diff --git a/BUILDING_WILD_API.md b/BUILDING_WILD_API.md index 619daf3..746bf70 100644 --- a/BUILDING_WILD_API.md +++ b/BUILDING_WILD_API.md @@ -38,6 +38,7 @@ - Write unit tests for all functions and methods. - Make and use common modules. For example, one module should handle all interactions with talosctl. Another modules should handle all interactions with kubectl. - If the code is getting long and complex, break it into smaller modules. +- API requests and responses should be valid JSON. Object attributes should be standard JSON camel-cased. ### Features diff --git a/internal/api/v1/handlers.go b/internal/api/v1/handlers.go index 64af0d9..b2f0402 100644 --- a/internal/api/v1/handlers.go +++ b/internal/api/v1/handlers.go @@ -30,6 +30,7 @@ type API struct { context *context.Manager instance *instance.Manager dnsmasq *dnsmasq.ConfigGenerator + opsMgr *operations.Manager // Operations manager broadcaster *operations.Broadcaster // SSE broadcaster for operation output } @@ -57,6 +58,7 @@ func NewAPI(dataDir, appsDir string) (*API, error) { context: context.NewManager(dataDir), instance: instance.NewManager(dataDir), dnsmasq: dnsmasq.NewConfigGenerator(dnsmasqConfigPath), + opsMgr: operations.NewManager(dataDir), broadcaster: operations.NewBroadcaster(), }, nil } @@ -85,6 +87,7 @@ func (api *API) RegisterRoutes(r *mux.Router) { r.HandleFunc("/api/v1/instances/{name}/nodes/discover", api.NodeDiscover).Methods("POST") r.HandleFunc("/api/v1/instances/{name}/nodes/detect", api.NodeDetect).Methods("POST") r.HandleFunc("/api/v1/instances/{name}/discovery", api.NodeDiscoveryStatus).Methods("GET") + r.HandleFunc("/api/v1/instances/{name}/discovery/cancel", api.NodeDiscoveryCancel).Methods("POST") r.HandleFunc("/api/v1/instances/{name}/nodes/hardware/{ip}", api.NodeHardware).Methods("GET") r.HandleFunc("/api/v1/instances/{name}/nodes/fetch-templates", api.NodeFetchTemplates).Methods("POST") r.HandleFunc("/api/v1/instances/{name}/nodes", api.NodeAdd).Methods("POST") diff --git a/internal/api/v1/handlers_cluster.go b/internal/api/v1/handlers_cluster.go index 8c7f6f4..d2f21aa 100644 --- a/internal/api/v1/handlers_cluster.go +++ b/internal/api/v1/handlers_cluster.go @@ -46,15 +46,15 @@ func (api *API) ClusterGenerateConfig(w http.ResponseWriter, r *http.Request) { } // Create cluster config - config := cluster.ClusterConfig{ + clusterConfig := cluster.ClusterConfig{ ClusterName: clusterName, VIP: vip, Version: version, } // Generate configuration - clusterMgr := cluster.NewManager(api.dataDir) - if err := clusterMgr.GenerateConfig(instanceName, &config); err != nil { + clusterMgr := cluster.NewManager(api.dataDir, api.opsMgr) + if err := clusterMgr.GenerateConfig(instanceName, &clusterConfig); err != nil { respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to generate config: %v", err)) return } @@ -90,26 +90,14 @@ func (api *API) ClusterBootstrap(w http.ResponseWriter, r *http.Request) { return } - // Start bootstrap operation - opsMgr := operations.NewManager(api.dataDir) - opID, err := opsMgr.Start(instanceName, "bootstrap", req.Node) + // Bootstrap with progress tracking + clusterMgr := cluster.NewManager(api.dataDir, api.opsMgr) + opID, err := clusterMgr.Bootstrap(instanceName, req.Node) if err != nil { - respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to start operation: %v", err)) + respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to start bootstrap: %v", err)) return } - // Bootstrap in background - go func() { - clusterMgr := cluster.NewManager(api.dataDir) - _ = opsMgr.UpdateStatus(instanceName, opID, "running") - - if err := clusterMgr.Bootstrap(instanceName, req.Node); err != nil { - _ = opsMgr.Update(instanceName, opID, "failed", err.Error(), 0) - } else { - _ = opsMgr.Update(instanceName, opID, "completed", "Bootstrap completed", 100) - } - }() - respondJSON(w, http.StatusAccepted, map[string]string{ "operation_id": opID, "message": "Bootstrap initiated", @@ -138,7 +126,7 @@ func (api *API) ClusterConfigureEndpoints(w http.ResponseWriter, r *http.Request } // Configure endpoints - clusterMgr := cluster.NewManager(api.dataDir) + clusterMgr := cluster.NewManager(api.dataDir, api.opsMgr) if err := clusterMgr.ConfigureEndpoints(instanceName, req.IncludeNodes); err != nil { respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to configure endpoints: %v", err)) return @@ -161,7 +149,7 @@ func (api *API) ClusterGetStatus(w http.ResponseWriter, r *http.Request) { } // Get status - clusterMgr := cluster.NewManager(api.dataDir) + clusterMgr := cluster.NewManager(api.dataDir, api.opsMgr) status, err := clusterMgr.GetStatus(instanceName) if err != nil { respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to get status: %v", err)) @@ -183,7 +171,7 @@ func (api *API) ClusterHealth(w http.ResponseWriter, r *http.Request) { } // Get health checks - clusterMgr := cluster.NewManager(api.dataDir) + clusterMgr := cluster.NewManager(api.dataDir, api.opsMgr) checks, err := clusterMgr.Health(instanceName) if err != nil { respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to get health: %v", err)) @@ -219,7 +207,7 @@ func (api *API) ClusterGetKubeconfig(w http.ResponseWriter, r *http.Request) { } // Get kubeconfig - clusterMgr := cluster.NewManager(api.dataDir) + clusterMgr := cluster.NewManager(api.dataDir, api.opsMgr) kubeconfig, err := clusterMgr.GetKubeconfig(instanceName) if err != nil { respondError(w, http.StatusNotFound, fmt.Sprintf("Kubeconfig not found: %v", err)) @@ -243,7 +231,7 @@ func (api *API) ClusterGenerateKubeconfig(w http.ResponseWriter, r *http.Request } // Regenerate kubeconfig from cluster - clusterMgr := cluster.NewManager(api.dataDir) + clusterMgr := cluster.NewManager(api.dataDir, api.opsMgr) if err := clusterMgr.RegenerateKubeconfig(instanceName); err != nil { respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to generate kubeconfig: %v", err)) return @@ -266,7 +254,7 @@ func (api *API) ClusterGetTalosconfig(w http.ResponseWriter, r *http.Request) { } // Get talosconfig - clusterMgr := cluster.NewManager(api.dataDir) + clusterMgr := cluster.NewManager(api.dataDir, api.opsMgr) talosconfig, err := clusterMgr.GetTalosconfig(instanceName) if err != nil { respondError(w, http.StatusNotFound, fmt.Sprintf("Talosconfig not found: %v", err)) @@ -314,7 +302,7 @@ func (api *API) ClusterReset(w http.ResponseWriter, r *http.Request) { // Reset in background go func() { - clusterMgr := cluster.NewManager(api.dataDir) + clusterMgr := cluster.NewManager(api.dataDir, api.opsMgr) _ = opsMgr.UpdateStatus(instanceName, opID, "running") if err := clusterMgr.Reset(instanceName, req.Confirm); err != nil { diff --git a/internal/api/v1/handlers_node.go b/internal/api/v1/handlers_node.go index 1c94bba..303feb4 100644 --- a/internal/api/v1/handlers_node.go +++ b/internal/api/v1/handlers_node.go @@ -12,6 +12,7 @@ import ( ) // NodeDiscover initiates node discovery +// Accepts optional subnet parameter. If no subnet provided, auto-detects local networks. func (api *API) NodeDiscover(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) instanceName := vars["name"] @@ -22,10 +23,9 @@ func (api *API) NodeDiscover(w http.ResponseWriter, r *http.Request) { return } - // Parse request body - support both subnet and ip_list formats + // Parse request body - only subnet is supported var req struct { - Subnet string `json:"subnet"` - IPList []string `json:"ip_list"` + Subnet string `json:"subnet,omitempty"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { @@ -33,16 +33,38 @@ func (api *API) NodeDiscover(w http.ResponseWriter, r *http.Request) { return } - // If subnet provided, use it as a single "IP" for discovery - // The discovery manager will scan this subnet + // Build IP list var ipList []string + var err error + if req.Subnet != "" { - ipList = []string{req.Subnet} - } else if len(req.IPList) > 0 { - ipList = req.IPList + // Expand provided CIDR notation to individual IPs + ipList, err = discovery.ExpandSubnet(req.Subnet) + if err != nil { + respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid subnet: %v", err)) + return + } } else { - respondError(w, http.StatusBadRequest, "subnet or ip_list is required") - return + // Auto-detect: Get local networks when no subnet provided + networks, err := discovery.GetLocalNetworks() + if err != nil { + respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to detect local networks: %v", err)) + return + } + + if len(networks) == 0 { + respondError(w, http.StatusNotFound, "No local networks found") + return + } + + // Expand all detected networks + for _, network := range networks { + ips, err := discovery.ExpandSubnet(network) + if err != nil { + continue // Skip invalid networks + } + ipList = append(ipList, ips...) + } } // Start discovery @@ -52,9 +74,10 @@ func (api *API) NodeDiscover(w http.ResponseWriter, r *http.Request) { return } - respondJSON(w, http.StatusAccepted, map[string]string{ - "message": "Discovery started", - "status": "running", + respondJSON(w, http.StatusAccepted, map[string]interface{}{ + "message": "Discovery started", + "status": "running", + "ips_to_scan": len(ipList), }) } @@ -92,7 +115,7 @@ func (api *API) NodeHardware(w http.ResponseWriter, r *http.Request) { } // Detect hardware - nodeMgr := node.NewManager(api.dataDir) + nodeMgr := node.NewManager(api.dataDir, instanceName) hwInfo, err := nodeMgr.DetectHardware(nodeIP) if err != nil { respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to detect hardware: %v", err)) @@ -103,6 +126,7 @@ func (api *API) NodeHardware(w http.ResponseWriter, r *http.Request) { } // NodeDetect detects hardware on a single node (POST with IP in body) +// IP address is required. func (api *API) NodeDetect(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) instanceName := vars["name"] @@ -123,13 +147,14 @@ func (api *API) NodeDetect(w http.ResponseWriter, r *http.Request) { return } + // Validate IP is provided if req.IP == "" { - respondError(w, http.StatusBadRequest, "ip is required") + respondError(w, http.StatusBadRequest, "IP address is required") return } - // Detect hardware - nodeMgr := node.NewManager(api.dataDir) + // Detect hardware for specific IP + nodeMgr := node.NewManager(api.dataDir, instanceName) hwInfo, err := nodeMgr.DetectHardware(req.IP) if err != nil { respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to detect hardware: %v", err)) @@ -158,7 +183,7 @@ func (api *API) NodeAdd(w http.ResponseWriter, r *http.Request) { } // Add node - nodeMgr := node.NewManager(api.dataDir) + nodeMgr := node.NewManager(api.dataDir, instanceName) if err := nodeMgr.Add(instanceName, &nodeData); err != nil { respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to add node: %v", err)) return @@ -182,7 +207,7 @@ func (api *API) NodeList(w http.ResponseWriter, r *http.Request) { } // List nodes - nodeMgr := node.NewManager(api.dataDir) + nodeMgr := node.NewManager(api.dataDir, instanceName) nodes, err := nodeMgr.List(instanceName) if err != nil { respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to list nodes: %v", err)) @@ -207,7 +232,7 @@ func (api *API) NodeGet(w http.ResponseWriter, r *http.Request) { } // Get node - nodeMgr := node.NewManager(api.dataDir) + nodeMgr := node.NewManager(api.dataDir, instanceName) nodeData, err := nodeMgr.Get(instanceName, nodeIdentifier) if err != nil { respondError(w, http.StatusNotFound, fmt.Sprintf("Node not found: %v", err)) @@ -233,7 +258,7 @@ func (api *API) NodeApply(w http.ResponseWriter, r *http.Request) { opts := node.ApplyOptions{} // Apply node configuration - nodeMgr := node.NewManager(api.dataDir) + nodeMgr := node.NewManager(api.dataDir, instanceName) if err := nodeMgr.Apply(instanceName, nodeIdentifier, opts); err != nil { respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to apply node configuration: %v", err)) return @@ -265,7 +290,7 @@ func (api *API) NodeUpdate(w http.ResponseWriter, r *http.Request) { } // Update node - nodeMgr := node.NewManager(api.dataDir) + nodeMgr := node.NewManager(api.dataDir, instanceName) if err := nodeMgr.Update(instanceName, nodeIdentifier, updates); err != nil { respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to update node: %v", err)) return @@ -289,7 +314,7 @@ func (api *API) NodeFetchTemplates(w http.ResponseWriter, r *http.Request) { } // Fetch templates - nodeMgr := node.NewManager(api.dataDir) + nodeMgr := node.NewManager(api.dataDir, instanceName) if err := nodeMgr.FetchTemplates(instanceName); err != nil { respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to fetch templates: %v", err)) return @@ -313,7 +338,7 @@ func (api *API) NodeDelete(w http.ResponseWriter, r *http.Request) { } // Delete node - nodeMgr := node.NewManager(api.dataDir) + nodeMgr := node.NewManager(api.dataDir, instanceName) if err := nodeMgr.Delete(instanceName, nodeIdentifier); err != nil { respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to delete node: %v", err)) return @@ -323,3 +348,26 @@ func (api *API) NodeDelete(w http.ResponseWriter, r *http.Request) { "message": "Node deleted successfully", }) } + +// NodeDiscoveryCancel cancels an in-progress discovery operation +func (api *API) NodeDiscoveryCancel(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + instanceName := vars["name"] + + // Validate instance exists + if err := api.instance.ValidateInstance(instanceName); err != nil { + respondError(w, http.StatusNotFound, fmt.Sprintf("Instance not found: %v", err)) + return + } + + // Cancel discovery + discoveryMgr := discovery.NewManager(api.dataDir, instanceName) + if err := discoveryMgr.CancelDiscovery(instanceName); err != nil { + respondError(w, http.StatusBadRequest, fmt.Sprintf("Failed to cancel discovery: %v", err)) + return + } + + respondJSON(w, http.StatusOK, map[string]string{ + "message": "Discovery cancelled successfully", + }) +} diff --git a/internal/cluster/cluster.go b/internal/cluster/cluster.go index 73016ba..24437ca 100644 --- a/internal/cluster/cluster.go +++ b/internal/cluster/cluster.go @@ -1,6 +1,7 @@ package cluster import ( + "context" "encoding/json" "fmt" "log" @@ -10,6 +11,7 @@ import ( "strings" "time" + "github.com/wild-cloud/wild-central/daemon/internal/operations" "github.com/wild-cloud/wild-central/daemon/internal/storage" "github.com/wild-cloud/wild-central/daemon/internal/tools" ) @@ -18,13 +20,15 @@ import ( type Manager struct { dataDir string talosctl *tools.Talosctl + opsMgr *operations.Manager } // NewManager creates a new cluster manager -func NewManager(dataDir string) *Manager { +func NewManager(dataDir string, opsMgr *operations.Manager) *Manager { return &Manager{ dataDir: dataDir, talosctl: tools.NewTalosctl(), + opsMgr: opsMgr, } } @@ -96,11 +100,28 @@ func (m *Manager) GenerateConfig(instanceName string, config *ClusterConfig) err return nil } -// Bootstrap bootstraps the cluster on the specified node -func (m *Manager) Bootstrap(instanceName, nodeName string) error { - // Get node configuration to find the target IP - configPath := tools.GetInstanceConfigPath(m.dataDir, instanceName) +// Bootstrap bootstraps the cluster on the specified node with progress tracking +func (m *Manager) Bootstrap(instanceName, nodeName string) (string, error) { + // Create operation for tracking + opID, err := m.opsMgr.Start(instanceName, "bootstrap", nodeName) + if err != nil { + return "", fmt.Errorf("failed to start bootstrap operation: %w", err) + } + // Run bootstrap asynchronously + go func() { + if err := m.runBootstrapWithTracking(instanceName, nodeName, opID); err != nil { + _ = m.opsMgr.Update(instanceName, opID, "failed", err.Error(), 0) + } + }() + + return opID, nil +} + +// runBootstrapWithTracking runs the bootstrap process with detailed progress tracking +func (m *Manager) runBootstrapWithTracking(instanceName, nodeName, opID string) error { + ctx := context.Background() + configPath := tools.GetInstanceConfigPath(m.dataDir, instanceName) yq := tools.NewYQ() // Get node's target IP @@ -114,17 +135,71 @@ func (m *Manager) Bootstrap(instanceName, nodeName string) error { return fmt.Errorf("node %s does not have a target IP configured", nodeName) } - // Get talosconfig path for this instance + // Get VIP + vipRaw, err := yq.Get(configPath, ".cluster.nodes.control.vip") + if err != nil { + return fmt.Errorf("failed to get VIP: %w", err) + } + + vip := tools.CleanYQOutput(vipRaw) + if vip == "" || vip == "null" { + return fmt.Errorf("control plane VIP not configured") + } + + // Step 0: Run talosctl bootstrap + if err := m.runBootstrapCommand(instanceName, nodeIP, opID); err != nil { + return err + } + + // Step 1: Wait for etcd health + if err := m.waitForEtcd(ctx, instanceName, nodeIP, opID); err != nil { + return err + } + + // Step 2: Wait for VIP assignment + if err := m.waitForVIP(ctx, instanceName, nodeIP, vip, opID); err != nil { + return err + } + + // Step 3: Wait for control plane components + if err := m.waitForControlPlane(ctx, instanceName, nodeIP, opID); err != nil { + return err + } + + // Step 4: Wait for API server on VIP + if err := m.waitForAPIServer(ctx, instanceName, vip, opID); err != nil { + return err + } + + // Step 5: Configure cluster access + if err := m.configureClusterAccess(instanceName, vip, opID); err != nil { + return err + } + + // Step 6: Verify node registration + if err := m.waitForNodeRegistration(ctx, instanceName, opID); err != nil { + return err + } + + // Mark as completed + _ = m.opsMgr.Update(instanceName, opID, "completed", "Bootstrap completed successfully", 100) + return nil +} + +// runBootstrapCommand executes the initial bootstrap command +func (m *Manager) runBootstrapCommand(instanceName, nodeIP, opID string) error { + _ = m.opsMgr.UpdateBootstrapProgress(instanceName, opID, 0, "bootstrap", 1, 1, "Running talosctl bootstrap command") + talosconfigPath := tools.GetTalosconfigPath(m.dataDir, instanceName) - // Set talosctl endpoint (with proper context via TALOSCONFIG env var) + // Set talosctl endpoint cmdEndpoint := exec.Command("talosctl", "config", "endpoint", nodeIP) tools.WithTalosconfig(cmdEndpoint, talosconfigPath) if output, err := cmdEndpoint.CombinedOutput(); err != nil { return fmt.Errorf("failed to set talosctl endpoint: %w\nOutput: %s", err, string(output)) } - // Bootstrap command (with proper context via TALOSCONFIG env var) + // Bootstrap command cmd := exec.Command("talosctl", "bootstrap", "--nodes", nodeIP) tools.WithTalosconfig(cmd, talosconfigPath) output, err := cmd.CombinedOutput() @@ -132,16 +207,152 @@ func (m *Manager) Bootstrap(instanceName, nodeName string) error { return fmt.Errorf("failed to bootstrap cluster: %w\nOutput: %s", err, string(output)) } - // Retrieve kubeconfig after bootstrap (best-effort with retry) - log.Printf("Waiting for Kubernetes API server to become ready...") - if err := m.retrieveKubeconfigFromCluster(instanceName, nodeIP, 5*time.Minute); err != nil { - log.Printf("Warning: %v", err) - log.Printf("You can retrieve it manually later using: wild cluster kubeconfig --generate") + return nil +} + +// waitForEtcd waits for etcd to become healthy +func (m *Manager) waitForEtcd(ctx context.Context, instanceName, nodeIP, opID string) error { + maxAttempts := 30 + talosconfigPath := tools.GetTalosconfigPath(m.dataDir, instanceName) + + for attempt := 1; attempt <= maxAttempts; attempt++ { + _ = m.opsMgr.UpdateBootstrapProgress(instanceName, opID, 1, "etcd", attempt, maxAttempts, "Waiting for etcd to become healthy") + + cmd := exec.Command("talosctl", "-n", nodeIP, "etcd", "status") + tools.WithTalosconfig(cmd, talosconfigPath) + output, err := cmd.CombinedOutput() + + if err == nil && strings.Contains(string(output), nodeIP) { + return nil + } + + if attempt < maxAttempts { + time.Sleep(10 * time.Second) + } + } + + return fmt.Errorf("etcd did not become healthy after %d attempts", maxAttempts) +} + +// waitForVIP waits for VIP to be assigned to the node +func (m *Manager) waitForVIP(ctx context.Context, instanceName, nodeIP, vip, opID string) error { + maxAttempts := 90 + talosconfigPath := tools.GetTalosconfigPath(m.dataDir, instanceName) + + for attempt := 1; attempt <= maxAttempts; attempt++ { + _ = m.opsMgr.UpdateBootstrapProgress(instanceName, opID, 2, "vip", attempt, maxAttempts, "Waiting for VIP assignment") + + cmd := exec.Command("talosctl", "-n", nodeIP, "get", "addresses") + tools.WithTalosconfig(cmd, talosconfigPath) + output, err := cmd.CombinedOutput() + + if err == nil && strings.Contains(string(output), vip+"/32") { + return nil + } + + if attempt < maxAttempts { + time.Sleep(10 * time.Second) + } + } + + return fmt.Errorf("VIP was not assigned after %d attempts", maxAttempts) +} + +// waitForControlPlane waits for control plane components to start +func (m *Manager) waitForControlPlane(ctx context.Context, instanceName, nodeIP, opID string) error { + maxAttempts := 60 + talosconfigPath := tools.GetTalosconfigPath(m.dataDir, instanceName) + + for attempt := 1; attempt <= maxAttempts; attempt++ { + _ = m.opsMgr.UpdateBootstrapProgress(instanceName, opID, 3, "controlplane", attempt, maxAttempts, "Waiting for control plane components") + + cmd := exec.Command("talosctl", "-n", nodeIP, "containers", "-k") + tools.WithTalosconfig(cmd, talosconfigPath) + output, err := cmd.CombinedOutput() + + if err == nil && strings.Contains(string(output), "kube-") { + return nil + } + + if attempt < maxAttempts { + time.Sleep(10 * time.Second) + } + } + + return fmt.Errorf("control plane components did not start after %d attempts", maxAttempts) +} + +// waitForAPIServer waits for Kubernetes API server to respond +func (m *Manager) waitForAPIServer(ctx context.Context, instanceName, vip, opID string) error { + maxAttempts := 60 + apiURL := fmt.Sprintf("https://%s:6443/healthz", vip) + + for attempt := 1; attempt <= maxAttempts; attempt++ { + _ = m.opsMgr.UpdateBootstrapProgress(instanceName, opID, 4, "apiserver", attempt, maxAttempts, "Waiting for Kubernetes API server") + + cmd := exec.Command("curl", "-k", "-s", "--max-time", "5", apiURL) + output, err := cmd.CombinedOutput() + + if err == nil && strings.Contains(string(output), "ok") { + return nil + } + + if attempt < maxAttempts { + time.Sleep(10 * time.Second) + } + } + + return fmt.Errorf("API server did not respond after %d attempts", maxAttempts) +} + +// configureClusterAccess configures talosctl and kubectl to use the VIP +func (m *Manager) configureClusterAccess(instanceName, vip, opID string) error { + _ = m.opsMgr.UpdateBootstrapProgress(instanceName, opID, 5, "configure", 1, 1, "Configuring cluster access") + + talosconfigPath := tools.GetTalosconfigPath(m.dataDir, instanceName) + kubeconfigPath := tools.GetKubeconfigPath(m.dataDir, instanceName) + + // Set talosctl endpoint to VIP + cmdEndpoint := exec.Command("talosctl", "config", "endpoint", vip) + tools.WithTalosconfig(cmdEndpoint, talosconfigPath) + if output, err := cmdEndpoint.CombinedOutput(); err != nil { + return fmt.Errorf("failed to set talosctl endpoint: %w\nOutput: %s", err, string(output)) + } + + // Retrieve kubeconfig + cmdKubeconfig := exec.Command("talosctl", "kubeconfig", "--nodes", vip, kubeconfigPath) + tools.WithTalosconfig(cmdKubeconfig, talosconfigPath) + if output, err := cmdKubeconfig.CombinedOutput(); err != nil { + return fmt.Errorf("failed to retrieve kubeconfig: %w\nOutput: %s", err, string(output)) } return nil } +// waitForNodeRegistration waits for the node to register with Kubernetes +func (m *Manager) waitForNodeRegistration(ctx context.Context, instanceName, opID string) error { + maxAttempts := 10 + kubeconfigPath := tools.GetKubeconfigPath(m.dataDir, instanceName) + + for attempt := 1; attempt <= maxAttempts; attempt++ { + _ = m.opsMgr.UpdateBootstrapProgress(instanceName, opID, 6, "nodes", attempt, maxAttempts, "Waiting for node registration") + + cmd := exec.Command("kubectl", "get", "nodes") + tools.WithKubeconfig(cmd, kubeconfigPath) + output, err := cmd.CombinedOutput() + + if err == nil && strings.Contains(string(output), "Ready") { + return nil + } + + if attempt < maxAttempts { + time.Sleep(10 * time.Second) + } + } + + return fmt.Errorf("node did not register after %d attempts", maxAttempts) +} + // retrieveKubeconfigFromCluster retrieves kubeconfig from the cluster with retry logic func (m *Manager) retrieveKubeconfigFromCluster(instanceName, nodeIP string, timeout time.Duration) error { kubeconfigPath := tools.GetKubeconfigPath(m.dataDir, instanceName) diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go index f2e7f02..2f112bf 100644 --- a/internal/discovery/discovery.go +++ b/internal/discovery/discovery.go @@ -3,6 +3,7 @@ package discovery import ( "encoding/json" "fmt" + "net" "os" "path/filepath" "sync" @@ -28,19 +29,17 @@ func NewManager(dataDir string, instanceName string) *Manager { return &Manager{ dataDir: dataDir, - nodeMgr: node.NewManager(dataDir), + nodeMgr: node.NewManager(dataDir, instanceName), talosctl: tools.NewTalosconfigWithConfig(talosconfigPath), } } -// DiscoveredNode represents a discovered node on the network +// DiscoveredNode represents a discovered node on the network (maintenance mode only) type DiscoveredNode struct { - IP string `json:"ip"` - Hostname string `json:"hostname,omitempty"` - MaintenanceMode bool `json:"maintenance_mode"` - Version string `json:"version,omitempty"` - Interface string `json:"interface,omitempty"` - Disks []string `json:"disks,omitempty"` + IP string `json:"ip"` + Hostname string `json:"hostname,omitempty"` + MaintenanceMode bool `json:"maintenance_mode"` + Version string `json:"version,omitempty"` } // DiscoveryStatus represents the current state of discovery @@ -130,17 +129,42 @@ func (m *Manager) runDiscovery(instanceName string, ipList []string) { _ = m.writeDiscoveryStatus(instanceName, status) }() - // Discover nodes by probing each IP - discoveredNodes := []DiscoveredNode{} + // Discover nodes by probing each IP in parallel + var wg sync.WaitGroup + resultsChan := make(chan DiscoveredNode, len(ipList)) + + // Limit concurrent scans to avoid overwhelming the network + semaphore := make(chan struct{}, 50) for _, ip := range ipList { - node, err := m.probeNode(ip) - if err != nil { - // Node not reachable or not a Talos node - continue - } + wg.Add(1) + go func(ip string) { + defer wg.Done() - discoveredNodes = append(discoveredNodes, *node) + // Acquire semaphore + semaphore <- struct{}{} + defer func() { <-semaphore }() + + node, err := m.probeNode(ip) + if err != nil { + // Node not reachable or not a Talos node + return + } + + resultsChan <- *node + }(ip) + } + + // Close results channel when all goroutines complete + go func() { + wg.Wait() + close(resultsChan) + }() + + // Collect results and update status incrementally + discoveredNodes := []DiscoveredNode{} + for node := range resultsChan { + discoveredNodes = append(discoveredNodes, node) // Update status incrementally m.discoveryMu.Lock() @@ -151,37 +175,20 @@ func (m *Manager) runDiscovery(instanceName string, ipList []string) { } } -// probeNode attempts to detect if a node is running Talos +// probeNode attempts to detect if a node is running Talos in maintenance mode func (m *Manager) probeNode(ip string) (*DiscoveredNode, error) { - // Attempt to get version (quick connectivity test) - version, err := m.talosctl.GetVersion(ip, false) + // Try insecure connection first (maintenance mode) + version, err := m.talosctl.GetVersion(ip, true) if err != nil { + // Not in maintenance mode or not reachable return nil, err } - // Node is reachable, get hardware info - hwInfo, err := m.nodeMgr.DetectHardware(ip) - if err != nil { - // Still count it as discovered even if we can't get full hardware - return &DiscoveredNode{ - IP: ip, - MaintenanceMode: false, - Version: version, - }, nil - } - - // Extract just the disk paths for discovery output - diskPaths := make([]string, len(hwInfo.Disks)) - for i, disk := range hwInfo.Disks { - diskPaths[i] = disk.Path - } - + // If insecure connection works, node is in maintenance mode return &DiscoveredNode{ IP: ip, - MaintenanceMode: hwInfo.MaintenanceMode, + MaintenanceMode: true, Version: version, - Interface: hwInfo.Interface, - Disks: diskPaths, }, nil } @@ -245,3 +252,132 @@ func (m *Manager) writeDiscoveryStatus(instanceName string, status *DiscoverySta return nil } + +// CancelDiscovery cancels an in-progress discovery operation +func (m *Manager) CancelDiscovery(instanceName string) error { + m.discoveryMu.Lock() + defer m.discoveryMu.Unlock() + + // Get current status + status, err := m.GetDiscoveryStatus(instanceName) + if err != nil { + return err + } + + if !status.Active { + return fmt.Errorf("no discovery in progress") + } + + // Mark discovery as cancelled + status.Active = false + status.Error = "Discovery cancelled by user" + + if err := m.writeDiscoveryStatus(instanceName, status); err != nil { + return err + } + + return nil +} + +// GetLocalNetworks discovers local network interfaces and returns their CIDR addresses +// Skips loopback, link-local, and down interfaces +// Only returns IPv4 networks +func GetLocalNetworks() ([]string, error) { + interfaces, err := net.Interfaces() + if err != nil { + return nil, fmt.Errorf("failed to get network interfaces: %w", err) + } + + var networks []string + for _, iface := range interfaces { + // Skip loopback and down interfaces + if iface.Flags&net.FlagLoopback != 0 || iface.Flags&net.FlagUp == 0 { + continue + } + + addrs, err := iface.Addrs() + if err != nil { + continue + } + + for _, addr := range addrs { + ipnet, ok := addr.(*net.IPNet) + if !ok { + continue + } + + // Only IPv4 for now + if ipnet.IP.To4() == nil { + continue + } + + // Skip link-local addresses (169.254.0.0/16) + if ipnet.IP.IsLinkLocalUnicast() { + continue + } + + networks = append(networks, ipnet.String()) + } + } + + return networks, nil +} + +// ExpandSubnet expands a CIDR notation subnet into individual IP addresses +// Example: "192.168.8.0/24" → ["192.168.8.1", "192.168.8.2", ..., "192.168.8.254"] +// Also handles single IPs (without CIDR notation) +func ExpandSubnet(subnet string) ([]string, error) { + // Check if it's a CIDR notation + ip, ipnet, err := net.ParseCIDR(subnet) + if err != nil { + // Not a CIDR, might be single IP + if net.ParseIP(subnet) != nil { + return []string{subnet}, nil + } + return nil, fmt.Errorf("invalid IP or CIDR: %s", subnet) + } + + // Special case: /32 (single host) - just return the IP + ones, _ := ipnet.Mask.Size() + if ones == 32 { + return []string{ip.String()}, nil + } + + var ips []string + + // Iterate through all IPs in the subnet + for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); incIP(ip) { + // Skip network address (first IP) + if ip.Equal(ipnet.IP) { + continue + } + + // Skip broadcast address (last IP) + if isLastIP(ip, ipnet) { + continue + } + + ips = append(ips, ip.String()) + } + + return ips, nil +} + +// incIP increments an IP address +func incIP(ip net.IP) { + for j := len(ip) - 1; j >= 0; j-- { + ip[j]++ + if ip[j] > 0 { + break + } + } +} + +// isLastIP checks if an IP is the last IP in a subnet (broadcast address) +func isLastIP(ip net.IP, ipnet *net.IPNet) bool { + lastIP := make(net.IP, len(ip)) + for i := range ip { + lastIP[i] = ip[i] | ^ipnet.Mask[i] + } + return ip.Equal(lastIP) +} diff --git a/internal/node/node.go b/internal/node/node.go index 6ac763a..77f8cb5 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -20,11 +20,22 @@ type Manager struct { } // NewManager creates a new node manager -func NewManager(dataDir string) *Manager { +func NewManager(dataDir string, instanceName string) *Manager { + var talosctl *tools.Talosctl + + // If instanceName is provided, use instance-specific talosconfig + // Otherwise, create basic talosctl (will use --insecure mode) + if instanceName != "" { + talosconfigPath := tools.GetTalosconfigPath(dataDir, instanceName) + talosctl = tools.NewTalosconfigWithConfig(talosconfigPath) + } else { + talosctl = tools.NewTalosctl() + } + return &Manager{ dataDir: dataDir, configMgr: config.NewManager(), - talosctl: tools.NewTalosctl(), + talosctl: talosctl, } } @@ -254,12 +265,14 @@ func (m *Manager) Delete(instanceName, nodeIdentifier string) error { configPath := filepath.Join(instancePath, "config.yaml") // Delete node from config.yaml - // Path: cluster.nodes.active.{hostname} - nodePath := fmt.Sprintf("cluster.nodes.active.%s", node.Hostname) + // Path: .cluster.nodes.active["hostname"] + // Use bracket notation to safely handle hostnames with special characters + nodePath := fmt.Sprintf(".cluster.nodes.active[\"%s\"]", node.Hostname) yq := tools.NewYQ() // Use yq to delete the node - _, err = yq.Exec("eval", "-i", fmt.Sprintf("del(%s)", nodePath), configPath) + delExpr := fmt.Sprintf("del(%s)", nodePath) + _, err = yq.Exec("eval", "-i", delExpr, configPath) if err != nil { return fmt.Errorf("failed to delete node: %w", err) } @@ -268,10 +281,20 @@ func (m *Manager) Delete(instanceName, nodeIdentifier string) error { } // DetectHardware queries node hardware information via talosctl +// Automatically detects maintenance mode by trying insecure first, then secure func (m *Manager) DetectHardware(nodeIP string) (*HardwareInfo, error) { - // Query node with insecure flag (maintenance mode) - insecure := true + // Try insecure first (maintenance mode) + hwInfo, err := m.detectHardwareWithMode(nodeIP, true) + if err == nil { + return hwInfo, nil + } + // Fall back to secure (configured node) + return m.detectHardwareWithMode(nodeIP, false) +} + +// detectHardwareWithMode queries node hardware with specified connection mode +func (m *Manager) detectHardwareWithMode(nodeIP string, insecure bool) (*HardwareInfo, error) { // Try to get default interface (with default route) iface, err := m.talosctl.GetDefaultInterface(nodeIP, insecure) if err != nil { @@ -299,10 +322,11 @@ func (m *Manager) DetectHardware(nodeIP string) (*HardwareInfo, error) { Interface: iface, Disks: disks, SelectedDisk: selectedDisk, - MaintenanceMode: true, + MaintenanceMode: insecure, // If we used insecure, it's in maintenance mode }, nil } + // Apply generates configuration and applies it to node // This follows the wild-node-apply flow: // 1. Auto-fetch templates if missing @@ -380,9 +404,9 @@ func (m *Manager) Apply(instanceName, nodeIdentifier string, opts ApplyOptions) // Determine which IP to use and whether node is in maintenance mode // // Three scenarios: - // 1. Production node (currentIP empty/same, maintenance=false): use targetIP, no --insecure + // 1. Production node (already applied, maintenance=false): use targetIP, no --insecure // 2. IP changing (currentIP != targetIP): use currentIP, --insecure (always maintenance) - // 3. Maintenance at target (maintenance=true, no IP change): use targetIP, --insecure + // 3. Fresh/maintenance node (never applied OR maintenance=true): use targetIP, --insecure var deployIP string var maintenanceMode bool @@ -390,12 +414,13 @@ func (m *Manager) Apply(instanceName, nodeIdentifier string, opts ApplyOptions) // Scenario 2: IP is changing - node is at currentIP, moving to targetIP deployIP = node.CurrentIP maintenanceMode = true - } else if node.Maintenance { - // Scenario 3: Explicit maintenance mode, no IP change + } else if node.Maintenance || !node.Applied { + // Scenario 3: Explicit maintenance mode OR never been applied (fresh node) + // Fresh nodes need --insecure because they have self-signed certificates deployIP = node.TargetIP maintenanceMode = true } else { - // Scenario 1: Production node at target IP + // Scenario 1: Production node at target IP (already applied, not in maintenance) deployIP = node.TargetIP maintenanceMode = false } diff --git a/internal/operations/operations.go b/internal/operations/operations.go index 93496f8..8db6a31 100644 --- a/internal/operations/operations.go +++ b/internal/operations/operations.go @@ -11,6 +11,9 @@ import ( "github.com/wild-cloud/wild-central/daemon/internal/tools" ) +// Bootstrap step constants +const totalBootstrapSteps = 7 + // Manager handles async operation tracking type Manager struct { dataDir string @@ -23,18 +26,33 @@ func NewManager(dataDir string) *Manager { } } +// BootstrapProgress tracks detailed bootstrap progress +type BootstrapProgress struct { + CurrentStep int `json:"current_step"` // 0-6 + StepName string `json:"step_name"` + Attempt int `json:"attempt"` + MaxAttempts int `json:"max_attempts"` + StepDescription string `json:"step_description"` +} + +// OperationDetails contains operation-specific details +type OperationDetails struct { + BootstrapProgress *BootstrapProgress `json:"bootstrap,omitempty"` +} + // Operation represents a long-running operation type Operation struct { - ID string `json:"id"` - Type string `json:"type"` // discover, setup, download, bootstrap - Target string `json:"target"` - Instance string `json:"instance"` - Status string `json:"status"` // pending, running, completed, failed, cancelled - Message string `json:"message,omitempty"` - Progress int `json:"progress"` // 0-100 - LogFile string `json:"logFile,omitempty"` // Path to output log file - StartedAt time.Time `json:"started_at"` - EndedAt time.Time `json:"ended_at,omitempty"` + ID string `json:"id"` + Type string `json:"type"` // discover, setup, download, bootstrap + Target string `json:"target"` + Instance string `json:"instance"` + Status string `json:"status"` // pending, running, completed, failed, cancelled + Message string `json:"message,omitempty"` + Progress int `json:"progress"` // 0-100 + Details *OperationDetails `json:"details,omitempty"` // Operation-specific details + LogFile string `json:"logFile,omitempty"` // Path to output log file + StartedAt time.Time `json:"started_at"` + EndedAt time.Time `json:"ended_at,omitempty"` } // GetOperationsDir returns the operations directory for an instance @@ -79,19 +97,6 @@ func (m *Manager) Start(instanceName, opType, target string) (string, error) { return opID, nil } -// Get returns operation status -func (m *Manager) Get(opID string) (*Operation, error) { - // Operation ID contains instance name, but we need to find it - // For now, we'll scan all instances (not ideal but simple) - // Better approach: encode instance in operation ID or maintain index - - // Simplified: assume operation ID format is op_{type}_{target}_{timestamp} - // We need to know which instance to look in - // For now, return error if we can't find it - - // This needs improvement in actual implementation - return nil, fmt.Errorf("operation lookup not implemented - need instance context") -} // GetByInstance returns an operation for a specific instance func (m *Manager) GetByInstance(instanceName, opID string) (*Operation, error) { @@ -238,6 +243,31 @@ func (m *Manager) Cleanup(instanceName string, olderThan time.Duration) error { return nil } +// UpdateBootstrapProgress updates bootstrap-specific progress details +func (m *Manager) UpdateBootstrapProgress(instanceName, opID string, step int, stepName string, attempt, maxAttempts int, stepDescription string) error { + op, err := m.GetByInstance(instanceName, opID) + if err != nil { + return err + } + + if op.Details == nil { + op.Details = &OperationDetails{} + } + + op.Details.BootstrapProgress = &BootstrapProgress{ + CurrentStep: step, + StepName: stepName, + Attempt: attempt, + MaxAttempts: maxAttempts, + StepDescription: stepDescription, + } + + op.Progress = (step * 100) / (totalBootstrapSteps - 1) + op.Message = fmt.Sprintf("Step %d/%d: %s (attempt %d/%d)", step+1, totalBootstrapSteps, stepName, attempt, maxAttempts) + + return m.writeOperation(op) +} + // writeOperation writes operation to disk func (m *Manager) writeOperation(op *Operation) error { opsDir := m.GetOperationsDir(op.Instance) diff --git a/internal/tools/talosctl.go b/internal/tools/talosctl.go index 3db46f7..d2e2c0e 100644 --- a/internal/tools/talosctl.go +++ b/internal/tools/talosctl.go @@ -1,10 +1,12 @@ package tools import ( + "context" "encoding/json" "fmt" "os/exec" "strings" + "time" ) // Talosctl provides a thin wrapper around the talosctl command-line tool @@ -92,8 +94,11 @@ func (t *Talosctl) GetDisks(nodeIP string, insecure bool) ([]DiskInfo, error) { args = append(args, "--insecure") } + // Build args with talosconfig if available + finalArgs := t.buildArgs(args) + // Use jq to slurp the NDJSON into an array (like v.PoC does with jq -s) - talosCmd := exec.Command("talosctl", args...) + talosCmd := exec.Command("talosctl", finalArgs...) jqCmd := exec.Command("jq", "-s", ".") // Pipe talosctl output to jq @@ -171,8 +176,11 @@ func (t *Talosctl) getResourceJSON(resourceType, nodeIP string, insecure bool) ( args = append(args, "--insecure") } + // Build args with talosconfig if available + finalArgs := t.buildArgs(args) + // Use jq to slurp the NDJSON into an array - talosCmd := exec.Command("talosctl", args...) + talosCmd := exec.Command("talosctl", finalArgs...) jqCmd := exec.Command("jq", "-s", ".") // Pipe talosctl output to jq @@ -280,20 +288,45 @@ func (t *Talosctl) GetPhysicalInterface(nodeIP string, insecure bool) (string, e // GetVersion gets Talos version from a node func (t *Talosctl) GetVersion(nodeIP string, insecure bool) (string, error) { - args := t.buildArgs([]string{ - "version", - "--nodes", nodeIP, - "--short", - }) + var args []string + // When using insecure mode (for maintenance mode nodes), don't use talosconfig + // Insecure mode is for unconfigured nodes that don't have authentication set up if insecure { - args = append(args, "--insecure") + args = []string{ + "version", + "--nodes", nodeIP, + "--short", + "--insecure", + } + } else { + // For configured nodes, use talosconfig if available + args = t.buildArgs([]string{ + "version", + "--nodes", nodeIP, + "--short", + }) } - cmd := exec.Command("talosctl", args...) + // Use context with timeout to prevent hanging on unreachable nodes + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + cmd := exec.CommandContext(ctx, "talosctl", args...) output, err := cmd.CombinedOutput() + outputStr := string(output) + + // Special case: In maintenance mode, talosctl version returns an error + // "API is not implemented in maintenance mode" but this means the node IS reachable + // and IS in maintenance mode, so we treat this as a success + if err != nil && strings.Contains(outputStr, "API is not implemented in maintenance mode") { + // Extract client version from output as the node version + // Since we can't get server version in maintenance mode + return "maintenance", nil + } + if err != nil { - return "", fmt.Errorf("talosctl version failed: %w\nOutput: %s", err, string(output)) + return "", fmt.Errorf("talosctl version failed: %w\nOutput: %s", err, outputStr) } // Parse output to extract server version