claude-failover/internal/dispatcher/dispatcher_test.go
Ubuntu 5cfb58c202 feat(dispatcher): enforce depends_on with .blocked marker (Phase 2/G2)
Before claiming a session for a task, the dispatcher now:
1. Parses the task's frontmatter
2. If `depends_on: [project:task_id]` is non-empty, checks each entry
   against `<projectsDir>/<project>/.agent-queue/done/<task_id>.md`
3. If any dep is unresolved -> skip the task and write
   `<task>.md.blocked` next to it. The watchdog (G1) will resolve
   this marker on its next tick.

The `.blocked` marker is idempotent: re-running the dispatcher does not
refresh its mtime, so the watchdog can compute the blocked-since
timestamp from the FIRST detection (timeout precision).

Path-traversal hardening: project / task_id segments must match
`[A-Za-z0-9._-]+` and cannot be `.` or `..`. A malicious frontmatter
like `depends_on: ../../tmp:foo` is rejected before any filesystem
lookup.

assignNextTask (the doneChan path) applies the same gate so that a
session freed mid-cycle cannot bypass enforcement.

Tests (-race clean):
- DependsOnUnresolved -> .blocked marker, no dispatch
- DependsOnResolved -> normal dispatch, no marker
- PartialResolution -> stay blocked
- RejectPathTraversal -> blocked, not dispatched
- BlockedMarker idempotent (mtime stable across passes)
- NoDependsOn regression guard
2026-04-16 20:30:17 +00:00

521 lines
15 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package dispatcher
import (
"log"
"os"
"path/filepath"
"testing"
"time"
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
)
// mockTmux is a minimal in-memory tmux.Client for tests.
type mockTmux struct {
sessions map[string]bool
paneOutput map[string]string
sentKeys []string
}
func newMockTmux() *mockTmux {
return &mockTmux{
sessions: make(map[string]bool),
paneOutput: make(map[string]string),
}
}
func (m *mockTmux) HasSession(name string) bool { return m.sessions[name] }
func (m *mockTmux) CreateSession(name, _ string) error { m.sessions[name] = true; return nil }
func (m *mockTmux) KillSession(_ string) error { return nil }
func (m *mockTmux) SendKeys(_, keys string) error {
m.sentKeys = append(m.sentKeys, keys)
return nil
}
func (m *mockTmux) SendEnter(_ string) error {
m.sentKeys = append(m.sentKeys, "<ENTER>")
return nil
}
func (m *mockTmux) CapturePaneTail(session string, _ int) (string, error) {
return m.paneOutput[session], nil
}
// TestParseFrontmatter verifies YAML frontmatter extraction.
func TestParseFrontmatter(t *testing.T) {
input := "---\ntitle: Fix bug\npriority: critical\n---\nDo the fix."
fm, body := parseFrontmatter([]byte(input))
if fm.Title != "Fix bug" {
t.Errorf("expected title 'Fix bug', got %q", fm.Title)
}
if fm.Priority != "critical" {
t.Errorf("expected priority critical, got %q", fm.Priority)
}
if body != "Do the fix." {
t.Errorf("expected body 'Do the fix.', got %q", body)
}
}
// TestParseFrontmatterNoHeader handles files without a YAML header.
func TestParseFrontmatterNoHeader(t *testing.T) {
input := "Just plain content."
fm, body := parseFrontmatter([]byte(input))
if fm.Title != "" {
t.Errorf("expected empty title, got %q", fm.Title)
}
if body != "Just plain content." {
t.Errorf("expected full body, got %q", body)
}
}
// TestModelForPriority maps priority strings to model names.
func TestModelForPriority(t *testing.T) {
cases := []struct{ priority, want string }{
{"critical", "opus"},
{"CRITICAL", "opus"},
{"high", "sonnet"},
{"default", "sonnet"},
{"", "sonnet"},
}
for _, c := range cases {
if got := modelForPriority(c.priority); got != c.want {
t.Errorf("modelForPriority(%q) = %q, want %q", c.priority, got, c.want)
}
}
}
// TestFindFreeSessionSkipsFailed verifies that recently-failed sessions are skipped.
func TestFindFreeSessionSkipsFailed(t *testing.T) {
tc := newMockTmux()
tc.sessions["sess-0"] = true
tc.sessions["sess-1"] = true
s := state.New("")
s.SetFailed("sess-0")
s.SetIdle("sess-1")
d := &Dispatcher{
tmux: tc,
state: s,
config: &config.Config{
Pool: config.PoolConfig{
Autonomous: config.AutonomousConfig{Prefix: "sess-", Max: 2},
},
},
logger: log.Default(),
}
got := d.findFreeSession()
if got != "sess-1" {
t.Errorf("expected sess-1, got %q", got)
}
}
// TestFindFreeSessionMissingTmux skips sessions not in tmux.
func TestFindFreeSessionMissingTmux(t *testing.T) {
tc := newMockTmux()
// sess-0 missing from tmux, sess-1 present and idle.
tc.sessions["sess-1"] = true
s := state.New("")
s.SetIdle("sess-0")
s.SetIdle("sess-1")
d := &Dispatcher{
tmux: tc,
state: s,
config: &config.Config{
Pool: config.PoolConfig{
Autonomous: config.AutonomousConfig{Prefix: "sess-", Max: 2},
},
},
logger: log.Default(),
}
got := d.findFreeSession()
if got != "sess-1" {
t.Errorf("expected sess-1, got %q", got)
}
}
// TestFindFreeSessionSkipsDedicated verifies that dedicated sessions are
// NEVER returned by the auto-dispatch path, even when idle. Those host the
// operator's manual interactive work and must stay untouched.
func TestFindFreeSessionSkipsDedicated(t *testing.T) {
tc := newMockTmux()
tc.sessions["ccl-1-conformvault"] = true
tc.sessions["sess-0"] = true
s := state.New("")
s.SetIdle("ccl-1-conformvault")
s.SetIdle("sess-0")
d := &Dispatcher{
tmux: tc,
state: s,
config: &config.Config{
Pool: config.PoolConfig{
Dedicated: []config.DedicatedSession{{Name: "ccl-1-conformvault"}},
Autonomous: config.AutonomousConfig{Prefix: "sess-", Max: 1},
},
},
logger: log.Default(),
}
got := d.findFreeSession()
if got != "sess-0" {
t.Errorf("expected pool sess-0 (dedicated must be skipped), got %q", got)
}
}
// TestDispatchProject creates a task file, dispatches it, and checks state + rename.
func TestDispatchProject(t *testing.T) {
dir := t.TempDir()
inbox := filepath.Join(dir, ".agent-queue", "inbox")
os.MkdirAll(inbox, 0755)
taskContent := "---\ntitle: My Task\npriority: high\n---\nDo the work."
taskPath := filepath.Join(inbox, "task-001.md")
os.WriteFile(taskPath, []byte(taskContent), 0644)
tc := newMockTmux()
tc.sessions["pool-0"] = true
// Return prompt on first CapturePaneTail call (Claude is ready).
tc.paneOutput["pool-0"] = " "
s := state.New("")
s.SetIdle("pool-0")
d := &Dispatcher{
tmux: tc,
state: s,
config: &config.Config{
Pool: config.PoolConfig{
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1},
},
},
logger: log.Default(),
}
d.dispatchProject(inbox)
if st := s.GetSession("pool-0"); st == nil || st.State != "working" {
t.Errorf("expected session working after dispatch, got %v", st)
}
// Original file renamed to .dispatched.
if _, err := os.Stat(taskPath + ".dispatched"); os.IsNotExist(err) {
t.Error("expected .dispatched marker")
}
if _, err := os.Stat(taskPath); !os.IsNotExist(err) {
t.Error("expected original task file to be renamed")
}
}
// TestDispatchProjectNoFreeSession leaves the task untouched when no session is available.
func TestDispatchProjectNoFreeSession(t *testing.T) {
dir := t.TempDir()
inbox := filepath.Join(dir, ".agent-queue", "inbox")
os.MkdirAll(inbox, 0755)
taskPath := filepath.Join(inbox, "task-002.md")
os.WriteFile(taskPath, []byte("content"), 0644)
tc := newMockTmux() // no sessions
s := state.New("")
d := &Dispatcher{
tmux: tc,
state: s,
config: &config.Config{
Pool: config.PoolConfig{
Autonomous: config.AutonomousConfig{Max: 0},
},
},
logger: log.Default(),
}
d.dispatchProject(inbox)
// File must remain unchanged.
if _, err := os.Stat(taskPath); os.IsNotExist(err) {
t.Error("task file should remain when no session is free")
}
}
// ─── Phase 2 / G2 — depends_on enforcement tests ────────────────────────────
// makeProjectInbox sets up a `<project>/.agent-queue/inbox/` directory under
// `root` and returns the inbox path.
func makeProjectInbox(t *testing.T, root, project string) string {
t.Helper()
inbox := filepath.Join(root, project, ".agent-queue", "inbox")
if err := os.MkdirAll(inbox, 0o755); err != nil {
t.Fatalf("mkdir inbox: %v", err)
}
if err := os.MkdirAll(filepath.Join(root, project, ".agent-queue", "done"), 0o755); err != nil {
t.Fatalf("mkdir done: %v", err)
}
return inbox
}
// putDoneTaskFailover seeds <root>/<project>/.agent-queue/done/<id>.md.
func putDoneTaskFailover(t *testing.T, root, project, id string) {
t.Helper()
doneDir := filepath.Join(root, project, ".agent-queue", "done")
_ = os.MkdirAll(doneDir, 0o755)
if err := os.WriteFile(filepath.Join(doneDir, id+".md"), []byte("done\n"), 0o644); err != nil {
t.Fatalf("write done: %v", err)
}
}
// TestDispatcher_DependsOnUnresolved_DropsBlockedMarker verifies that a task
// whose `depends_on` is not satisfied is NOT dispatched and that a
// `.md.blocked` marker is created next to it.
func TestDispatcher_DependsOnUnresolved_DropsBlockedMarker(t *testing.T) {
root := t.TempDir()
inboxA := makeProjectInbox(t, root, "projA")
makeProjectInbox(t, root, "projB") // empty done/
taskContent := "---\nid: TASK-A1\npriority: default\ndepends_on:\n - projB:DEP-1\n---\n\nbody\n"
taskPath := filepath.Join(inboxA, "TASK-A1.md")
if err := os.WriteFile(taskPath, []byte(taskContent), 0o644); err != nil {
t.Fatalf("write task: %v", err)
}
tc := newMockTmux()
tc.sessions["pool-0"] = true
tc.paneOutput["pool-0"] = " "
s := state.New("")
s.SetIdle("pool-0")
d := &Dispatcher{
tmux: tc,
state: s,
projectsDir: root,
config: &config.Config{
Pool: config.PoolConfig{
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1},
},
},
logger: log.Default(),
}
d.dispatchProject(inboxA)
if _, err := os.Stat(taskPath + ".blocked"); err != nil {
t.Errorf("expected .blocked marker: %v", err)
}
if _, err := os.Stat(taskPath + ".dispatched"); !os.IsNotExist(err) {
t.Errorf("task should NOT be dispatched while blocked")
}
if _, err := os.Stat(taskPath); err != nil {
t.Errorf("task .md should remain in inbox: %v", err)
}
if st := s.GetSession("pool-0"); st == nil || st.State != "idle" {
t.Errorf("session should stay idle when only task is blocked, got %v", st)
}
}
// TestDispatcher_DependsOnResolved_Dispatches verifies normal flow when the
// upstream task is in done/.
func TestDispatcher_DependsOnResolved_Dispatches(t *testing.T) {
root := t.TempDir()
inboxA := makeProjectInbox(t, root, "projA")
makeProjectInbox(t, root, "projB")
putDoneTaskFailover(t, root, "projB", "DEP-1")
taskContent := "---\nid: TASK-A2\npriority: default\ndepends_on:\n - projB:DEP-1\n---\n\nbody\n"
taskPath := filepath.Join(inboxA, "TASK-A2.md")
_ = os.WriteFile(taskPath, []byte(taskContent), 0o644)
tc := newMockTmux()
tc.sessions["pool-0"] = true
tc.paneOutput["pool-0"] = " "
s := state.New("")
s.SetIdle("pool-0")
d := &Dispatcher{
tmux: tc,
state: s,
projectsDir: root,
config: &config.Config{
Pool: config.PoolConfig{
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1},
},
},
logger: log.Default(),
}
d.dispatchProject(inboxA)
if _, err := os.Stat(taskPath + ".dispatched"); err != nil {
t.Errorf("expected .dispatched: %v", err)
}
if _, err := os.Stat(taskPath + ".blocked"); !os.IsNotExist(err) {
t.Errorf(".blocked marker should NOT be present when dep is resolved")
}
if st := s.GetSession("pool-0"); st == nil || st.State != "working" {
t.Errorf("expected working session, got %v", st)
}
}
// TestDispatcher_DependsOn_PartialResolution treats partial resolution as
// blocked.
func TestDispatcher_DependsOn_PartialResolution(t *testing.T) {
root := t.TempDir()
inboxA := makeProjectInbox(t, root, "projA")
makeProjectInbox(t, root, "projB")
makeProjectInbox(t, root, "projC")
putDoneTaskFailover(t, root, "projB", "DEP-X")
content := "---\nid: TASK-A3\npriority: default\ndepends_on:\n - projB:DEP-X\n - projC:DEP-Y\n---\n\nbody\n"
taskPath := filepath.Join(inboxA, "TASK-A3.md")
_ = os.WriteFile(taskPath, []byte(content), 0o644)
tc := newMockTmux()
tc.sessions["pool-0"] = true
tc.paneOutput["pool-0"] = " "
s := state.New("")
s.SetIdle("pool-0")
d := &Dispatcher{
tmux: tc,
state: s,
projectsDir: root,
config: &config.Config{
Pool: config.PoolConfig{
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1},
},
},
logger: log.Default(),
}
d.dispatchProject(inboxA)
if _, err := os.Stat(taskPath + ".blocked"); err != nil {
t.Errorf(".blocked marker expected for partial resolution: %v", err)
}
if _, err := os.Stat(taskPath + ".dispatched"); !os.IsNotExist(err) {
t.Errorf("must not dispatch with one unresolved dep")
}
}
// TestDispatcher_DependsOn_RejectPathTraversal blocks malicious depends_on
// segments containing `..` or `/`.
func TestDispatcher_DependsOn_RejectPathTraversal(t *testing.T) {
root := t.TempDir()
inboxA := makeProjectInbox(t, root, "projA")
content := "---\nid: TASK-evil\npriority: default\ndepends_on:\n - ../../tmp:foo\n---\n\nbody\n"
taskPath := filepath.Join(inboxA, "TASK-evil.md")
_ = os.WriteFile(taskPath, []byte(content), 0o644)
tc := newMockTmux()
tc.sessions["pool-0"] = true
s := state.New("")
s.SetIdle("pool-0")
d := &Dispatcher{
tmux: tc,
state: s,
projectsDir: root,
config: &config.Config{
Pool: config.PoolConfig{
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1},
},
},
logger: log.Default(),
}
d.dispatchProject(inboxA)
if _, err := os.Stat(taskPath + ".blocked"); err != nil {
t.Errorf("malicious dep must result in .blocked, not dispatch: %v", err)
}
if _, err := os.Stat(taskPath + ".dispatched"); !os.IsNotExist(err) {
t.Errorf("malicious dep must NOT be dispatched")
}
}
// TestDispatcher_BlockedMarker_Idempotent verifies running the dispatch
// twice on the same blocked task does not refresh the marker mtime (so the
// watchdog timeout counts from FIRST detection).
func TestDispatcher_BlockedMarker_Idempotent(t *testing.T) {
root := t.TempDir()
inboxA := makeProjectInbox(t, root, "projA")
makeProjectInbox(t, root, "projB")
content := "---\nid: TASK-idem\npriority: default\ndepends_on:\n - projB:NEVER\n---\n\nbody\n"
taskPath := filepath.Join(inboxA, "TASK-idem.md")
_ = os.WriteFile(taskPath, []byte(content), 0o644)
tc := newMockTmux()
s := state.New("")
d := &Dispatcher{
tmux: tc,
state: s,
projectsDir: root,
config: &config.Config{
Pool: config.PoolConfig{
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 0},
},
},
logger: log.Default(),
}
d.dispatchProject(inboxA)
marker := taskPath + ".blocked"
st1, err := os.Stat(marker)
if err != nil {
t.Fatalf("expected marker after first run: %v", err)
}
mtime1 := st1.ModTime()
// Sleep enough to detect mtime drift on most filesystems (1s
// granularity). The marker mtime must be stable so the watchdog
// timeout counts from FIRST detection.
time.Sleep(1100 * time.Millisecond)
d.dispatchProject(inboxA)
st2, err := os.Stat(marker)
if err != nil {
t.Fatalf("expected marker after second run: %v", err)
}
if !st2.ModTime().Equal(mtime1) {
t.Errorf("blocked marker mtime should be stable across dispatch passes (was %v, now %v)",
mtime1, st2.ModTime())
}
}
// TestDispatcher_NoDependsOn_DispatchesNormally is a regression guard:
// tasks without depends_on must keep working exactly as before G2.
func TestDispatcher_NoDependsOn_DispatchesNormally(t *testing.T) {
root := t.TempDir()
inboxA := makeProjectInbox(t, root, "projA")
taskContent := "---\nid: TASK-plain\npriority: default\n---\nbody\n"
taskPath := filepath.Join(inboxA, "TASK-plain.md")
_ = os.WriteFile(taskPath, []byte(taskContent), 0o644)
tc := newMockTmux()
tc.sessions["pool-0"] = true
tc.paneOutput["pool-0"] = " "
s := state.New("")
s.SetIdle("pool-0")
d := &Dispatcher{
tmux: tc,
state: s,
projectsDir: root,
config: &config.Config{
Pool: config.PoolConfig{
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1},
},
},
logger: log.Default(),
}
d.dispatchProject(inboxA)
if _, err := os.Stat(taskPath + ".dispatched"); err != nil {
t.Errorf("plain task should be dispatched: %v", err)
}
}