feat(api): Enhance NodeDiscover with subnet auto-detection and discovery cancellation

- Updated NodeDiscover to accept an optional subnet parameter, with auto-detection of local networks if none is provided.
- Removed support for IP list format in NodeDiscover request body.
- Implemented discovery cancellation functionality with NodeDiscoveryCancel endpoint.
- Improved error handling and response messages for better clarity.

feat(cluster): Add operation tracking for cluster bootstrap process

- Integrated operations manager into cluster manager for tracking bootstrap progress.
- Refactored Bootstrap method to run asynchronously with detailed progress updates.
- Added methods to wait for various bootstrap steps (etcd health, VIP assignment, control plane readiness, etc.).

fix(discovery): Optimize node discovery process and improve maintenance mode detection

- Enhanced node discovery to run in parallel with a semaphore to limit concurrent scans.
- Updated probeNode to detect maintenance mode more reliably.
- Added functions to expand CIDR notation into individual IP addresses and retrieve local network interfaces.

refactor(node): Update node manager to handle instance-specific configurations

- Modified NewManager to accept instanceName for tailored talosconfig usage.
- Improved hardware detection logic to handle maintenance mode scenarios.

feat(operations): Implement detailed bootstrap progress tracking

- Introduced BootstrapProgress struct to track and report the status of bootstrap operations.
- Updated operation management to include bootstrap-specific details.

fix(tools): Improve talosctl command execution with context and error handling

- Added context with timeout to talosctl commands to prevent hanging on unreachable nodes.
- Enhanced error handling for version retrieval in maintenance mode.
This commit is contained in:
2025-11-04 17:16:16 +00:00
parent 005dc30aa5
commit 7cd434aabf
9 changed files with 623 additions and 148 deletions

View File

@@ -38,6 +38,7 @@
- Write unit tests for all functions and methods.
- Make and use common modules. For example, one module should handle all interactions with talosctl. Another modules should handle all interactions with kubectl.
- If the code is getting long and complex, break it into smaller modules.
- API requests and responses should be valid JSON. Object attributes should be standard JSON camel-cased.
### Features

View File

@@ -30,6 +30,7 @@ type API struct {
context *context.Manager
instance *instance.Manager
dnsmasq *dnsmasq.ConfigGenerator
opsMgr *operations.Manager // Operations manager
broadcaster *operations.Broadcaster // SSE broadcaster for operation output
}
@@ -57,6 +58,7 @@ func NewAPI(dataDir, appsDir string) (*API, error) {
context: context.NewManager(dataDir),
instance: instance.NewManager(dataDir),
dnsmasq: dnsmasq.NewConfigGenerator(dnsmasqConfigPath),
opsMgr: operations.NewManager(dataDir),
broadcaster: operations.NewBroadcaster(),
}, nil
}
@@ -85,6 +87,7 @@ func (api *API) RegisterRoutes(r *mux.Router) {
r.HandleFunc("/api/v1/instances/{name}/nodes/discover", api.NodeDiscover).Methods("POST")
r.HandleFunc("/api/v1/instances/{name}/nodes/detect", api.NodeDetect).Methods("POST")
r.HandleFunc("/api/v1/instances/{name}/discovery", api.NodeDiscoveryStatus).Methods("GET")
r.HandleFunc("/api/v1/instances/{name}/discovery/cancel", api.NodeDiscoveryCancel).Methods("POST")
r.HandleFunc("/api/v1/instances/{name}/nodes/hardware/{ip}", api.NodeHardware).Methods("GET")
r.HandleFunc("/api/v1/instances/{name}/nodes/fetch-templates", api.NodeFetchTemplates).Methods("POST")
r.HandleFunc("/api/v1/instances/{name}/nodes", api.NodeAdd).Methods("POST")

View File

@@ -46,15 +46,15 @@ func (api *API) ClusterGenerateConfig(w http.ResponseWriter, r *http.Request) {
}
// Create cluster config
config := cluster.ClusterConfig{
clusterConfig := cluster.ClusterConfig{
ClusterName: clusterName,
VIP: vip,
Version: version,
}
// Generate configuration
clusterMgr := cluster.NewManager(api.dataDir)
if err := clusterMgr.GenerateConfig(instanceName, &config); err != nil {
clusterMgr := cluster.NewManager(api.dataDir, api.opsMgr)
if err := clusterMgr.GenerateConfig(instanceName, &clusterConfig); err != nil {
respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to generate config: %v", err))
return
}
@@ -90,26 +90,14 @@ func (api *API) ClusterBootstrap(w http.ResponseWriter, r *http.Request) {
return
}
// Start bootstrap operation
opsMgr := operations.NewManager(api.dataDir)
opID, err := opsMgr.Start(instanceName, "bootstrap", req.Node)
// Bootstrap with progress tracking
clusterMgr := cluster.NewManager(api.dataDir, api.opsMgr)
opID, err := clusterMgr.Bootstrap(instanceName, req.Node)
if err != nil {
respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to start operation: %v", err))
respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to start bootstrap: %v", err))
return
}
// Bootstrap in background
go func() {
clusterMgr := cluster.NewManager(api.dataDir)
_ = opsMgr.UpdateStatus(instanceName, opID, "running")
if err := clusterMgr.Bootstrap(instanceName, req.Node); err != nil {
_ = opsMgr.Update(instanceName, opID, "failed", err.Error(), 0)
} else {
_ = opsMgr.Update(instanceName, opID, "completed", "Bootstrap completed", 100)
}
}()
respondJSON(w, http.StatusAccepted, map[string]string{
"operation_id": opID,
"message": "Bootstrap initiated",
@@ -138,7 +126,7 @@ func (api *API) ClusterConfigureEndpoints(w http.ResponseWriter, r *http.Request
}
// Configure endpoints
clusterMgr := cluster.NewManager(api.dataDir)
clusterMgr := cluster.NewManager(api.dataDir, api.opsMgr)
if err := clusterMgr.ConfigureEndpoints(instanceName, req.IncludeNodes); err != nil {
respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to configure endpoints: %v", err))
return
@@ -161,7 +149,7 @@ func (api *API) ClusterGetStatus(w http.ResponseWriter, r *http.Request) {
}
// Get status
clusterMgr := cluster.NewManager(api.dataDir)
clusterMgr := cluster.NewManager(api.dataDir, api.opsMgr)
status, err := clusterMgr.GetStatus(instanceName)
if err != nil {
respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to get status: %v", err))
@@ -183,7 +171,7 @@ func (api *API) ClusterHealth(w http.ResponseWriter, r *http.Request) {
}
// Get health checks
clusterMgr := cluster.NewManager(api.dataDir)
clusterMgr := cluster.NewManager(api.dataDir, api.opsMgr)
checks, err := clusterMgr.Health(instanceName)
if err != nil {
respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to get health: %v", err))
@@ -219,7 +207,7 @@ func (api *API) ClusterGetKubeconfig(w http.ResponseWriter, r *http.Request) {
}
// Get kubeconfig
clusterMgr := cluster.NewManager(api.dataDir)
clusterMgr := cluster.NewManager(api.dataDir, api.opsMgr)
kubeconfig, err := clusterMgr.GetKubeconfig(instanceName)
if err != nil {
respondError(w, http.StatusNotFound, fmt.Sprintf("Kubeconfig not found: %v", err))
@@ -243,7 +231,7 @@ func (api *API) ClusterGenerateKubeconfig(w http.ResponseWriter, r *http.Request
}
// Regenerate kubeconfig from cluster
clusterMgr := cluster.NewManager(api.dataDir)
clusterMgr := cluster.NewManager(api.dataDir, api.opsMgr)
if err := clusterMgr.RegenerateKubeconfig(instanceName); err != nil {
respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to generate kubeconfig: %v", err))
return
@@ -266,7 +254,7 @@ func (api *API) ClusterGetTalosconfig(w http.ResponseWriter, r *http.Request) {
}
// Get talosconfig
clusterMgr := cluster.NewManager(api.dataDir)
clusterMgr := cluster.NewManager(api.dataDir, api.opsMgr)
talosconfig, err := clusterMgr.GetTalosconfig(instanceName)
if err != nil {
respondError(w, http.StatusNotFound, fmt.Sprintf("Talosconfig not found: %v", err))
@@ -314,7 +302,7 @@ func (api *API) ClusterReset(w http.ResponseWriter, r *http.Request) {
// Reset in background
go func() {
clusterMgr := cluster.NewManager(api.dataDir)
clusterMgr := cluster.NewManager(api.dataDir, api.opsMgr)
_ = opsMgr.UpdateStatus(instanceName, opID, "running")
if err := clusterMgr.Reset(instanceName, req.Confirm); err != nil {

View File

@@ -12,6 +12,7 @@ import (
)
// NodeDiscover initiates node discovery
// Accepts optional subnet parameter. If no subnet provided, auto-detects local networks.
func (api *API) NodeDiscover(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
instanceName := vars["name"]
@@ -22,10 +23,9 @@ func (api *API) NodeDiscover(w http.ResponseWriter, r *http.Request) {
return
}
// Parse request body - support both subnet and ip_list formats
// Parse request body - only subnet is supported
var req struct {
Subnet string `json:"subnet"`
IPList []string `json:"ip_list"`
Subnet string `json:"subnet,omitempty"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
@@ -33,16 +33,38 @@ func (api *API) NodeDiscover(w http.ResponseWriter, r *http.Request) {
return
}
// If subnet provided, use it as a single "IP" for discovery
// The discovery manager will scan this subnet
// Build IP list
var ipList []string
var err error
if req.Subnet != "" {
ipList = []string{req.Subnet}
} else if len(req.IPList) > 0 {
ipList = req.IPList
// Expand provided CIDR notation to individual IPs
ipList, err = discovery.ExpandSubnet(req.Subnet)
if err != nil {
respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid subnet: %v", err))
return
}
} else {
respondError(w, http.StatusBadRequest, "subnet or ip_list is required")
return
// Auto-detect: Get local networks when no subnet provided
networks, err := discovery.GetLocalNetworks()
if err != nil {
respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to detect local networks: %v", err))
return
}
if len(networks) == 0 {
respondError(w, http.StatusNotFound, "No local networks found")
return
}
// Expand all detected networks
for _, network := range networks {
ips, err := discovery.ExpandSubnet(network)
if err != nil {
continue // Skip invalid networks
}
ipList = append(ipList, ips...)
}
}
// Start discovery
@@ -52,9 +74,10 @@ func (api *API) NodeDiscover(w http.ResponseWriter, r *http.Request) {
return
}
respondJSON(w, http.StatusAccepted, map[string]string{
"message": "Discovery started",
"status": "running",
respondJSON(w, http.StatusAccepted, map[string]interface{}{
"message": "Discovery started",
"status": "running",
"ips_to_scan": len(ipList),
})
}
@@ -92,7 +115,7 @@ func (api *API) NodeHardware(w http.ResponseWriter, r *http.Request) {
}
// Detect hardware
nodeMgr := node.NewManager(api.dataDir)
nodeMgr := node.NewManager(api.dataDir, instanceName)
hwInfo, err := nodeMgr.DetectHardware(nodeIP)
if err != nil {
respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to detect hardware: %v", err))
@@ -103,6 +126,7 @@ func (api *API) NodeHardware(w http.ResponseWriter, r *http.Request) {
}
// NodeDetect detects hardware on a single node (POST with IP in body)
// IP address is required.
func (api *API) NodeDetect(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
instanceName := vars["name"]
@@ -123,13 +147,14 @@ func (api *API) NodeDetect(w http.ResponseWriter, r *http.Request) {
return
}
// Validate IP is provided
if req.IP == "" {
respondError(w, http.StatusBadRequest, "ip is required")
respondError(w, http.StatusBadRequest, "IP address is required")
return
}
// Detect hardware
nodeMgr := node.NewManager(api.dataDir)
// Detect hardware for specific IP
nodeMgr := node.NewManager(api.dataDir, instanceName)
hwInfo, err := nodeMgr.DetectHardware(req.IP)
if err != nil {
respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to detect hardware: %v", err))
@@ -158,7 +183,7 @@ func (api *API) NodeAdd(w http.ResponseWriter, r *http.Request) {
}
// Add node
nodeMgr := node.NewManager(api.dataDir)
nodeMgr := node.NewManager(api.dataDir, instanceName)
if err := nodeMgr.Add(instanceName, &nodeData); err != nil {
respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to add node: %v", err))
return
@@ -182,7 +207,7 @@ func (api *API) NodeList(w http.ResponseWriter, r *http.Request) {
}
// List nodes
nodeMgr := node.NewManager(api.dataDir)
nodeMgr := node.NewManager(api.dataDir, instanceName)
nodes, err := nodeMgr.List(instanceName)
if err != nil {
respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to list nodes: %v", err))
@@ -207,7 +232,7 @@ func (api *API) NodeGet(w http.ResponseWriter, r *http.Request) {
}
// Get node
nodeMgr := node.NewManager(api.dataDir)
nodeMgr := node.NewManager(api.dataDir, instanceName)
nodeData, err := nodeMgr.Get(instanceName, nodeIdentifier)
if err != nil {
respondError(w, http.StatusNotFound, fmt.Sprintf("Node not found: %v", err))
@@ -233,7 +258,7 @@ func (api *API) NodeApply(w http.ResponseWriter, r *http.Request) {
opts := node.ApplyOptions{}
// Apply node configuration
nodeMgr := node.NewManager(api.dataDir)
nodeMgr := node.NewManager(api.dataDir, instanceName)
if err := nodeMgr.Apply(instanceName, nodeIdentifier, opts); err != nil {
respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to apply node configuration: %v", err))
return
@@ -265,7 +290,7 @@ func (api *API) NodeUpdate(w http.ResponseWriter, r *http.Request) {
}
// Update node
nodeMgr := node.NewManager(api.dataDir)
nodeMgr := node.NewManager(api.dataDir, instanceName)
if err := nodeMgr.Update(instanceName, nodeIdentifier, updates); err != nil {
respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to update node: %v", err))
return
@@ -289,7 +314,7 @@ func (api *API) NodeFetchTemplates(w http.ResponseWriter, r *http.Request) {
}
// Fetch templates
nodeMgr := node.NewManager(api.dataDir)
nodeMgr := node.NewManager(api.dataDir, instanceName)
if err := nodeMgr.FetchTemplates(instanceName); err != nil {
respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to fetch templates: %v", err))
return
@@ -313,7 +338,7 @@ func (api *API) NodeDelete(w http.ResponseWriter, r *http.Request) {
}
// Delete node
nodeMgr := node.NewManager(api.dataDir)
nodeMgr := node.NewManager(api.dataDir, instanceName)
if err := nodeMgr.Delete(instanceName, nodeIdentifier); err != nil {
respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to delete node: %v", err))
return
@@ -323,3 +348,26 @@ func (api *API) NodeDelete(w http.ResponseWriter, r *http.Request) {
"message": "Node deleted successfully",
})
}
// NodeDiscoveryCancel cancels an in-progress discovery operation
func (api *API) NodeDiscoveryCancel(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
instanceName := vars["name"]
// Validate instance exists
if err := api.instance.ValidateInstance(instanceName); err != nil {
respondError(w, http.StatusNotFound, fmt.Sprintf("Instance not found: %v", err))
return
}
// Cancel discovery
discoveryMgr := discovery.NewManager(api.dataDir, instanceName)
if err := discoveryMgr.CancelDiscovery(instanceName); err != nil {
respondError(w, http.StatusBadRequest, fmt.Sprintf("Failed to cancel discovery: %v", err))
return
}
respondJSON(w, http.StatusOK, map[string]string{
"message": "Discovery cancelled successfully",
})
}

View File

@@ -1,6 +1,7 @@
package cluster
import (
"context"
"encoding/json"
"fmt"
"log"
@@ -10,6 +11,7 @@ import (
"strings"
"time"
"github.com/wild-cloud/wild-central/daemon/internal/operations"
"github.com/wild-cloud/wild-central/daemon/internal/storage"
"github.com/wild-cloud/wild-central/daemon/internal/tools"
)
@@ -18,13 +20,15 @@ import (
type Manager struct {
dataDir string
talosctl *tools.Talosctl
opsMgr *operations.Manager
}
// NewManager creates a new cluster manager
func NewManager(dataDir string) *Manager {
func NewManager(dataDir string, opsMgr *operations.Manager) *Manager {
return &Manager{
dataDir: dataDir,
talosctl: tools.NewTalosctl(),
opsMgr: opsMgr,
}
}
@@ -96,11 +100,28 @@ func (m *Manager) GenerateConfig(instanceName string, config *ClusterConfig) err
return nil
}
// Bootstrap bootstraps the cluster on the specified node
func (m *Manager) Bootstrap(instanceName, nodeName string) error {
// Get node configuration to find the target IP
configPath := tools.GetInstanceConfigPath(m.dataDir, instanceName)
// Bootstrap bootstraps the cluster on the specified node with progress tracking
func (m *Manager) Bootstrap(instanceName, nodeName string) (string, error) {
// Create operation for tracking
opID, err := m.opsMgr.Start(instanceName, "bootstrap", nodeName)
if err != nil {
return "", fmt.Errorf("failed to start bootstrap operation: %w", err)
}
// Run bootstrap asynchronously
go func() {
if err := m.runBootstrapWithTracking(instanceName, nodeName, opID); err != nil {
_ = m.opsMgr.Update(instanceName, opID, "failed", err.Error(), 0)
}
}()
return opID, nil
}
// runBootstrapWithTracking runs the bootstrap process with detailed progress tracking
func (m *Manager) runBootstrapWithTracking(instanceName, nodeName, opID string) error {
ctx := context.Background()
configPath := tools.GetInstanceConfigPath(m.dataDir, instanceName)
yq := tools.NewYQ()
// Get node's target IP
@@ -114,17 +135,71 @@ func (m *Manager) Bootstrap(instanceName, nodeName string) error {
return fmt.Errorf("node %s does not have a target IP configured", nodeName)
}
// Get talosconfig path for this instance
// Get VIP
vipRaw, err := yq.Get(configPath, ".cluster.nodes.control.vip")
if err != nil {
return fmt.Errorf("failed to get VIP: %w", err)
}
vip := tools.CleanYQOutput(vipRaw)
if vip == "" || vip == "null" {
return fmt.Errorf("control plane VIP not configured")
}
// Step 0: Run talosctl bootstrap
if err := m.runBootstrapCommand(instanceName, nodeIP, opID); err != nil {
return err
}
// Step 1: Wait for etcd health
if err := m.waitForEtcd(ctx, instanceName, nodeIP, opID); err != nil {
return err
}
// Step 2: Wait for VIP assignment
if err := m.waitForVIP(ctx, instanceName, nodeIP, vip, opID); err != nil {
return err
}
// Step 3: Wait for control plane components
if err := m.waitForControlPlane(ctx, instanceName, nodeIP, opID); err != nil {
return err
}
// Step 4: Wait for API server on VIP
if err := m.waitForAPIServer(ctx, instanceName, vip, opID); err != nil {
return err
}
// Step 5: Configure cluster access
if err := m.configureClusterAccess(instanceName, vip, opID); err != nil {
return err
}
// Step 6: Verify node registration
if err := m.waitForNodeRegistration(ctx, instanceName, opID); err != nil {
return err
}
// Mark as completed
_ = m.opsMgr.Update(instanceName, opID, "completed", "Bootstrap completed successfully", 100)
return nil
}
// runBootstrapCommand executes the initial bootstrap command
func (m *Manager) runBootstrapCommand(instanceName, nodeIP, opID string) error {
_ = m.opsMgr.UpdateBootstrapProgress(instanceName, opID, 0, "bootstrap", 1, 1, "Running talosctl bootstrap command")
talosconfigPath := tools.GetTalosconfigPath(m.dataDir, instanceName)
// Set talosctl endpoint (with proper context via TALOSCONFIG env var)
// Set talosctl endpoint
cmdEndpoint := exec.Command("talosctl", "config", "endpoint", nodeIP)
tools.WithTalosconfig(cmdEndpoint, talosconfigPath)
if output, err := cmdEndpoint.CombinedOutput(); err != nil {
return fmt.Errorf("failed to set talosctl endpoint: %w\nOutput: %s", err, string(output))
}
// Bootstrap command (with proper context via TALOSCONFIG env var)
// Bootstrap command
cmd := exec.Command("talosctl", "bootstrap", "--nodes", nodeIP)
tools.WithTalosconfig(cmd, talosconfigPath)
output, err := cmd.CombinedOutput()
@@ -132,16 +207,152 @@ func (m *Manager) Bootstrap(instanceName, nodeName string) error {
return fmt.Errorf("failed to bootstrap cluster: %w\nOutput: %s", err, string(output))
}
// Retrieve kubeconfig after bootstrap (best-effort with retry)
log.Printf("Waiting for Kubernetes API server to become ready...")
if err := m.retrieveKubeconfigFromCluster(instanceName, nodeIP, 5*time.Minute); err != nil {
log.Printf("Warning: %v", err)
log.Printf("You can retrieve it manually later using: wild cluster kubeconfig --generate")
return nil
}
// waitForEtcd waits for etcd to become healthy
func (m *Manager) waitForEtcd(ctx context.Context, instanceName, nodeIP, opID string) error {
maxAttempts := 30
talosconfigPath := tools.GetTalosconfigPath(m.dataDir, instanceName)
for attempt := 1; attempt <= maxAttempts; attempt++ {
_ = m.opsMgr.UpdateBootstrapProgress(instanceName, opID, 1, "etcd", attempt, maxAttempts, "Waiting for etcd to become healthy")
cmd := exec.Command("talosctl", "-n", nodeIP, "etcd", "status")
tools.WithTalosconfig(cmd, talosconfigPath)
output, err := cmd.CombinedOutput()
if err == nil && strings.Contains(string(output), nodeIP) {
return nil
}
if attempt < maxAttempts {
time.Sleep(10 * time.Second)
}
}
return fmt.Errorf("etcd did not become healthy after %d attempts", maxAttempts)
}
// waitForVIP waits for VIP to be assigned to the node
func (m *Manager) waitForVIP(ctx context.Context, instanceName, nodeIP, vip, opID string) error {
maxAttempts := 90
talosconfigPath := tools.GetTalosconfigPath(m.dataDir, instanceName)
for attempt := 1; attempt <= maxAttempts; attempt++ {
_ = m.opsMgr.UpdateBootstrapProgress(instanceName, opID, 2, "vip", attempt, maxAttempts, "Waiting for VIP assignment")
cmd := exec.Command("talosctl", "-n", nodeIP, "get", "addresses")
tools.WithTalosconfig(cmd, talosconfigPath)
output, err := cmd.CombinedOutput()
if err == nil && strings.Contains(string(output), vip+"/32") {
return nil
}
if attempt < maxAttempts {
time.Sleep(10 * time.Second)
}
}
return fmt.Errorf("VIP was not assigned after %d attempts", maxAttempts)
}
// waitForControlPlane waits for control plane components to start
func (m *Manager) waitForControlPlane(ctx context.Context, instanceName, nodeIP, opID string) error {
maxAttempts := 60
talosconfigPath := tools.GetTalosconfigPath(m.dataDir, instanceName)
for attempt := 1; attempt <= maxAttempts; attempt++ {
_ = m.opsMgr.UpdateBootstrapProgress(instanceName, opID, 3, "controlplane", attempt, maxAttempts, "Waiting for control plane components")
cmd := exec.Command("talosctl", "-n", nodeIP, "containers", "-k")
tools.WithTalosconfig(cmd, talosconfigPath)
output, err := cmd.CombinedOutput()
if err == nil && strings.Contains(string(output), "kube-") {
return nil
}
if attempt < maxAttempts {
time.Sleep(10 * time.Second)
}
}
return fmt.Errorf("control plane components did not start after %d attempts", maxAttempts)
}
// waitForAPIServer waits for Kubernetes API server to respond
func (m *Manager) waitForAPIServer(ctx context.Context, instanceName, vip, opID string) error {
maxAttempts := 60
apiURL := fmt.Sprintf("https://%s:6443/healthz", vip)
for attempt := 1; attempt <= maxAttempts; attempt++ {
_ = m.opsMgr.UpdateBootstrapProgress(instanceName, opID, 4, "apiserver", attempt, maxAttempts, "Waiting for Kubernetes API server")
cmd := exec.Command("curl", "-k", "-s", "--max-time", "5", apiURL)
output, err := cmd.CombinedOutput()
if err == nil && strings.Contains(string(output), "ok") {
return nil
}
if attempt < maxAttempts {
time.Sleep(10 * time.Second)
}
}
return fmt.Errorf("API server did not respond after %d attempts", maxAttempts)
}
// configureClusterAccess configures talosctl and kubectl to use the VIP
func (m *Manager) configureClusterAccess(instanceName, vip, opID string) error {
_ = m.opsMgr.UpdateBootstrapProgress(instanceName, opID, 5, "configure", 1, 1, "Configuring cluster access")
talosconfigPath := tools.GetTalosconfigPath(m.dataDir, instanceName)
kubeconfigPath := tools.GetKubeconfigPath(m.dataDir, instanceName)
// Set talosctl endpoint to VIP
cmdEndpoint := exec.Command("talosctl", "config", "endpoint", vip)
tools.WithTalosconfig(cmdEndpoint, talosconfigPath)
if output, err := cmdEndpoint.CombinedOutput(); err != nil {
return fmt.Errorf("failed to set talosctl endpoint: %w\nOutput: %s", err, string(output))
}
// Retrieve kubeconfig
cmdKubeconfig := exec.Command("talosctl", "kubeconfig", "--nodes", vip, kubeconfigPath)
tools.WithTalosconfig(cmdKubeconfig, talosconfigPath)
if output, err := cmdKubeconfig.CombinedOutput(); err != nil {
return fmt.Errorf("failed to retrieve kubeconfig: %w\nOutput: %s", err, string(output))
}
return nil
}
// waitForNodeRegistration waits for the node to register with Kubernetes
func (m *Manager) waitForNodeRegistration(ctx context.Context, instanceName, opID string) error {
maxAttempts := 10
kubeconfigPath := tools.GetKubeconfigPath(m.dataDir, instanceName)
for attempt := 1; attempt <= maxAttempts; attempt++ {
_ = m.opsMgr.UpdateBootstrapProgress(instanceName, opID, 6, "nodes", attempt, maxAttempts, "Waiting for node registration")
cmd := exec.Command("kubectl", "get", "nodes")
tools.WithKubeconfig(cmd, kubeconfigPath)
output, err := cmd.CombinedOutput()
if err == nil && strings.Contains(string(output), "Ready") {
return nil
}
if attempt < maxAttempts {
time.Sleep(10 * time.Second)
}
}
return fmt.Errorf("node did not register after %d attempts", maxAttempts)
}
// retrieveKubeconfigFromCluster retrieves kubeconfig from the cluster with retry logic
func (m *Manager) retrieveKubeconfigFromCluster(instanceName, nodeIP string, timeout time.Duration) error {
kubeconfigPath := tools.GetKubeconfigPath(m.dataDir, instanceName)

View File

@@ -3,6 +3,7 @@ package discovery
import (
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"sync"
@@ -28,19 +29,17 @@ func NewManager(dataDir string, instanceName string) *Manager {
return &Manager{
dataDir: dataDir,
nodeMgr: node.NewManager(dataDir),
nodeMgr: node.NewManager(dataDir, instanceName),
talosctl: tools.NewTalosconfigWithConfig(talosconfigPath),
}
}
// DiscoveredNode represents a discovered node on the network
// DiscoveredNode represents a discovered node on the network (maintenance mode only)
type DiscoveredNode struct {
IP string `json:"ip"`
Hostname string `json:"hostname,omitempty"`
MaintenanceMode bool `json:"maintenance_mode"`
Version string `json:"version,omitempty"`
Interface string `json:"interface,omitempty"`
Disks []string `json:"disks,omitempty"`
IP string `json:"ip"`
Hostname string `json:"hostname,omitempty"`
MaintenanceMode bool `json:"maintenance_mode"`
Version string `json:"version,omitempty"`
}
// DiscoveryStatus represents the current state of discovery
@@ -130,17 +129,42 @@ func (m *Manager) runDiscovery(instanceName string, ipList []string) {
_ = m.writeDiscoveryStatus(instanceName, status)
}()
// Discover nodes by probing each IP
discoveredNodes := []DiscoveredNode{}
// Discover nodes by probing each IP in parallel
var wg sync.WaitGroup
resultsChan := make(chan DiscoveredNode, len(ipList))
// Limit concurrent scans to avoid overwhelming the network
semaphore := make(chan struct{}, 50)
for _, ip := range ipList {
node, err := m.probeNode(ip)
if err != nil {
// Node not reachable or not a Talos node
continue
}
wg.Add(1)
go func(ip string) {
defer wg.Done()
discoveredNodes = append(discoveredNodes, *node)
// Acquire semaphore
semaphore <- struct{}{}
defer func() { <-semaphore }()
node, err := m.probeNode(ip)
if err != nil {
// Node not reachable or not a Talos node
return
}
resultsChan <- *node
}(ip)
}
// Close results channel when all goroutines complete
go func() {
wg.Wait()
close(resultsChan)
}()
// Collect results and update status incrementally
discoveredNodes := []DiscoveredNode{}
for node := range resultsChan {
discoveredNodes = append(discoveredNodes, node)
// Update status incrementally
m.discoveryMu.Lock()
@@ -151,37 +175,20 @@ func (m *Manager) runDiscovery(instanceName string, ipList []string) {
}
}
// probeNode attempts to detect if a node is running Talos
// probeNode attempts to detect if a node is running Talos in maintenance mode
func (m *Manager) probeNode(ip string) (*DiscoveredNode, error) {
// Attempt to get version (quick connectivity test)
version, err := m.talosctl.GetVersion(ip, false)
// Try insecure connection first (maintenance mode)
version, err := m.talosctl.GetVersion(ip, true)
if err != nil {
// Not in maintenance mode or not reachable
return nil, err
}
// Node is reachable, get hardware info
hwInfo, err := m.nodeMgr.DetectHardware(ip)
if err != nil {
// Still count it as discovered even if we can't get full hardware
return &DiscoveredNode{
IP: ip,
MaintenanceMode: false,
Version: version,
}, nil
}
// Extract just the disk paths for discovery output
diskPaths := make([]string, len(hwInfo.Disks))
for i, disk := range hwInfo.Disks {
diskPaths[i] = disk.Path
}
// If insecure connection works, node is in maintenance mode
return &DiscoveredNode{
IP: ip,
MaintenanceMode: hwInfo.MaintenanceMode,
MaintenanceMode: true,
Version: version,
Interface: hwInfo.Interface,
Disks: diskPaths,
}, nil
}
@@ -245,3 +252,132 @@ func (m *Manager) writeDiscoveryStatus(instanceName string, status *DiscoverySta
return nil
}
// CancelDiscovery cancels an in-progress discovery operation
func (m *Manager) CancelDiscovery(instanceName string) error {
m.discoveryMu.Lock()
defer m.discoveryMu.Unlock()
// Get current status
status, err := m.GetDiscoveryStatus(instanceName)
if err != nil {
return err
}
if !status.Active {
return fmt.Errorf("no discovery in progress")
}
// Mark discovery as cancelled
status.Active = false
status.Error = "Discovery cancelled by user"
if err := m.writeDiscoveryStatus(instanceName, status); err != nil {
return err
}
return nil
}
// GetLocalNetworks discovers local network interfaces and returns their CIDR addresses
// Skips loopback, link-local, and down interfaces
// Only returns IPv4 networks
func GetLocalNetworks() ([]string, error) {
interfaces, err := net.Interfaces()
if err != nil {
return nil, fmt.Errorf("failed to get network interfaces: %w", err)
}
var networks []string
for _, iface := range interfaces {
// Skip loopback and down interfaces
if iface.Flags&net.FlagLoopback != 0 || iface.Flags&net.FlagUp == 0 {
continue
}
addrs, err := iface.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
ipnet, ok := addr.(*net.IPNet)
if !ok {
continue
}
// Only IPv4 for now
if ipnet.IP.To4() == nil {
continue
}
// Skip link-local addresses (169.254.0.0/16)
if ipnet.IP.IsLinkLocalUnicast() {
continue
}
networks = append(networks, ipnet.String())
}
}
return networks, nil
}
// ExpandSubnet expands a CIDR notation subnet into individual IP addresses
// Example: "192.168.8.0/24" → ["192.168.8.1", "192.168.8.2", ..., "192.168.8.254"]
// Also handles single IPs (without CIDR notation)
func ExpandSubnet(subnet string) ([]string, error) {
// Check if it's a CIDR notation
ip, ipnet, err := net.ParseCIDR(subnet)
if err != nil {
// Not a CIDR, might be single IP
if net.ParseIP(subnet) != nil {
return []string{subnet}, nil
}
return nil, fmt.Errorf("invalid IP or CIDR: %s", subnet)
}
// Special case: /32 (single host) - just return the IP
ones, _ := ipnet.Mask.Size()
if ones == 32 {
return []string{ip.String()}, nil
}
var ips []string
// Iterate through all IPs in the subnet
for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); incIP(ip) {
// Skip network address (first IP)
if ip.Equal(ipnet.IP) {
continue
}
// Skip broadcast address (last IP)
if isLastIP(ip, ipnet) {
continue
}
ips = append(ips, ip.String())
}
return ips, nil
}
// incIP increments an IP address
func incIP(ip net.IP) {
for j := len(ip) - 1; j >= 0; j-- {
ip[j]++
if ip[j] > 0 {
break
}
}
}
// isLastIP checks if an IP is the last IP in a subnet (broadcast address)
func isLastIP(ip net.IP, ipnet *net.IPNet) bool {
lastIP := make(net.IP, len(ip))
for i := range ip {
lastIP[i] = ip[i] | ^ipnet.Mask[i]
}
return ip.Equal(lastIP)
}

View File

@@ -20,11 +20,22 @@ type Manager struct {
}
// NewManager creates a new node manager
func NewManager(dataDir string) *Manager {
func NewManager(dataDir string, instanceName string) *Manager {
var talosctl *tools.Talosctl
// If instanceName is provided, use instance-specific talosconfig
// Otherwise, create basic talosctl (will use --insecure mode)
if instanceName != "" {
talosconfigPath := tools.GetTalosconfigPath(dataDir, instanceName)
talosctl = tools.NewTalosconfigWithConfig(talosconfigPath)
} else {
talosctl = tools.NewTalosctl()
}
return &Manager{
dataDir: dataDir,
configMgr: config.NewManager(),
talosctl: tools.NewTalosctl(),
talosctl: talosctl,
}
}
@@ -254,12 +265,14 @@ func (m *Manager) Delete(instanceName, nodeIdentifier string) error {
configPath := filepath.Join(instancePath, "config.yaml")
// Delete node from config.yaml
// Path: cluster.nodes.active.{hostname}
nodePath := fmt.Sprintf("cluster.nodes.active.%s", node.Hostname)
// Path: .cluster.nodes.active["hostname"]
// Use bracket notation to safely handle hostnames with special characters
nodePath := fmt.Sprintf(".cluster.nodes.active[\"%s\"]", node.Hostname)
yq := tools.NewYQ()
// Use yq to delete the node
_, err = yq.Exec("eval", "-i", fmt.Sprintf("del(%s)", nodePath), configPath)
delExpr := fmt.Sprintf("del(%s)", nodePath)
_, err = yq.Exec("eval", "-i", delExpr, configPath)
if err != nil {
return fmt.Errorf("failed to delete node: %w", err)
}
@@ -268,10 +281,20 @@ func (m *Manager) Delete(instanceName, nodeIdentifier string) error {
}
// DetectHardware queries node hardware information via talosctl
// Automatically detects maintenance mode by trying insecure first, then secure
func (m *Manager) DetectHardware(nodeIP string) (*HardwareInfo, error) {
// Query node with insecure flag (maintenance mode)
insecure := true
// Try insecure first (maintenance mode)
hwInfo, err := m.detectHardwareWithMode(nodeIP, true)
if err == nil {
return hwInfo, nil
}
// Fall back to secure (configured node)
return m.detectHardwareWithMode(nodeIP, false)
}
// detectHardwareWithMode queries node hardware with specified connection mode
func (m *Manager) detectHardwareWithMode(nodeIP string, insecure bool) (*HardwareInfo, error) {
// Try to get default interface (with default route)
iface, err := m.talosctl.GetDefaultInterface(nodeIP, insecure)
if err != nil {
@@ -299,10 +322,11 @@ func (m *Manager) DetectHardware(nodeIP string) (*HardwareInfo, error) {
Interface: iface,
Disks: disks,
SelectedDisk: selectedDisk,
MaintenanceMode: true,
MaintenanceMode: insecure, // If we used insecure, it's in maintenance mode
}, nil
}
// Apply generates configuration and applies it to node
// This follows the wild-node-apply flow:
// 1. Auto-fetch templates if missing
@@ -380,9 +404,9 @@ func (m *Manager) Apply(instanceName, nodeIdentifier string, opts ApplyOptions)
// Determine which IP to use and whether node is in maintenance mode
//
// Three scenarios:
// 1. Production node (currentIP empty/same, maintenance=false): use targetIP, no --insecure
// 1. Production node (already applied, maintenance=false): use targetIP, no --insecure
// 2. IP changing (currentIP != targetIP): use currentIP, --insecure (always maintenance)
// 3. Maintenance at target (maintenance=true, no IP change): use targetIP, --insecure
// 3. Fresh/maintenance node (never applied OR maintenance=true): use targetIP, --insecure
var deployIP string
var maintenanceMode bool
@@ -390,12 +414,13 @@ func (m *Manager) Apply(instanceName, nodeIdentifier string, opts ApplyOptions)
// Scenario 2: IP is changing - node is at currentIP, moving to targetIP
deployIP = node.CurrentIP
maintenanceMode = true
} else if node.Maintenance {
// Scenario 3: Explicit maintenance mode, no IP change
} else if node.Maintenance || !node.Applied {
// Scenario 3: Explicit maintenance mode OR never been applied (fresh node)
// Fresh nodes need --insecure because they have self-signed certificates
deployIP = node.TargetIP
maintenanceMode = true
} else {
// Scenario 1: Production node at target IP
// Scenario 1: Production node at target IP (already applied, not in maintenance)
deployIP = node.TargetIP
maintenanceMode = false
}

View File

@@ -11,6 +11,9 @@ import (
"github.com/wild-cloud/wild-central/daemon/internal/tools"
)
// Bootstrap step constants
const totalBootstrapSteps = 7
// Manager handles async operation tracking
type Manager struct {
dataDir string
@@ -23,18 +26,33 @@ func NewManager(dataDir string) *Manager {
}
}
// BootstrapProgress tracks detailed bootstrap progress
type BootstrapProgress struct {
CurrentStep int `json:"current_step"` // 0-6
StepName string `json:"step_name"`
Attempt int `json:"attempt"`
MaxAttempts int `json:"max_attempts"`
StepDescription string `json:"step_description"`
}
// OperationDetails contains operation-specific details
type OperationDetails struct {
BootstrapProgress *BootstrapProgress `json:"bootstrap,omitempty"`
}
// Operation represents a long-running operation
type Operation struct {
ID string `json:"id"`
Type string `json:"type"` // discover, setup, download, bootstrap
Target string `json:"target"`
Instance string `json:"instance"`
Status string `json:"status"` // pending, running, completed, failed, cancelled
Message string `json:"message,omitempty"`
Progress int `json:"progress"` // 0-100
LogFile string `json:"logFile,omitempty"` // Path to output log file
StartedAt time.Time `json:"started_at"`
EndedAt time.Time `json:"ended_at,omitempty"`
ID string `json:"id"`
Type string `json:"type"` // discover, setup, download, bootstrap
Target string `json:"target"`
Instance string `json:"instance"`
Status string `json:"status"` // pending, running, completed, failed, cancelled
Message string `json:"message,omitempty"`
Progress int `json:"progress"` // 0-100
Details *OperationDetails `json:"details,omitempty"` // Operation-specific details
LogFile string `json:"logFile,omitempty"` // Path to output log file
StartedAt time.Time `json:"started_at"`
EndedAt time.Time `json:"ended_at,omitempty"`
}
// GetOperationsDir returns the operations directory for an instance
@@ -79,19 +97,6 @@ func (m *Manager) Start(instanceName, opType, target string) (string, error) {
return opID, nil
}
// Get returns operation status
func (m *Manager) Get(opID string) (*Operation, error) {
// Operation ID contains instance name, but we need to find it
// For now, we'll scan all instances (not ideal but simple)
// Better approach: encode instance in operation ID or maintain index
// Simplified: assume operation ID format is op_{type}_{target}_{timestamp}
// We need to know which instance to look in
// For now, return error if we can't find it
// This needs improvement in actual implementation
return nil, fmt.Errorf("operation lookup not implemented - need instance context")
}
// GetByInstance returns an operation for a specific instance
func (m *Manager) GetByInstance(instanceName, opID string) (*Operation, error) {
@@ -238,6 +243,31 @@ func (m *Manager) Cleanup(instanceName string, olderThan time.Duration) error {
return nil
}
// UpdateBootstrapProgress updates bootstrap-specific progress details
func (m *Manager) UpdateBootstrapProgress(instanceName, opID string, step int, stepName string, attempt, maxAttempts int, stepDescription string) error {
op, err := m.GetByInstance(instanceName, opID)
if err != nil {
return err
}
if op.Details == nil {
op.Details = &OperationDetails{}
}
op.Details.BootstrapProgress = &BootstrapProgress{
CurrentStep: step,
StepName: stepName,
Attempt: attempt,
MaxAttempts: maxAttempts,
StepDescription: stepDescription,
}
op.Progress = (step * 100) / (totalBootstrapSteps - 1)
op.Message = fmt.Sprintf("Step %d/%d: %s (attempt %d/%d)", step+1, totalBootstrapSteps, stepName, attempt, maxAttempts)
return m.writeOperation(op)
}
// writeOperation writes operation to disk
func (m *Manager) writeOperation(op *Operation) error {
opsDir := m.GetOperationsDir(op.Instance)

View File

@@ -1,10 +1,12 @@
package tools
import (
"context"
"encoding/json"
"fmt"
"os/exec"
"strings"
"time"
)
// Talosctl provides a thin wrapper around the talosctl command-line tool
@@ -92,8 +94,11 @@ func (t *Talosctl) GetDisks(nodeIP string, insecure bool) ([]DiskInfo, error) {
args = append(args, "--insecure")
}
// Build args with talosconfig if available
finalArgs := t.buildArgs(args)
// Use jq to slurp the NDJSON into an array (like v.PoC does with jq -s)
talosCmd := exec.Command("talosctl", args...)
talosCmd := exec.Command("talosctl", finalArgs...)
jqCmd := exec.Command("jq", "-s", ".")
// Pipe talosctl output to jq
@@ -171,8 +176,11 @@ func (t *Talosctl) getResourceJSON(resourceType, nodeIP string, insecure bool) (
args = append(args, "--insecure")
}
// Build args with talosconfig if available
finalArgs := t.buildArgs(args)
// Use jq to slurp the NDJSON into an array
talosCmd := exec.Command("talosctl", args...)
talosCmd := exec.Command("talosctl", finalArgs...)
jqCmd := exec.Command("jq", "-s", ".")
// Pipe talosctl output to jq
@@ -280,20 +288,45 @@ func (t *Talosctl) GetPhysicalInterface(nodeIP string, insecure bool) (string, e
// GetVersion gets Talos version from a node
func (t *Talosctl) GetVersion(nodeIP string, insecure bool) (string, error) {
args := t.buildArgs([]string{
"version",
"--nodes", nodeIP,
"--short",
})
var args []string
// When using insecure mode (for maintenance mode nodes), don't use talosconfig
// Insecure mode is for unconfigured nodes that don't have authentication set up
if insecure {
args = append(args, "--insecure")
args = []string{
"version",
"--nodes", nodeIP,
"--short",
"--insecure",
}
} else {
// For configured nodes, use talosconfig if available
args = t.buildArgs([]string{
"version",
"--nodes", nodeIP,
"--short",
})
}
cmd := exec.Command("talosctl", args...)
// Use context with timeout to prevent hanging on unreachable nodes
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, "talosctl", args...)
output, err := cmd.CombinedOutput()
outputStr := string(output)
// Special case: In maintenance mode, talosctl version returns an error
// "API is not implemented in maintenance mode" but this means the node IS reachable
// and IS in maintenance mode, so we treat this as a success
if err != nil && strings.Contains(outputStr, "API is not implemented in maintenance mode") {
// Extract client version from output as the node version
// Since we can't get server version in maintenance mode
return "maintenance", nil
}
if err != nil {
return "", fmt.Errorf("talosctl version failed: %w\nOutput: %s", err, string(output))
return "", fmt.Errorf("talosctl version failed: %w\nOutput: %s", err, outputStr)
}
// Parse output to extract server version