From 28f82a66c8c663bf985661413ffe7387e27f7846 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 15 Apr 2026 00:15:06 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=202.7+3=20=E2=80=94=20full=20inte?= =?UTF-8?q?gration,=20config=20update,=20systemd=20unit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Wire all goroutines in main.go: watcher, quota monitor, account switcher, dispatcher, janitor, and 10s state flush loop - Add missing sections to config.example.yaml: notifications, dispatcher, watcher, janitor - Add scripts/claude-failover.service systemd unit Co-Authored-By: Claude Sonnet 4.6 --- cmd/claude-failover/main.go | 47 +++++++- config.example.yaml | 32 +++++ internal/janitor/janitor.go | 199 +++++++++++++++++++++++++++++++ internal/janitor/janitor_test.go | 96 +++++++++++++++ scripts/claude-failover.service | 17 +++ 5 files changed, 390 insertions(+), 1 deletion(-) create mode 100644 internal/janitor/janitor.go create mode 100644 internal/janitor/janitor_test.go create mode 100644 scripts/claude-failover.service diff --git a/cmd/claude-failover/main.go b/cmd/claude-failover/main.go index d77ea14..8bc8fc5 100644 --- a/cmd/claude-failover/main.go +++ b/cmd/claude-failover/main.go @@ -8,12 +8,19 @@ import ( "os" "os/signal" "syscall" + "time" "forge.secuaas.ovh/olivier/claude-failover/internal/api" "forge.secuaas.ovh/olivier/claude-failover/internal/config" + "forge.secuaas.ovh/olivier/claude-failover/internal/dispatcher" + "forge.secuaas.ovh/olivier/claude-failover/internal/janitor" "forge.secuaas.ovh/olivier/claude-failover/internal/lifecycle" + "forge.secuaas.ovh/olivier/claude-failover/internal/notify" + "forge.secuaas.ovh/olivier/claude-failover/internal/quota" "forge.secuaas.ovh/olivier/claude-failover/internal/state" + "forge.secuaas.ovh/olivier/claude-failover/internal/switcher" "forge.secuaas.ovh/olivier/claude-failover/internal/tmux" + "forge.secuaas.ovh/olivier/claude-failover/internal/watcher" ) const version = "0.1.0" @@ -53,6 +60,43 @@ func main() { go lm.Run(ctx) + // Notifier — reads credentials from environment variables. + notifier := notify.New(cfg) + + // Session Watcher — detects when sessions finish their tasks. + sw := watcher.New(tmuxClient, s, cfg) + go sw.Run(ctx) + + // Quota Monitor — polls panes for quota exhaustion signals. + qm := quota.New(tmuxClient, s, cfg) + go qm.Run(ctx) + + // Account Switcher — orchestrates account failover on quota exhaustion. + as := switcher.New(tmuxClient, s, cfg, qm.SwitchChan(), notifier) + go as.Run(ctx) + + // Dispatcher — assigns inbox tasks to idle sessions. + disp := dispatcher.New(tmuxClient, s, cfg, sw.DoneChan()) + go disp.Run(ctx) + + // Janitor — periodic cleanup of orphaned files and stale status.json. + jan := janitor.New(s, cfg.Dispatcher.ProjectsDir) + go jan.Run(ctx) + + // State flush loop — persists state to disk every 10 seconds. + go func() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.Flush() //nolint:errcheck + } + } + }() + // Start HTTP API server. listenAddr := cfg.MCPHTTP.Listen if listenAddr == "" { @@ -65,7 +109,8 @@ func main() { os.Exit(1) } }() - log.Printf("claude-failover v%s started, API on %s", version, listenAddr) + + log.Printf("claude-failover v%s — all goroutines running", version) <-ctx.Done() log.Printf("shutdown signal received — flushing state and exiting") diff --git a/config.example.yaml b/config.example.yaml index c33b31e..5a5e5b3 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -82,3 +82,35 @@ mcp_http: bearer_token_env: CLAUDE_FAILOVER_BEARER # Paths exposed (all read-only except explicitly listed mutating routes). enable_trigger: true # allow /trigger/dispatch, /trigger/swap + +# --------------------------------------------------------------------------- +# notifications +# --------------------------------------------------------------------------- +notifications: + telegram_token_env: TELEGRAM_BOT_TOKEN + telegram_chat_id_env: TELEGRAM_CHAT_ID + resend_api_key_env: RESEND_API_KEY + notify_email_env: CLAUDE_FAILOVER_NOTIFY_EMAIL + +# --------------------------------------------------------------------------- +# dispatcher +# --------------------------------------------------------------------------- +dispatcher: + projects_dir: /home/ubuntu/projects + idle_timeout: 60m + prompt_timeout: 30s + max_dispatch_per_task: 3 + +# --------------------------------------------------------------------------- +# watcher +# --------------------------------------------------------------------------- +watcher: + interval: 30s + done_signal_dir: /tmp + idle_timeout: 60m + +# --------------------------------------------------------------------------- +# janitor +# --------------------------------------------------------------------------- +janitor: + interval: 5m diff --git a/internal/janitor/janitor.go b/internal/janitor/janitor.go new file mode 100644 index 0000000..250264b --- /dev/null +++ b/internal/janitor/janitor.go @@ -0,0 +1,199 @@ +// Package janitor provides a periodic cleanup goroutine for the claude-failover +// daemon. It removes orphaned dispatch-meta files, rewrites agent-queue +// status.json with accurate filesystem counters, and deletes stale +// /tmp/agent-done-* files. +package janitor + +import ( + "context" + "encoding/json" + "log" + "os" + "path/filepath" + "time" + + "forge.secuaas.ovh/olivier/claude-failover/internal/state" +) + +const defaultInterval = 5 * time.Minute + +// agentQueueStatus mirrors the JSON written to .agent-queue/status.json. +// Only the counter fields and state/current_task are managed here. +type agentQueueStatus struct { + State string `json:"state"` + CurrentTask *string `json:"current_task"` + LastHB string `json:"last_heartbeat"` + InboxCount int `json:"inbox_count"` + DoneCount int `json:"done_count"` + FailedCount int `json:"failed_count"` +} + +// Janitor runs periodic filesystem cleanup tasks. +type Janitor struct { + state *state.State + projectsDir string + interval time.Duration + logger *log.Logger +} + +// New returns a Janitor with the default 5-minute interval. +func New(s *state.State, projectsDir string) *Janitor { + return &Janitor{ + state: s, + projectsDir: projectsDir, + interval: defaultInterval, + logger: log.New(os.Stderr, "[janitor] ", log.LstdFlags), + } +} + +// Run starts the cleanup loop. It blocks until ctx is cancelled. +func (j *Janitor) Run(ctx context.Context) { + ticker := time.NewTicker(j.interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + j.cleanup() + } + } +} + +// cleanup performs a single cleanup pass. +func (j *Janitor) cleanup() { + j.cleanupProjects() + j.cleanupStaleAgentDoneFiles() +} + +// cleanupProjects iterates every project sub-directory inside projectsDir. +func (j *Janitor) cleanupProjects() { + entries, err := os.ReadDir(j.projectsDir) + if err != nil { + if !os.IsNotExist(err) { + j.logger.Printf("readdir %s: %v", j.projectsDir, err) + } + return + } + + // Build the set of session names currently working, keyed by task ID. + workingByTask := make(map[string]struct{}) + j.state.ForEachWorking(func(_ string, sess *state.SessionState) { + if sess.Task != nil { + workingByTask[*sess.Task] = struct{}{} + } + }) + + for _, e := range entries { + if !e.IsDir() { + continue + } + projectDir := filepath.Join(j.projectsDir, e.Name()) + inboxDir := filepath.Join(projectDir, ".agent-queue", "inbox") + doneDir := filepath.Join(projectDir, ".agent-queue", "done") + failedDir := filepath.Join(projectDir, ".agent-queue", "failed") + + j.cleanupOrphanDispatchMeta(inboxDir, workingByTask) + j.rewriteStatusJSON(projectDir, inboxDir, doneDir, failedDir) + } +} + +// cleanupOrphanDispatchMeta removes *.md.dispatch-meta files whose associated +// task is no longer tracked by any working session. +func (j *Janitor) cleanupOrphanDispatchMeta(inboxDir string, workingByTask map[string]struct{}) { + metas, err := filepath.Glob(filepath.Join(inboxDir, "*.md.dispatch-meta")) + if err != nil || len(metas) == 0 { + return + } + for _, metaPath := range metas { + // Derive the task name from the meta filename: strip the .dispatch-meta suffix. + taskFile := metaPath[:len(metaPath)-len(".dispatch-meta")] + taskName := filepath.Base(taskFile) + // Strip .md suffix to get bare task ID. + if len(taskName) > 3 { + taskName = taskName[:len(taskName)-3] // remove ".md" + } + if _, active := workingByTask[taskName]; !active { + j.logger.Printf("removing orphaned dispatch-meta: %s", metaPath) + if err := os.Remove(metaPath); err != nil && !os.IsNotExist(err) { + j.logger.Printf("remove %s: %v", metaPath, err) + } + } + } +} + +// rewriteStatusJSON recalculates real filesystem counters and rewrites +// .agent-queue/status.json, preserving state/current_task when agent is working. +func (j *Janitor) rewriteStatusJSON(projectDir, inboxDir, doneDir, failedDir string) { + statusPath := filepath.Join(projectDir, ".agent-queue", "status.json") + + inboxCount := countMDFiles(inboxDir) + doneCount := countMDFiles(doneDir) + failedCount := countMDFiles(failedDir) + + // Read existing status to preserve agent-owned fields. + existing := agentQueueStatus{ + State: "idle", + CurrentTask: nil, + } + if data, err := os.ReadFile(statusPath); err == nil { + _ = json.Unmarshal(data, &existing) + } + + // Only update counters; do NOT touch state/current_task if agent is working. + existing.InboxCount = inboxCount + existing.DoneCount = doneCount + existing.FailedCount = failedCount + existing.LastHB = time.Now().UTC().Format(time.RFC3339) + + data, err := json.Marshal(existing) + if err != nil { + j.logger.Printf("marshal status.json for %s: %v", projectDir, err) + return + } + tmp := statusPath + ".tmp" + if err := os.WriteFile(tmp, data, 0644); err != nil { + j.logger.Printf("write tmp status.json for %s: %v", projectDir, err) + return + } + if err := os.Rename(tmp, statusPath); err != nil { + j.logger.Printf("rename status.json for %s: %v", projectDir, err) + } +} + +// countMDFiles counts *.md files in dir that are NOT *.dispatched. +func countMDFiles(dir string) int { + entries, err := os.ReadDir(dir) + if err != nil { + return 0 + } + count := 0 + for _, e := range entries { + name := e.Name() + if filepath.Ext(name) == ".md" { + count++ + } + } + return count +} + +// cleanupStaleAgentDoneFiles deletes /tmp/agent-done-* files older than 1 hour. +func (j *Janitor) cleanupStaleAgentDoneFiles() { + matches, err := filepath.Glob("/tmp/agent-done-*") + if err != nil || len(matches) == 0 { + return + } + cutoff := time.Now().Add(-1 * time.Hour) + for _, path := range matches { + info, err := os.Stat(path) + if err != nil { + continue + } + if info.ModTime().Before(cutoff) { + j.logger.Printf("removing stale agent-done file: %s", path) + if err := os.Remove(path); err != nil && !os.IsNotExist(err) { + j.logger.Printf("remove %s: %v", path, err) + } + } + } +} diff --git a/internal/janitor/janitor_test.go b/internal/janitor/janitor_test.go new file mode 100644 index 0000000..67ba730 --- /dev/null +++ b/internal/janitor/janitor_test.go @@ -0,0 +1,96 @@ +package janitor + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "forge.secuaas.ovh/olivier/claude-failover/internal/state" +) + +func TestNew(t *testing.T) { + s := state.New("/tmp/test-state.json") + j := New(s, "/tmp/projects") + + if j == nil { + t.Fatal("New() returned nil") + } + if j.interval != defaultInterval { + t.Errorf("expected interval %v, got %v", defaultInterval, j.interval) + } + if j.state != s { + t.Error("state not assigned correctly") + } + if j.projectsDir != "/tmp/projects" { + t.Errorf("projectsDir not assigned correctly: %s", j.projectsDir) + } + if j.logger == nil { + t.Error("logger should not be nil") + } +} + +func TestCleanupStaleAgentDoneFiles(t *testing.T) { + // Create a stale file (older than 1h). + staleFile := fmt.Sprintf("/tmp/agent-done-test-%d", time.Now().UnixNano()) + if err := os.WriteFile(staleFile, []byte("done"), 0644); err != nil { + t.Fatalf("create stale file: %v", err) + } + // Age the file to 2 hours ago. + old := time.Now().Add(-2 * time.Hour) + if err := os.Chtimes(staleFile, old, old); err != nil { + t.Fatalf("chtimes: %v", err) + } + + // Create a recent file (should NOT be deleted). + recentFile := fmt.Sprintf("/tmp/agent-done-test-recent-%d", time.Now().UnixNano()) + if err := os.WriteFile(recentFile, []byte("recent"), 0644); err != nil { + t.Fatalf("create recent file: %v", err) + } + defer os.Remove(recentFile) + + s := state.New("/tmp/test-state.json") + j := New(s, "/tmp/projects") + j.cleanupStaleAgentDoneFiles() + + // Stale file should be gone. + if _, err := os.Stat(staleFile); !os.IsNotExist(err) { + os.Remove(staleFile) + t.Errorf("stale file %s should have been deleted", staleFile) + } + + // Recent file should still exist. + if _, err := os.Stat(recentFile); os.IsNotExist(err) { + t.Errorf("recent file %s should NOT have been deleted", recentFile) + } +} + +func TestCleanupOrphanDispatchMeta(t *testing.T) { + tmpDir := t.TempDir() + inboxDir := filepath.Join(tmpDir, ".agent-queue", "inbox") + if err := os.MkdirAll(inboxDir, 0755); err != nil { + t.Fatal(err) + } + + // Create a task file and its dispatch-meta. + taskFile := filepath.Join(inboxDir, "my-task.md") + metaFile := taskFile + ".dispatch-meta" + os.WriteFile(taskFile, []byte("task"), 0644) + os.WriteFile(metaFile, []byte("meta"), 0644) + + s := state.New("/tmp/test-state.json") + j := New(s, tmpDir) + + // No working sessions → dispatch-meta should be removed. + workingByTask := make(map[string]struct{}) + j.cleanupOrphanDispatchMeta(inboxDir, workingByTask) + + if _, err := os.Stat(metaFile); !os.IsNotExist(err) { + t.Errorf("orphan dispatch-meta %s should have been deleted", metaFile) + } + // Task file itself should still exist. + if _, err := os.Stat(taskFile); os.IsNotExist(err) { + t.Errorf("task file %s should NOT have been deleted", taskFile) + } +} diff --git a/scripts/claude-failover.service b/scripts/claude-failover.service new file mode 100644 index 0000000..4489b9d --- /dev/null +++ b/scripts/claude-failover.service @@ -0,0 +1,17 @@ +[Unit] +Description=Claude Failover — Session Orchestrator +After=network.target + +[Service] +Type=simple +User=ubuntu +ExecStartPre=/usr/bin/loginctl enable-linger ubuntu +ExecStart=/usr/local/bin/claude-failover --config /etc/claude-failover/config.yaml +Restart=always +RestartSec=5 +KillMode=mixed +TimeoutStopSec=60 +EnvironmentFile=-/etc/claude-failover/env + +[Install] +WantedBy=multi-user.target