claude-failover/internal/dispatcher/dispatcher.go
Ubuntu 6b109ed1bc fix(dispatcher): send a lone Enter after the task paste to submit it
Multi-line task bodies arrived in Claude Code as "[Pasted text #N +M lines]"
and sat in the input buffer forever — the trailing Enter that SendKeys
appends to the paste is consumed as a newline inside the paste, not as a
submit. Observed live on ccl-auto-11 (secumon) and ccl-auto-12 (secuops):
prompt visible, agent idle.

- tmux.Client grows a SendEnter(session) method. ExecClient runs
  `tmux send-keys -t <sess> Enter` (no preceding text), which Claude's
  TUI accepts as the explicit submit action after a paste.
- Dispatcher: after SendKeys(msg), sleep 500ms for the paste to register,
  then SendEnter. Same sequence a human would perform.
- Five mockTmux implementations updated (quota, dispatcher, switcher,
  lifecycle, watcher tests).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 20:49:59 +00:00

326 lines
9.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package dispatcher watches project inbox directories and assigns tasks to
// idle tmux sessions. One task per session — batch dispatch is intentionally
// unsupported.
package dispatcher
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"time"
"github.com/fsnotify/fsnotify"
"gopkg.in/yaml.v3"
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
"forge.secuaas.ovh/olivier/claude-failover/internal/tmux"
)
// TaskFrontmatter is the YAML header parsed from task .md files.
type TaskFrontmatter struct {
Title string `yaml:"title"`
Priority string `yaml:"priority"` // critical, high, default, low
Tags []string `yaml:"tags"`
NeedsClaude bool `yaml:"needs_claude_code"`
}
// Dispatcher watches project inbox directories and assigns tasks to idle sessions.
type Dispatcher struct {
tmux tmux.Client
state *state.State
config *config.Config
doneChan <-chan string
projectsDir string
logger *log.Logger
}
// New creates a Dispatcher.
// doneChan receives session names when they become idle (from SessionWatcher).
func New(tc tmux.Client, s *state.State, cfg *config.Config, doneChan <-chan string) *Dispatcher {
projectsDir := cfg.Dispatcher.ProjectsDir
if projectsDir == "" {
projectsDir = cfg.Pool.SharedProjectsDir
}
return &Dispatcher{
tmux: tc,
state: s,
config: cfg,
doneChan: doneChan,
projectsDir: projectsDir,
logger: log.Default(),
}
}
// Run starts the dispatcher event loop until ctx is cancelled.
func (d *Dispatcher) Run(ctx context.Context) {
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
fw, err := fsnotify.NewWatcher()
if err != nil {
d.logger.Printf("[dispatcher] fsnotify unavailable: %v — poll-only mode", err)
} else {
defer fw.Close()
d.watchInboxDirs(fw)
}
var fwEvents <-chan fsnotify.Event
if fw != nil {
fwEvents = fw.Events
}
for {
select {
case <-ctx.Done():
return
case evt, ok := <-fwEvents:
if !ok {
return
}
if evt.Op&fsnotify.Create != 0 &&
strings.HasSuffix(evt.Name, ".md") &&
!strings.HasSuffix(evt.Name, ".dispatched") {
d.dispatchProject(filepath.Dir(evt.Name))
}
case session := <-d.doneChan:
d.assignNextTask(session)
case <-ticker.C:
d.fullScan()
}
}
}
// watchInboxDirs registers all known project inbox dirs with the fsnotify watcher.
func (d *Dispatcher) watchInboxDirs(fw *fsnotify.Watcher) {
for _, ds := range d.config.Pool.Dedicated {
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
if err := fw.Add(inbox); err != nil {
d.logger.Printf("[dispatcher] watch %s: %v", inbox, err)
}
}
}
// fullScan dispatches pending tasks in all project inboxes.
func (d *Dispatcher) fullScan() {
for _, ds := range d.config.Pool.Dedicated {
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
d.dispatchProject(inbox)
}
}
// dispatchProject assigns undispatched tasks in inboxDir to idle sessions.
func (d *Dispatcher) dispatchProject(inboxDir string) {
entries, err := os.ReadDir(inboxDir)
if err != nil {
return
}
projectDir := filepath.Dir(filepath.Dir(inboxDir)) // inboxDir/.agent-queue/inbox → project
for _, e := range entries {
name := e.Name()
if !strings.HasSuffix(name, ".md") || strings.Contains(name, ".dispatched") {
continue
}
session := d.findFreeSession()
if session == "" {
d.logger.Printf("[dispatcher] no free session for task in %s", inboxDir)
return
}
taskPath := filepath.Join(inboxDir, name)
if err := d.launchAgent(session, projectDir, taskPath); err != nil {
d.logger.Printf("[dispatcher] launchAgent error: %v", err)
continue
}
if err := os.Rename(taskPath, taskPath+".dispatched"); err != nil {
d.logger.Printf("[dispatcher] rename .dispatched: %v", err)
}
}
}
// findFreeSession returns the name of an idle, live, cooldown-free session.
// Returns "" if no session is available.
func (d *Dispatcher) findFreeSession() string {
for _, ds := range d.config.Pool.Dedicated {
if d.isSessionFree(ds.Name) {
return ds.Name
}
}
prefix := d.config.Pool.Autonomous.Prefix
if prefix == "" {
prefix = "ccl-auto-"
}
start := d.config.Pool.Autonomous.StartIndex
for i := start; i < start+d.config.Pool.Autonomous.Max; i++ {
if d.isSessionFree(sessionName(prefix, i)) {
return sessionName(prefix, i)
}
}
return ""
}
// isSessionFree returns true when the session is idle, alive, and past its cooldown.
func (d *Dispatcher) isSessionFree(name string) bool {
if !d.tmux.HasSession(name) {
return false
}
sess := d.state.GetSession(name)
if sess == nil || sess.State != "idle" {
return false
}
if sess.LastFail != nil && time.Since(*sess.LastFail) < 5*time.Minute {
return false
}
return true
}
// assignNextTask scans all inboxes for work to give to a freshly-idled session.
func (d *Dispatcher) assignNextTask(session string) {
for _, ds := range d.config.Pool.Dedicated {
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
entries, err := os.ReadDir(inbox)
if err != nil {
continue
}
for _, e := range entries {
if !strings.HasSuffix(e.Name(), ".md") || strings.Contains(e.Name(), ".dispatched") {
continue
}
taskPath := filepath.Join(inbox, e.Name())
if err := d.launchAgent(session, ds.Project, taskPath); err == nil {
os.Rename(taskPath, taskPath+".dispatched")
return
}
}
}
}
// launchAgent starts Claude Code in session for the given task file.
func (d *Dispatcher) launchAgent(session, projectDir, taskFile string) error {
content, err := os.ReadFile(taskFile)
if err != nil {
return fmt.Errorf("read task %s: %w", taskFile, err)
}
fm, body := parseFrontmatter(content)
model := modelForPriority(fm.Priority)
// Change to project directory.
if err := d.tmux.SendKeys(session, "cd "+projectDir); err != nil {
return err
}
time.Sleep(300 * time.Millisecond)
// Build and send the claude command, with optional --resume UUID.
cmd := fmt.Sprintf("claude --model %s --dangerously-skip-permissions", model)
resumeFile := filepath.Join(d.resumeDir(), session+"-resume-id.txt")
if data, ferr := os.ReadFile(resumeFile); ferr == nil {
if uuid := strings.TrimSpace(string(data)); uuid != "" {
cmd += " --resume " + uuid
os.Remove(resumeFile)
}
}
if err := d.tmux.SendKeys(session, cmd); err != nil {
return err
}
// Wait for the prompt before sending the task message.
promptTimeout := d.config.Dispatcher.PromptTimeout.Duration
if promptTimeout == 0 {
promptTimeout = 30 * time.Second
}
if !d.waitForPrompt(session, promptTimeout) {
return fmt.Errorf("claude not ready in %q after %v", session, promptTimeout)
}
// Send the task message. Multi-line task bodies render in Claude's TUI
// as "[Pasted text #N +M lines]" — the trailing Enter that SendKeys
// appends is consumed as part of the paste, so the message would stay
// unsubmitted in the input buffer forever. Wait for the paste to
// register, then send a lone Enter to actually submit.
msg := buildTaskMessage(body, taskFile)
if err := d.tmux.SendKeys(session, msg); err != nil {
return err
}
time.Sleep(500 * time.Millisecond)
if err := d.tmux.SendEnter(session); err != nil {
return err
}
d.state.SetWorking(session, filepath.Base(taskFile))
d.logger.Printf("[dispatcher] DISPATCHED session=%q task=%s model=%s",
session, filepath.Base(taskFile), model)
return nil
}
// waitForPrompt polls for the Claude prompt up to timeout.
func (d *Dispatcher) waitForPrompt(session string, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
tail, err := d.tmux.CapturePaneTail(session, 5)
if err == nil && strings.Contains(tail, "") {
return true
}
time.Sleep(1 * time.Second)
}
return false
}
// resumeDir returns the directory containing per-session resume UUIDs.
func (d *Dispatcher) resumeDir() string {
home, _ := os.UserHomeDir()
return filepath.Join(home, ".claude-context")
}
// parseFrontmatter splits YAML frontmatter (between --- markers) from the body.
// Returns an empty TaskFrontmatter and the full content if no header is found.
func parseFrontmatter(content []byte) (TaskFrontmatter, string) {
s := string(content)
if !strings.HasPrefix(s, "---\n") {
return TaskFrontmatter{}, s
}
end := strings.Index(s[4:], "\n---\n")
if end < 0 {
return TaskFrontmatter{}, s
}
yamlBlock := s[4 : end+4]
body := strings.TrimSpace(s[end+4+5:]) // skip "\n---\n"
var fm TaskFrontmatter
yaml.Unmarshal([]byte(yamlBlock), &fm) //nolint:errcheck // best-effort parse
return fm, body
}
// modelForPriority maps a task priority to a Claude model name.
func modelForPriority(priority string) string {
if strings.EqualFold(priority, "critical") {
return "opus"
}
return "sonnet"
}
// buildTaskMessage constructs the dispatch message sent to Claude Code.
func buildTaskMessage(body, taskFile string) string {
taskName := filepath.Base(taskFile)
if body == "" {
body = "Verifie .agent-queue/inbox/ — 1 tache assignee."
}
return fmt.Sprintf("[%s] %s\n\nIMPORTANT: Tu dois EXECUTER les actions demandées, pas seulement les décrire.",
taskName, body)
}
func sessionName(prefix string, i int) string {
return prefix + itoa(i)
}
func itoa(n int) string {
if n == 0 {
return "0"
}
b := make([]byte, 0, 10)
for n > 0 {
b = append([]byte{byte('0' + n%10)}, b...)
n /= 10
}
return string(b)
}