diff --git a/go.mod b/go.mod index a9e40d0..e30b815 100644 --- a/go.mod +++ b/go.mod @@ -3,3 +3,8 @@ module forge.secuaas.ovh/olivier/claude-failover go 1.22 require gopkg.in/yaml.v3 v3.0.1 + +require ( + github.com/fsnotify/fsnotify v1.9.0 // indirect + golang.org/x/sys v0.13.0 // indirect +) diff --git a/go.sum b/go.sum index a62c313..c88ad8e 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/internal/dispatcher/dispatcher.go b/internal/dispatcher/dispatcher.go new file mode 100644 index 0000000..e434949 --- /dev/null +++ b/internal/dispatcher/dispatcher.go @@ -0,0 +1,317 @@ +// Package dispatcher watches project inbox directories and assigns tasks to +// idle tmux sessions. One task per session — batch dispatch is intentionally +// unsupported. +package dispatcher + +import ( + "context" + "fmt" + "log" + "os" + "path/filepath" + "strings" + "time" + + "github.com/fsnotify/fsnotify" + "gopkg.in/yaml.v3" + + "forge.secuaas.ovh/olivier/claude-failover/internal/config" + "forge.secuaas.ovh/olivier/claude-failover/internal/state" + "forge.secuaas.ovh/olivier/claude-failover/internal/tmux" +) + +// TaskFrontmatter is the YAML header parsed from task .md files. +type TaskFrontmatter struct { + Title string `yaml:"title"` + Priority string `yaml:"priority"` // critical, high, default, low + Tags []string `yaml:"tags"` + NeedsClaude bool `yaml:"needs_claude_code"` +} + +// Dispatcher watches project inbox directories and assigns tasks to idle sessions. +type Dispatcher struct { + tmux tmux.Client + state *state.State + config *config.Config + doneChan <-chan string + projectsDir string + logger *log.Logger +} + +// New creates a Dispatcher. +// doneChan receives session names when they become idle (from SessionWatcher). +func New(tc tmux.Client, s *state.State, cfg *config.Config, doneChan <-chan string) *Dispatcher { + projectsDir := cfg.Dispatcher.ProjectsDir + if projectsDir == "" { + projectsDir = cfg.Pool.SharedProjectsDir + } + return &Dispatcher{ + tmux: tc, + state: s, + config: cfg, + doneChan: doneChan, + projectsDir: projectsDir, + logger: log.Default(), + } +} + +// Run starts the dispatcher event loop until ctx is cancelled. +func (d *Dispatcher) Run(ctx context.Context) { + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + + fw, err := fsnotify.NewWatcher() + if err != nil { + d.logger.Printf("[dispatcher] fsnotify unavailable: %v — poll-only mode", err) + } else { + defer fw.Close() + d.watchInboxDirs(fw) + } + + var fwEvents <-chan fsnotify.Event + if fw != nil { + fwEvents = fw.Events + } + + for { + select { + case <-ctx.Done(): + return + case evt, ok := <-fwEvents: + if !ok { + return + } + if evt.Op&fsnotify.Create != 0 && + strings.HasSuffix(evt.Name, ".md") && + !strings.HasSuffix(evt.Name, ".dispatched") { + d.dispatchProject(filepath.Dir(evt.Name)) + } + case session := <-d.doneChan: + d.assignNextTask(session) + case <-ticker.C: + d.fullScan() + } + } +} + +// watchInboxDirs registers all known project inbox dirs with the fsnotify watcher. +func (d *Dispatcher) watchInboxDirs(fw *fsnotify.Watcher) { + for _, ds := range d.config.Pool.Dedicated { + inbox := filepath.Join(ds.Project, ".agent-queue", "inbox") + if err := fw.Add(inbox); err != nil { + d.logger.Printf("[dispatcher] watch %s: %v", inbox, err) + } + } +} + +// fullScan dispatches pending tasks in all project inboxes. +func (d *Dispatcher) fullScan() { + for _, ds := range d.config.Pool.Dedicated { + inbox := filepath.Join(ds.Project, ".agent-queue", "inbox") + d.dispatchProject(inbox) + } +} + +// dispatchProject assigns undispatched tasks in inboxDir to idle sessions. +func (d *Dispatcher) dispatchProject(inboxDir string) { + entries, err := os.ReadDir(inboxDir) + if err != nil { + return + } + projectDir := filepath.Dir(filepath.Dir(inboxDir)) // inboxDir/.agent-queue/inbox → project + for _, e := range entries { + name := e.Name() + if !strings.HasSuffix(name, ".md") || strings.Contains(name, ".dispatched") { + continue + } + session := d.findFreeSession() + if session == "" { + d.logger.Printf("[dispatcher] no free session for task in %s", inboxDir) + return + } + taskPath := filepath.Join(inboxDir, name) + if err := d.launchAgent(session, projectDir, taskPath); err != nil { + d.logger.Printf("[dispatcher] launchAgent error: %v", err) + continue + } + if err := os.Rename(taskPath, taskPath+".dispatched"); err != nil { + d.logger.Printf("[dispatcher] rename .dispatched: %v", err) + } + } +} + +// findFreeSession returns the name of an idle, live, cooldown-free session. +// Returns "" if no session is available. +func (d *Dispatcher) findFreeSession() string { + for _, ds := range d.config.Pool.Dedicated { + if d.isSessionFree(ds.Name) { + return ds.Name + } + } + prefix := d.config.Pool.Autonomous.Prefix + if prefix == "" { + prefix = "ccl-auto-" + } + for i := 0; i < d.config.Pool.Autonomous.Max; i++ { + if d.isSessionFree(sessionName(prefix, i)) { + return sessionName(prefix, i) + } + } + return "" +} + +// isSessionFree returns true when the session is idle, alive, and past its cooldown. +func (d *Dispatcher) isSessionFree(name string) bool { + if !d.tmux.HasSession(name) { + return false + } + sess := d.state.GetSession(name) + if sess == nil || sess.State != "idle" { + return false + } + if sess.LastFail != nil && time.Since(*sess.LastFail) < 5*time.Minute { + return false + } + return true +} + +// assignNextTask scans all inboxes for work to give to a freshly-idled session. +func (d *Dispatcher) assignNextTask(session string) { + for _, ds := range d.config.Pool.Dedicated { + inbox := filepath.Join(ds.Project, ".agent-queue", "inbox") + entries, err := os.ReadDir(inbox) + if err != nil { + continue + } + for _, e := range entries { + if !strings.HasSuffix(e.Name(), ".md") || strings.Contains(e.Name(), ".dispatched") { + continue + } + taskPath := filepath.Join(inbox, e.Name()) + if err := d.launchAgent(session, ds.Project, taskPath); err == nil { + os.Rename(taskPath, taskPath+".dispatched") + return + } + } + } +} + +// launchAgent starts Claude Code in session for the given task file. +func (d *Dispatcher) launchAgent(session, projectDir, taskFile string) error { + content, err := os.ReadFile(taskFile) + if err != nil { + return fmt.Errorf("read task %s: %w", taskFile, err) + } + fm, body := parseFrontmatter(content) + model := modelForPriority(fm.Priority) + + // Change to project directory. + if err := d.tmux.SendKeys(session, "cd "+projectDir); err != nil { + return err + } + time.Sleep(300 * time.Millisecond) + + // Build and send the claude command, with optional --resume UUID. + cmd := fmt.Sprintf("claude --model %s --dangerously-skip-permissions", model) + resumeFile := filepath.Join(d.resumeDir(), session+"-resume-id.txt") + if data, ferr := os.ReadFile(resumeFile); ferr == nil { + if uuid := strings.TrimSpace(string(data)); uuid != "" { + cmd += " --resume " + uuid + os.Remove(resumeFile) + } + } + if err := d.tmux.SendKeys(session, cmd); err != nil { + return err + } + + // Wait for the ❯ prompt before sending the task message. + promptTimeout := d.config.Dispatcher.PromptTimeout.Duration + if promptTimeout == 0 { + promptTimeout = 30 * time.Second + } + if !d.waitForPrompt(session, promptTimeout) { + return fmt.Errorf("claude not ready in %q after %v", session, promptTimeout) + } + + // Send the task message. + msg := buildTaskMessage(body, taskFile) + if err := d.tmux.SendKeys(session, msg); err != nil { + return err + } + + d.state.SetWorking(session, filepath.Base(taskFile)) + d.logger.Printf("[dispatcher] DISPATCHED session=%q task=%s model=%s", + session, filepath.Base(taskFile), model) + return nil +} + +// waitForPrompt polls for the Claude ❯ prompt up to timeout. +func (d *Dispatcher) waitForPrompt(session string, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + tail, err := d.tmux.CapturePaneTail(session, 5) + if err == nil && strings.Contains(tail, "❯") { + return true + } + time.Sleep(1 * time.Second) + } + return false +} + +// resumeDir returns the directory containing per-session resume UUIDs. +func (d *Dispatcher) resumeDir() string { + home, _ := os.UserHomeDir() + return filepath.Join(home, ".claude-context") +} + +// parseFrontmatter splits YAML frontmatter (between --- markers) from the body. +// Returns an empty TaskFrontmatter and the full content if no header is found. +func parseFrontmatter(content []byte) (TaskFrontmatter, string) { + s := string(content) + if !strings.HasPrefix(s, "---\n") { + return TaskFrontmatter{}, s + } + end := strings.Index(s[4:], "\n---\n") + if end < 0 { + return TaskFrontmatter{}, s + } + yamlBlock := s[4 : end+4] + body := strings.TrimSpace(s[end+4+5:]) // skip "\n---\n" + var fm TaskFrontmatter + yaml.Unmarshal([]byte(yamlBlock), &fm) //nolint:errcheck // best-effort parse + return fm, body +} + +// modelForPriority maps a task priority to a Claude model name. +func modelForPriority(priority string) string { + if strings.EqualFold(priority, "critical") { + return "opus" + } + return "sonnet" +} + +// buildTaskMessage constructs the dispatch message sent to Claude Code. +func buildTaskMessage(body, taskFile string) string { + taskName := filepath.Base(taskFile) + if body == "" { + body = "Verifie .agent-queue/inbox/ — 1 tache assignee." + } + return fmt.Sprintf("[%s] %s\n\nIMPORTANT: Tu dois EXECUTER les actions demandées, pas seulement les décrire.", + taskName, body) +} + +func sessionName(prefix string, i int) string { + return prefix + itoa(i) +} + +func itoa(n int) string { + if n == 0 { + return "0" + } + b := make([]byte, 0, 10) + for n > 0 { + b = append([]byte{byte('0' + n%10)}, b...) + n /= 10 + } + return string(b) +} diff --git a/internal/dispatcher/dispatcher_test.go b/internal/dispatcher/dispatcher_test.go new file mode 100644 index 0000000..d790feb --- /dev/null +++ b/internal/dispatcher/dispatcher_test.go @@ -0,0 +1,211 @@ +package dispatcher + +import ( + "log" + "os" + "path/filepath" + "testing" + + "forge.secuaas.ovh/olivier/claude-failover/internal/config" + "forge.secuaas.ovh/olivier/claude-failover/internal/state" +) + +// mockTmux is a minimal in-memory tmux.Client for tests. +type mockTmux struct { + sessions map[string]bool + paneOutput map[string]string + sentKeys []string +} + +func newMockTmux() *mockTmux { + return &mockTmux{ + sessions: make(map[string]bool), + paneOutput: make(map[string]string), + } +} + +func (m *mockTmux) HasSession(name string) bool { return m.sessions[name] } +func (m *mockTmux) CreateSession(name, _ string) error { m.sessions[name] = true; return nil } +func (m *mockTmux) KillSession(_ string) error { return nil } +func (m *mockTmux) SendKeys(_, keys string) error { + m.sentKeys = append(m.sentKeys, keys) + return nil +} +func (m *mockTmux) CapturePaneTail(session string, _ int) (string, error) { + return m.paneOutput[session], nil +} + +// TestParseFrontmatter verifies YAML frontmatter extraction. +func TestParseFrontmatter(t *testing.T) { + input := "---\ntitle: Fix bug\npriority: critical\n---\nDo the fix." + fm, body := parseFrontmatter([]byte(input)) + if fm.Title != "Fix bug" { + t.Errorf("expected title 'Fix bug', got %q", fm.Title) + } + if fm.Priority != "critical" { + t.Errorf("expected priority critical, got %q", fm.Priority) + } + if body != "Do the fix." { + t.Errorf("expected body 'Do the fix.', got %q", body) + } +} + +// TestParseFrontmatterNoHeader handles files without a YAML header. +func TestParseFrontmatterNoHeader(t *testing.T) { + input := "Just plain content." + fm, body := parseFrontmatter([]byte(input)) + if fm.Title != "" { + t.Errorf("expected empty title, got %q", fm.Title) + } + if body != "Just plain content." { + t.Errorf("expected full body, got %q", body) + } +} + +// TestModelForPriority maps priority strings to model names. +func TestModelForPriority(t *testing.T) { + cases := []struct{ priority, want string }{ + {"critical", "opus"}, + {"CRITICAL", "opus"}, + {"high", "sonnet"}, + {"default", "sonnet"}, + {"", "sonnet"}, + } + for _, c := range cases { + if got := modelForPriority(c.priority); got != c.want { + t.Errorf("modelForPriority(%q) = %q, want %q", c.priority, got, c.want) + } + } +} + +// TestFindFreeSessionSkipsFailed verifies that recently-failed sessions are skipped. +func TestFindFreeSessionSkipsFailed(t *testing.T) { + tc := newMockTmux() + tc.sessions["sess-1"] = true + tc.sessions["sess-2"] = true + + s := state.New("") + s.SetIdle("sess-1") + s.SetFailed("sess-2") + + d := &Dispatcher{ + tmux: tc, + state: s, + config: &config.Config{ + Pool: config.PoolConfig{ + Dedicated: []config.DedicatedSession{{Name: "sess-1"}, {Name: "sess-2"}}, + Autonomous: config.AutonomousConfig{Max: 0}, + }, + }, + logger: log.Default(), + } + + got := d.findFreeSession() + if got != "sess-1" { + t.Errorf("expected sess-1, got %q", got) + } +} + +// TestFindFreeSessionMissingTmux skips sessions not in tmux. +func TestFindFreeSessionMissingTmux(t *testing.T) { + tc := newMockTmux() + // sess-1 missing from tmux, sess-2 present and idle. + tc.sessions["sess-2"] = true + + s := state.New("") + s.SetIdle("sess-1") + s.SetIdle("sess-2") + + d := &Dispatcher{ + tmux: tc, + state: s, + config: &config.Config{ + Pool: config.PoolConfig{ + Dedicated: []config.DedicatedSession{{Name: "sess-1"}, {Name: "sess-2"}}, + Autonomous: config.AutonomousConfig{Max: 0}, + }, + }, + logger: log.Default(), + } + + got := d.findFreeSession() + if got != "sess-2" { + t.Errorf("expected sess-2, got %q", got) + } +} + +// TestDispatchProject creates a task file, dispatches it, and checks state + rename. +func TestDispatchProject(t *testing.T) { + dir := t.TempDir() + inbox := filepath.Join(dir, ".agent-queue", "inbox") + os.MkdirAll(inbox, 0755) + + taskContent := "---\ntitle: My Task\npriority: high\n---\nDo the work." + taskPath := filepath.Join(inbox, "task-001.md") + os.WriteFile(taskPath, []byte(taskContent), 0644) + + tc := newMockTmux() + tc.sessions["free-sess"] = true + // Return ❯ prompt on first CapturePaneTail call (Claude is ready). + tc.paneOutput["free-sess"] = "❯ " + + s := state.New("") + s.SetIdle("free-sess") + + d := &Dispatcher{ + tmux: tc, + state: s, + config: &config.Config{ + Pool: config.PoolConfig{ + Dedicated: []config.DedicatedSession{{Name: "free-sess", Project: dir}}, + Autonomous: config.AutonomousConfig{Max: 0}, + }, + }, + logger: log.Default(), + } + + d.dispatchProject(inbox) + + if st := s.GetSession("free-sess"); st == nil || st.State != "working" { + t.Errorf("expected session working after dispatch, got %v", st) + } + + // Original file renamed to .dispatched. + if _, err := os.Stat(taskPath + ".dispatched"); os.IsNotExist(err) { + t.Error("expected .dispatched marker") + } + if _, err := os.Stat(taskPath); !os.IsNotExist(err) { + t.Error("expected original task file to be renamed") + } +} + +// TestDispatchProjectNoFreeSession leaves the task untouched when no session is available. +func TestDispatchProjectNoFreeSession(t *testing.T) { + dir := t.TempDir() + inbox := filepath.Join(dir, ".agent-queue", "inbox") + os.MkdirAll(inbox, 0755) + + taskPath := filepath.Join(inbox, "task-002.md") + os.WriteFile(taskPath, []byte("content"), 0644) + + tc := newMockTmux() // no sessions + s := state.New("") + + d := &Dispatcher{ + tmux: tc, + state: s, + config: &config.Config{ + Pool: config.PoolConfig{ + Autonomous: config.AutonomousConfig{Max: 0}, + }, + }, + logger: log.Default(), + } + + d.dispatchProject(inbox) + + // File must remain unchanged. + if _, err := os.Stat(taskPath); os.IsNotExist(err) { + t.Error("task file should remain when no session is free") + } +}