Initial commit.

This commit is contained in:
2025-10-11 17:06:14 +00:00
commit ec521c3c91
45 changed files with 9798 additions and 0 deletions

View File

@@ -0,0 +1,75 @@
package operations
import (
"sync"
)
// Broadcaster manages SSE clients subscribed to operation output
type Broadcaster struct {
clients map[string]map[chan []byte]bool // opID -> set of client channels
mu sync.RWMutex
}
// NewBroadcaster creates a new broadcaster
func NewBroadcaster() *Broadcaster {
return &Broadcaster{
clients: make(map[string]map[chan []byte]bool),
}
}
// Subscribe creates a new channel for receiving operation output
func (b *Broadcaster) Subscribe(opID string) chan []byte {
b.mu.Lock()
defer b.mu.Unlock()
ch := make(chan []byte, 100) // Buffered to prevent slow clients from blocking
if b.clients[opID] == nil {
b.clients[opID] = make(map[chan []byte]bool)
}
b.clients[opID][ch] = true
return ch
}
// Unsubscribe removes a client channel and closes it
func (b *Broadcaster) Unsubscribe(opID string, ch chan []byte) {
b.mu.Lock()
defer b.mu.Unlock()
if clients, ok := b.clients[opID]; ok {
delete(clients, ch)
close(ch)
if len(clients) == 0 {
delete(b.clients, opID)
}
}
}
// Publish sends data to all subscribed clients for an operation
func (b *Broadcaster) Publish(opID string, data []byte) {
b.mu.RLock()
defer b.mu.RUnlock()
if clients, ok := b.clients[opID]; ok {
for ch := range clients {
select {
case ch <- data:
// Sent successfully
default:
// Channel buffer full, skip this message for this client
}
}
}
}
// Close closes all client channels for an operation
func (b *Broadcaster) Close(opID string) {
b.mu.Lock()
defer b.mu.Unlock()
if clients, ok := b.clients[opID]; ok {
for ch := range clients {
close(ch)
}
delete(b.clients, opID)
}
}

View File

@@ -0,0 +1,255 @@
package operations
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"
"github.com/wild-cloud/wild-central/daemon/internal/storage"
)
// Manager handles async operation tracking
type Manager struct {
dataDir string
}
// NewManager creates a new operations manager
func NewManager(dataDir string) *Manager {
return &Manager{
dataDir: dataDir,
}
}
// Operation represents a long-running operation
type Operation struct {
ID string `json:"id"`
Type string `json:"type"` // discover, setup, download, bootstrap
Target string `json:"target"`
Instance string `json:"instance"`
Status string `json:"status"` // pending, running, completed, failed, cancelled
Message string `json:"message,omitempty"`
Progress int `json:"progress"` // 0-100
LogFile string `json:"logFile,omitempty"` // Path to output log file
StartedAt time.Time `json:"started_at"`
EndedAt time.Time `json:"ended_at,omitempty"`
}
// GetOperationsDir returns the operations directory for an instance
func (m *Manager) GetOperationsDir(instanceName string) string {
return filepath.Join(m.dataDir, "instances", instanceName, "operations")
}
// generateID generates a unique operation ID
func generateID(opType, target string) string {
timestamp := time.Now().UnixNano()
return fmt.Sprintf("op_%s_%s_%d", opType, target, timestamp)
}
// Start begins tracking a new operation
func (m *Manager) Start(instanceName, opType, target string) (string, error) {
opsDir := m.GetOperationsDir(instanceName)
// Ensure operations directory exists
if err := storage.EnsureDir(opsDir, 0755); err != nil {
return "", err
}
// Generate operation ID
opID := generateID(opType, target)
// Create operation
op := &Operation{
ID: opID,
Type: opType,
Target: target,
Instance: instanceName,
Status: "pending",
Progress: 0,
StartedAt: time.Now(),
}
// Write operation file
if err := m.writeOperation(op); err != nil {
return "", err
}
return opID, nil
}
// Get returns operation status
func (m *Manager) Get(opID string) (*Operation, error) {
// Operation ID contains instance name, but we need to find it
// For now, we'll scan all instances (not ideal but simple)
// Better approach: encode instance in operation ID or maintain index
// Simplified: assume operation ID format is op_{type}_{target}_{timestamp}
// We need to know which instance to look in
// For now, return error if we can't find it
// This needs improvement in actual implementation
return nil, fmt.Errorf("operation lookup not implemented - need instance context")
}
// GetByInstance returns an operation for a specific instance
func (m *Manager) GetByInstance(instanceName, opID string) (*Operation, error) {
opsDir := m.GetOperationsDir(instanceName)
opPath := filepath.Join(opsDir, opID+".json")
if !storage.FileExists(opPath) {
return nil, fmt.Errorf("operation %s not found", opID)
}
data, err := os.ReadFile(opPath)
if err != nil {
return nil, fmt.Errorf("failed to read operation: %w", err)
}
var op Operation
if err := json.Unmarshal(data, &op); err != nil {
return nil, fmt.Errorf("failed to parse operation: %w", err)
}
return &op, nil
}
// Update modifies operation state
func (m *Manager) Update(instanceName, opID, status, message string, progress int) error {
op, err := m.GetByInstance(instanceName, opID)
if err != nil {
return err
}
op.Status = status
op.Message = message
op.Progress = progress
if status == "completed" || status == "failed" || status == "cancelled" {
op.EndedAt = time.Now()
}
return m.writeOperation(op)
}
// UpdateStatus updates only the status
func (m *Manager) UpdateStatus(instanceName, opID, status string) error {
op, err := m.GetByInstance(instanceName, opID)
if err != nil {
return err
}
op.Status = status
if status == "completed" || status == "failed" || status == "cancelled" {
op.EndedAt = time.Now()
}
return m.writeOperation(op)
}
// UpdateProgress updates operation progress
func (m *Manager) UpdateProgress(instanceName, opID string, progress int, message string) error {
op, err := m.GetByInstance(instanceName, opID)
if err != nil {
return err
}
op.Progress = progress
if message != "" {
op.Message = message
}
return m.writeOperation(op)
}
// Cancel requests operation cancellation
func (m *Manager) Cancel(instanceName, opID string) error {
return m.UpdateStatus(instanceName, opID, "cancelled")
}
// List returns all operations for an instance
func (m *Manager) List(instanceName string) ([]Operation, error) {
opsDir := m.GetOperationsDir(instanceName)
// Ensure directory exists
if err := storage.EnsureDir(opsDir, 0755); err != nil {
return nil, err
}
// Read all operation files
entries, err := os.ReadDir(opsDir)
if err != nil {
return nil, fmt.Errorf("failed to read operations directory: %w", err)
}
operations := []Operation{}
for _, entry := range entries {
if entry.IsDir() || filepath.Ext(entry.Name()) != ".json" {
continue
}
opPath := filepath.Join(opsDir, entry.Name())
data, err := os.ReadFile(opPath)
if err != nil {
continue // Skip files we can't read
}
var op Operation
if err := json.Unmarshal(data, &op); err != nil {
continue // Skip invalid JSON
}
operations = append(operations, op)
}
return operations, nil
}
// Delete removes an operation record
func (m *Manager) Delete(instanceName, opID string) error {
opsDir := m.GetOperationsDir(instanceName)
opPath := filepath.Join(opsDir, opID+".json")
if !storage.FileExists(opPath) {
return nil // Already deleted, idempotent
}
return os.Remove(opPath)
}
// Cleanup removes old completed/failed operations
func (m *Manager) Cleanup(instanceName string, olderThan time.Duration) error {
ops, err := m.List(instanceName)
if err != nil {
return err
}
cutoff := time.Now().Add(-olderThan)
for _, op := range ops {
if (op.Status == "completed" || op.Status == "failed" || op.Status == "cancelled") &&
!op.EndedAt.IsZero() && op.EndedAt.Before(cutoff) {
m.Delete(instanceName, op.ID)
}
}
return nil
}
// writeOperation writes operation to disk
func (m *Manager) writeOperation(op *Operation) error {
opsDir := m.GetOperationsDir(op.Instance)
opPath := filepath.Join(opsDir, op.ID+".json")
data, err := json.MarshalIndent(op, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal operation: %w", err)
}
if err := storage.WriteFile(opPath, data, 0644); err != nil {
return fmt.Errorf("failed to write operation: %w", err)
}
return nil
}