claude-failover/internal/dispatcher/dispatcher.go
Ubuntu 4cbdcf143a fix(dispatcher+watcher): never auto-dispatch into dedicated sessions
Observed: tasks from filesecure/.agent-queue/inbox and SecuScan/
.agent-queue/inbox were being routed into ccl-1-conformvault and
ccl-2-scanyze whenever those sessions happened to be idle. Those are
the operator's manual interactive Claude sessions, not dispatch
targets — the auto-dispatch was (a) hijacking a Claude instance the
operator was using and (b) triggering /exit via the watcher's
completion path when the side-task finished, kicking the operator out
mid-conversation.

findFreeSession was iterating Pool.Dedicated before the autonomous
pool, so any idle dedicated session was the first candidate.

- Dispatcher.findFreeSession: remove the Dedicated loop entirely.
  Auto-dispatch is now pool-only (ccl-auto-11..20).
- Watcher.completeSession: defense-in-depth — even if a dedicated
  session ever ends up in "working" state, it is no longer /exit'd;
  just marked idle. Pool /exit behaviour unchanged (context recycle).
- Tests: new TestFindFreeSessionSkipsDedicated proves the routing;
  3 existing tests rewritten to use the autonomous pool instead of
  relying on Dedicated as a fake pool.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 13:30:26 +00:00

325 lines
9.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package dispatcher watches project inbox directories and assigns tasks to
// idle tmux sessions. One task per session — batch dispatch is intentionally
// unsupported.
package dispatcher
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"time"
"github.com/fsnotify/fsnotify"
"gopkg.in/yaml.v3"
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
"forge.secuaas.ovh/olivier/claude-failover/internal/tmux"
)
// TaskFrontmatter is the YAML header parsed from task .md files.
type TaskFrontmatter struct {
Title string `yaml:"title"`
Priority string `yaml:"priority"` // critical, high, default, low
Tags []string `yaml:"tags"`
NeedsClaude bool `yaml:"needs_claude_code"`
}
// Dispatcher watches project inbox directories and assigns tasks to idle sessions.
type Dispatcher struct {
tmux tmux.Client
state *state.State
config *config.Config
doneChan <-chan string
projectsDir string
logger *log.Logger
}
// New creates a Dispatcher.
// doneChan receives session names when they become idle (from SessionWatcher).
func New(tc tmux.Client, s *state.State, cfg *config.Config, doneChan <-chan string) *Dispatcher {
projectsDir := cfg.Dispatcher.ProjectsDir
if projectsDir == "" {
projectsDir = cfg.Pool.SharedProjectsDir
}
return &Dispatcher{
tmux: tc,
state: s,
config: cfg,
doneChan: doneChan,
projectsDir: projectsDir,
logger: log.Default(),
}
}
// Run starts the dispatcher event loop until ctx is cancelled.
func (d *Dispatcher) Run(ctx context.Context) {
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
fw, err := fsnotify.NewWatcher()
if err != nil {
d.logger.Printf("[dispatcher] fsnotify unavailable: %v — poll-only mode", err)
} else {
defer fw.Close()
d.watchInboxDirs(fw)
}
var fwEvents <-chan fsnotify.Event
if fw != nil {
fwEvents = fw.Events
}
for {
select {
case <-ctx.Done():
return
case evt, ok := <-fwEvents:
if !ok {
return
}
if evt.Op&fsnotify.Create != 0 &&
strings.HasSuffix(evt.Name, ".md") &&
!strings.HasSuffix(evt.Name, ".dispatched") {
d.dispatchProject(filepath.Dir(evt.Name))
}
case session := <-d.doneChan:
d.assignNextTask(session)
case <-ticker.C:
d.fullScan()
}
}
}
// watchInboxDirs registers all known project inbox dirs with the fsnotify watcher.
func (d *Dispatcher) watchInboxDirs(fw *fsnotify.Watcher) {
for _, ds := range d.config.Pool.Dedicated {
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
if err := fw.Add(inbox); err != nil {
d.logger.Printf("[dispatcher] watch %s: %v", inbox, err)
}
}
}
// fullScan dispatches pending tasks in all project inboxes.
func (d *Dispatcher) fullScan() {
for _, ds := range d.config.Pool.Dedicated {
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
d.dispatchProject(inbox)
}
}
// dispatchProject assigns undispatched tasks in inboxDir to idle sessions.
func (d *Dispatcher) dispatchProject(inboxDir string) {
entries, err := os.ReadDir(inboxDir)
if err != nil {
return
}
projectDir := filepath.Dir(filepath.Dir(inboxDir)) // inboxDir/.agent-queue/inbox → project
for _, e := range entries {
name := e.Name()
if !strings.HasSuffix(name, ".md") || strings.Contains(name, ".dispatched") {
continue
}
session := d.findFreeSession()
if session == "" {
d.logger.Printf("[dispatcher] no free session for task in %s", inboxDir)
return
}
taskPath := filepath.Join(inboxDir, name)
if err := d.launchAgent(session, projectDir, taskPath); err != nil {
d.logger.Printf("[dispatcher] launchAgent error: %v", err)
continue
}
if err := os.Rename(taskPath, taskPath+".dispatched"); err != nil {
d.logger.Printf("[dispatcher] rename .dispatched: %v", err)
}
}
}
// findFreeSession returns the name of an idle, live, cooldown-free session
// from the autonomous pool. Dedicated sessions are intentionally NOT
// considered: those host the operator's manual interactive work. Routing a
// background dispatch into them would (a) hijack a Claude instance the
// operator is using and (b) trigger the watcher's /exit recycle at task
// end, kicking the operator out mid-conversation.
func (d *Dispatcher) findFreeSession() string {
prefix := d.config.Pool.Autonomous.Prefix
if prefix == "" {
prefix = "ccl-auto-"
}
start := d.config.Pool.Autonomous.StartIndex
for i := start; i < start+d.config.Pool.Autonomous.Max; i++ {
if d.isSessionFree(sessionName(prefix, i)) {
return sessionName(prefix, i)
}
}
return ""
}
// isSessionFree returns true when the session is idle, alive, and past its cooldown.
func (d *Dispatcher) isSessionFree(name string) bool {
if !d.tmux.HasSession(name) {
return false
}
sess := d.state.GetSession(name)
if sess == nil || sess.State != "idle" {
return false
}
if sess.LastFail != nil && time.Since(*sess.LastFail) < 5*time.Minute {
return false
}
return true
}
// assignNextTask scans all inboxes for work to give to a freshly-idled session.
func (d *Dispatcher) assignNextTask(session string) {
for _, ds := range d.config.Pool.Dedicated {
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
entries, err := os.ReadDir(inbox)
if err != nil {
continue
}
for _, e := range entries {
if !strings.HasSuffix(e.Name(), ".md") || strings.Contains(e.Name(), ".dispatched") {
continue
}
taskPath := filepath.Join(inbox, e.Name())
if err := d.launchAgent(session, ds.Project, taskPath); err == nil {
os.Rename(taskPath, taskPath+".dispatched")
return
}
}
}
}
// launchAgent starts Claude Code in session for the given task file.
func (d *Dispatcher) launchAgent(session, projectDir, taskFile string) error {
content, err := os.ReadFile(taskFile)
if err != nil {
return fmt.Errorf("read task %s: %w", taskFile, err)
}
fm, body := parseFrontmatter(content)
model := modelForPriority(fm.Priority)
// Change to project directory.
if err := d.tmux.SendKeys(session, "cd "+projectDir); err != nil {
return err
}
time.Sleep(300 * time.Millisecond)
// Build and send the claude command, with optional --resume UUID.
cmd := fmt.Sprintf("claude --model %s --dangerously-skip-permissions", model)
resumeFile := filepath.Join(d.resumeDir(), session+"-resume-id.txt")
if data, ferr := os.ReadFile(resumeFile); ferr == nil {
if uuid := strings.TrimSpace(string(data)); uuid != "" {
cmd += " --resume " + uuid
os.Remove(resumeFile)
}
}
if err := d.tmux.SendKeys(session, cmd); err != nil {
return err
}
// Wait for the prompt before sending the task message.
promptTimeout := d.config.Dispatcher.PromptTimeout.Duration
if promptTimeout == 0 {
promptTimeout = 30 * time.Second
}
if !d.waitForPrompt(session, promptTimeout) {
return fmt.Errorf("claude not ready in %q after %v", session, promptTimeout)
}
// Send the task message. Multi-line task bodies render in Claude's TUI
// as "[Pasted text #N +M lines]" — the trailing Enter that SendKeys
// appends is consumed as part of the paste, so the message would stay
// unsubmitted in the input buffer forever. Wait for the paste to
// register, then send a lone Enter to actually submit.
msg := buildTaskMessage(body, taskFile)
if err := d.tmux.SendKeys(session, msg); err != nil {
return err
}
time.Sleep(500 * time.Millisecond)
if err := d.tmux.SendEnter(session); err != nil {
return err
}
d.state.SetWorking(session, filepath.Base(taskFile))
d.logger.Printf("[dispatcher] DISPATCHED session=%q task=%s model=%s",
session, filepath.Base(taskFile), model)
return nil
}
// waitForPrompt polls for the Claude prompt up to timeout.
func (d *Dispatcher) waitForPrompt(session string, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
tail, err := d.tmux.CapturePaneTail(session, 5)
if err == nil && strings.Contains(tail, "") {
return true
}
time.Sleep(1 * time.Second)
}
return false
}
// resumeDir returns the directory containing per-session resume UUIDs.
func (d *Dispatcher) resumeDir() string {
home, _ := os.UserHomeDir()
return filepath.Join(home, ".claude-context")
}
// parseFrontmatter splits YAML frontmatter (between --- markers) from the body.
// Returns an empty TaskFrontmatter and the full content if no header is found.
func parseFrontmatter(content []byte) (TaskFrontmatter, string) {
s := string(content)
if !strings.HasPrefix(s, "---\n") {
return TaskFrontmatter{}, s
}
end := strings.Index(s[4:], "\n---\n")
if end < 0 {
return TaskFrontmatter{}, s
}
yamlBlock := s[4 : end+4]
body := strings.TrimSpace(s[end+4+5:]) // skip "\n---\n"
var fm TaskFrontmatter
yaml.Unmarshal([]byte(yamlBlock), &fm) //nolint:errcheck // best-effort parse
return fm, body
}
// modelForPriority maps a task priority to a Claude model name.
func modelForPriority(priority string) string {
if strings.EqualFold(priority, "critical") {
return "opus"
}
return "sonnet"
}
// buildTaskMessage constructs the dispatch message sent to Claude Code.
func buildTaskMessage(body, taskFile string) string {
taskName := filepath.Base(taskFile)
if body == "" {
body = "Verifie .agent-queue/inbox/ — 1 tache assignee."
}
return fmt.Sprintf("[%s] %s\n\nIMPORTANT: Tu dois EXECUTER les actions demandées, pas seulement les décrire.",
taskName, body)
}
func sessionName(prefix string, i int) string {
return prefix + itoa(i)
}
func itoa(n int) string {
if n == 0 {
return "0"
}
b := make([]byte, 0, 10)
for n > 0 {
b = append([]byte{byte('0' + n%10)}, b...)
n /= 10
}
return string(b)
}