claude-failover/internal/dispatcher/dispatcher.go

319 lines
8.9 KiB
Go
Raw Normal View History

// Package dispatcher watches project inbox directories and assigns tasks to
// idle tmux sessions. One task per session — batch dispatch is intentionally
// unsupported.
package dispatcher
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"time"
"github.com/fsnotify/fsnotify"
"gopkg.in/yaml.v3"
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
"forge.secuaas.ovh/olivier/claude-failover/internal/tmux"
)
// TaskFrontmatter is the YAML header parsed from task .md files.
type TaskFrontmatter struct {
Title string `yaml:"title"`
Priority string `yaml:"priority"` // critical, high, default, low
Tags []string `yaml:"tags"`
NeedsClaude bool `yaml:"needs_claude_code"`
}
// Dispatcher watches project inbox directories and assigns tasks to idle sessions.
type Dispatcher struct {
tmux tmux.Client
state *state.State
config *config.Config
doneChan <-chan string
projectsDir string
logger *log.Logger
}
// New creates a Dispatcher.
// doneChan receives session names when they become idle (from SessionWatcher).
func New(tc tmux.Client, s *state.State, cfg *config.Config, doneChan <-chan string) *Dispatcher {
projectsDir := cfg.Dispatcher.ProjectsDir
if projectsDir == "" {
projectsDir = cfg.Pool.SharedProjectsDir
}
return &Dispatcher{
tmux: tc,
state: s,
config: cfg,
doneChan: doneChan,
projectsDir: projectsDir,
logger: log.Default(),
}
}
// Run starts the dispatcher event loop until ctx is cancelled.
func (d *Dispatcher) Run(ctx context.Context) {
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
fw, err := fsnotify.NewWatcher()
if err != nil {
d.logger.Printf("[dispatcher] fsnotify unavailable: %v — poll-only mode", err)
} else {
defer fw.Close()
d.watchInboxDirs(fw)
}
var fwEvents <-chan fsnotify.Event
if fw != nil {
fwEvents = fw.Events
}
for {
select {
case <-ctx.Done():
return
case evt, ok := <-fwEvents:
if !ok {
return
}
if evt.Op&fsnotify.Create != 0 &&
strings.HasSuffix(evt.Name, ".md") &&
!strings.HasSuffix(evt.Name, ".dispatched") {
d.dispatchProject(filepath.Dir(evt.Name))
}
case session := <-d.doneChan:
d.assignNextTask(session)
case <-ticker.C:
d.fullScan()
}
}
}
// watchInboxDirs registers all known project inbox dirs with the fsnotify watcher.
func (d *Dispatcher) watchInboxDirs(fw *fsnotify.Watcher) {
for _, ds := range d.config.Pool.Dedicated {
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
if err := fw.Add(inbox); err != nil {
d.logger.Printf("[dispatcher] watch %s: %v", inbox, err)
}
}
}
// fullScan dispatches pending tasks in all project inboxes.
func (d *Dispatcher) fullScan() {
for _, ds := range d.config.Pool.Dedicated {
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
d.dispatchProject(inbox)
}
}
// dispatchProject assigns undispatched tasks in inboxDir to idle sessions.
func (d *Dispatcher) dispatchProject(inboxDir string) {
entries, err := os.ReadDir(inboxDir)
if err != nil {
return
}
projectDir := filepath.Dir(filepath.Dir(inboxDir)) // inboxDir/.agent-queue/inbox → project
for _, e := range entries {
name := e.Name()
if !strings.HasSuffix(name, ".md") || strings.Contains(name, ".dispatched") {
continue
}
session := d.findFreeSession()
if session == "" {
d.logger.Printf("[dispatcher] no free session for task in %s", inboxDir)
return
}
taskPath := filepath.Join(inboxDir, name)
if err := d.launchAgent(session, projectDir, taskPath); err != nil {
d.logger.Printf("[dispatcher] launchAgent error: %v", err)
continue
}
if err := os.Rename(taskPath, taskPath+".dispatched"); err != nil {
d.logger.Printf("[dispatcher] rename .dispatched: %v", err)
}
}
}
// findFreeSession returns the name of an idle, live, cooldown-free session.
// Returns "" if no session is available.
func (d *Dispatcher) findFreeSession() string {
for _, ds := range d.config.Pool.Dedicated {
if d.isSessionFree(ds.Name) {
return ds.Name
}
}
prefix := d.config.Pool.Autonomous.Prefix
if prefix == "" {
prefix = "ccl-auto-"
}
start := d.config.Pool.Autonomous.StartIndex
for i := start; i < start+d.config.Pool.Autonomous.Max; i++ {
if d.isSessionFree(sessionName(prefix, i)) {
return sessionName(prefix, i)
}
}
return ""
}
// isSessionFree returns true when the session is idle, alive, and past its cooldown.
func (d *Dispatcher) isSessionFree(name string) bool {
if !d.tmux.HasSession(name) {
return false
}
sess := d.state.GetSession(name)
if sess == nil || sess.State != "idle" {
return false
}
if sess.LastFail != nil && time.Since(*sess.LastFail) < 5*time.Minute {
return false
}
return true
}
// assignNextTask scans all inboxes for work to give to a freshly-idled session.
func (d *Dispatcher) assignNextTask(session string) {
for _, ds := range d.config.Pool.Dedicated {
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
entries, err := os.ReadDir(inbox)
if err != nil {
continue
}
for _, e := range entries {
if !strings.HasSuffix(e.Name(), ".md") || strings.Contains(e.Name(), ".dispatched") {
continue
}
taskPath := filepath.Join(inbox, e.Name())
if err := d.launchAgent(session, ds.Project, taskPath); err == nil {
os.Rename(taskPath, taskPath+".dispatched")
return
}
}
}
}
// launchAgent starts Claude Code in session for the given task file.
func (d *Dispatcher) launchAgent(session, projectDir, taskFile string) error {
content, err := os.ReadFile(taskFile)
if err != nil {
return fmt.Errorf("read task %s: %w", taskFile, err)
}
fm, body := parseFrontmatter(content)
model := modelForPriority(fm.Priority)
// Change to project directory.
if err := d.tmux.SendKeys(session, "cd "+projectDir); err != nil {
return err
}
time.Sleep(300 * time.Millisecond)
// Build and send the claude command, with optional --resume UUID.
cmd := fmt.Sprintf("claude --model %s --dangerously-skip-permissions", model)
resumeFile := filepath.Join(d.resumeDir(), session+"-resume-id.txt")
if data, ferr := os.ReadFile(resumeFile); ferr == nil {
if uuid := strings.TrimSpace(string(data)); uuid != "" {
cmd += " --resume " + uuid
os.Remove(resumeFile)
}
}
if err := d.tmux.SendKeys(session, cmd); err != nil {
return err
}
// Wait for the prompt before sending the task message.
promptTimeout := d.config.Dispatcher.PromptTimeout.Duration
if promptTimeout == 0 {
promptTimeout = 30 * time.Second
}
if !d.waitForPrompt(session, promptTimeout) {
return fmt.Errorf("claude not ready in %q after %v", session, promptTimeout)
}
// Send the task message.
msg := buildTaskMessage(body, taskFile)
if err := d.tmux.SendKeys(session, msg); err != nil {
return err
}
d.state.SetWorking(session, filepath.Base(taskFile))
d.logger.Printf("[dispatcher] DISPATCHED session=%q task=%s model=%s",
session, filepath.Base(taskFile), model)
return nil
}
// waitForPrompt polls for the Claude prompt up to timeout.
func (d *Dispatcher) waitForPrompt(session string, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
tail, err := d.tmux.CapturePaneTail(session, 5)
if err == nil && strings.Contains(tail, "") {
return true
}
time.Sleep(1 * time.Second)
}
return false
}
// resumeDir returns the directory containing per-session resume UUIDs.
func (d *Dispatcher) resumeDir() string {
home, _ := os.UserHomeDir()
return filepath.Join(home, ".claude-context")
}
// parseFrontmatter splits YAML frontmatter (between --- markers) from the body.
// Returns an empty TaskFrontmatter and the full content if no header is found.
func parseFrontmatter(content []byte) (TaskFrontmatter, string) {
s := string(content)
if !strings.HasPrefix(s, "---\n") {
return TaskFrontmatter{}, s
}
end := strings.Index(s[4:], "\n---\n")
if end < 0 {
return TaskFrontmatter{}, s
}
yamlBlock := s[4 : end+4]
body := strings.TrimSpace(s[end+4+5:]) // skip "\n---\n"
var fm TaskFrontmatter
yaml.Unmarshal([]byte(yamlBlock), &fm) //nolint:errcheck // best-effort parse
return fm, body
}
// modelForPriority maps a task priority to a Claude model name.
func modelForPriority(priority string) string {
if strings.EqualFold(priority, "critical") {
return "opus"
}
return "sonnet"
}
// buildTaskMessage constructs the dispatch message sent to Claude Code.
func buildTaskMessage(body, taskFile string) string {
taskName := filepath.Base(taskFile)
if body == "" {
body = "Verifie .agent-queue/inbox/ — 1 tache assignee."
}
return fmt.Sprintf("[%s] %s\n\nIMPORTANT: Tu dois EXECUTER les actions demandées, pas seulement les décrire.",
taskName, body)
}
func sessionName(prefix string, i int) string {
return prefix + itoa(i)
}
func itoa(n int) string {
if n == 0 {
return "0"
}
b := make([]byte, 0, 10)
for n > 0 {
b = append([]byte{byte('0' + n%10)}, b...)
n /= 10
}
return string(b)
}