feat: Phase 2.7+3 — full integration, config update, systemd unit
- Wire all goroutines in main.go: watcher, quota monitor, account switcher, dispatcher, janitor, and 10s state flush loop - Add missing sections to config.example.yaml: notifications, dispatcher, watcher, janitor - Add scripts/claude-failover.service systemd unit Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
d4260f71b4
commit
28f82a66c8
5 changed files with 390 additions and 1 deletions
|
|
@ -8,12 +8,19 @@ import (
|
|||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/api"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/dispatcher"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/janitor"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/lifecycle"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/notify"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/quota"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/switcher"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/tmux"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/watcher"
|
||||
)
|
||||
|
||||
const version = "0.1.0"
|
||||
|
|
@ -53,6 +60,43 @@ func main() {
|
|||
|
||||
go lm.Run(ctx)
|
||||
|
||||
// Notifier — reads credentials from environment variables.
|
||||
notifier := notify.New(cfg)
|
||||
|
||||
// Session Watcher — detects when sessions finish their tasks.
|
||||
sw := watcher.New(tmuxClient, s, cfg)
|
||||
go sw.Run(ctx)
|
||||
|
||||
// Quota Monitor — polls panes for quota exhaustion signals.
|
||||
qm := quota.New(tmuxClient, s, cfg)
|
||||
go qm.Run(ctx)
|
||||
|
||||
// Account Switcher — orchestrates account failover on quota exhaustion.
|
||||
as := switcher.New(tmuxClient, s, cfg, qm.SwitchChan(), notifier)
|
||||
go as.Run(ctx)
|
||||
|
||||
// Dispatcher — assigns inbox tasks to idle sessions.
|
||||
disp := dispatcher.New(tmuxClient, s, cfg, sw.DoneChan())
|
||||
go disp.Run(ctx)
|
||||
|
||||
// Janitor — periodic cleanup of orphaned files and stale status.json.
|
||||
jan := janitor.New(s, cfg.Dispatcher.ProjectsDir)
|
||||
go jan.Run(ctx)
|
||||
|
||||
// State flush loop — persists state to disk every 10 seconds.
|
||||
go func() {
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.Flush() //nolint:errcheck
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Start HTTP API server.
|
||||
listenAddr := cfg.MCPHTTP.Listen
|
||||
if listenAddr == "" {
|
||||
|
|
@ -65,7 +109,8 @@ func main() {
|
|||
os.Exit(1)
|
||||
}
|
||||
}()
|
||||
log.Printf("claude-failover v%s started, API on %s", version, listenAddr)
|
||||
|
||||
log.Printf("claude-failover v%s — all goroutines running", version)
|
||||
|
||||
<-ctx.Done()
|
||||
log.Printf("shutdown signal received — flushing state and exiting")
|
||||
|
|
|
|||
|
|
@ -82,3 +82,35 @@ mcp_http:
|
|||
bearer_token_env: CLAUDE_FAILOVER_BEARER
|
||||
# Paths exposed (all read-only except explicitly listed mutating routes).
|
||||
enable_trigger: true # allow /trigger/dispatch, /trigger/swap
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# notifications
|
||||
# ---------------------------------------------------------------------------
|
||||
notifications:
|
||||
telegram_token_env: TELEGRAM_BOT_TOKEN
|
||||
telegram_chat_id_env: TELEGRAM_CHAT_ID
|
||||
resend_api_key_env: RESEND_API_KEY
|
||||
notify_email_env: CLAUDE_FAILOVER_NOTIFY_EMAIL
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# dispatcher
|
||||
# ---------------------------------------------------------------------------
|
||||
dispatcher:
|
||||
projects_dir: /home/ubuntu/projects
|
||||
idle_timeout: 60m
|
||||
prompt_timeout: 30s
|
||||
max_dispatch_per_task: 3
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# watcher
|
||||
# ---------------------------------------------------------------------------
|
||||
watcher:
|
||||
interval: 30s
|
||||
done_signal_dir: /tmp
|
||||
idle_timeout: 60m
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# janitor
|
||||
# ---------------------------------------------------------------------------
|
||||
janitor:
|
||||
interval: 5m
|
||||
|
|
|
|||
199
internal/janitor/janitor.go
Normal file
199
internal/janitor/janitor.go
Normal file
|
|
@ -0,0 +1,199 @@
|
|||
// Package janitor provides a periodic cleanup goroutine for the claude-failover
|
||||
// daemon. It removes orphaned dispatch-meta files, rewrites agent-queue
|
||||
// status.json with accurate filesystem counters, and deletes stale
|
||||
// /tmp/agent-done-* files.
|
||||
package janitor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
||||
)
|
||||
|
||||
const defaultInterval = 5 * time.Minute
|
||||
|
||||
// agentQueueStatus mirrors the JSON written to .agent-queue/status.json.
|
||||
// Only the counter fields and state/current_task are managed here.
|
||||
type agentQueueStatus struct {
|
||||
State string `json:"state"`
|
||||
CurrentTask *string `json:"current_task"`
|
||||
LastHB string `json:"last_heartbeat"`
|
||||
InboxCount int `json:"inbox_count"`
|
||||
DoneCount int `json:"done_count"`
|
||||
FailedCount int `json:"failed_count"`
|
||||
}
|
||||
|
||||
// Janitor runs periodic filesystem cleanup tasks.
|
||||
type Janitor struct {
|
||||
state *state.State
|
||||
projectsDir string
|
||||
interval time.Duration
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
// New returns a Janitor with the default 5-minute interval.
|
||||
func New(s *state.State, projectsDir string) *Janitor {
|
||||
return &Janitor{
|
||||
state: s,
|
||||
projectsDir: projectsDir,
|
||||
interval: defaultInterval,
|
||||
logger: log.New(os.Stderr, "[janitor] ", log.LstdFlags),
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts the cleanup loop. It blocks until ctx is cancelled.
|
||||
func (j *Janitor) Run(ctx context.Context) {
|
||||
ticker := time.NewTicker(j.interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
j.cleanup()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// cleanup performs a single cleanup pass.
|
||||
func (j *Janitor) cleanup() {
|
||||
j.cleanupProjects()
|
||||
j.cleanupStaleAgentDoneFiles()
|
||||
}
|
||||
|
||||
// cleanupProjects iterates every project sub-directory inside projectsDir.
|
||||
func (j *Janitor) cleanupProjects() {
|
||||
entries, err := os.ReadDir(j.projectsDir)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
j.logger.Printf("readdir %s: %v", j.projectsDir, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Build the set of session names currently working, keyed by task ID.
|
||||
workingByTask := make(map[string]struct{})
|
||||
j.state.ForEachWorking(func(_ string, sess *state.SessionState) {
|
||||
if sess.Task != nil {
|
||||
workingByTask[*sess.Task] = struct{}{}
|
||||
}
|
||||
})
|
||||
|
||||
for _, e := range entries {
|
||||
if !e.IsDir() {
|
||||
continue
|
||||
}
|
||||
projectDir := filepath.Join(j.projectsDir, e.Name())
|
||||
inboxDir := filepath.Join(projectDir, ".agent-queue", "inbox")
|
||||
doneDir := filepath.Join(projectDir, ".agent-queue", "done")
|
||||
failedDir := filepath.Join(projectDir, ".agent-queue", "failed")
|
||||
|
||||
j.cleanupOrphanDispatchMeta(inboxDir, workingByTask)
|
||||
j.rewriteStatusJSON(projectDir, inboxDir, doneDir, failedDir)
|
||||
}
|
||||
}
|
||||
|
||||
// cleanupOrphanDispatchMeta removes *.md.dispatch-meta files whose associated
|
||||
// task is no longer tracked by any working session.
|
||||
func (j *Janitor) cleanupOrphanDispatchMeta(inboxDir string, workingByTask map[string]struct{}) {
|
||||
metas, err := filepath.Glob(filepath.Join(inboxDir, "*.md.dispatch-meta"))
|
||||
if err != nil || len(metas) == 0 {
|
||||
return
|
||||
}
|
||||
for _, metaPath := range metas {
|
||||
// Derive the task name from the meta filename: strip the .dispatch-meta suffix.
|
||||
taskFile := metaPath[:len(metaPath)-len(".dispatch-meta")]
|
||||
taskName := filepath.Base(taskFile)
|
||||
// Strip .md suffix to get bare task ID.
|
||||
if len(taskName) > 3 {
|
||||
taskName = taskName[:len(taskName)-3] // remove ".md"
|
||||
}
|
||||
if _, active := workingByTask[taskName]; !active {
|
||||
j.logger.Printf("removing orphaned dispatch-meta: %s", metaPath)
|
||||
if err := os.Remove(metaPath); err != nil && !os.IsNotExist(err) {
|
||||
j.logger.Printf("remove %s: %v", metaPath, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// rewriteStatusJSON recalculates real filesystem counters and rewrites
|
||||
// .agent-queue/status.json, preserving state/current_task when agent is working.
|
||||
func (j *Janitor) rewriteStatusJSON(projectDir, inboxDir, doneDir, failedDir string) {
|
||||
statusPath := filepath.Join(projectDir, ".agent-queue", "status.json")
|
||||
|
||||
inboxCount := countMDFiles(inboxDir)
|
||||
doneCount := countMDFiles(doneDir)
|
||||
failedCount := countMDFiles(failedDir)
|
||||
|
||||
// Read existing status to preserve agent-owned fields.
|
||||
existing := agentQueueStatus{
|
||||
State: "idle",
|
||||
CurrentTask: nil,
|
||||
}
|
||||
if data, err := os.ReadFile(statusPath); err == nil {
|
||||
_ = json.Unmarshal(data, &existing)
|
||||
}
|
||||
|
||||
// Only update counters; do NOT touch state/current_task if agent is working.
|
||||
existing.InboxCount = inboxCount
|
||||
existing.DoneCount = doneCount
|
||||
existing.FailedCount = failedCount
|
||||
existing.LastHB = time.Now().UTC().Format(time.RFC3339)
|
||||
|
||||
data, err := json.Marshal(existing)
|
||||
if err != nil {
|
||||
j.logger.Printf("marshal status.json for %s: %v", projectDir, err)
|
||||
return
|
||||
}
|
||||
tmp := statusPath + ".tmp"
|
||||
if err := os.WriteFile(tmp, data, 0644); err != nil {
|
||||
j.logger.Printf("write tmp status.json for %s: %v", projectDir, err)
|
||||
return
|
||||
}
|
||||
if err := os.Rename(tmp, statusPath); err != nil {
|
||||
j.logger.Printf("rename status.json for %s: %v", projectDir, err)
|
||||
}
|
||||
}
|
||||
|
||||
// countMDFiles counts *.md files in dir that are NOT *.dispatched.
|
||||
func countMDFiles(dir string) int {
|
||||
entries, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
count := 0
|
||||
for _, e := range entries {
|
||||
name := e.Name()
|
||||
if filepath.Ext(name) == ".md" {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
// cleanupStaleAgentDoneFiles deletes /tmp/agent-done-* files older than 1 hour.
|
||||
func (j *Janitor) cleanupStaleAgentDoneFiles() {
|
||||
matches, err := filepath.Glob("/tmp/agent-done-*")
|
||||
if err != nil || len(matches) == 0 {
|
||||
return
|
||||
}
|
||||
cutoff := time.Now().Add(-1 * time.Hour)
|
||||
for _, path := range matches {
|
||||
info, err := os.Stat(path)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if info.ModTime().Before(cutoff) {
|
||||
j.logger.Printf("removing stale agent-done file: %s", path)
|
||||
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
|
||||
j.logger.Printf("remove %s: %v", path, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
96
internal/janitor/janitor_test.go
Normal file
96
internal/janitor/janitor_test.go
Normal file
|
|
@ -0,0 +1,96 @@
|
|||
package janitor
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
||||
)
|
||||
|
||||
func TestNew(t *testing.T) {
|
||||
s := state.New("/tmp/test-state.json")
|
||||
j := New(s, "/tmp/projects")
|
||||
|
||||
if j == nil {
|
||||
t.Fatal("New() returned nil")
|
||||
}
|
||||
if j.interval != defaultInterval {
|
||||
t.Errorf("expected interval %v, got %v", defaultInterval, j.interval)
|
||||
}
|
||||
if j.state != s {
|
||||
t.Error("state not assigned correctly")
|
||||
}
|
||||
if j.projectsDir != "/tmp/projects" {
|
||||
t.Errorf("projectsDir not assigned correctly: %s", j.projectsDir)
|
||||
}
|
||||
if j.logger == nil {
|
||||
t.Error("logger should not be nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanupStaleAgentDoneFiles(t *testing.T) {
|
||||
// Create a stale file (older than 1h).
|
||||
staleFile := fmt.Sprintf("/tmp/agent-done-test-%d", time.Now().UnixNano())
|
||||
if err := os.WriteFile(staleFile, []byte("done"), 0644); err != nil {
|
||||
t.Fatalf("create stale file: %v", err)
|
||||
}
|
||||
// Age the file to 2 hours ago.
|
||||
old := time.Now().Add(-2 * time.Hour)
|
||||
if err := os.Chtimes(staleFile, old, old); err != nil {
|
||||
t.Fatalf("chtimes: %v", err)
|
||||
}
|
||||
|
||||
// Create a recent file (should NOT be deleted).
|
||||
recentFile := fmt.Sprintf("/tmp/agent-done-test-recent-%d", time.Now().UnixNano())
|
||||
if err := os.WriteFile(recentFile, []byte("recent"), 0644); err != nil {
|
||||
t.Fatalf("create recent file: %v", err)
|
||||
}
|
||||
defer os.Remove(recentFile)
|
||||
|
||||
s := state.New("/tmp/test-state.json")
|
||||
j := New(s, "/tmp/projects")
|
||||
j.cleanupStaleAgentDoneFiles()
|
||||
|
||||
// Stale file should be gone.
|
||||
if _, err := os.Stat(staleFile); !os.IsNotExist(err) {
|
||||
os.Remove(staleFile)
|
||||
t.Errorf("stale file %s should have been deleted", staleFile)
|
||||
}
|
||||
|
||||
// Recent file should still exist.
|
||||
if _, err := os.Stat(recentFile); os.IsNotExist(err) {
|
||||
t.Errorf("recent file %s should NOT have been deleted", recentFile)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanupOrphanDispatchMeta(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
inboxDir := filepath.Join(tmpDir, ".agent-queue", "inbox")
|
||||
if err := os.MkdirAll(inboxDir, 0755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Create a task file and its dispatch-meta.
|
||||
taskFile := filepath.Join(inboxDir, "my-task.md")
|
||||
metaFile := taskFile + ".dispatch-meta"
|
||||
os.WriteFile(taskFile, []byte("task"), 0644)
|
||||
os.WriteFile(metaFile, []byte("meta"), 0644)
|
||||
|
||||
s := state.New("/tmp/test-state.json")
|
||||
j := New(s, tmpDir)
|
||||
|
||||
// No working sessions → dispatch-meta should be removed.
|
||||
workingByTask := make(map[string]struct{})
|
||||
j.cleanupOrphanDispatchMeta(inboxDir, workingByTask)
|
||||
|
||||
if _, err := os.Stat(metaFile); !os.IsNotExist(err) {
|
||||
t.Errorf("orphan dispatch-meta %s should have been deleted", metaFile)
|
||||
}
|
||||
// Task file itself should still exist.
|
||||
if _, err := os.Stat(taskFile); os.IsNotExist(err) {
|
||||
t.Errorf("task file %s should NOT have been deleted", taskFile)
|
||||
}
|
||||
}
|
||||
17
scripts/claude-failover.service
Normal file
17
scripts/claude-failover.service
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
[Unit]
|
||||
Description=Claude Failover — Session Orchestrator
|
||||
After=network.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User=ubuntu
|
||||
ExecStartPre=/usr/bin/loginctl enable-linger ubuntu
|
||||
ExecStart=/usr/local/bin/claude-failover --config /etc/claude-failover/config.yaml
|
||||
Restart=always
|
||||
RestartSec=5
|
||||
KillMode=mixed
|
||||
TimeoutStopSec=60
|
||||
EnvironmentFile=-/etc/claude-failover/env
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
Loading…
Add table
Add a link
Reference in a new issue