Before claiming a session for a task, the dispatcher now: 1. Parses the task's frontmatter 2. If `depends_on: [project:task_id]` is non-empty, checks each entry against `<projectsDir>/<project>/.agent-queue/done/<task_id>.md` 3. If any dep is unresolved -> skip the task and write `<task>.md.blocked` next to it. The watchdog (G1) will resolve this marker on its next tick. The `.blocked` marker is idempotent: re-running the dispatcher does not refresh its mtime, so the watchdog can compute the blocked-since timestamp from the FIRST detection (timeout precision). Path-traversal hardening: project / task_id segments must match `[A-Za-z0-9._-]+` and cannot be `.` or `..`. A malicious frontmatter like `depends_on: ../../tmp:foo` is rejected before any filesystem lookup. assignNextTask (the doneChan path) applies the same gate so that a session freed mid-cycle cannot bypass enforcement. Tests (-race clean): - DependsOnUnresolved -> .blocked marker, no dispatch - DependsOnResolved -> normal dispatch, no marker - PartialResolution -> stay blocked - RejectPathTraversal -> blocked, not dispatched - BlockedMarker idempotent (mtime stable across passes) - NoDependsOn regression guard
449 lines
14 KiB
Go
449 lines
14 KiB
Go
// Package dispatcher watches project inbox directories and assigns tasks to
|
||
// idle tmux sessions. One task per session — batch dispatch is intentionally
|
||
// unsupported.
|
||
package dispatcher
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"log"
|
||
"os"
|
||
"path/filepath"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/fsnotify/fsnotify"
|
||
"gopkg.in/yaml.v3"
|
||
|
||
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
|
||
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
||
"forge.secuaas.ovh/olivier/claude-failover/internal/tmux"
|
||
)
|
||
|
||
// TaskFrontmatter is the YAML header parsed from task .md files.
|
||
type TaskFrontmatter struct {
|
||
Title string `yaml:"title"`
|
||
Priority string `yaml:"priority"` // critical, high, default, low
|
||
Tags []string `yaml:"tags"`
|
||
NeedsClaude bool `yaml:"needs_claude_code"`
|
||
// Phase 2/G2: cross-project dependencies. Each entry is
|
||
// "project:task_id" (e.g. "filesecure:FIX-0123"). The dispatcher
|
||
// refuses to launch a task whose deps aren't all in the target
|
||
// project's done/, and drops a `<task>.md.blocked` marker so the
|
||
// watchdog can resolve it later (Phase 2/G1).
|
||
DependsOn []string `yaml:"depends_on"`
|
||
}
|
||
|
||
// Dispatcher watches project inbox directories and assigns tasks to idle sessions.
|
||
type Dispatcher struct {
|
||
tmux tmux.Client
|
||
state *state.State
|
||
config *config.Config
|
||
doneChan <-chan string
|
||
projectsDir string
|
||
logger *log.Logger
|
||
}
|
||
|
||
// New creates a Dispatcher.
|
||
// doneChan receives session names when they become idle (from SessionWatcher).
|
||
func New(tc tmux.Client, s *state.State, cfg *config.Config, doneChan <-chan string) *Dispatcher {
|
||
projectsDir := cfg.Dispatcher.ProjectsDir
|
||
if projectsDir == "" {
|
||
projectsDir = cfg.Pool.SharedProjectsDir
|
||
}
|
||
return &Dispatcher{
|
||
tmux: tc,
|
||
state: s,
|
||
config: cfg,
|
||
doneChan: doneChan,
|
||
projectsDir: projectsDir,
|
||
logger: log.Default(),
|
||
}
|
||
}
|
||
|
||
// Run starts the dispatcher event loop until ctx is cancelled.
|
||
func (d *Dispatcher) Run(ctx context.Context) {
|
||
ticker := time.NewTicker(60 * time.Second)
|
||
defer ticker.Stop()
|
||
|
||
fw, err := fsnotify.NewWatcher()
|
||
if err != nil {
|
||
d.logger.Printf("[dispatcher] fsnotify unavailable: %v — poll-only mode", err)
|
||
} else {
|
||
defer fw.Close()
|
||
d.watchInboxDirs(fw)
|
||
}
|
||
|
||
var fwEvents <-chan fsnotify.Event
|
||
if fw != nil {
|
||
fwEvents = fw.Events
|
||
}
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case evt, ok := <-fwEvents:
|
||
if !ok {
|
||
return
|
||
}
|
||
if evt.Op&fsnotify.Create != 0 &&
|
||
strings.HasSuffix(evt.Name, ".md") &&
|
||
!strings.HasSuffix(evt.Name, ".dispatched") {
|
||
d.dispatchProject(filepath.Dir(evt.Name))
|
||
}
|
||
case session := <-d.doneChan:
|
||
d.assignNextTask(session)
|
||
case <-ticker.C:
|
||
d.fullScan()
|
||
}
|
||
}
|
||
}
|
||
|
||
// watchInboxDirs registers all known project inbox dirs with the fsnotify watcher.
|
||
func (d *Dispatcher) watchInboxDirs(fw *fsnotify.Watcher) {
|
||
for _, ds := range d.config.Pool.Dedicated {
|
||
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
|
||
if err := fw.Add(inbox); err != nil {
|
||
d.logger.Printf("[dispatcher] watch %s: %v", inbox, err)
|
||
}
|
||
}
|
||
}
|
||
|
||
// fullScan dispatches pending tasks in all project inboxes.
|
||
func (d *Dispatcher) fullScan() {
|
||
for _, ds := range d.config.Pool.Dedicated {
|
||
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
|
||
d.dispatchProject(inbox)
|
||
}
|
||
}
|
||
|
||
// dispatchProject assigns undispatched tasks in inboxDir to idle sessions.
|
||
func (d *Dispatcher) dispatchProject(inboxDir string) {
|
||
entries, err := os.ReadDir(inboxDir)
|
||
if err != nil {
|
||
return
|
||
}
|
||
projectDir := filepath.Dir(filepath.Dir(inboxDir)) // inboxDir/.agent-queue/inbox → project
|
||
for _, e := range entries {
|
||
name := e.Name()
|
||
// Skip non-md files, .dispatched markers, .blocked markers, and any
|
||
// other sidecar (.stuck, .tmp, etc).
|
||
if !strings.HasSuffix(name, ".md") {
|
||
continue
|
||
}
|
||
taskPath := filepath.Join(inboxDir, name)
|
||
|
||
// Phase 2/G2: enforce depends_on before claiming a session. The
|
||
// session pool is a precious resource — we don't want to burn an
|
||
// idle slot on a task that can't proceed.
|
||
if d.taskBlocked(taskPath) {
|
||
d.touchBlockedMarker(taskPath)
|
||
continue
|
||
}
|
||
|
||
session := d.findFreeSession()
|
||
if session == "" {
|
||
d.logger.Printf("[dispatcher] no free session for task in %s", inboxDir)
|
||
return
|
||
}
|
||
if err := d.launchAgent(session, projectDir, taskPath); err != nil {
|
||
d.logger.Printf("[dispatcher] launchAgent error: %v", err)
|
||
continue
|
||
}
|
||
if err := os.Rename(taskPath, taskPath+".dispatched"); err != nil {
|
||
d.logger.Printf("[dispatcher] rename .dispatched: %v", err)
|
||
}
|
||
}
|
||
}
|
||
|
||
// taskBlocked returns true when taskPath declares `depends_on` entries that
|
||
// are not yet present in the target project's `.agent-queue/done/`. The
|
||
// frontmatter is parsed best-effort; on parse failure we treat the task as
|
||
// non-blocked (current behaviour preserved — bad frontmatter is the
|
||
// agent's problem, not the dispatcher's).
|
||
func (d *Dispatcher) taskBlocked(taskPath string) bool {
|
||
content, err := os.ReadFile(taskPath)
|
||
if err != nil {
|
||
return false
|
||
}
|
||
fm, _ := parseFrontmatter(content)
|
||
if len(fm.DependsOn) == 0 {
|
||
return false
|
||
}
|
||
for _, dep := range fm.DependsOn {
|
||
if !d.dependencyResolved(dep) {
|
||
d.logger.Printf("[dispatcher] task %s blocked by unresolved dep %q",
|
||
filepath.Base(taskPath), dep)
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
// dependencyResolved checks whether `project:task_id` is in
|
||
// `<projectsDir>/<project>/.agent-queue/done/<task_id>.md`. Accepts an
|
||
// exact match or a `<task_id>*.md` prefix match (some queues append a
|
||
// timestamp). Path segments are validated (no `..`, no `/` inside the
|
||
// segment) — see isSafeSegment.
|
||
func (d *Dispatcher) dependencyResolved(dep string) bool {
|
||
parts := strings.SplitN(dep, ":", 2)
|
||
if len(parts) != 2 {
|
||
d.logger.Printf("[dispatcher] malformed depends_on %q (expected 'project:task_id')", dep)
|
||
return false
|
||
}
|
||
project, taskID := parts[0], parts[1]
|
||
if !isSafeSegment(project) || !isSafeSegment(taskID) {
|
||
d.logger.Printf("[dispatcher] unsafe depends_on segment %q — refusing to look up", dep)
|
||
return false
|
||
}
|
||
doneDir := filepath.Join(d.projectsDir, project, ".agent-queue", "done")
|
||
exact := filepath.Join(doneDir, taskID+".md")
|
||
if _, err := os.Stat(exact); err == nil {
|
||
return true
|
||
}
|
||
entries, err := os.ReadDir(doneDir)
|
||
if err != nil {
|
||
return false
|
||
}
|
||
for _, e := range entries {
|
||
if e.IsDir() {
|
||
continue
|
||
}
|
||
name := e.Name()
|
||
if strings.HasPrefix(name, taskID) && strings.HasSuffix(name, ".md") &&
|
||
!strings.HasSuffix(name, ".dispatched") {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
// touchBlockedMarker creates `<taskPath>.blocked` (or refreshes its mtime)
|
||
// so the watchdog (Phase 2/G1) sees the task as blocked and tracks its
|
||
// timeout. We use mtime as the "blocked since" timestamp; we do NOT
|
||
// refresh it on subsequent ticks — the operator wants the timeout to be
|
||
// counted from the FIRST detection, not the last.
|
||
func (d *Dispatcher) touchBlockedMarker(taskPath string) {
|
||
marker := taskPath + ".blocked"
|
||
if _, err := os.Stat(marker); err == nil {
|
||
return // marker already exists, leave mtime alone
|
||
}
|
||
if err := os.WriteFile(marker, []byte(""), 0o644); err != nil {
|
||
d.logger.Printf("[dispatcher] write .blocked marker for %s: %v",
|
||
filepath.Base(taskPath), err)
|
||
return
|
||
}
|
||
d.logger.Printf("[dispatcher] task %s marked .blocked (waiting on depends_on)",
|
||
filepath.Base(taskPath))
|
||
}
|
||
|
||
// isSafeSegment guards the project / task_id pair against path traversal.
|
||
// Same rule as the watchdog (`[A-Za-z0-9._-]+`, no `.` / `..`).
|
||
func isSafeSegment(s string) bool {
|
||
if s == "" || s == "." || s == ".." {
|
||
return false
|
||
}
|
||
for _, r := range s {
|
||
switch {
|
||
case r >= 'a' && r <= 'z',
|
||
r >= 'A' && r <= 'Z',
|
||
r >= '0' && r <= '9',
|
||
r == '_', r == '-', r == '.':
|
||
default:
|
||
return false
|
||
}
|
||
}
|
||
return true
|
||
}
|
||
|
||
// findFreeSession returns the name of an idle, live, cooldown-free session
|
||
// from the autonomous pool. Dedicated sessions are intentionally NOT
|
||
// considered: those host the operator's manual interactive work. Routing a
|
||
// background dispatch into them would (a) hijack a Claude instance the
|
||
// operator is using and (b) trigger the watcher's /exit recycle at task
|
||
// end, kicking the operator out mid-conversation.
|
||
func (d *Dispatcher) findFreeSession() string {
|
||
prefix := d.config.Pool.Autonomous.Prefix
|
||
if prefix == "" {
|
||
prefix = "ccl-auto-"
|
||
}
|
||
start := d.config.Pool.Autonomous.StartIndex
|
||
for i := start; i < start+d.config.Pool.Autonomous.Max; i++ {
|
||
if d.isSessionFree(sessionName(prefix, i)) {
|
||
return sessionName(prefix, i)
|
||
}
|
||
}
|
||
return ""
|
||
}
|
||
|
||
// isSessionFree returns true when the session is idle, alive, and past its cooldown.
|
||
func (d *Dispatcher) isSessionFree(name string) bool {
|
||
if !d.tmux.HasSession(name) {
|
||
return false
|
||
}
|
||
sess := d.state.GetSession(name)
|
||
if sess == nil || sess.State != "idle" {
|
||
return false
|
||
}
|
||
if sess.LastFail != nil && time.Since(*sess.LastFail) < 5*time.Minute {
|
||
return false
|
||
}
|
||
return true
|
||
}
|
||
|
||
// assignNextTask scans all inboxes for work to give to a freshly-idled session.
|
||
func (d *Dispatcher) assignNextTask(session string) {
|
||
for _, ds := range d.config.Pool.Dedicated {
|
||
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
|
||
entries, err := os.ReadDir(inbox)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
for _, e := range entries {
|
||
name := e.Name()
|
||
if !strings.HasSuffix(name, ".md") {
|
||
continue
|
||
}
|
||
taskPath := filepath.Join(inbox, name)
|
||
// Phase 2/G2: respect depends_on here too, otherwise a
|
||
// session freed mid-cycle would still bypass the gate.
|
||
if d.taskBlocked(taskPath) {
|
||
d.touchBlockedMarker(taskPath)
|
||
continue
|
||
}
|
||
if err := d.launchAgent(session, ds.Project, taskPath); err == nil {
|
||
os.Rename(taskPath, taskPath+".dispatched")
|
||
return
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// launchAgent starts Claude Code in session for the given task file.
|
||
func (d *Dispatcher) launchAgent(session, projectDir, taskFile string) error {
|
||
content, err := os.ReadFile(taskFile)
|
||
if err != nil {
|
||
return fmt.Errorf("read task %s: %w", taskFile, err)
|
||
}
|
||
fm, body := parseFrontmatter(content)
|
||
model := modelForPriority(fm.Priority)
|
||
|
||
// Change to project directory.
|
||
if err := d.tmux.SendKeys(session, "cd "+projectDir); err != nil {
|
||
return err
|
||
}
|
||
time.Sleep(300 * time.Millisecond)
|
||
|
||
// Build and send the claude command, with optional --resume UUID.
|
||
cmd := fmt.Sprintf("claude --model %s --dangerously-skip-permissions", model)
|
||
resumeFile := filepath.Join(d.resumeDir(), session+"-resume-id.txt")
|
||
if data, ferr := os.ReadFile(resumeFile); ferr == nil {
|
||
if uuid := strings.TrimSpace(string(data)); uuid != "" {
|
||
cmd += " --resume " + uuid
|
||
os.Remove(resumeFile)
|
||
}
|
||
}
|
||
if err := d.tmux.SendKeys(session, cmd); err != nil {
|
||
return err
|
||
}
|
||
|
||
// Wait for the ❯ prompt before sending the task message.
|
||
promptTimeout := d.config.Dispatcher.PromptTimeout.Duration
|
||
if promptTimeout == 0 {
|
||
promptTimeout = 30 * time.Second
|
||
}
|
||
if !d.waitForPrompt(session, promptTimeout) {
|
||
return fmt.Errorf("claude not ready in %q after %v", session, promptTimeout)
|
||
}
|
||
|
||
// Send the task message. Multi-line task bodies render in Claude's TUI
|
||
// as "[Pasted text #N +M lines]" — the trailing Enter that SendKeys
|
||
// appends is consumed as part of the paste, so the message would stay
|
||
// unsubmitted in the input buffer forever. Wait for the paste to
|
||
// register, then send a lone Enter to actually submit.
|
||
msg := buildTaskMessage(body, taskFile)
|
||
if err := d.tmux.SendKeys(session, msg); err != nil {
|
||
return err
|
||
}
|
||
time.Sleep(500 * time.Millisecond)
|
||
if err := d.tmux.SendEnter(session); err != nil {
|
||
return err
|
||
}
|
||
|
||
d.state.SetWorking(session, filepath.Base(taskFile))
|
||
d.logger.Printf("[dispatcher] DISPATCHED session=%q task=%s model=%s",
|
||
session, filepath.Base(taskFile), model)
|
||
return nil
|
||
}
|
||
|
||
// waitForPrompt polls for the Claude ❯ prompt up to timeout.
|
||
func (d *Dispatcher) waitForPrompt(session string, timeout time.Duration) bool {
|
||
deadline := time.Now().Add(timeout)
|
||
for time.Now().Before(deadline) {
|
||
tail, err := d.tmux.CapturePaneTail(session, 5)
|
||
if err == nil && strings.Contains(tail, "❯") {
|
||
return true
|
||
}
|
||
time.Sleep(1 * time.Second)
|
||
}
|
||
return false
|
||
}
|
||
|
||
// resumeDir returns the directory containing per-session resume UUIDs.
|
||
func (d *Dispatcher) resumeDir() string {
|
||
home, _ := os.UserHomeDir()
|
||
return filepath.Join(home, ".claude-context")
|
||
}
|
||
|
||
// parseFrontmatter splits YAML frontmatter (between --- markers) from the body.
|
||
// Returns an empty TaskFrontmatter and the full content if no header is found.
|
||
func parseFrontmatter(content []byte) (TaskFrontmatter, string) {
|
||
s := string(content)
|
||
if !strings.HasPrefix(s, "---\n") {
|
||
return TaskFrontmatter{}, s
|
||
}
|
||
end := strings.Index(s[4:], "\n---\n")
|
||
if end < 0 {
|
||
return TaskFrontmatter{}, s
|
||
}
|
||
yamlBlock := s[4 : end+4]
|
||
body := strings.TrimSpace(s[end+4+5:]) // skip "\n---\n"
|
||
var fm TaskFrontmatter
|
||
yaml.Unmarshal([]byte(yamlBlock), &fm) //nolint:errcheck // best-effort parse
|
||
return fm, body
|
||
}
|
||
|
||
// modelForPriority maps a task priority to a Claude model name.
|
||
func modelForPriority(priority string) string {
|
||
if strings.EqualFold(priority, "critical") {
|
||
return "opus"
|
||
}
|
||
return "sonnet"
|
||
}
|
||
|
||
// buildTaskMessage constructs the dispatch message sent to Claude Code.
|
||
func buildTaskMessage(body, taskFile string) string {
|
||
taskName := filepath.Base(taskFile)
|
||
if body == "" {
|
||
body = "Verifie .agent-queue/inbox/ — 1 tache assignee."
|
||
}
|
||
return fmt.Sprintf("[%s] %s\n\nIMPORTANT: Tu dois EXECUTER les actions demandées, pas seulement les décrire.",
|
||
taskName, body)
|
||
}
|
||
|
||
func sessionName(prefix string, i int) string {
|
||
return prefix + itoa(i)
|
||
}
|
||
|
||
func itoa(n int) string {
|
||
if n == 0 {
|
||
return "0"
|
||
}
|
||
b := make([]byte, 0, 10)
|
||
for n > 0 {
|
||
b = append([]byte{byte('0' + n%10)}, b...)
|
||
n /= 10
|
||
}
|
||
return string(b)
|
||
}
|