Before claiming a session for a task, the dispatcher now: 1. Parses the task's frontmatter 2. If `depends_on: [project:task_id]` is non-empty, checks each entry against `<projectsDir>/<project>/.agent-queue/done/<task_id>.md` 3. If any dep is unresolved -> skip the task and write `<task>.md.blocked` next to it. The watchdog (G1) will resolve this marker on its next tick. The `.blocked` marker is idempotent: re-running the dispatcher does not refresh its mtime, so the watchdog can compute the blocked-since timestamp from the FIRST detection (timeout precision). Path-traversal hardening: project / task_id segments must match `[A-Za-z0-9._-]+` and cannot be `.` or `..`. A malicious frontmatter like `depends_on: ../../tmp:foo` is rejected before any filesystem lookup. assignNextTask (the doneChan path) applies the same gate so that a session freed mid-cycle cannot bypass enforcement. Tests (-race clean): - DependsOnUnresolved -> .blocked marker, no dispatch - DependsOnResolved -> normal dispatch, no marker - PartialResolution -> stay blocked - RejectPathTraversal -> blocked, not dispatched - BlockedMarker idempotent (mtime stable across passes) - NoDependsOn regression guard
521 lines
15 KiB
Go
521 lines
15 KiB
Go
package dispatcher
|
||
|
||
import (
|
||
"log"
|
||
"os"
|
||
"path/filepath"
|
||
"testing"
|
||
"time"
|
||
|
||
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
|
||
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
||
)
|
||
|
||
// mockTmux is a minimal in-memory tmux.Client for tests.
|
||
type mockTmux struct {
|
||
sessions map[string]bool
|
||
paneOutput map[string]string
|
||
sentKeys []string
|
||
}
|
||
|
||
func newMockTmux() *mockTmux {
|
||
return &mockTmux{
|
||
sessions: make(map[string]bool),
|
||
paneOutput: make(map[string]string),
|
||
}
|
||
}
|
||
|
||
func (m *mockTmux) HasSession(name string) bool { return m.sessions[name] }
|
||
func (m *mockTmux) CreateSession(name, _ string) error { m.sessions[name] = true; return nil }
|
||
func (m *mockTmux) KillSession(_ string) error { return nil }
|
||
func (m *mockTmux) SendKeys(_, keys string) error {
|
||
m.sentKeys = append(m.sentKeys, keys)
|
||
return nil
|
||
}
|
||
func (m *mockTmux) SendEnter(_ string) error {
|
||
m.sentKeys = append(m.sentKeys, "<ENTER>")
|
||
return nil
|
||
}
|
||
func (m *mockTmux) CapturePaneTail(session string, _ int) (string, error) {
|
||
return m.paneOutput[session], nil
|
||
}
|
||
|
||
// TestParseFrontmatter verifies YAML frontmatter extraction.
|
||
func TestParseFrontmatter(t *testing.T) {
|
||
input := "---\ntitle: Fix bug\npriority: critical\n---\nDo the fix."
|
||
fm, body := parseFrontmatter([]byte(input))
|
||
if fm.Title != "Fix bug" {
|
||
t.Errorf("expected title 'Fix bug', got %q", fm.Title)
|
||
}
|
||
if fm.Priority != "critical" {
|
||
t.Errorf("expected priority critical, got %q", fm.Priority)
|
||
}
|
||
if body != "Do the fix." {
|
||
t.Errorf("expected body 'Do the fix.', got %q", body)
|
||
}
|
||
}
|
||
|
||
// TestParseFrontmatterNoHeader handles files without a YAML header.
|
||
func TestParseFrontmatterNoHeader(t *testing.T) {
|
||
input := "Just plain content."
|
||
fm, body := parseFrontmatter([]byte(input))
|
||
if fm.Title != "" {
|
||
t.Errorf("expected empty title, got %q", fm.Title)
|
||
}
|
||
if body != "Just plain content." {
|
||
t.Errorf("expected full body, got %q", body)
|
||
}
|
||
}
|
||
|
||
// TestModelForPriority maps priority strings to model names.
|
||
func TestModelForPriority(t *testing.T) {
|
||
cases := []struct{ priority, want string }{
|
||
{"critical", "opus"},
|
||
{"CRITICAL", "opus"},
|
||
{"high", "sonnet"},
|
||
{"default", "sonnet"},
|
||
{"", "sonnet"},
|
||
}
|
||
for _, c := range cases {
|
||
if got := modelForPriority(c.priority); got != c.want {
|
||
t.Errorf("modelForPriority(%q) = %q, want %q", c.priority, got, c.want)
|
||
}
|
||
}
|
||
}
|
||
|
||
// TestFindFreeSessionSkipsFailed verifies that recently-failed sessions are skipped.
|
||
func TestFindFreeSessionSkipsFailed(t *testing.T) {
|
||
tc := newMockTmux()
|
||
tc.sessions["sess-0"] = true
|
||
tc.sessions["sess-1"] = true
|
||
|
||
s := state.New("")
|
||
s.SetFailed("sess-0")
|
||
s.SetIdle("sess-1")
|
||
|
||
d := &Dispatcher{
|
||
tmux: tc,
|
||
state: s,
|
||
config: &config.Config{
|
||
Pool: config.PoolConfig{
|
||
Autonomous: config.AutonomousConfig{Prefix: "sess-", Max: 2},
|
||
},
|
||
},
|
||
logger: log.Default(),
|
||
}
|
||
|
||
got := d.findFreeSession()
|
||
if got != "sess-1" {
|
||
t.Errorf("expected sess-1, got %q", got)
|
||
}
|
||
}
|
||
|
||
// TestFindFreeSessionMissingTmux skips sessions not in tmux.
|
||
func TestFindFreeSessionMissingTmux(t *testing.T) {
|
||
tc := newMockTmux()
|
||
// sess-0 missing from tmux, sess-1 present and idle.
|
||
tc.sessions["sess-1"] = true
|
||
|
||
s := state.New("")
|
||
s.SetIdle("sess-0")
|
||
s.SetIdle("sess-1")
|
||
|
||
d := &Dispatcher{
|
||
tmux: tc,
|
||
state: s,
|
||
config: &config.Config{
|
||
Pool: config.PoolConfig{
|
||
Autonomous: config.AutonomousConfig{Prefix: "sess-", Max: 2},
|
||
},
|
||
},
|
||
logger: log.Default(),
|
||
}
|
||
|
||
got := d.findFreeSession()
|
||
if got != "sess-1" {
|
||
t.Errorf("expected sess-1, got %q", got)
|
||
}
|
||
}
|
||
|
||
// TestFindFreeSessionSkipsDedicated verifies that dedicated sessions are
|
||
// NEVER returned by the auto-dispatch path, even when idle. Those host the
|
||
// operator's manual interactive work and must stay untouched.
|
||
func TestFindFreeSessionSkipsDedicated(t *testing.T) {
|
||
tc := newMockTmux()
|
||
tc.sessions["ccl-1-conformvault"] = true
|
||
tc.sessions["sess-0"] = true
|
||
|
||
s := state.New("")
|
||
s.SetIdle("ccl-1-conformvault")
|
||
s.SetIdle("sess-0")
|
||
|
||
d := &Dispatcher{
|
||
tmux: tc,
|
||
state: s,
|
||
config: &config.Config{
|
||
Pool: config.PoolConfig{
|
||
Dedicated: []config.DedicatedSession{{Name: "ccl-1-conformvault"}},
|
||
Autonomous: config.AutonomousConfig{Prefix: "sess-", Max: 1},
|
||
},
|
||
},
|
||
logger: log.Default(),
|
||
}
|
||
|
||
got := d.findFreeSession()
|
||
if got != "sess-0" {
|
||
t.Errorf("expected pool sess-0 (dedicated must be skipped), got %q", got)
|
||
}
|
||
}
|
||
|
||
// TestDispatchProject creates a task file, dispatches it, and checks state + rename.
|
||
func TestDispatchProject(t *testing.T) {
|
||
dir := t.TempDir()
|
||
inbox := filepath.Join(dir, ".agent-queue", "inbox")
|
||
os.MkdirAll(inbox, 0755)
|
||
|
||
taskContent := "---\ntitle: My Task\npriority: high\n---\nDo the work."
|
||
taskPath := filepath.Join(inbox, "task-001.md")
|
||
os.WriteFile(taskPath, []byte(taskContent), 0644)
|
||
|
||
tc := newMockTmux()
|
||
tc.sessions["pool-0"] = true
|
||
// Return ❯ prompt on first CapturePaneTail call (Claude is ready).
|
||
tc.paneOutput["pool-0"] = "❯ "
|
||
|
||
s := state.New("")
|
||
s.SetIdle("pool-0")
|
||
|
||
d := &Dispatcher{
|
||
tmux: tc,
|
||
state: s,
|
||
config: &config.Config{
|
||
Pool: config.PoolConfig{
|
||
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1},
|
||
},
|
||
},
|
||
logger: log.Default(),
|
||
}
|
||
|
||
d.dispatchProject(inbox)
|
||
|
||
if st := s.GetSession("pool-0"); st == nil || st.State != "working" {
|
||
t.Errorf("expected session working after dispatch, got %v", st)
|
||
}
|
||
|
||
// Original file renamed to .dispatched.
|
||
if _, err := os.Stat(taskPath + ".dispatched"); os.IsNotExist(err) {
|
||
t.Error("expected .dispatched marker")
|
||
}
|
||
if _, err := os.Stat(taskPath); !os.IsNotExist(err) {
|
||
t.Error("expected original task file to be renamed")
|
||
}
|
||
}
|
||
|
||
// TestDispatchProjectNoFreeSession leaves the task untouched when no session is available.
|
||
func TestDispatchProjectNoFreeSession(t *testing.T) {
|
||
dir := t.TempDir()
|
||
inbox := filepath.Join(dir, ".agent-queue", "inbox")
|
||
os.MkdirAll(inbox, 0755)
|
||
|
||
taskPath := filepath.Join(inbox, "task-002.md")
|
||
os.WriteFile(taskPath, []byte("content"), 0644)
|
||
|
||
tc := newMockTmux() // no sessions
|
||
s := state.New("")
|
||
|
||
d := &Dispatcher{
|
||
tmux: tc,
|
||
state: s,
|
||
config: &config.Config{
|
||
Pool: config.PoolConfig{
|
||
Autonomous: config.AutonomousConfig{Max: 0},
|
||
},
|
||
},
|
||
logger: log.Default(),
|
||
}
|
||
|
||
d.dispatchProject(inbox)
|
||
|
||
// File must remain unchanged.
|
||
if _, err := os.Stat(taskPath); os.IsNotExist(err) {
|
||
t.Error("task file should remain when no session is free")
|
||
}
|
||
}
|
||
|
||
// ─── Phase 2 / G2 — depends_on enforcement tests ────────────────────────────
|
||
|
||
// makeProjectInbox sets up a `<project>/.agent-queue/inbox/` directory under
|
||
// `root` and returns the inbox path.
|
||
func makeProjectInbox(t *testing.T, root, project string) string {
|
||
t.Helper()
|
||
inbox := filepath.Join(root, project, ".agent-queue", "inbox")
|
||
if err := os.MkdirAll(inbox, 0o755); err != nil {
|
||
t.Fatalf("mkdir inbox: %v", err)
|
||
}
|
||
if err := os.MkdirAll(filepath.Join(root, project, ".agent-queue", "done"), 0o755); err != nil {
|
||
t.Fatalf("mkdir done: %v", err)
|
||
}
|
||
return inbox
|
||
}
|
||
|
||
// putDoneTaskFailover seeds <root>/<project>/.agent-queue/done/<id>.md.
|
||
func putDoneTaskFailover(t *testing.T, root, project, id string) {
|
||
t.Helper()
|
||
doneDir := filepath.Join(root, project, ".agent-queue", "done")
|
||
_ = os.MkdirAll(doneDir, 0o755)
|
||
if err := os.WriteFile(filepath.Join(doneDir, id+".md"), []byte("done\n"), 0o644); err != nil {
|
||
t.Fatalf("write done: %v", err)
|
||
}
|
||
}
|
||
|
||
// TestDispatcher_DependsOnUnresolved_DropsBlockedMarker verifies that a task
|
||
// whose `depends_on` is not satisfied is NOT dispatched and that a
|
||
// `.md.blocked` marker is created next to it.
|
||
func TestDispatcher_DependsOnUnresolved_DropsBlockedMarker(t *testing.T) {
|
||
root := t.TempDir()
|
||
inboxA := makeProjectInbox(t, root, "projA")
|
||
makeProjectInbox(t, root, "projB") // empty done/
|
||
|
||
taskContent := "---\nid: TASK-A1\npriority: default\ndepends_on:\n - projB:DEP-1\n---\n\nbody\n"
|
||
taskPath := filepath.Join(inboxA, "TASK-A1.md")
|
||
if err := os.WriteFile(taskPath, []byte(taskContent), 0o644); err != nil {
|
||
t.Fatalf("write task: %v", err)
|
||
}
|
||
|
||
tc := newMockTmux()
|
||
tc.sessions["pool-0"] = true
|
||
tc.paneOutput["pool-0"] = "❯ "
|
||
s := state.New("")
|
||
s.SetIdle("pool-0")
|
||
|
||
d := &Dispatcher{
|
||
tmux: tc,
|
||
state: s,
|
||
projectsDir: root,
|
||
config: &config.Config{
|
||
Pool: config.PoolConfig{
|
||
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1},
|
||
},
|
||
},
|
||
logger: log.Default(),
|
||
}
|
||
|
||
d.dispatchProject(inboxA)
|
||
|
||
if _, err := os.Stat(taskPath + ".blocked"); err != nil {
|
||
t.Errorf("expected .blocked marker: %v", err)
|
||
}
|
||
if _, err := os.Stat(taskPath + ".dispatched"); !os.IsNotExist(err) {
|
||
t.Errorf("task should NOT be dispatched while blocked")
|
||
}
|
||
if _, err := os.Stat(taskPath); err != nil {
|
||
t.Errorf("task .md should remain in inbox: %v", err)
|
||
}
|
||
if st := s.GetSession("pool-0"); st == nil || st.State != "idle" {
|
||
t.Errorf("session should stay idle when only task is blocked, got %v", st)
|
||
}
|
||
}
|
||
|
||
// TestDispatcher_DependsOnResolved_Dispatches verifies normal flow when the
|
||
// upstream task is in done/.
|
||
func TestDispatcher_DependsOnResolved_Dispatches(t *testing.T) {
|
||
root := t.TempDir()
|
||
inboxA := makeProjectInbox(t, root, "projA")
|
||
makeProjectInbox(t, root, "projB")
|
||
putDoneTaskFailover(t, root, "projB", "DEP-1")
|
||
|
||
taskContent := "---\nid: TASK-A2\npriority: default\ndepends_on:\n - projB:DEP-1\n---\n\nbody\n"
|
||
taskPath := filepath.Join(inboxA, "TASK-A2.md")
|
||
_ = os.WriteFile(taskPath, []byte(taskContent), 0o644)
|
||
|
||
tc := newMockTmux()
|
||
tc.sessions["pool-0"] = true
|
||
tc.paneOutput["pool-0"] = "❯ "
|
||
s := state.New("")
|
||
s.SetIdle("pool-0")
|
||
|
||
d := &Dispatcher{
|
||
tmux: tc,
|
||
state: s,
|
||
projectsDir: root,
|
||
config: &config.Config{
|
||
Pool: config.PoolConfig{
|
||
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1},
|
||
},
|
||
},
|
||
logger: log.Default(),
|
||
}
|
||
|
||
d.dispatchProject(inboxA)
|
||
|
||
if _, err := os.Stat(taskPath + ".dispatched"); err != nil {
|
||
t.Errorf("expected .dispatched: %v", err)
|
||
}
|
||
if _, err := os.Stat(taskPath + ".blocked"); !os.IsNotExist(err) {
|
||
t.Errorf(".blocked marker should NOT be present when dep is resolved")
|
||
}
|
||
if st := s.GetSession("pool-0"); st == nil || st.State != "working" {
|
||
t.Errorf("expected working session, got %v", st)
|
||
}
|
||
}
|
||
|
||
// TestDispatcher_DependsOn_PartialResolution treats partial resolution as
|
||
// blocked.
|
||
func TestDispatcher_DependsOn_PartialResolution(t *testing.T) {
|
||
root := t.TempDir()
|
||
inboxA := makeProjectInbox(t, root, "projA")
|
||
makeProjectInbox(t, root, "projB")
|
||
makeProjectInbox(t, root, "projC")
|
||
putDoneTaskFailover(t, root, "projB", "DEP-X")
|
||
|
||
content := "---\nid: TASK-A3\npriority: default\ndepends_on:\n - projB:DEP-X\n - projC:DEP-Y\n---\n\nbody\n"
|
||
taskPath := filepath.Join(inboxA, "TASK-A3.md")
|
||
_ = os.WriteFile(taskPath, []byte(content), 0o644)
|
||
|
||
tc := newMockTmux()
|
||
tc.sessions["pool-0"] = true
|
||
tc.paneOutput["pool-0"] = "❯ "
|
||
s := state.New("")
|
||
s.SetIdle("pool-0")
|
||
|
||
d := &Dispatcher{
|
||
tmux: tc,
|
||
state: s,
|
||
projectsDir: root,
|
||
config: &config.Config{
|
||
Pool: config.PoolConfig{
|
||
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1},
|
||
},
|
||
},
|
||
logger: log.Default(),
|
||
}
|
||
|
||
d.dispatchProject(inboxA)
|
||
|
||
if _, err := os.Stat(taskPath + ".blocked"); err != nil {
|
||
t.Errorf(".blocked marker expected for partial resolution: %v", err)
|
||
}
|
||
if _, err := os.Stat(taskPath + ".dispatched"); !os.IsNotExist(err) {
|
||
t.Errorf("must not dispatch with one unresolved dep")
|
||
}
|
||
}
|
||
|
||
// TestDispatcher_DependsOn_RejectPathTraversal blocks malicious depends_on
|
||
// segments containing `..` or `/`.
|
||
func TestDispatcher_DependsOn_RejectPathTraversal(t *testing.T) {
|
||
root := t.TempDir()
|
||
inboxA := makeProjectInbox(t, root, "projA")
|
||
|
||
content := "---\nid: TASK-evil\npriority: default\ndepends_on:\n - ../../tmp:foo\n---\n\nbody\n"
|
||
taskPath := filepath.Join(inboxA, "TASK-evil.md")
|
||
_ = os.WriteFile(taskPath, []byte(content), 0o644)
|
||
|
||
tc := newMockTmux()
|
||
tc.sessions["pool-0"] = true
|
||
s := state.New("")
|
||
s.SetIdle("pool-0")
|
||
|
||
d := &Dispatcher{
|
||
tmux: tc,
|
||
state: s,
|
||
projectsDir: root,
|
||
config: &config.Config{
|
||
Pool: config.PoolConfig{
|
||
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1},
|
||
},
|
||
},
|
||
logger: log.Default(),
|
||
}
|
||
|
||
d.dispatchProject(inboxA)
|
||
if _, err := os.Stat(taskPath + ".blocked"); err != nil {
|
||
t.Errorf("malicious dep must result in .blocked, not dispatch: %v", err)
|
||
}
|
||
if _, err := os.Stat(taskPath + ".dispatched"); !os.IsNotExist(err) {
|
||
t.Errorf("malicious dep must NOT be dispatched")
|
||
}
|
||
}
|
||
|
||
// TestDispatcher_BlockedMarker_Idempotent verifies running the dispatch
|
||
// twice on the same blocked task does not refresh the marker mtime (so the
|
||
// watchdog timeout counts from FIRST detection).
|
||
func TestDispatcher_BlockedMarker_Idempotent(t *testing.T) {
|
||
root := t.TempDir()
|
||
inboxA := makeProjectInbox(t, root, "projA")
|
||
makeProjectInbox(t, root, "projB")
|
||
|
||
content := "---\nid: TASK-idem\npriority: default\ndepends_on:\n - projB:NEVER\n---\n\nbody\n"
|
||
taskPath := filepath.Join(inboxA, "TASK-idem.md")
|
||
_ = os.WriteFile(taskPath, []byte(content), 0o644)
|
||
|
||
tc := newMockTmux()
|
||
s := state.New("")
|
||
d := &Dispatcher{
|
||
tmux: tc,
|
||
state: s,
|
||
projectsDir: root,
|
||
config: &config.Config{
|
||
Pool: config.PoolConfig{
|
||
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 0},
|
||
},
|
||
},
|
||
logger: log.Default(),
|
||
}
|
||
|
||
d.dispatchProject(inboxA)
|
||
marker := taskPath + ".blocked"
|
||
st1, err := os.Stat(marker)
|
||
if err != nil {
|
||
t.Fatalf("expected marker after first run: %v", err)
|
||
}
|
||
mtime1 := st1.ModTime()
|
||
|
||
// Sleep enough to detect mtime drift on most filesystems (1s
|
||
// granularity). The marker mtime must be stable so the watchdog
|
||
// timeout counts from FIRST detection.
|
||
time.Sleep(1100 * time.Millisecond)
|
||
|
||
d.dispatchProject(inboxA)
|
||
st2, err := os.Stat(marker)
|
||
if err != nil {
|
||
t.Fatalf("expected marker after second run: %v", err)
|
||
}
|
||
if !st2.ModTime().Equal(mtime1) {
|
||
t.Errorf("blocked marker mtime should be stable across dispatch passes (was %v, now %v)",
|
||
mtime1, st2.ModTime())
|
||
}
|
||
}
|
||
|
||
// TestDispatcher_NoDependsOn_DispatchesNormally is a regression guard:
|
||
// tasks without depends_on must keep working exactly as before G2.
|
||
func TestDispatcher_NoDependsOn_DispatchesNormally(t *testing.T) {
|
||
root := t.TempDir()
|
||
inboxA := makeProjectInbox(t, root, "projA")
|
||
|
||
taskContent := "---\nid: TASK-plain\npriority: default\n---\nbody\n"
|
||
taskPath := filepath.Join(inboxA, "TASK-plain.md")
|
||
_ = os.WriteFile(taskPath, []byte(taskContent), 0o644)
|
||
|
||
tc := newMockTmux()
|
||
tc.sessions["pool-0"] = true
|
||
tc.paneOutput["pool-0"] = "❯ "
|
||
s := state.New("")
|
||
s.SetIdle("pool-0")
|
||
|
||
d := &Dispatcher{
|
||
tmux: tc,
|
||
state: s,
|
||
projectsDir: root,
|
||
config: &config.Config{
|
||
Pool: config.PoolConfig{
|
||
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1},
|
||
},
|
||
},
|
||
logger: log.Default(),
|
||
}
|
||
|
||
d.dispatchProject(inboxA)
|
||
if _, err := os.Stat(taskPath + ".dispatched"); err != nil {
|
||
t.Errorf("plain task should be dispatched: %v", err)
|
||
}
|
||
}
|