Adds optional delegation of agent-queue tasks to the SecuAAS secutools AI platform (GPU / Gemini / Claude API) instead of dispatching to a local Claude Code tmux session. Per-task opt-in via YAML frontmatter fields preferred_ai, allow_delegation, complexity_hint — absence keeps the Phase 1 behaviour exactly (zero breaking change). Go side: - internal/secutools: HTTP client with exponential-backoff retries (SubmitJob/GetJob/WaitForResult), DecideProvider map adapter for CLI use, table tests. - internal/router: struct-typed Decide() with strict precedence (needs_claude_code > preferred_ai=claude-code > allow_delegation=false > preferred_ai > fail-safe local on unknown). - internal/delegation: Manager submits jobs, writes .md.delegated markers for on-restart recovery, runs a periodic reaper that moves completed jobs into done/ with provider/cost footer and failed jobs into failed/. - internal/dispatcher: WithDelegation() opt-in, routeTask hook before findFreeSession, skips .md.delegated in assignNextTask. - internal/api: /api/delegated/status (active jobs + counters), /watchdog/status extended with delegation counters. - cmd/ccl-delegate: small CLI exposing submit/get/result/decide so the bash dispatcher can call the same contract without duplicating logic. - cmd/claude-failover: delegation wired opt-in via SECUTOOLS_API_KEY. Tests: - 29+ new unit tests across router, secutools, delegation, dispatcher, api packages. go test -race -count=1 clean. - tests/phase2-E-integration.sh: bash end-to-end against a Python stdlib mock HTTP server, exercising the dev-management scripts. Forward-compat with watchdog (Phase 1 B1 already ignores state=delegated_to_secutools) so delegated tasks aren't flagged stale.
335 lines
9 KiB
Go
335 lines
9 KiB
Go
package delegation
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"forge.secuaas.ovh/olivier/claude-failover/internal/router"
|
|
"forge.secuaas.ovh/olivier/claude-failover/internal/secutools"
|
|
)
|
|
|
|
// fakeClient is an in-memory secutools.Client implementation for tests.
|
|
// It supports preloaded responses keyed by job_id and a counter of calls.
|
|
type fakeClient struct {
|
|
mu sync.Mutex
|
|
nextJobID int
|
|
statuses map[string]string // job_id → current status
|
|
results map[string]*secutools.JobResult // job_id → result
|
|
statusErr error
|
|
submitErr error
|
|
}
|
|
|
|
func newFakeClient() *fakeClient {
|
|
return &fakeClient{
|
|
statuses: make(map[string]string),
|
|
results: make(map[string]*secutools.JobResult),
|
|
}
|
|
}
|
|
|
|
func (f *fakeClient) SubmitJob(_ context.Context, req *secutools.JobRequest) (*secutools.JobResponse, error) {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
if f.submitErr != nil {
|
|
return nil, f.submitErr
|
|
}
|
|
f.nextJobID++
|
|
id := "job-" + itoa(f.nextJobID)
|
|
f.statuses[id] = "pending"
|
|
return &secutools.JobResponse{JobID: id, Status: "pending"}, nil
|
|
}
|
|
|
|
func (f *fakeClient) GetJob(_ context.Context, id string) (*secutools.JobStatus, error) {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
if f.statusErr != nil {
|
|
return nil, f.statusErr
|
|
}
|
|
st, ok := f.statuses[id]
|
|
if !ok {
|
|
return nil, errors.New("unknown job: " + id)
|
|
}
|
|
out := &secutools.JobStatus{JobID: id, Status: st}
|
|
if r, ok := f.results[id]; ok {
|
|
out.Provider = r.Provider
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (f *fakeClient) WaitForResult(_ context.Context, id string, _ time.Duration) (*secutools.JobResult, error) {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
if r, ok := f.results[id]; ok {
|
|
return r, nil
|
|
}
|
|
return nil, errors.New("no result for " + id)
|
|
}
|
|
|
|
// setStatus is a test helper to drive job lifecycles.
|
|
func (f *fakeClient) setStatus(id, status string) {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
f.statuses[id] = status
|
|
}
|
|
|
|
func (f *fakeClient) setResult(id string, r *secutools.JobResult) {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
f.results[id] = r
|
|
}
|
|
|
|
// itoa is a tiny stdlib-free int-to-string for fake job IDs.
|
|
func itoa(n int) string {
|
|
if n == 0 {
|
|
return "0"
|
|
}
|
|
var b []byte
|
|
for n > 0 {
|
|
b = append([]byte{byte('0' + n%10)}, b...)
|
|
n /= 10
|
|
}
|
|
return string(b)
|
|
}
|
|
|
|
// quietLogger discards log output.
|
|
func quietLogger() *log.Logger {
|
|
return log.New(io.Discard, "", 0)
|
|
}
|
|
|
|
func setupProject(t *testing.T) (projectDir, taskPath string) {
|
|
t.Helper()
|
|
projectDir = t.TempDir()
|
|
inbox := filepath.Join(projectDir, ".agent-queue", "inbox")
|
|
if err := os.MkdirAll(inbox, 0755); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
taskPath = filepath.Join(inbox, "task-001.md")
|
|
if err := os.WriteFile(taskPath, []byte("---\ntitle: Test\n---\nDo a thing."), 0644); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
return projectDir, taskPath
|
|
}
|
|
|
|
func TestSubmit_WritesMarkerAndIncrementsCounter(t *testing.T) {
|
|
fc := newFakeClient()
|
|
mgr := New(fc, 100*time.Millisecond)
|
|
mgr.SetLogger(quietLogger())
|
|
|
|
projectDir, taskPath := setupProject(t)
|
|
|
|
job, err := mgr.Submit(context.Background(), projectDir, taskPath, "Do a thing.",
|
|
router.ProviderGPU, secutools.PriorityHigh)
|
|
if err != nil {
|
|
t.Fatalf("Submit: %v", err)
|
|
}
|
|
if job.JobID == "" {
|
|
t.Fatal("expected non-empty job id")
|
|
}
|
|
if mgr.Counters.Active.Load() != 1 {
|
|
t.Errorf("expected Active=1, got %d", mgr.Counters.Active.Load())
|
|
}
|
|
|
|
// Marker file must exist and decode back to the same job_id.
|
|
markerPath := taskPath + ".delegated"
|
|
data, err := os.ReadFile(markerPath)
|
|
if err != nil {
|
|
t.Fatalf("missing marker: %v", err)
|
|
}
|
|
var mk Marker
|
|
if err := json.Unmarshal(data, &mk); err != nil {
|
|
t.Fatalf("decode marker: %v", err)
|
|
}
|
|
if mk.JobID != job.JobID || mk.Provider != "gpu" {
|
|
t.Errorf("marker mismatch: %+v", mk)
|
|
}
|
|
}
|
|
|
|
func TestSubmit_RejectsNonDelegatedProvider(t *testing.T) {
|
|
mgr := New(newFakeClient(), time.Millisecond)
|
|
mgr.SetLogger(quietLogger())
|
|
_, taskPath := setupProject(t)
|
|
|
|
_, err := mgr.Submit(context.Background(), filepath.Dir(filepath.Dir(filepath.Dir(taskPath))),
|
|
taskPath, "p", router.ProviderClaudeCode, secutools.PriorityDefault)
|
|
if err == nil {
|
|
t.Fatal("expected error when provider is claude-code (non-delegated)")
|
|
}
|
|
}
|
|
|
|
func TestReap_CompletesJobAndMovesToDone(t *testing.T) {
|
|
fc := newFakeClient()
|
|
mgr := New(fc, time.Millisecond)
|
|
mgr.SetLogger(quietLogger())
|
|
|
|
projectDir, taskPath := setupProject(t)
|
|
|
|
job, err := mgr.Submit(context.Background(), projectDir, taskPath, "p",
|
|
router.ProviderAuto, secutools.PriorityDefault)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Drive the fake into completed state.
|
|
fc.setStatus(job.JobID, "completed")
|
|
fc.setResult(job.JobID, &secutools.JobResult{
|
|
JobID: job.JobID, Response: "Hello world", Provider: "gpu",
|
|
Model: "qwen-coder", CostCAD: 0.012,
|
|
})
|
|
|
|
mgr.reapOnce(context.Background())
|
|
|
|
if mgr.Counters.Active.Load() != 0 {
|
|
t.Errorf("expected Active=0 after completion, got %d", mgr.Counters.Active.Load())
|
|
}
|
|
if mgr.Counters.CompletedTotal.Load() != 1 {
|
|
t.Errorf("expected CompletedTotal=1, got %d", mgr.Counters.CompletedTotal.Load())
|
|
}
|
|
|
|
// done/ contains the result.
|
|
donePath := filepath.Join(projectDir, ".agent-queue", "done", "task-001.md")
|
|
doneBody, err := os.ReadFile(donePath)
|
|
if err != nil {
|
|
t.Fatalf("missing done/ file: %v", err)
|
|
}
|
|
if !contains(string(doneBody), "Hello world") || !contains(string(doneBody), "provider: gpu") {
|
|
t.Errorf("done body missing expected fields:\n%s", doneBody)
|
|
}
|
|
|
|
// Inbox marker and original .md are gone.
|
|
if _, err := os.Stat(taskPath + ".delegated"); !os.IsNotExist(err) {
|
|
t.Error("marker should be removed after completion")
|
|
}
|
|
if _, err := os.Stat(taskPath); !os.IsNotExist(err) {
|
|
t.Error("inbox .md should be removed after completion")
|
|
}
|
|
}
|
|
|
|
func TestReap_FailedJobMovesToFailed(t *testing.T) {
|
|
fc := newFakeClient()
|
|
mgr := New(fc, time.Millisecond)
|
|
mgr.SetLogger(quietLogger())
|
|
|
|
projectDir, taskPath := setupProject(t)
|
|
|
|
job, err := mgr.Submit(context.Background(), projectDir, taskPath, "p",
|
|
router.ProviderGemini, secutools.PriorityLow)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
fc.setStatus(job.JobID, "failed")
|
|
mgr.reapOnce(context.Background())
|
|
|
|
if mgr.Counters.FailedTotal.Load() != 1 {
|
|
t.Errorf("expected FailedTotal=1, got %d", mgr.Counters.FailedTotal.Load())
|
|
}
|
|
failedPath := filepath.Join(projectDir, ".agent-queue", "failed", "task-001.md")
|
|
if _, err := os.Stat(failedPath); err != nil {
|
|
t.Errorf("failed/ file missing: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestLoadFromDisk_RestoresMarkers(t *testing.T) {
|
|
fc := newFakeClient()
|
|
mgr := New(fc, time.Millisecond)
|
|
mgr.SetLogger(quietLogger())
|
|
|
|
projectDir, taskPath := setupProject(t)
|
|
|
|
// Pre-write a marker on disk simulating a daemon restart.
|
|
mk := Marker{
|
|
Project: projectDir, TaskFile: taskPath, JobID: "preexisting-job",
|
|
Provider: "gpu", StartedAt: time.Now().UTC().Add(-1 * time.Minute),
|
|
}
|
|
if err := writeMarker(taskPath+".delegated", mk); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if err := mgr.LoadFromDisk([]string{projectDir}); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if mgr.Counters.Active.Load() != 1 {
|
|
t.Errorf("expected Active=1 after LoadFromDisk, got %d", mgr.Counters.Active.Load())
|
|
}
|
|
active := mgr.Active()
|
|
if len(active) != 1 || active[0].JobID != "preexisting-job" {
|
|
t.Errorf("Active() mismatch: %+v", active)
|
|
}
|
|
}
|
|
|
|
func TestActive_ReturnsCurrentJobs(t *testing.T) {
|
|
fc := newFakeClient()
|
|
mgr := New(fc, time.Millisecond)
|
|
mgr.SetLogger(quietLogger())
|
|
|
|
projectDir, taskPath := setupProject(t)
|
|
if _, err := mgr.Submit(context.Background(), projectDir, taskPath, "p",
|
|
router.ProviderGPU, secutools.PriorityDefault); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
got := mgr.Active()
|
|
if len(got) != 1 {
|
|
t.Fatalf("expected 1 active job, got %d", len(got))
|
|
}
|
|
if got[0].Provider != "gpu" || got[0].Project != projectDir {
|
|
t.Errorf("active[0] mismatch: %+v", got[0])
|
|
}
|
|
}
|
|
|
|
// TestEndToEnd_InboxToDone exercises the full happy path:
|
|
// inbox → Submit → reapOnce (status=completed) → done/.
|
|
func TestEndToEnd_InboxToDone(t *testing.T) {
|
|
fc := newFakeClient()
|
|
mgr := New(fc, time.Millisecond)
|
|
mgr.SetLogger(quietLogger())
|
|
|
|
projectDir, taskPath := setupProject(t)
|
|
|
|
job, err := mgr.Submit(context.Background(), projectDir, taskPath,
|
|
"Summarize this", router.ProviderAuto, secutools.PriorityDefault)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Simulate the secutools backend completing the job.
|
|
fc.setStatus(job.JobID, "completed")
|
|
fc.setResult(job.JobID, &secutools.JobResult{
|
|
JobID: job.JobID, Response: "Summary text",
|
|
Provider: "claude-haiku", Model: "haiku-3-5", CostCAD: 0.0005,
|
|
})
|
|
|
|
mgr.reapOnce(context.Background())
|
|
|
|
donePath := filepath.Join(projectDir, ".agent-queue", "done", "task-001.md")
|
|
body, err := os.ReadFile(donePath)
|
|
if err != nil {
|
|
t.Fatalf("missing done/ file: %v", err)
|
|
}
|
|
want := []string{"Summary text", "provider: claude-haiku", "cost_cad: 0.0005"}
|
|
for _, w := range want {
|
|
if !contains(string(body), w) {
|
|
t.Errorf("done body missing %q:\n%s", w, body)
|
|
}
|
|
}
|
|
}
|
|
|
|
func contains(s, sub string) bool {
|
|
return len(s) >= len(sub) && (sub == "" || indexOf(s, sub) >= 0)
|
|
}
|
|
|
|
func indexOf(s, sub string) int {
|
|
for i := 0; i+len(sub) <= len(s); i++ {
|
|
if s[i:i+len(sub)] == sub {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|