claude-failover/internal/dispatcher/routing_test.go
Ubuntu 3e20085204 feat(phase2-E): multi-provider routing via secutools delegation
Adds optional delegation of agent-queue tasks to the SecuAAS secutools
AI platform (GPU / Gemini / Claude API) instead of dispatching to a
local Claude Code tmux session. Per-task opt-in via YAML frontmatter
fields preferred_ai, allow_delegation, complexity_hint — absence keeps
the Phase 1 behaviour exactly (zero breaking change).

Go side:
- internal/secutools: HTTP client with exponential-backoff retries
  (SubmitJob/GetJob/WaitForResult), DecideProvider map adapter for CLI
  use, table tests.
- internal/router: struct-typed Decide() with strict precedence
  (needs_claude_code > preferred_ai=claude-code > allow_delegation=false
  > preferred_ai > fail-safe local on unknown).
- internal/delegation: Manager submits jobs, writes .md.delegated
  markers for on-restart recovery, runs a periodic reaper that moves
  completed jobs into done/ with provider/cost footer and failed jobs
  into failed/.
- internal/dispatcher: WithDelegation() opt-in, routeTask hook before
  findFreeSession, skips .md.delegated in assignNextTask.
- internal/api: /api/delegated/status (active jobs + counters),
  /watchdog/status extended with delegation counters.
- cmd/ccl-delegate: small CLI exposing submit/get/result/decide so the
  bash dispatcher can call the same contract without duplicating logic.
- cmd/claude-failover: delegation wired opt-in via SECUTOOLS_API_KEY.

Tests:
- 29+ new unit tests across router, secutools, delegation, dispatcher,
  api packages. go test -race -count=1 clean.
- tests/phase2-E-integration.sh: bash end-to-end against a Python
  stdlib mock HTTP server, exercising the dev-management scripts.

Forward-compat with watchdog (Phase 1 B1 already ignores
state=delegated_to_secutools) so delegated tasks aren't flagged stale.
2026-04-17 02:17:19 +00:00

318 lines
9.1 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package dispatcher
import (
"context"
"errors"
"log"
"os"
"path/filepath"
"sync"
"testing"
"time"
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
"forge.secuaas.ovh/olivier/claude-failover/internal/delegation"
"forge.secuaas.ovh/olivier/claude-failover/internal/router"
"forge.secuaas.ovh/olivier/claude-failover/internal/secutools"
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
)
// fakeSecutools is a minimal stub used for routing tests. It records every
// SubmitJob call so tests can assert on the number/contents of submissions.
type fakeSecutools struct {
mu sync.Mutex
submits []*secutools.JobRequest
statuses map[string]string
results map[string]*secutools.JobResult
nextID int
}
func newFakeSecutools() *fakeSecutools {
return &fakeSecutools{
statuses: make(map[string]string),
results: make(map[string]*secutools.JobResult),
}
}
func (f *fakeSecutools) SubmitJob(_ context.Context, req *secutools.JobRequest) (*secutools.JobResponse, error) {
f.mu.Lock()
defer f.mu.Unlock()
f.submits = append(f.submits, req)
f.nextID++
id := "job-" + itoa(f.nextID)
f.statuses[id] = "pending"
return &secutools.JobResponse{JobID: id, Status: "pending"}, nil
}
func (f *fakeSecutools) GetJob(_ context.Context, id string) (*secutools.JobStatus, error) {
f.mu.Lock()
defer f.mu.Unlock()
st, ok := f.statuses[id]
if !ok {
return nil, errors.New("unknown")
}
return &secutools.JobStatus{JobID: id, Status: st}, nil
}
func (f *fakeSecutools) WaitForResult(_ context.Context, id string, _ time.Duration) (*secutools.JobResult, error) {
f.mu.Lock()
defer f.mu.Unlock()
if r, ok := f.results[id]; ok {
return r, nil
}
return nil, errors.New("no result")
}
// (uses itoa from dispatcher.go)
func setupTaskFile(t *testing.T, frontmatter, body string) (projectDir, inbox, taskPath string) {
t.Helper()
projectDir = t.TempDir()
inbox = filepath.Join(projectDir, ".agent-queue", "inbox")
if err := os.MkdirAll(inbox, 0755); err != nil {
t.Fatal(err)
}
content := "---\n" + frontmatter + "\n---\n" + body
taskPath = filepath.Join(inbox, "task-routing.md")
if err := os.WriteFile(taskPath, []byte(content), 0644); err != nil {
t.Fatal(err)
}
return projectDir, inbox, taskPath
}
// TestRouteTask_ParsesNewFrontmatterFields ensures the dispatcher actually
// reads the new preferred_ai/allow_delegation/needs_claude_code fields.
func TestRouteTask_ParsesNewFrontmatterFields(t *testing.T) {
_, _, taskPath := setupTaskFile(t,
"title: Demo\npreferred_ai: gpu\nallow_delegation: true\ncomplexity_hint: low",
"Body of task.")
d := &Dispatcher{logger: log.Default()}
dec, body, err := d.routeTask(taskPath)
if err != nil {
t.Fatal(err)
}
if dec.Provider != router.ProviderGPU {
t.Errorf("expected ProviderGPU, got %v (%s)", dec.Provider, dec.Reason)
}
if body != "Body of task." {
t.Errorf("body parsed wrong: %q", body)
}
}
// TestDispatchProject_DelegatesGPUTask: a task with allow_delegation=true
// and preferred_ai=gpu must be sent to secutools and never reach a tmux
// session, even when one is available.
func TestDispatchProject_DelegatesGPUTask(t *testing.T) {
projectDir, inbox, taskPath := setupTaskFile(t,
"title: Delegated\npreferred_ai: gpu\nallow_delegation: true",
"Analyze something.")
tc := newMockTmux()
tc.sessions["pool-0"] = true
tc.paneOutput["pool-0"] = " "
s := state.New("")
s.SetIdle("pool-0")
fc := newFakeSecutools()
mgr := delegation.New(fc, time.Millisecond)
d := &Dispatcher{
tmux: tc,
state: s,
config: &config.Config{
Pool: config.PoolConfig{
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1},
},
},
logger: log.Default(),
delegation: mgr,
}
d.dispatchProject(inbox)
if len(fc.submits) != 1 {
t.Fatalf("expected 1 SubmitJob call, got %d", len(fc.submits))
}
if fc.submits[0].PreferredAI != "gpu" {
t.Errorf("expected preferred_ai=gpu, got %q", fc.submits[0].PreferredAI)
}
// .delegated marker is present, original .md kept (the reaper finalises later).
if _, err := os.Stat(taskPath + ".delegated"); err != nil {
t.Errorf("expected .delegated marker, got %v", err)
}
if _, err := os.Stat(taskPath); err != nil {
t.Errorf("expected original .md to remain until reaped, got %v", err)
}
// pool-0 must remain idle (we did NOT launch a Claude Code agent).
if st := s.GetSession("pool-0"); st == nil || st.State != "idle" {
t.Errorf("expected pool-0 to stay idle, got %v", st)
}
_ = projectDir
}
// TestDispatchProject_LegacyTaskKeepsClaudeCode: backward-compat. A task
// with no new fields (or allow_delegation=false implicit) MUST go to a
// local Claude Code session.
func TestDispatchProject_LegacyTaskKeepsClaudeCode(t *testing.T) {
_, inbox, taskPath := setupTaskFile(t,
"title: Legacy\npriority: default",
"Do classic work.")
tc := newMockTmux()
tc.sessions["pool-0"] = true
tc.paneOutput["pool-0"] = " "
s := state.New("")
s.SetIdle("pool-0")
fc := newFakeSecutools()
mgr := delegation.New(fc, time.Millisecond)
d := &Dispatcher{
tmux: tc,
state: s,
config: &config.Config{
Pool: config.PoolConfig{
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1},
},
},
logger: log.Default(),
delegation: mgr,
}
d.dispatchProject(inbox)
if len(fc.submits) != 0 {
t.Errorf("legacy task must NOT delegate, got %d submits", len(fc.submits))
}
// .dispatched marker present, session is working.
if _, err := os.Stat(taskPath + ".dispatched"); err != nil {
t.Errorf("expected .dispatched marker for Claude Code path, got %v", err)
}
if st := s.GetSession("pool-0"); st == nil || st.State != "working" {
t.Errorf("expected pool-0 working, got %v", st)
}
}
// TestDispatchProject_NeedsClaudeCodeBypassesDelegation: even with
// allow_delegation=true, needs_claude_code: true forces local execution.
func TestDispatchProject_NeedsClaudeCodeBypassesDelegation(t *testing.T) {
_, inbox, _ := setupTaskFile(t,
"title: Bypass\npreferred_ai: gpu\nallow_delegation: true\nneeds_claude_code: true",
"This needs a real Claude session.")
tc := newMockTmux()
tc.sessions["pool-0"] = true
tc.paneOutput["pool-0"] = " "
s := state.New("")
s.SetIdle("pool-0")
fc := newFakeSecutools()
mgr := delegation.New(fc, time.Millisecond)
d := &Dispatcher{
tmux: tc,
state: s,
config: &config.Config{
Pool: config.PoolConfig{
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1},
},
},
logger: log.Default(),
delegation: mgr,
}
d.dispatchProject(inbox)
if len(fc.submits) != 0 {
t.Errorf("needs_claude_code must skip delegation, got %d submits", len(fc.submits))
}
}
// TestEndToEnd_DelegationFlow: full happy path from inbox → submit →
// reaper → done/, with the dispatcher and delegation manager wired up.
func TestEndToEnd_DelegationFlow(t *testing.T) {
projectDir, inbox, taskPath := setupTaskFile(t,
"title: E2E\npreferred_ai: auto\nallow_delegation: true",
"Summarize this.")
tc := newMockTmux()
s := state.New("")
fc := newFakeSecutools()
mgr := delegation.New(fc, time.Millisecond)
d := &Dispatcher{
tmux: tc,
state: s,
config: &config.Config{Pool: config.PoolConfig{Autonomous: config.AutonomousConfig{Max: 0}}},
logger: log.Default(),
delegation: mgr,
}
// 1. Dispatch — submits to fakeSecutools.
d.dispatchProject(inbox)
if len(fc.submits) != 1 {
t.Fatalf("expected 1 submit, got %d", len(fc.submits))
}
jobID := "job-1"
// 2. Backend completes the job.
fc.mu.Lock()
fc.statuses[jobID] = "completed"
fc.results[jobID] = &secutools.JobResult{
JobID: jobID, Response: "Summary", Provider: "gpu", Model: "qwen", CostCAD: 0.001,
}
fc.mu.Unlock()
// 3. Reaper picks it up. mgr.Run drives reapOnce on each ticker tick;
// interval was set to 1ms in newDelegationManager so a 200ms
// context yields many cycles.
mgr.SetLogger(log.Default())
delegationReapNow(t, mgr, jobID)
donePath := filepath.Join(projectDir, ".agent-queue", "done", "task-routing.md")
body, err := os.ReadFile(donePath)
if err != nil {
t.Fatalf("missing done/ file: %v", err)
}
if !contains(string(body), "Summary") || !contains(string(body), "provider: gpu") {
t.Errorf("done body missing expected fields:\n%s", body)
}
// Marker and original .md gone.
if _, err := os.Stat(taskPath + ".delegated"); !os.IsNotExist(err) {
t.Error("delegated marker should be removed")
}
if _, err := os.Stat(taskPath); !os.IsNotExist(err) {
t.Error("original .md should be removed")
}
}
// delegationReapNow drives one reap cycle of mgr by calling its public
// API. We need this because reapOnce is unexported in the delegation
// package. The Run loop ticker is too slow for tests, so we expose a
// trivial ticker-equivalent: temporarily run with an immediate context.
func delegationReapNow(t *testing.T, mgr *delegation.Manager, _ string) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
// Run the manager briefly; the ticker fires after `interval` (1ms in
// these tests) so within 200ms we get many reap cycles.
mgr.Run(ctx)
}
func contains(s, sub string) bool {
if sub == "" {
return true
}
for i := 0; i+len(sub) <= len(s); i++ {
if s[i:i+len(sub)] == sub {
return true
}
}
return false
}