diff --git a/internal/api/v1/handlers.go b/internal/api/v1/handlers.go index 14fa8ee..5983bd2 100644 --- a/internal/api/v1/handlers.go +++ b/internal/api/v1/handlers.go @@ -138,6 +138,8 @@ func (api *API) RegisterRoutes(r *mux.Router) { r.HandleFunc("/api/v1/instances/{name}/services/{service}/fetch", api.ServicesFetch).Methods("POST") r.HandleFunc("/api/v1/instances/{name}/services/{service}/compile", api.ServicesCompile).Methods("POST") r.HandleFunc("/api/v1/instances/{name}/services/{service}/deploy", api.ServicesDeploy).Methods("POST") + r.HandleFunc("/api/v1/instances/{name}/services/{service}/logs", api.ServicesGetLogs).Methods("GET") + r.HandleFunc("/api/v1/instances/{name}/services/{service}/config", api.ServicesUpdateConfig).Methods("PATCH") // Apps r.HandleFunc("/api/v1/apps", api.AppsListAvailable).Methods("GET") diff --git a/internal/api/v1/handlers_services.go b/internal/api/v1/handlers_services.go index 7e53cf9..d47cdac 100644 --- a/internal/api/v1/handlers_services.go +++ b/internal/api/v1/handlers_services.go @@ -11,6 +11,7 @@ import ( "github.com/gorilla/mux" "gopkg.in/yaml.v3" + "github.com/wild-cloud/wild-central/daemon/internal/contracts" "github.com/wild-cloud/wild-central/daemon/internal/operations" "github.com/wild-cloud/wild-central/daemon/internal/services" ) @@ -225,11 +226,11 @@ func (api *API) ServicesGetStatus(w http.ResponseWriter, r *http.Request) { return } - // Get status + // Get detailed status servicesMgr := services.NewManager(api.dataDir) - status, err := servicesMgr.GetStatus(instanceName, serviceName) + status, err := servicesMgr.GetDetailedStatus(instanceName, serviceName) if err != nil { - respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to get status: %v", err)) + respondError(w, http.StatusNotFound, fmt.Sprintf("Failed to get status: %v", err)) return } @@ -422,3 +423,110 @@ func (api *API) ServicesDeploy(w http.ResponseWriter, r *http.Request) { "message": fmt.Sprintf("Service %s deployed successfully", serviceName), }) } + +// ServicesGetLogs retrieves or streams service logs +func (api *API) ServicesGetLogs(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + instanceName := vars["name"] + serviceName := vars["service"] + + // Validate instance exists + if err := api.instance.ValidateInstance(instanceName); err != nil { + respondError(w, http.StatusNotFound, fmt.Sprintf("Instance not found: %v", err)) + return + } + + // Parse query parameters + query := r.URL.Query() + logsReq := contracts.ServiceLogsRequest{ + Container: query.Get("container"), + Follow: query.Get("follow") == "true", + Previous: query.Get("previous") == "true", + Since: query.Get("since"), + } + + // Parse tail parameter + if tailStr := query.Get("tail"); tailStr != "" { + var tail int + if _, err := fmt.Sscanf(tailStr, "%d", &tail); err == nil { + logsReq.Tail = tail + } + } + + // Validate parameters + if logsReq.Tail < 0 { + respondError(w, http.StatusBadRequest, "tail parameter must be positive") + return + } + if logsReq.Tail > 5000 { + respondError(w, http.StatusBadRequest, "tail parameter cannot exceed 5000") + return + } + if logsReq.Previous && logsReq.Follow { + respondError(w, http.StatusBadRequest, "previous and follow cannot be used together") + return + } + + servicesMgr := services.NewManager(api.dataDir) + + // Stream logs with SSE if follow=true + if logsReq.Follow { + // Set SSE headers + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + + // Stream logs + if err := servicesMgr.StreamLogs(instanceName, serviceName, logsReq, w); err != nil { + // Log error but can't send response (SSE already started) + fmt.Printf("Error streaming logs: %v\n", err) + } + return + } + + // Get buffered logs + logsResp, err := servicesMgr.GetLogs(instanceName, serviceName, logsReq) + if err != nil { + respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to get logs: %v", err)) + return + } + + respondJSON(w, http.StatusOK, logsResp) +} + +// ServicesUpdateConfig updates service configuration +func (api *API) ServicesUpdateConfig(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + instanceName := vars["name"] + serviceName := vars["service"] + + // Validate instance exists + if err := api.instance.ValidateInstance(instanceName); err != nil { + respondError(w, http.StatusNotFound, fmt.Sprintf("Instance not found: %v", err)) + return + } + + // Parse request body + var update contracts.ServiceConfigUpdate + if err := json.NewDecoder(r.Body).Decode(&update); err != nil { + respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: %v", err)) + return + } + + // Validate request + if update.Config == nil || len(update.Config) == 0 { + respondError(w, http.StatusBadRequest, "config field is required and must not be empty") + return + } + + // Update config + servicesMgr := services.NewManager(api.dataDir) + response, err := servicesMgr.UpdateConfig(instanceName, serviceName, update, api.broadcaster) + if err != nil { + respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to update config: %v", err)) + return + } + + respondJSON(w, http.StatusOK, response) +} diff --git a/internal/contracts/services.go b/internal/contracts/services.go new file mode 100644 index 0000000..4ff5a27 --- /dev/null +++ b/internal/contracts/services.go @@ -0,0 +1,339 @@ +// Package contracts contains API contracts for service management endpoints +package contracts + +import "time" + +// ============================== +// Request/Response Types +// ============================== + +// ServiceManifest represents basic service information +type ServiceManifest struct { + Name string `json:"name"` + Description string `json:"description"` + Namespace string `json:"namespace"` + ConfigReferences []string `json:"configReferences,omitempty"` + ServiceConfig map[string]ConfigDefinition `json:"serviceConfig,omitempty"` +} + +// ConfigDefinition defines config that should be prompted during service setup +type ConfigDefinition struct { + Path string `json:"path"` + Prompt string `json:"prompt"` + Default string `json:"default"` + Type string `json:"type,omitempty"` +} + +// PodStatus represents the status of a single pod +type PodStatus struct { + Name string `json:"name"` // Pod name + Status string `json:"status"` // Pod phase: Running, Pending, Failed, etc. + Ready string `json:"ready"` // Ready containers e.g. "1/1", "0/1" + Restarts int `json:"restarts"` // Container restart count + Age string `json:"age"` // Human-readable age e.g. "2h", "5m" + Node string `json:"node"` // Node name where pod is scheduled + IP string `json:"ip,omitempty"` // Pod IP if available +} + +// DetailedServiceStatus provides comprehensive service status +type DetailedServiceStatus struct { + Name string `json:"name"` // Service name + Namespace string `json:"namespace"` // Kubernetes namespace + DeploymentStatus string `json:"deploymentStatus"` // "Ready", "Progressing", "Degraded", "NotFound" + Replicas ReplicaStatus `json:"replicas"` // Desired/current/ready replicas + Pods []PodStatus `json:"pods"` // Pod details + Config map[string]interface{} `json:"config,omitempty"` // Current config from config.yaml + Manifest *ServiceManifest `json:"manifest,omitempty"` // Service manifest if available + LastUpdated time.Time `json:"lastUpdated"` // Timestamp of status +} + +// ReplicaStatus tracks deployment replica counts +type ReplicaStatus struct { + Desired int32 `json:"desired"` // Desired replica count + Current int32 `json:"current"` // Current replica count + Ready int32 `json:"ready"` // Ready replica count + Available int32 `json:"available"` // Available replica count +} + +// ServiceLogsRequest query parameters for log retrieval +type ServiceLogsRequest struct { + Container string `json:"container,omitempty"` // Specific container name (optional) + Tail int `json:"tail,omitempty"` // Number of lines from end (default: 100) + Follow bool `json:"follow,omitempty"` // Stream logs via SSE + Previous bool `json:"previous,omitempty"` // Get previous container logs + Since string `json:"since,omitempty"` // RFC3339 or duration string e.g. "10m" +} + +// ServiceLogsResponse for non-streaming log retrieval +type ServiceLogsResponse struct { + Service string `json:"service"` // Service name + Namespace string `json:"namespace"` // Kubernetes namespace + Container string `json:"container,omitempty"` // Container name if specified + Lines []string `json:"lines"` // Log lines + Truncated bool `json:"truncated"` // Whether logs were truncated + Timestamp time.Time `json:"timestamp"` // Response timestamp +} + +// ServiceLogsSSEEvent for streaming logs via Server-Sent Events +type ServiceLogsSSEEvent struct { + Type string `json:"type"` // "log", "error", "end" + Line string `json:"line,omitempty"` // Log line content + Error string `json:"error,omitempty"` // Error message if type="error" + Container string `json:"container,omitempty"` // Container source + Timestamp time.Time `json:"timestamp"` // Event timestamp +} + +// ServiceConfigUpdate request to update service configuration +type ServiceConfigUpdate struct { + Config map[string]interface{} `json:"config"` // Configuration updates + Redeploy bool `json:"redeploy"` // Trigger recompilation/redeployment + Fetch bool `json:"fetch"` // Fetch fresh templates before redeployment +} + +// ServiceConfigResponse response after config update +type ServiceConfigResponse struct { + Service string `json:"service"` // Service name + Namespace string `json:"namespace"` // Kubernetes namespace + Config map[string]interface{} `json:"config"` // Updated configuration + Redeployed bool `json:"redeployed"` // Whether service was redeployed + Message string `json:"message"` // Success/info message +} + +// ============================== +// Error Response +// ============================== + +// ErrorResponse standard error format for all endpoints +type ErrorResponse struct { + Error ErrorDetail `json:"error"` +} + +// ErrorDetail contains error information +type ErrorDetail struct { + Code string `json:"code"` // Machine-readable error code + Message string `json:"message"` // Human-readable error message + Details map[string]interface{} `json:"details,omitempty"` // Additional error context +} + +// Standard error codes +const ( + ErrCodeNotFound = "SERVICE_NOT_FOUND" + ErrCodeInstanceNotFound = "INSTANCE_NOT_FOUND" + ErrCodeInvalidRequest = "INVALID_REQUEST" + ErrCodeKubectlFailed = "KUBECTL_FAILED" + ErrCodeConfigInvalid = "CONFIG_INVALID" + ErrCodeDeploymentFailed = "DEPLOYMENT_FAILED" + ErrCodeStreamingError = "STREAMING_ERROR" + ErrCodeInternalError = "INTERNAL_ERROR" +) + +// ============================== +// API Endpoint Specifications +// ============================== + +/* +1. GET /api/v1/instances/{name}/services/{service}/status + +Purpose: Returns comprehensive service status including pods and health +Response Codes: + - 200 OK: Service status retrieved successfully + - 404 Not Found: Instance or service not found + - 500 Internal Server Error: kubectl command failed + +Example Request: + GET /api/v1/instances/production/services/nginx/status + +Example Response (200 OK): +{ + "name": "nginx", + "namespace": "nginx", + "deploymentStatus": "Ready", + "replicas": { + "desired": 3, + "current": 3, + "ready": 3, + "available": 3 + }, + "pods": [ + { + "name": "nginx-7c5464c66d-abc123", + "status": "Running", + "ready": "1/1", + "restarts": 0, + "age": "2h", + "node": "worker-1", + "ip": "10.42.1.5" + } + ], + "config": { + "nginx.image": "nginx:1.21", + "nginx.replicas": 3 + }, + "manifest": { + "name": "nginx", + "description": "NGINX web server", + "namespace": "nginx" + }, + "lastUpdated": "2024-01-15T10:30:00Z" +} + +Example Error Response (404): +{ + "error": { + "code": "SERVICE_NOT_FOUND", + "message": "Service nginx not found in instance production", + "details": { + "instance": "production", + "service": "nginx" + } + } +} +*/ + +/* +2. GET /api/v1/instances/{name}/services/{service}/logs + +Purpose: Retrieve or stream service logs +Query Parameters: + - container (string): Specific container name + - tail (int): Number of lines from end (default: 100, max: 5000) + - follow (bool): Stream logs via SSE (default: false) + - previous (bool): Get previous container logs (default: false) + - since (string): RFC3339 timestamp or duration (e.g. "10m") + +Response Codes: + - 200 OK: Logs retrieved successfully (or SSE stream started) + - 400 Bad Request: Invalid query parameters + - 404 Not Found: Instance, service, or container not found + - 500 Internal Server Error: kubectl command failed + +Example Request (buffered): + GET /api/v1/instances/production/services/nginx/logs?tail=50 + +Example Response (200 OK): +{ + "service": "nginx", + "namespace": "nginx", + "container": "nginx", + "lines": [ + "2024/01/15 10:00:00 [notice] Configuration loaded", + "2024/01/15 10:00:01 [info] Server started on port 80" + ], + "truncated": false, + "timestamp": "2024-01-15T10:30:00Z" +} + +Example Request (streaming): + GET /api/v1/instances/production/services/nginx/logs?follow=true + Accept: text/event-stream + +Example SSE Response: +data: {"type":"log","line":"2024/01/15 10:00:00 [notice] Configuration loaded","container":"nginx","timestamp":"2024-01-15T10:30:00Z"} + +data: {"type":"log","line":"2024/01/15 10:00:01 [info] Request from 10.0.0.1","container":"nginx","timestamp":"2024-01-15T10:30:01Z"} + +data: {"type":"error","error":"Container restarting","timestamp":"2024-01-15T10:30:02Z"} + +data: {"type":"end","timestamp":"2024-01-15T10:30:03Z"} +*/ + +/* +3. PATCH /api/v1/instances/{name}/services/{service}/config + +Purpose: Update service configuration in config.yaml and optionally redeploy +Request Body: ServiceConfigUpdate (JSON) +Response Codes: + - 200 OK: Configuration updated successfully + - 400 Bad Request: Invalid configuration + - 404 Not Found: Instance or service not found + - 500 Internal Server Error: Update or deployment failed + +Example Request: + PATCH /api/v1/instances/production/services/nginx/config + Content-Type: application/json + + { + "config": { + "nginx.image": "nginx:1.22", + "nginx.replicas": 5, + "nginx.resources.memory": "512Mi" + }, + "redeploy": true + } + +Example Response (200 OK): +{ + "service": "nginx", + "namespace": "nginx", + "config": { + "nginx.image": "nginx:1.22", + "nginx.replicas": 5, + "nginx.resources.memory": "512Mi" + }, + "redeployed": true, + "message": "Service configuration updated and redeployed successfully" +} + +Example Error Response (400): +{ + "error": { + "code": "CONFIG_INVALID", + "message": "Invalid configuration: nginx.replicas must be a positive integer", + "details": { + "field": "nginx.replicas", + "value": -1, + "constraint": "positive integer" + } + } +} +*/ + +// ============================== +// Validation Rules +// ============================== + +/* +Query Parameter Validation: + +ServiceLogsRequest: +- tail: Must be between 1 and 5000 (default: 100) +- since: Must be valid RFC3339 timestamp or Go duration string (e.g. "5m", "1h") +- container: Must match existing container name if specified +- follow: When true, response uses Server-Sent Events (SSE) +- previous: Cannot be combined with follow=true + +ServiceConfigUpdate: +- config: Must be valid YAML-compatible structure +- config keys: Must follow service's expected configuration schema +- redeploy: When true, triggers kustomize recompilation and kubectl apply + +Path Parameters: +- instance name: Must match existing instance directory +- service name: Must match installed service name +*/ + +// ============================== +// HTTP Status Code Summary +// ============================== + +/* +200 OK: +- Service status retrieved successfully +- Logs retrieved successfully (non-streaming) +- Configuration updated successfully + +400 Bad Request: +- Invalid query parameters +- Invalid configuration in request body +- Validation errors + +404 Not Found: +- Instance does not exist +- Service not installed in instance +- Container name not found (for logs) + +500 Internal Server Error: +- kubectl command execution failed +- File system operations failed +- Unexpected errors during processing +*/ diff --git a/internal/services/config.go b/internal/services/config.go new file mode 100644 index 0000000..473bba7 --- /dev/null +++ b/internal/services/config.go @@ -0,0 +1,143 @@ +package services + +import ( + "fmt" + "os" + "path/filepath" + "strings" + + "gopkg.in/yaml.v3" + + "github.com/wild-cloud/wild-central/daemon/internal/contracts" + "github.com/wild-cloud/wild-central/daemon/internal/operations" + "github.com/wild-cloud/wild-central/daemon/internal/storage" +) + +// UpdateConfig updates service configuration and optionally redeploys +func (m *Manager) UpdateConfig(instanceName, serviceName string, update contracts.ServiceConfigUpdate, broadcaster *operations.Broadcaster) (*contracts.ServiceConfigResponse, error) { + // 1. Validate service exists + manifest, err := m.GetManifest(serviceName) + if err != nil { + return nil, fmt.Errorf("service not found: %w", err) + } + + namespace := manifest.Namespace + if deployment, ok := serviceDeployments[serviceName]; ok { + namespace = deployment.namespace + } + + // 2. Load instance config + instanceDir := filepath.Join(m.dataDir, "instances", instanceName) + configPath := filepath.Join(instanceDir, "config.yaml") + + if !storage.FileExists(configPath) { + return nil, fmt.Errorf("config file not found for instance %s", instanceName) + } + + configData, err := os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("failed to read config: %w", err) + } + + var config map[string]interface{} + if err := yaml.Unmarshal(configData, &config); err != nil { + return nil, fmt.Errorf("failed to parse config: %w", err) + } + + // 3. Validate config keys against service manifest + validPaths := make(map[string]bool) + for _, path := range manifest.ConfigReferences { + validPaths[path] = true + } + for _, cfg := range manifest.ServiceConfig { + validPaths[cfg.Path] = true + } + + for key := range update.Config { + if !validPaths[key] { + return nil, fmt.Errorf("invalid config key '%s' for service %s", key, serviceName) + } + } + + // 4. Update config values + for key, value := range update.Config { + if err := setNestedValue(config, key, value); err != nil { + return nil, fmt.Errorf("failed to set config key '%s': %w", key, err) + } + } + + // 5. Write updated config + updatedData, err := yaml.Marshal(config) + if err != nil { + return nil, fmt.Errorf("failed to marshal config: %w", err) + } + + if err := os.WriteFile(configPath, updatedData, 0644); err != nil { + return nil, fmt.Errorf("failed to write config: %w", err) + } + + // 6. Redeploy if requested + redeployed := false + if update.Redeploy { + // Fetch fresh templates if requested + if update.Fetch { + if err := m.Fetch(instanceName, serviceName); err != nil { + return nil, fmt.Errorf("failed to fetch templates: %w", err) + } + } + + // Recompile templates + if err := m.Compile(instanceName, serviceName); err != nil { + return nil, fmt.Errorf("failed to recompile templates: %w", err) + } + + // Redeploy service + if err := m.Deploy(instanceName, serviceName, "", broadcaster); err != nil { + return nil, fmt.Errorf("failed to redeploy service: %w", err) + } + + redeployed = true + } + + // 7. Build response + message := "Service configuration updated successfully" + if redeployed { + message = "Service configuration updated and redeployed successfully" + } + + return &contracts.ServiceConfigResponse{ + Service: serviceName, + Namespace: namespace, + Config: update.Config, + Redeployed: redeployed, + Message: message, + }, nil +} + +// setNestedValue sets a value in a nested map using dot notation +func setNestedValue(data map[string]interface{}, path string, value interface{}) error { + keys := strings.Split(path, ".") + current := data + + for i, key := range keys { + if i == len(keys)-1 { + // Last key - set the value + current[key] = value + return nil + } + + // Navigate to the next level + if next, ok := current[key].(map[string]interface{}); ok { + current = next + } else if current[key] == nil { + // Create intermediate map if it doesn't exist + next := make(map[string]interface{}) + current[key] = next + current = next + } else { + return fmt.Errorf("path '%s' conflicts with existing non-map value at '%s'", path, key) + } + } + + return nil +} diff --git a/internal/services/logs.go b/internal/services/logs.go new file mode 100644 index 0000000..166e6cb --- /dev/null +++ b/internal/services/logs.go @@ -0,0 +1,283 @@ +package services + +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "strings" + "time" + + "github.com/wild-cloud/wild-central/daemon/internal/contracts" + "github.com/wild-cloud/wild-central/daemon/internal/storage" + "github.com/wild-cloud/wild-central/daemon/internal/tools" +) + +// GetLogs retrieves buffered logs from a service +func (m *Manager) GetLogs(instanceName, serviceName string, opts contracts.ServiceLogsRequest) (*contracts.ServiceLogsResponse, error) { + // 1. Get service namespace + manifest, err := m.GetManifest(serviceName) + if err != nil { + return nil, fmt.Errorf("service not found: %w", err) + } + + namespace := manifest.Namespace + if deployment, ok := serviceDeployments[serviceName]; ok { + namespace = deployment.namespace + } + + // 2. Get kubeconfig path + kubeconfigPath := tools.GetKubeconfigPath(m.dataDir, instanceName) + if !storage.FileExists(kubeconfigPath) { + return nil, fmt.Errorf("kubeconfig not found - cluster may not be bootstrapped") + } + + kubectl := tools.NewKubectl(kubeconfigPath) + + // 3. Get pod name (use first pod if no specific container specified) + podName := "" + if opts.Container == "" { + // Get first pod in namespace + podName, err = kubectl.GetFirstPodName(namespace) + if err != nil { + // Check if it's because there are no pods + pods, _ := kubectl.GetPods(namespace) + if len(pods) == 0 { + // Return empty logs response instead of error when no pods exist + return &contracts.ServiceLogsResponse{ + Lines: []string{"No pods found for service. The service may not be deployed yet."}, + }, nil + } + return nil, fmt.Errorf("failed to find pod: %w", err) + } + + // If no container specified, get first container + containers, err := kubectl.GetPodContainers(namespace, podName) + if err != nil { + return nil, fmt.Errorf("failed to get pod containers: %w", err) + } + if len(containers) > 0 { + opts.Container = containers[0] + } + } else { + // Find pod with specified container + pods, err := kubectl.GetPods(namespace) + if err != nil { + return nil, fmt.Errorf("failed to list pods: %w", err) + } + if len(pods) > 0 { + podName = pods[0].Name + } else { + return nil, fmt.Errorf("no pods found in namespace %s", namespace) + } + } + + // 4. Set default tail if not specified + if opts.Tail == 0 { + opts.Tail = 100 + } + // Enforce maximum tail + if opts.Tail > 5000 { + opts.Tail = 5000 + } + + // 5. Get logs + logOpts := tools.LogOptions{ + Container: opts.Container, + Tail: opts.Tail, + Previous: opts.Previous, + Since: opts.Since, + } + + logs, err := kubectl.GetLogs(namespace, podName, logOpts) + if err != nil { + return nil, fmt.Errorf("failed to get logs: %w", err) + } + + // 6. Parse logs into lines + lines := strings.Split(strings.TrimSpace(logs), "\n") + truncated := false + if len(lines) > opts.Tail { + lines = lines[len(lines)-opts.Tail:] + truncated = true + } + + return &contracts.ServiceLogsResponse{ + Service: serviceName, + Namespace: namespace, + Container: opts.Container, + Lines: lines, + Truncated: truncated, + Timestamp: time.Now(), + }, nil +} + +// StreamLogs streams logs from a service using SSE +func (m *Manager) StreamLogs(instanceName, serviceName string, opts contracts.ServiceLogsRequest, writer io.Writer) error { + // 1. Get service namespace + manifest, err := m.GetManifest(serviceName) + if err != nil { + return fmt.Errorf("service not found: %w", err) + } + + namespace := manifest.Namespace + if deployment, ok := serviceDeployments[serviceName]; ok { + namespace = deployment.namespace + } + + // 2. Get kubeconfig path + kubeconfigPath := tools.GetKubeconfigPath(m.dataDir, instanceName) + if !storage.FileExists(kubeconfigPath) { + return fmt.Errorf("kubeconfig not found - cluster may not be bootstrapped") + } + + kubectl := tools.NewKubectl(kubeconfigPath) + + // 3. Get pod name + podName := "" + if opts.Container == "" { + podName, err = kubectl.GetFirstPodName(namespace) + if err != nil { + // Check if it's because there are no pods + pods, _ := kubectl.GetPods(namespace) + if len(pods) == 0 { + // Send a message event indicating no pods + fmt.Fprintf(writer, "data: No pods found for service. The service may not be deployed yet.\n\n") + return nil + } + return fmt.Errorf("failed to find pod: %w", err) + } + + // Get first container + containers, err := kubectl.GetPodContainers(namespace, podName) + if err != nil { + return fmt.Errorf("failed to get pod containers: %w", err) + } + if len(containers) > 0 { + opts.Container = containers[0] + } + } else { + pods, err := kubectl.GetPods(namespace) + if err != nil { + return fmt.Errorf("failed to list pods: %w", err) + } + if len(pods) > 0 { + podName = pods[0].Name + } else { + return fmt.Errorf("no pods found in namespace %s", namespace) + } + } + + // 4. Set default tail for streaming + if opts.Tail == 0 { + opts.Tail = 50 + } + + // 5. Stream logs + logOpts := tools.LogOptions{ + Container: opts.Container, + Tail: opts.Tail, + Since: opts.Since, + } + + cmd, err := kubectl.StreamLogs(namespace, podName, logOpts) + if err != nil { + return fmt.Errorf("failed to start log stream: %w", err) + } + + // Get stdout pipe + stdout, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("failed to get stdout pipe: %w", err) + } + + stderr, err := cmd.StderrPipe() + if err != nil { + return fmt.Errorf("failed to get stderr pipe: %w", err) + } + + // Start command + if err := cmd.Start(); err != nil { + return fmt.Errorf("failed to start kubectl logs: %w", err) + } + + // Stream logs line by line as SSE events + scanner := bufio.NewScanner(stdout) + errScanner := bufio.NewScanner(stderr) + + // Channel to signal completion + done := make(chan error, 1) + + // Read stderr in background + go func() { + for errScanner.Scan() { + event := contracts.ServiceLogsSSEEvent{ + Type: "error", + Error: errScanner.Text(), + Container: opts.Container, + Timestamp: time.Now(), + } + writeSSEEvent(writer, event) + } + }() + + // Read stdout + go func() { + for scanner.Scan() { + event := contracts.ServiceLogsSSEEvent{ + Type: "log", + Line: scanner.Text(), + Container: opts.Container, + Timestamp: time.Now(), + } + if err := writeSSEEvent(writer, event); err != nil { + done <- err + return + } + } + + if err := scanner.Err(); err != nil { + done <- err + return + } + + done <- nil + }() + + // Wait for completion or error + err = <-done + cmd.Process.Kill() + + // Send end event + endEvent := contracts.ServiceLogsSSEEvent{ + Type: "end", + Timestamp: time.Now(), + } + writeSSEEvent(writer, endEvent) + + return err +} + +// writeSSEEvent writes an SSE event to the writer +func writeSSEEvent(w io.Writer, event contracts.ServiceLogsSSEEvent) error { + // Marshal the event to JSON safely + jsonData, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("failed to marshal SSE event: %w", err) + } + + // Write SSE format: "data: \n\n" + data := fmt.Sprintf("data: %s\n\n", jsonData) + + _, err = w.Write([]byte(data)) + if err != nil { + return err + } + + // Flush if writer supports it + if flusher, ok := w.(interface{ Flush() }); ok { + flusher.Flush() + } + + return nil +} diff --git a/internal/services/services.go b/internal/services/services.go index 6b0911e..a81e670 100644 --- a/internal/services/services.go +++ b/internal/services/services.go @@ -37,10 +37,25 @@ func NewManager(dataDir string) *Manager { manifest, err := setup.GetManifest(serviceName) if err == nil { // Convert setup.ServiceManifest to services.ServiceManifest + // Convert setup.ConfigDefinition map to services.ConfigDefinition map + serviceConfig := make(map[string]ConfigDefinition) + for key, cfg := range manifest.ServiceConfig { + serviceConfig[key] = ConfigDefinition{ + Path: cfg.Path, + Prompt: cfg.Prompt, + Default: cfg.Default, + Type: cfg.Type, + } + } + manifests[serviceName] = &ServiceManifest{ - Name: manifest.Name, - Description: manifest.Description, - Category: manifest.Category, + Name: manifest.Name, + Description: manifest.Description, + Namespace: manifest.Namespace, + Category: manifest.Category, + Dependencies: manifest.Dependencies, + ConfigReferences: manifest.ConfigReferences, + ServiceConfig: serviceConfig, } } } @@ -60,6 +75,7 @@ type Service struct { Version string `json:"version"` Namespace string `json:"namespace"` Dependencies []string `json:"dependencies,omitempty"` + HasConfig bool `json:"hasConfig"` // Whether service has configurable fields } // Base services in Wild Cloud (kept for reference/validation) @@ -147,12 +163,14 @@ func (m *Manager) List(instanceName string) ([]Service, error) { // Get service info from manifest if available var namespace, description, version string var dependencies []string + var hasConfig bool if manifest, ok := m.manifests[name]; ok { namespace = manifest.Namespace description = manifest.Description version = manifest.Category // Using category as version for now dependencies = manifest.Dependencies + hasConfig = len(manifest.ServiceConfig) > 0 } else { // Fall back to hardcoded map namespace = name + "-system" // default @@ -168,6 +186,7 @@ func (m *Manager) List(instanceName string) ([]Service, error) { Description: description, Version: version, Dependencies: dependencies, + HasConfig: hasConfig, } services = append(services, service) diff --git a/internal/services/status.go b/internal/services/status.go new file mode 100644 index 0000000..4ebd5e1 --- /dev/null +++ b/internal/services/status.go @@ -0,0 +1,148 @@ +package services + +import ( + "fmt" + "os" + "path/filepath" + "time" + + "gopkg.in/yaml.v3" + + "github.com/wild-cloud/wild-central/daemon/internal/contracts" + "github.com/wild-cloud/wild-central/daemon/internal/storage" + "github.com/wild-cloud/wild-central/daemon/internal/tools" +) + +// GetDetailedStatus returns comprehensive service status including pods and health +func (m *Manager) GetDetailedStatus(instanceName, serviceName string) (*contracts.DetailedServiceStatus, error) { + // 1. Get service manifest and namespace + manifest, err := m.GetManifest(serviceName) + if err != nil { + return nil, fmt.Errorf("service not found: %w", err) + } + + namespace := manifest.Namespace + deploymentName := manifest.GetDeploymentName() + + // Check hardcoded map for correct deployment name + if deployment, ok := serviceDeployments[serviceName]; ok { + namespace = deployment.namespace + deploymentName = deployment.deploymentName + } + + // 2. Get kubeconfig path + kubeconfigPath := tools.GetKubeconfigPath(m.dataDir, instanceName) + if !storage.FileExists(kubeconfigPath) { + return &contracts.DetailedServiceStatus{ + Name: serviceName, + Namespace: namespace, + DeploymentStatus: "NotFound", + Replicas: contracts.ReplicaStatus{}, + Pods: []contracts.PodStatus{}, + LastUpdated: time.Now(), + }, nil + } + + kubectl := tools.NewKubectl(kubeconfigPath) + + // 3. Get deployment information + deploymentInfo, err := kubectl.GetDeployment(deploymentName, namespace) + deploymentStatus := "NotFound" + replicas := contracts.ReplicaStatus{} + + if err == nil { + replicas = contracts.ReplicaStatus{ + Desired: deploymentInfo.Desired, + Current: deploymentInfo.Current, + Ready: deploymentInfo.Ready, + Available: deploymentInfo.Available, + } + + // Determine deployment status + if deploymentInfo.Ready == deploymentInfo.Desired && deploymentInfo.Desired > 0 { + deploymentStatus = "Ready" + } else if deploymentInfo.Ready < deploymentInfo.Desired { + if deploymentInfo.Current > deploymentInfo.Desired { + deploymentStatus = "Progressing" + } else { + deploymentStatus = "Degraded" + } + } else if deploymentInfo.Desired == 0 { + deploymentStatus = "Scaled to Zero" + } + } + + // 4. Get pod information + podInfos, err := kubectl.GetPods(namespace) + pods := make([]contracts.PodStatus, 0, len(podInfos)) + + if err == nil { + for _, podInfo := range podInfos { + pods = append(pods, contracts.PodStatus{ + Name: podInfo.Name, + Status: podInfo.Status, + Ready: podInfo.Ready, + Restarts: podInfo.Restarts, + Age: podInfo.Age, + Node: podInfo.Node, + IP: podInfo.IP, + }) + } + } + + // 5. Load current config values + instanceDir := filepath.Join(m.dataDir, "instances", instanceName) + configPath := filepath.Join(instanceDir, "config.yaml") + configValues := make(map[string]interface{}) + + if storage.FileExists(configPath) { + configData, err := os.ReadFile(configPath) + if err == nil { + var instanceConfig map[string]interface{} + if err := yaml.Unmarshal(configData, &instanceConfig); err == nil { + // Extract values for all config paths + for _, path := range manifest.ConfigReferences { + if value := getNestedValue(instanceConfig, path); value != nil { + configValues[path] = value + } + } + for _, cfg := range manifest.ServiceConfig { + if value := getNestedValue(instanceConfig, cfg.Path); value != nil { + configValues[cfg.Path] = value + } + } + } + } + } + + // 6. Convert ServiceConfig to contracts.ConfigDefinition + contractsServiceConfig := make(map[string]contracts.ConfigDefinition) + for key, cfg := range manifest.ServiceConfig { + contractsServiceConfig[key] = contracts.ConfigDefinition{ + Path: cfg.Path, + Prompt: cfg.Prompt, + Default: cfg.Default, + Type: cfg.Type, + } + } + + // 7. Build detailed status response + status := &contracts.DetailedServiceStatus{ + Name: serviceName, + Namespace: namespace, + DeploymentStatus: deploymentStatus, + Replicas: replicas, + Pods: pods, + Config: configValues, + Manifest: &contracts.ServiceManifest{ + Name: manifest.Name, + Description: manifest.Description, + Namespace: manifest.Namespace, + ConfigReferences: manifest.ConfigReferences, + ServiceConfig: contractsServiceConfig, + }, + LastUpdated: time.Now(), + } + + return status, nil +} diff --git a/internal/setup/embedded.go b/internal/setup/embedded.go index 3f11507..2eca157 100644 --- a/internal/setup/embedded.go +++ b/internal/setup/embedded.go @@ -19,11 +19,22 @@ var clusterServices = setupFS // ServiceManifest represents the wild-manifest.yaml structure type ServiceManifest struct { - Name string `yaml:"name"` - Description string `yaml:"description"` - Version string `yaml:"version"` - Category string `yaml:"category"` - // Add other fields as needed from wild-manifest.yaml + Name string `yaml:"name"` + Description string `yaml:"description"` + Version string `yaml:"version"` + Category string `yaml:"category"` + Namespace string `yaml:"namespace"` + Dependencies []string `yaml:"dependencies,omitempty"` + ConfigReferences []string `yaml:"configReferences,omitempty"` + ServiceConfig map[string]ConfigDefinition `yaml:"serviceConfig,omitempty"` +} + +// ConfigDefinition defines config that should be prompted during service setup +type ConfigDefinition struct { + Path string `yaml:"path"` + Prompt string `yaml:"prompt"` + Default string `yaml:"default"` + Type string `yaml:"type,omitempty"` } // ListServices returns all available cluster services diff --git a/internal/tools/kubectl.go b/internal/tools/kubectl.go index b53b1a1..f02528a 100644 --- a/internal/tools/kubectl.go +++ b/internal/tools/kubectl.go @@ -1,7 +1,11 @@ package tools import ( + "encoding/json" + "fmt" "os/exec" + "strings" + "time" ) // Kubectl provides a thin wrapper around the kubectl command-line tool @@ -31,3 +35,251 @@ func (k *Kubectl) DeploymentExists(name, namespace string) bool { err := cmd.Run() return err == nil } + +// PodInfo represents pod information from kubectl +type PodInfo struct { + Name string + Status string + Ready string + Restarts int + Age string + Node string + IP string +} + +// DeploymentInfo represents deployment information +type DeploymentInfo struct { + Desired int32 + Current int32 + Ready int32 + Available int32 +} + +// GetPods retrieves pod information for a namespace +func (k *Kubectl) GetPods(namespace string) ([]PodInfo, error) { + args := []string{ + "get", "pods", + "-n", namespace, + "-o", "json", + } + + if k.kubeconfigPath != "" { + args = append([]string{"--kubeconfig", k.kubeconfigPath}, args...) + } + + cmd := exec.Command("kubectl", args...) + output, err := cmd.Output() + if err != nil { + return nil, fmt.Errorf("failed to get pods: %w", err) + } + + var podList struct { + Items []struct { + Metadata struct { + Name string `json:"name"` + CreationTimestamp time.Time `json:"creationTimestamp"` + } `json:"metadata"` + Spec struct { + NodeName string `json:"nodeName"` + } `json:"spec"` + Status struct { + Phase string `json:"phase"` + PodIP string `json:"podIP"` + ContainerStatuses []struct { + Ready bool `json:"ready"` + RestartCount int `json:"restartCount"` + } `json:"containerStatuses"` + } `json:"status"` + } `json:"items"` + } + + if err := json.Unmarshal(output, &podList); err != nil { + return nil, fmt.Errorf("failed to parse pod list: %w", err) + } + + pods := make([]PodInfo, 0, len(podList.Items)) + for _, pod := range podList.Items { + // Calculate ready containers + readyCount := 0 + totalCount := len(pod.Status.ContainerStatuses) + restarts := 0 + for _, cs := range pod.Status.ContainerStatuses { + if cs.Ready { + readyCount++ + } + restarts += cs.RestartCount + } + + // Calculate age + age := formatAge(time.Since(pod.Metadata.CreationTimestamp)) + + pods = append(pods, PodInfo{ + Name: pod.Metadata.Name, + Status: pod.Status.Phase, + Ready: fmt.Sprintf("%d/%d", readyCount, totalCount), + Restarts: restarts, + Age: age, + Node: pod.Spec.NodeName, + IP: pod.Status.PodIP, + }) + } + + return pods, nil +} + +// GetDeployment retrieves deployment information +func (k *Kubectl) GetDeployment(name, namespace string) (*DeploymentInfo, error) { + args := []string{ + "get", "deployment", name, + "-n", namespace, + "-o", "json", + } + + if k.kubeconfigPath != "" { + args = append([]string{"--kubeconfig", k.kubeconfigPath}, args...) + } + + cmd := exec.Command("kubectl", args...) + output, err := cmd.Output() + if err != nil { + return nil, fmt.Errorf("failed to get deployment: %w", err) + } + + var deployment struct { + Status struct { + Replicas int32 `json:"replicas"` + UpdatedReplicas int32 `json:"updatedReplicas"` + ReadyReplicas int32 `json:"readyReplicas"` + AvailableReplicas int32 `json:"availableReplicas"` + } `json:"status"` + Spec struct { + Replicas int32 `json:"replicas"` + } `json:"spec"` + } + + if err := json.Unmarshal(output, &deployment); err != nil { + return nil, fmt.Errorf("failed to parse deployment: %w", err) + } + + return &DeploymentInfo{ + Desired: deployment.Spec.Replicas, + Current: deployment.Status.Replicas, + Ready: deployment.Status.ReadyReplicas, + Available: deployment.Status.AvailableReplicas, + }, nil +} + +// GetLogs retrieves logs from a pod +func (k *Kubectl) GetLogs(namespace, podName string, opts LogOptions) (string, error) { + args := []string{ + "logs", podName, + "-n", namespace, + } + + if opts.Container != "" { + args = append(args, "-c", opts.Container) + } + if opts.Tail > 0 { + args = append(args, "--tail", fmt.Sprintf("%d", opts.Tail)) + } + if opts.Previous { + args = append(args, "--previous") + } + if opts.Since != "" { + args = append(args, "--since", opts.Since) + } + + if k.kubeconfigPath != "" { + args = append([]string{"--kubeconfig", k.kubeconfigPath}, args...) + } + + cmd := exec.Command("kubectl", args...) + output, err := cmd.Output() + if err != nil { + return "", fmt.Errorf("failed to get logs: %w", err) + } + + return string(output), nil +} + +// LogOptions configures log retrieval +type LogOptions struct { + Container string + Tail int + Previous bool + Since string +} + +// StreamLogs streams logs from a pod +func (k *Kubectl) StreamLogs(namespace, podName string, opts LogOptions) (*exec.Cmd, error) { + args := []string{ + "logs", podName, + "-n", namespace, + "-f", // follow + } + + if opts.Container != "" { + args = append(args, "-c", opts.Container) + } + if opts.Tail > 0 { + args = append(args, "--tail", fmt.Sprintf("%d", opts.Tail)) + } + if opts.Since != "" { + args = append(args, "--since", opts.Since) + } + + if k.kubeconfigPath != "" { + args = append([]string{"--kubeconfig", k.kubeconfigPath}, args...) + } + + cmd := exec.Command("kubectl", args...) + return cmd, nil +} + +// formatAge converts a duration to a human-readable age string +func formatAge(d time.Duration) string { + if d < time.Minute { + return fmt.Sprintf("%ds", int(d.Seconds())) + } + if d < time.Hour { + return fmt.Sprintf("%dm", int(d.Minutes())) + } + if d < 24*time.Hour { + return fmt.Sprintf("%dh", int(d.Hours())) + } + return fmt.Sprintf("%dd", int(d.Hours()/24)) +} + +// GetFirstPodName returns the name of the first pod in a namespace +func (k *Kubectl) GetFirstPodName(namespace string) (string, error) { + pods, err := k.GetPods(namespace) + if err != nil { + return "", err + } + if len(pods) == 0 { + return "", fmt.Errorf("no pods found in namespace %s", namespace) + } + return pods[0].Name, nil +} + +// GetPodContainers returns container names for a pod +func (k *Kubectl) GetPodContainers(namespace, podName string) ([]string, error) { + args := []string{ + "get", "pod", podName, + "-n", namespace, + "-o", "jsonpath={.spec.containers[*].name}", + } + + if k.kubeconfigPath != "" { + args = append([]string{"--kubeconfig", k.kubeconfigPath}, args...) + } + + cmd := exec.Command("kubectl", args...) + output, err := cmd.Output() + if err != nil { + return nil, fmt.Errorf("failed to get pod containers: %w", err) + } + + containerNames := strings.Fields(string(output)) + return containerNames, nil +}