claude-failover/internal/dispatcher/dispatcher.go
Ubuntu 5cfb58c202 feat(dispatcher): enforce depends_on with .blocked marker (Phase 2/G2)
Before claiming a session for a task, the dispatcher now:
1. Parses the task's frontmatter
2. If `depends_on: [project:task_id]` is non-empty, checks each entry
   against `<projectsDir>/<project>/.agent-queue/done/<task_id>.md`
3. If any dep is unresolved -> skip the task and write
   `<task>.md.blocked` next to it. The watchdog (G1) will resolve
   this marker on its next tick.

The `.blocked` marker is idempotent: re-running the dispatcher does not
refresh its mtime, so the watchdog can compute the blocked-since
timestamp from the FIRST detection (timeout precision).

Path-traversal hardening: project / task_id segments must match
`[A-Za-z0-9._-]+` and cannot be `.` or `..`. A malicious frontmatter
like `depends_on: ../../tmp:foo` is rejected before any filesystem
lookup.

assignNextTask (the doneChan path) applies the same gate so that a
session freed mid-cycle cannot bypass enforcement.

Tests (-race clean):
- DependsOnUnresolved -> .blocked marker, no dispatch
- DependsOnResolved -> normal dispatch, no marker
- PartialResolution -> stay blocked
- RejectPathTraversal -> blocked, not dispatched
- BlockedMarker idempotent (mtime stable across passes)
- NoDependsOn regression guard
2026-04-16 20:30:17 +00:00

449 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package dispatcher watches project inbox directories and assigns tasks to
// idle tmux sessions. One task per session — batch dispatch is intentionally
// unsupported.
package dispatcher
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"time"
"github.com/fsnotify/fsnotify"
"gopkg.in/yaml.v3"
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
"forge.secuaas.ovh/olivier/claude-failover/internal/tmux"
)
// TaskFrontmatter is the YAML header parsed from task .md files.
type TaskFrontmatter struct {
Title string `yaml:"title"`
Priority string `yaml:"priority"` // critical, high, default, low
Tags []string `yaml:"tags"`
NeedsClaude bool `yaml:"needs_claude_code"`
// Phase 2/G2: cross-project dependencies. Each entry is
// "project:task_id" (e.g. "filesecure:FIX-0123"). The dispatcher
// refuses to launch a task whose deps aren't all in the target
// project's done/, and drops a `<task>.md.blocked` marker so the
// watchdog can resolve it later (Phase 2/G1).
DependsOn []string `yaml:"depends_on"`
}
// Dispatcher watches project inbox directories and assigns tasks to idle sessions.
type Dispatcher struct {
tmux tmux.Client
state *state.State
config *config.Config
doneChan <-chan string
projectsDir string
logger *log.Logger
}
// New creates a Dispatcher.
// doneChan receives session names when they become idle (from SessionWatcher).
func New(tc tmux.Client, s *state.State, cfg *config.Config, doneChan <-chan string) *Dispatcher {
projectsDir := cfg.Dispatcher.ProjectsDir
if projectsDir == "" {
projectsDir = cfg.Pool.SharedProjectsDir
}
return &Dispatcher{
tmux: tc,
state: s,
config: cfg,
doneChan: doneChan,
projectsDir: projectsDir,
logger: log.Default(),
}
}
// Run starts the dispatcher event loop until ctx is cancelled.
func (d *Dispatcher) Run(ctx context.Context) {
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
fw, err := fsnotify.NewWatcher()
if err != nil {
d.logger.Printf("[dispatcher] fsnotify unavailable: %v — poll-only mode", err)
} else {
defer fw.Close()
d.watchInboxDirs(fw)
}
var fwEvents <-chan fsnotify.Event
if fw != nil {
fwEvents = fw.Events
}
for {
select {
case <-ctx.Done():
return
case evt, ok := <-fwEvents:
if !ok {
return
}
if evt.Op&fsnotify.Create != 0 &&
strings.HasSuffix(evt.Name, ".md") &&
!strings.HasSuffix(evt.Name, ".dispatched") {
d.dispatchProject(filepath.Dir(evt.Name))
}
case session := <-d.doneChan:
d.assignNextTask(session)
case <-ticker.C:
d.fullScan()
}
}
}
// watchInboxDirs registers all known project inbox dirs with the fsnotify watcher.
func (d *Dispatcher) watchInboxDirs(fw *fsnotify.Watcher) {
for _, ds := range d.config.Pool.Dedicated {
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
if err := fw.Add(inbox); err != nil {
d.logger.Printf("[dispatcher] watch %s: %v", inbox, err)
}
}
}
// fullScan dispatches pending tasks in all project inboxes.
func (d *Dispatcher) fullScan() {
for _, ds := range d.config.Pool.Dedicated {
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
d.dispatchProject(inbox)
}
}
// dispatchProject assigns undispatched tasks in inboxDir to idle sessions.
func (d *Dispatcher) dispatchProject(inboxDir string) {
entries, err := os.ReadDir(inboxDir)
if err != nil {
return
}
projectDir := filepath.Dir(filepath.Dir(inboxDir)) // inboxDir/.agent-queue/inbox → project
for _, e := range entries {
name := e.Name()
// Skip non-md files, .dispatched markers, .blocked markers, and any
// other sidecar (.stuck, .tmp, etc).
if !strings.HasSuffix(name, ".md") {
continue
}
taskPath := filepath.Join(inboxDir, name)
// Phase 2/G2: enforce depends_on before claiming a session. The
// session pool is a precious resource — we don't want to burn an
// idle slot on a task that can't proceed.
if d.taskBlocked(taskPath) {
d.touchBlockedMarker(taskPath)
continue
}
session := d.findFreeSession()
if session == "" {
d.logger.Printf("[dispatcher] no free session for task in %s", inboxDir)
return
}
if err := d.launchAgent(session, projectDir, taskPath); err != nil {
d.logger.Printf("[dispatcher] launchAgent error: %v", err)
continue
}
if err := os.Rename(taskPath, taskPath+".dispatched"); err != nil {
d.logger.Printf("[dispatcher] rename .dispatched: %v", err)
}
}
}
// taskBlocked returns true when taskPath declares `depends_on` entries that
// are not yet present in the target project's `.agent-queue/done/`. The
// frontmatter is parsed best-effort; on parse failure we treat the task as
// non-blocked (current behaviour preserved — bad frontmatter is the
// agent's problem, not the dispatcher's).
func (d *Dispatcher) taskBlocked(taskPath string) bool {
content, err := os.ReadFile(taskPath)
if err != nil {
return false
}
fm, _ := parseFrontmatter(content)
if len(fm.DependsOn) == 0 {
return false
}
for _, dep := range fm.DependsOn {
if !d.dependencyResolved(dep) {
d.logger.Printf("[dispatcher] task %s blocked by unresolved dep %q",
filepath.Base(taskPath), dep)
return true
}
}
return false
}
// dependencyResolved checks whether `project:task_id` is in
// `<projectsDir>/<project>/.agent-queue/done/<task_id>.md`. Accepts an
// exact match or a `<task_id>*.md` prefix match (some queues append a
// timestamp). Path segments are validated (no `..`, no `/` inside the
// segment) — see isSafeSegment.
func (d *Dispatcher) dependencyResolved(dep string) bool {
parts := strings.SplitN(dep, ":", 2)
if len(parts) != 2 {
d.logger.Printf("[dispatcher] malformed depends_on %q (expected 'project:task_id')", dep)
return false
}
project, taskID := parts[0], parts[1]
if !isSafeSegment(project) || !isSafeSegment(taskID) {
d.logger.Printf("[dispatcher] unsafe depends_on segment %q — refusing to look up", dep)
return false
}
doneDir := filepath.Join(d.projectsDir, project, ".agent-queue", "done")
exact := filepath.Join(doneDir, taskID+".md")
if _, err := os.Stat(exact); err == nil {
return true
}
entries, err := os.ReadDir(doneDir)
if err != nil {
return false
}
for _, e := range entries {
if e.IsDir() {
continue
}
name := e.Name()
if strings.HasPrefix(name, taskID) && strings.HasSuffix(name, ".md") &&
!strings.HasSuffix(name, ".dispatched") {
return true
}
}
return false
}
// touchBlockedMarker creates `<taskPath>.blocked` (or refreshes its mtime)
// so the watchdog (Phase 2/G1) sees the task as blocked and tracks its
// timeout. We use mtime as the "blocked since" timestamp; we do NOT
// refresh it on subsequent ticks — the operator wants the timeout to be
// counted from the FIRST detection, not the last.
func (d *Dispatcher) touchBlockedMarker(taskPath string) {
marker := taskPath + ".blocked"
if _, err := os.Stat(marker); err == nil {
return // marker already exists, leave mtime alone
}
if err := os.WriteFile(marker, []byte(""), 0o644); err != nil {
d.logger.Printf("[dispatcher] write .blocked marker for %s: %v",
filepath.Base(taskPath), err)
return
}
d.logger.Printf("[dispatcher] task %s marked .blocked (waiting on depends_on)",
filepath.Base(taskPath))
}
// isSafeSegment guards the project / task_id pair against path traversal.
// Same rule as the watchdog (`[A-Za-z0-9._-]+`, no `.` / `..`).
func isSafeSegment(s string) bool {
if s == "" || s == "." || s == ".." {
return false
}
for _, r := range s {
switch {
case r >= 'a' && r <= 'z',
r >= 'A' && r <= 'Z',
r >= '0' && r <= '9',
r == '_', r == '-', r == '.':
default:
return false
}
}
return true
}
// findFreeSession returns the name of an idle, live, cooldown-free session
// from the autonomous pool. Dedicated sessions are intentionally NOT
// considered: those host the operator's manual interactive work. Routing a
// background dispatch into them would (a) hijack a Claude instance the
// operator is using and (b) trigger the watcher's /exit recycle at task
// end, kicking the operator out mid-conversation.
func (d *Dispatcher) findFreeSession() string {
prefix := d.config.Pool.Autonomous.Prefix
if prefix == "" {
prefix = "ccl-auto-"
}
start := d.config.Pool.Autonomous.StartIndex
for i := start; i < start+d.config.Pool.Autonomous.Max; i++ {
if d.isSessionFree(sessionName(prefix, i)) {
return sessionName(prefix, i)
}
}
return ""
}
// isSessionFree returns true when the session is idle, alive, and past its cooldown.
func (d *Dispatcher) isSessionFree(name string) bool {
if !d.tmux.HasSession(name) {
return false
}
sess := d.state.GetSession(name)
if sess == nil || sess.State != "idle" {
return false
}
if sess.LastFail != nil && time.Since(*sess.LastFail) < 5*time.Minute {
return false
}
return true
}
// assignNextTask scans all inboxes for work to give to a freshly-idled session.
func (d *Dispatcher) assignNextTask(session string) {
for _, ds := range d.config.Pool.Dedicated {
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
entries, err := os.ReadDir(inbox)
if err != nil {
continue
}
for _, e := range entries {
name := e.Name()
if !strings.HasSuffix(name, ".md") {
continue
}
taskPath := filepath.Join(inbox, name)
// Phase 2/G2: respect depends_on here too, otherwise a
// session freed mid-cycle would still bypass the gate.
if d.taskBlocked(taskPath) {
d.touchBlockedMarker(taskPath)
continue
}
if err := d.launchAgent(session, ds.Project, taskPath); err == nil {
os.Rename(taskPath, taskPath+".dispatched")
return
}
}
}
}
// launchAgent starts Claude Code in session for the given task file.
func (d *Dispatcher) launchAgent(session, projectDir, taskFile string) error {
content, err := os.ReadFile(taskFile)
if err != nil {
return fmt.Errorf("read task %s: %w", taskFile, err)
}
fm, body := parseFrontmatter(content)
model := modelForPriority(fm.Priority)
// Change to project directory.
if err := d.tmux.SendKeys(session, "cd "+projectDir); err != nil {
return err
}
time.Sleep(300 * time.Millisecond)
// Build and send the claude command, with optional --resume UUID.
cmd := fmt.Sprintf("claude --model %s --dangerously-skip-permissions", model)
resumeFile := filepath.Join(d.resumeDir(), session+"-resume-id.txt")
if data, ferr := os.ReadFile(resumeFile); ferr == nil {
if uuid := strings.TrimSpace(string(data)); uuid != "" {
cmd += " --resume " + uuid
os.Remove(resumeFile)
}
}
if err := d.tmux.SendKeys(session, cmd); err != nil {
return err
}
// Wait for the prompt before sending the task message.
promptTimeout := d.config.Dispatcher.PromptTimeout.Duration
if promptTimeout == 0 {
promptTimeout = 30 * time.Second
}
if !d.waitForPrompt(session, promptTimeout) {
return fmt.Errorf("claude not ready in %q after %v", session, promptTimeout)
}
// Send the task message. Multi-line task bodies render in Claude's TUI
// as "[Pasted text #N +M lines]" — the trailing Enter that SendKeys
// appends is consumed as part of the paste, so the message would stay
// unsubmitted in the input buffer forever. Wait for the paste to
// register, then send a lone Enter to actually submit.
msg := buildTaskMessage(body, taskFile)
if err := d.tmux.SendKeys(session, msg); err != nil {
return err
}
time.Sleep(500 * time.Millisecond)
if err := d.tmux.SendEnter(session); err != nil {
return err
}
d.state.SetWorking(session, filepath.Base(taskFile))
d.logger.Printf("[dispatcher] DISPATCHED session=%q task=%s model=%s",
session, filepath.Base(taskFile), model)
return nil
}
// waitForPrompt polls for the Claude prompt up to timeout.
func (d *Dispatcher) waitForPrompt(session string, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
tail, err := d.tmux.CapturePaneTail(session, 5)
if err == nil && strings.Contains(tail, "") {
return true
}
time.Sleep(1 * time.Second)
}
return false
}
// resumeDir returns the directory containing per-session resume UUIDs.
func (d *Dispatcher) resumeDir() string {
home, _ := os.UserHomeDir()
return filepath.Join(home, ".claude-context")
}
// parseFrontmatter splits YAML frontmatter (between --- markers) from the body.
// Returns an empty TaskFrontmatter and the full content if no header is found.
func parseFrontmatter(content []byte) (TaskFrontmatter, string) {
s := string(content)
if !strings.HasPrefix(s, "---\n") {
return TaskFrontmatter{}, s
}
end := strings.Index(s[4:], "\n---\n")
if end < 0 {
return TaskFrontmatter{}, s
}
yamlBlock := s[4 : end+4]
body := strings.TrimSpace(s[end+4+5:]) // skip "\n---\n"
var fm TaskFrontmatter
yaml.Unmarshal([]byte(yamlBlock), &fm) //nolint:errcheck // best-effort parse
return fm, body
}
// modelForPriority maps a task priority to a Claude model name.
func modelForPriority(priority string) string {
if strings.EqualFold(priority, "critical") {
return "opus"
}
return "sonnet"
}
// buildTaskMessage constructs the dispatch message sent to Claude Code.
func buildTaskMessage(body, taskFile string) string {
taskName := filepath.Base(taskFile)
if body == "" {
body = "Verifie .agent-queue/inbox/ — 1 tache assignee."
}
return fmt.Sprintf("[%s] %s\n\nIMPORTANT: Tu dois EXECUTER les actions demandées, pas seulement les décrire.",
taskName, body)
}
func sessionName(prefix string, i int) string {
return prefix + itoa(i)
}
func itoa(n int) string {
if n == 0 {
return "0"
}
b := make([]byte, 0, 10)
for n > 0 {
b = append([]byte{byte('0' + n%10)}, b...)
n /= 10
}
return string(b)
}