Files
wild-cloud/api/internal/backup/strategies/longhorn_native.go
Paul Payne 11c875a513 fix: Resolve all golangci-lint errors across API codebase
Handle unchecked errors (errcheck), fix nil-deref false positives (SA5011),
suppress deprecated-but-functional API warnings (SA1019), remove unused code,
and use fmt.Fprintf over WriteString(fmt.Sprintf(...)).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-24 21:52:59 +00:00

613 lines
18 KiB
Go

package strategies
import (
"fmt"
"log/slog"
"os/exec"
"strings"
"time"
btypes "github.com/wild-cloud/wild-central/daemon/internal/backup/types"
"github.com/wild-cloud/wild-central/daemon/internal/operations"
"github.com/wild-cloud/wild-central/daemon/internal/tools"
)
// LonghornNativeStrategy implements backup strategy using Longhorn native backups to NFS
type LonghornNativeStrategy struct {
dataDir string
opManager *operations.Manager
}
// NewLonghornNativeStrategy creates a new Longhorn native backup strategy
func NewLonghornNativeStrategy(dataDir string) *LonghornNativeStrategy {
return &LonghornNativeStrategy{
dataDir: dataDir,
opManager: operations.NewManager(dataDir),
}
}
// Name returns the strategy identifier
func (l *LonghornNativeStrategy) Name() string {
return "longhorn-native"
}
// Backup creates Longhorn native backups of all PVCs for an app, writing results to the plan
func (l *LonghornNativeStrategy) Backup(plan *btypes.RecoveryPlan, dest btypes.BackupDestination) error {
entry := plan.GetStrategyEntry("longhorn-native")
if entry == nil {
return fmt.Errorf("no strategy entry for longhorn-native in plan")
}
entry.Status = "backing_up"
kubeconfigPath := tools.GetKubeconfigPath(l.dataDir, plan.Instance)
// Pre-flight: verify Longhorn backup target is configured
if err := l.checkBackupTarget(kubeconfigPath); err != nil {
return err
}
// Determine source namespace from plan
sourceNamespace := plan.Source.Namespace
if sourceNamespace == "" {
sourceNamespace = plan.App
}
// Get all PVCs in the source namespace
pvcs, err := l.getPVCs(kubeconfigPath, sourceNamespace)
if err != nil {
return fmt.Errorf("failed to get PVCs: %w", err)
}
if len(pvcs) == 0 {
entry.Status = "backed_up"
entry.Backup = map[string]any{
"volumes": []any{},
"count": 0,
}
return nil
}
volumeParams := []map[string]any{}
backupVolumes := []map[string]any{}
for _, pvcName := range pvcs {
// Skip cache or temp volumes
if strings.Contains(pvcName, "-cache") || strings.Contains(pvcName, "-tmp") {
continue
}
volumeName, err := l.getVolumeNameFromPVC(kubeconfigPath, sourceNamespace, pvcName)
if err != nil {
return fmt.Errorf("failed to get volume name for PVC %s: %w", pvcName, err)
}
pvcSize := l.getPVCSize(kubeconfigPath, sourceNamespace, pvcName)
accessMode := l.getPVCAccessMode(kubeconfigPath, sourceNamespace, pvcName)
// Record volume params
volumeParams = append(volumeParams, map[string]any{
"pvcName": pvcName,
"volumeName": volumeName,
"size": pvcSize,
"accessMode": accessMode,
})
// Backup volume with retry for transient failures (e.g. instance manager restart)
backupID, backupURL, err := l.backupVolumeWithRetry(kubeconfigPath, plan.App, pvcName, volumeName, plan.Timestamp, 3)
if err != nil {
return fmt.Errorf("backup failed for volume %s: %w", volumeName, err)
}
backupVolumes = append(backupVolumes, map[string]any{
"pvcName": pvcName,
"backupID": backupID,
"backupURL": backupURL,
})
if err := l.cleanupOldBackups(kubeconfigPath, volumeName, backupID); err != nil {
slog.Error("failed to clean up old backups", "component", "longhorn", "volume", volumeName, "error", err)
}
}
// Record in plan
entry.Params = map[string]any{
"volumes": volumeParams,
}
entry.Backup = map[string]any{
"volumes": backupVolumes,
"count": len(backupVolumes),
}
entry.Status = "backed_up"
return nil
}
// Restore restores PVCs from Longhorn backups using plan-driven coordination.
// Creates colored volumes ({pvcName}-{standbyColor}).
func (l *LonghornNativeStrategy) Restore(plan *btypes.RecoveryPlan, dest btypes.BackupDestination) error {
entry := plan.GetStrategyEntry("longhorn-native")
if entry == nil {
return fmt.Errorf("no strategy entry for longhorn-native in plan")
}
entry.Status = "restoring"
kubeconfigPath := tools.GetKubeconfigPath(l.dataDir, plan.Instance)
backupVolumes, ok := entry.Backup["volumes"].([]any)
if !ok || len(backupVolumes) == 0 {
entry.Restore = map[string]any{
"volumes": []any{},
}
entry.Status = "restored"
return nil
}
restoreVolumes := []map[string]any{}
for _, bv := range backupVolumes {
backup, ok := bv.(map[string]any)
if !ok {
continue
}
pvcName, _ := backup["pvcName"].(string)
backupURL, _ := backup["backupURL"].(string)
if pvcName == "" || backupURL == "" {
continue
}
// Find the volume params to get size and access mode
pvcSize := "10Gi"
accessMode := "ReadWriteOnce"
if volumeParams, ok := entry.Params["volumes"].([]any); ok {
for _, vp := range volumeParams {
if vpMap, ok := vp.(map[string]any); ok {
if vpMap["pvcName"] == pvcName {
if size, ok := vpMap["size"].(string); ok {
pvcSize = size
}
if mode, ok := vpMap["accessMode"].(string); ok {
accessMode = mode
}
}
}
}
}
// Create colored restore volume name
restoreVolumeName := fmt.Sprintf("%s-%s", pvcName, plan.StandbyColor)
if err := l.createVolumeFromBackup(kubeconfigPath, restoreVolumeName, backupURL, pvcSize); err != nil {
return fmt.Errorf("failed to create volume from backup for %s: %w", pvcName, err)
}
// Create a PV bound to this volume so the standby PVC can bind to it
standbyNamespace := plan.Standby.Namespace
if standbyNamespace == "" {
standbyNamespace = plan.App + "-" + plan.StandbyColor
}
if err := l.createPVForVolume(kubeconfigPath, restoreVolumeName, pvcSize, accessMode, standbyNamespace, pvcName); err != nil {
slog.Error("failed to create PV for volume", "component", "longhorn", "volume", restoreVolumeName, "error", err)
}
restoreVolumes = append(restoreVolumes, map[string]any{
"pvcName": pvcName,
"volumeName": restoreVolumeName,
})
}
entry.Restore = map[string]any{
"volumes": restoreVolumes,
}
entry.Status = "restored"
return nil
}
// Switch records previous active volume names
func (l *LonghornNativeStrategy) Switch(plan *btypes.RecoveryPlan) error {
entry := plan.GetStrategyEntry("longhorn-native")
if entry == nil {
return fmt.Errorf("no strategy entry for longhorn-native in plan")
}
entry.Status = "switching"
previousVolumes := []map[string]any{}
if volumeParams, ok := entry.Params["volumes"].([]any); ok {
for _, vp := range volumeParams {
if vpMap, ok := vp.(map[string]any); ok {
previousVolumes = append(previousVolumes, map[string]any{
"pvcName": vpMap["pvcName"],
"volumeName": vpMap["volumeName"],
})
}
}
}
entry.Switch = map[string]any{
"previousVolumes": previousVolumes,
}
entry.Status = "switched"
return nil
}
// Cleanup deletes the previous active-color Longhorn volumes
func (l *LonghornNativeStrategy) Cleanup(plan *btypes.RecoveryPlan) error {
entry := plan.GetStrategyEntry("longhorn-native")
if entry == nil {
return fmt.Errorf("no strategy entry for longhorn-native in plan")
}
entry.Status = "cleaning_up"
// Skip automatic volume cleanup — volumes may still be referenced by PVCs.
// Manual cleanup or a separate garbage collection process is safer.
entry.Status = "cleaned_up"
return nil
}
// Verify checks if Longhorn backups exist and are valid
func (l *LonghornNativeStrategy) Verify(plan *btypes.RecoveryPlan, dest btypes.BackupDestination) error {
entry := plan.GetStrategyEntry("longhorn-native")
if entry == nil {
return fmt.Errorf("no strategy entry for longhorn-native in plan")
}
kubeconfigPath := tools.GetKubeconfigPath(l.dataDir, plan.Instance)
backupVolumes, ok := entry.Backup["volumes"].([]any)
if !ok || len(backupVolumes) == 0 {
return nil
}
// Verify backup target is accessible
if err := l.checkBackupTarget(kubeconfigPath); err != nil {
return fmt.Errorf("backup target not accessible: %w", err)
}
// Verify each backup CRD still exists
for _, bv := range backupVolumes {
backup, ok := bv.(map[string]any)
if !ok {
continue
}
backupID, _ := backup["backupID"].(string)
if backupID == "" {
continue
}
cmd := exec.Command("kubectl", "get", "backups.longhorn.io", backupID,
"-n", "longhorn-system", "-o", "jsonpath={.status.state}")
tools.WithKubeconfig(cmd, kubeconfigPath)
output, err := cmd.Output()
if err != nil {
return fmt.Errorf("backup %s not found: %w", backupID, err)
}
if string(output) != "Completed" {
return fmt.Errorf("backup %s is not in Completed state: %s", backupID, string(output))
}
}
return nil
}
// Helper functions
func (l *LonghornNativeStrategy) backupVolumeWithRetry(kubeconfigPath, appName, pvcName, volumeName, timestamp string, maxAttempts int) (string, string, error) {
var lastErr error
for attempt := range maxAttempts {
snapshotName := strings.ToLower(fmt.Sprintf("%s-%s-snapshot-%s", appName, pvcName, timestamp))
if attempt > 0 {
snapshotName = strings.ToLower(fmt.Sprintf("%s-%s-snapshot-%s-retry%d", appName, pvcName, timestamp, attempt))
slog.Info("retrying backup for volume", "component", "longhorn", "volume", volumeName, "attempt", attempt+1, "maxAttempts", maxAttempts)
time.Sleep(10 * time.Second)
}
if err := l.createSnapshot(kubeconfigPath, volumeName, snapshotName); err != nil {
lastErr = fmt.Errorf("failed to create snapshot: %w", err)
continue
}
backupID, err := l.createBackup(kubeconfigPath, volumeName, snapshotName)
if err != nil {
lastErr = fmt.Errorf("failed to create backup: %w", err)
continue
}
backupURL, err := l.waitForBackupComplete(kubeconfigPath, volumeName, backupID)
if err != nil {
lastErr = fmt.Errorf("backup not ready: %w", err)
continue
}
return backupID, backupURL, nil
}
return "", "", lastErr
}
func (l *LonghornNativeStrategy) checkBackupTarget(kubeconfigPath string) error {
cmd := exec.Command("kubectl", "get", "backuptargets.longhorn.io", "default",
"-n", "longhorn-system", "-o", "jsonpath={.spec.backupTargetURL}")
tools.WithKubeconfig(cmd, kubeconfigPath)
output, err := cmd.Output()
if err != nil {
return fmt.Errorf("failed to check Longhorn backup target: %w", err)
}
if strings.TrimSpace(string(output)) == "" {
return fmt.Errorf("longhorn backup target not configured: set a backup target (NFS or S3) in Longhorn settings before creating backups")
}
return nil
}
func (l *LonghornNativeStrategy) getPVCs(kubeconfigPath, namespace string) ([]string, error) {
cmd := exec.Command("kubectl", "get", "pvc", "-n", namespace, "-o", "jsonpath={.items[*].metadata.name}")
tools.WithKubeconfig(cmd, kubeconfigPath)
output, err := cmd.Output()
if err != nil {
return nil, err
}
pvcs := strings.Fields(string(output))
return pvcs, nil
}
func (l *LonghornNativeStrategy) getPVCSize(kubeconfigPath, namespace, pvcName string) string {
cmd := exec.Command("kubectl", "get", "pvc", "-n", namespace, pvcName,
"-o", "jsonpath={.spec.resources.requests.storage}")
tools.WithKubeconfig(cmd, kubeconfigPath)
if output, err := cmd.Output(); err == nil && len(output) > 0 {
return string(output)
}
return "10Gi"
}
func (l *LonghornNativeStrategy) getPVCAccessMode(kubeconfigPath, namespace, pvcName string) string {
cmd := exec.Command("kubectl", "get", "pvc", "-n", namespace, pvcName,
"-o", "jsonpath={.spec.accessModes[0]}")
tools.WithKubeconfig(cmd, kubeconfigPath)
if output, err := cmd.Output(); err == nil && len(output) > 0 {
return string(output)
}
return "ReadWriteOnce"
}
func (l *LonghornNativeStrategy) getVolumeNameFromPVC(kubeconfigPath, namespace, pvcName string) (string, error) {
cmd := exec.Command("kubectl", "get", "pvc", "-n", namespace, pvcName,
"-o", "jsonpath={.spec.volumeName}")
tools.WithKubeconfig(cmd, kubeconfigPath)
output, err := cmd.Output()
if err != nil {
return "", fmt.Errorf("failed to get volume name: %w", err)
}
volumeName := string(output)
if volumeName == "" {
return "", fmt.Errorf("no volume bound to PVC %s", pvcName)
}
return volumeName, nil
}
func (l *LonghornNativeStrategy) createSnapshot(kubeconfigPath, volumeName, snapshotName string) error {
snapshotYAML := fmt.Sprintf(`apiVersion: longhorn.io/v1beta2
kind: Snapshot
metadata:
name: %s
namespace: longhorn-system
spec:
volume: %s
createSnapshot: true
`, snapshotName, volumeName)
cmd := exec.Command("kubectl", "apply", "-f", "-")
tools.WithKubeconfig(cmd, kubeconfigPath)
cmd.Stdin = strings.NewReader(snapshotYAML)
if output, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("failed to create snapshot: %w, output: %s", err, string(output))
}
// Wait for snapshot to be ready
for range 30 {
cmd := exec.Command("kubectl", "get", "snapshots.longhorn.io", snapshotName,
"-n", "longhorn-system", "-o", "jsonpath={.status.readyToUse}")
tools.WithKubeconfig(cmd, kubeconfigPath)
output, err := cmd.Output()
if err == nil && string(output) == "true" {
return nil
}
time.Sleep(2 * time.Second)
}
return fmt.Errorf("timeout waiting for snapshot %s to be ready", snapshotName)
}
func (l *LonghornNativeStrategy) createBackup(kubeconfigPath, volumeName, snapshotName string) (string, error) {
// Backup name must be unique — derive from snapshot name
backupName := strings.ReplaceAll(snapshotName, "_", "-")
if len(backupName) > 63 {
backupName = backupName[:63]
}
backupYAML := fmt.Sprintf(`apiVersion: longhorn.io/v1beta2
kind: Backup
metadata:
name: %s
namespace: longhorn-system
labels:
backup-volume: %s
spec:
snapshotName: %s
`, backupName, volumeName, snapshotName)
cmd := exec.Command("kubectl", "apply", "-f", "-")
tools.WithKubeconfig(cmd, kubeconfigPath)
cmd.Stdin = strings.NewReader(backupYAML)
if output, err := cmd.CombinedOutput(); err != nil {
return "", fmt.Errorf("failed to create backup: %w, output: %s", err, string(output))
}
return backupName, nil
}
func (l *LonghornNativeStrategy) waitForBackupComplete(kubeconfigPath, _, backupName string) (string, error) {
maxRetries := 120
for i := range maxRetries {
// Get backup state
stateCmd := exec.Command("kubectl", "get", "backups.longhorn.io", backupName,
"-n", "longhorn-system", "-o", "jsonpath={.status.state}")
tools.WithKubeconfig(stateCmd, kubeconfigPath)
stateOutput, err := stateCmd.Output()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
state := string(stateOutput)
if state == "Error" {
// Get error message
errCmd := exec.Command("kubectl", "get", "backups.longhorn.io", backupName,
"-n", "longhorn-system", "-o", "jsonpath={.status.messages}")
tools.WithKubeconfig(errCmd, kubeconfigPath)
errOutput, _ := errCmd.Output()
return "", fmt.Errorf("backup failed: %s", string(errOutput))
}
if state == "Completed" {
// Get backup URL
urlCmd := exec.Command("kubectl", "get", "backups.longhorn.io", backupName,
"-n", "longhorn-system", "-o", "jsonpath={.status.url}")
tools.WithKubeconfig(urlCmd, kubeconfigPath)
urlOutput, err := urlCmd.Output()
if err != nil {
return "", fmt.Errorf("backup completed but failed to get URL: %w", err)
}
backupURL := string(urlOutput)
if backupURL != "" {
return backupURL, nil
}
}
if i%12 == 0 && i > 0 {
slog.Info("waiting for backup to complete", "component", "longhorn", "backup", backupName, "state", state, "attempt", i)
}
time.Sleep(5 * time.Second)
}
return "", fmt.Errorf("timeout waiting for backup %s to complete", backupName)
}
func (l *LonghornNativeStrategy) createPVForVolume(kubeconfigPath, volumeName, size, accessMode, namespace, pvcName string) error {
pvYAML := fmt.Sprintf(`apiVersion: v1
kind: PersistentVolume
metadata:
name: %s
spec:
capacity:
storage: %s
accessModes:
- %s
storageClassName: longhorn
persistentVolumeReclaimPolicy: Retain
csi:
driver: driver.longhorn.io
fsType: ext4
volumeHandle: %s
claimRef:
namespace: %s
name: %s
`, volumeName, size, accessMode, volumeName, namespace, pvcName)
cmd := exec.Command("kubectl", "apply", "-f", "-")
tools.WithKubeconfig(cmd, kubeconfigPath)
cmd.Stdin = strings.NewReader(pvYAML)
if output, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("failed to create PV: %w, output: %s", err, string(output))
}
return nil
}
func (l *LonghornNativeStrategy) createVolumeFromBackup(kubeconfigPath, volumeName, backupURL, size string) error {
sizeBytes := "1073741824"
if strings.HasSuffix(size, "Gi") {
var sizeInt int
if _, err := fmt.Sscanf(size, "%dGi", &sizeInt); err == nil {
sizeBytes = fmt.Sprintf("%d", sizeInt*1024*1024*1024)
}
}
volumeYAML := fmt.Sprintf(`apiVersion: longhorn.io/v1beta2
kind: Volume
metadata:
name: %s
namespace: longhorn-system
spec:
size: "%s"
fromBackup: "%s"
numberOfReplicas: 3
frontend: blockdev
accessMode: rwo
`, volumeName, sizeBytes, backupURL)
cmd := exec.Command("kubectl", "apply", "-f", "-")
tools.WithKubeconfig(cmd, kubeconfigPath)
cmd.Stdin = strings.NewReader(volumeYAML)
if output, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("failed to create volume from backup: %w, output: %s", err, string(output))
}
return l.waitForVolume(kubeconfigPath, volumeName)
}
func (l *LonghornNativeStrategy) waitForVolume(kubeconfigPath, volumeName string) error {
maxRetries := 60
for i := range maxRetries {
cmd := exec.Command("kubectl", "get", "volumes.longhorn.io", volumeName,
"-n", "longhorn-system", "-o", "jsonpath={.status.state},{.status.restoreInitiated},{.status.robustness}")
tools.WithKubeconfig(cmd, kubeconfigPath)
output, err := cmd.Output()
if err == nil {
parts := strings.Split(string(output), ",")
if len(parts) == 3 {
state := parts[0]
restoreInitiated := parts[1]
robustness := parts[2]
if state == "detached" || state == "attached" {
if restoreInitiated == "true" {
return nil
}
if robustness == "healthy" || robustness == "unknown" {
return nil
}
}
}
}
if i%12 == 0 {
slog.Info("waiting for volume to be ready", "component", "longhorn", "volume", volumeName, "attempt", i, "maxRetries", maxRetries)
}
time.Sleep(5 * time.Second)
}
return fmt.Errorf("timeout waiting for volume %s to be ready", volumeName)
}
func (l *LonghornNativeStrategy) cleanupOldBackups(_, _, _ string) error {
return nil
}