diff --git a/VERSION.md b/VERSION.md index 09e8e6a..a02d252 100644 --- a/VERSION.md +++ b/VERSION.md @@ -1,15 +1,92 @@ -# Version actuelle : 0.3.9 +# Version actuelle : 0.4.0 -## [0.3.9] - 2026-04-16 -**Type:** Patch — `go mod tidy` (fsnotify direct dep cleanup) +## [0.4.0] - 2026-04-17 +**Type:** Minor — Phase 2 chantier E : multi-provider routing (delegation to secutools) -### Modifié -- `go.mod` : `fsnotify` promu en dépendance directe, `yaml.v3` regroupé, `golang.org/x/sys` isolé en indirect. Aucun changement fonctionnel. +### Ajouté +- `internal/secutools` — HTTP client (SubmitJob, GetJob, WaitForResult) + pour la plateforme centralisée IA SecuAAS. Interface `Client` mockable + pour les tests, implémentation `HTTPClient` avec polling 2s + timeout + + propagation `context.Context`. +- `internal/router` — Décide ProviderClaudeCode (Phase 1) vs ProviderGPU / + Gemini / ClaudeAPI / Auto (delegation secutools) à partir des nouveaux + champs frontmatter `preferred_ai`, `allow_delegation`, `complexity_hint`. + Précédence stricte : `needs_claude_code` > `preferred_ai=claude-code` > + `allow_delegation=false` (default) > `preferred_ai=...` > fail-safe Claude + Code sur valeur inconnue. +- `internal/delegation` — Manager qui submit les tâches non-Claude vers + secutools, écrit un marker `inbox/.md.delegated` (rebuild-on-restart), + et fait tourner un reaper périodique qui finalise les jobs vers `done/` + (succès) ou `failed/` (échec). Compteurs atomiques (Active / + CompletedTotal / FailedTotal) exposés via Snapshot(). +- Dispatcher: méthode `WithDelegation()` opt-in, branchement router avant + `findFreeSession()`, fallback automatique vers Claude Code si le submit + secutools échoue. Skip des `*.md.delegated` dans `assignNextTask`. +- Frontmatter: `TaskFrontmatter` étendu avec `PreferredAI`, `AllowDelegation`, + `ComplexityHint`. Tous optionnels — un .md sans ces champs garde + exactement le comportement Phase 1. +- API HTTP : nouveau `GET /api/delegated/status` (jobs en cours + + compteurs), `GET /watchdog/status` qui inclut les compteurs delegation. + `WithDelegation()` opt-in — endpoints renvoient 404 si désactivés. +- main.go : Manager initialisé seulement si `SECUTOOLS_API_KEY` est set + (zéro changement pour les déploiements existants). `LoadFromDisk()` + réhydrate les markers en attente après un restart. + +### Tests ajoutés (29 nouveaux cas) +- `internal/router` (8 tests) : matrice de décision complète, contrats + IsDelegated, fail-safe sur provider inconnu. +- `internal/secutools` (5 tests) : httptest.Server qui mock le contrat, + HappyPath, HTTPError, polling jusqu'à completed, ErrJobFailed, cancel + via context. +- `internal/delegation` (7 tests) : Submit + marker, rejet provider non + délégué, reap success/fail, LoadFromDisk, Active(), end-to-end. +- `internal/api` (4 tests) : /health, /api/delegated/status désactivé/ + activé, /watchdog/status inclut bien les counters. +- `internal/dispatcher` (5 nouveaux tests + 8 existants intacts) : + routeTask parse les nouveaux champs, dispatchProject délègue le GPU, + dispatchProject garde Claude Code en backward-compat, needs_claude_code + bypass, end-to-end inbox→submit→reap→done/. ### Tests effectués -- ✅ Build OK (aucune modification de code) +- `go build ./...` : ok +- `go vet ./...` : clean +- `go test -race ./...` : tous les packages passent (~10s) +- `go mod tidy` : aucun changement nécessaire ---- +### Ajouté — complément 2026-04-17 (bash wiring) +- `cmd/ccl-delegate` — CLI Go tiny wrapper autour d'`internal/secutools` + utilisable depuis bash. Sous-commandes `submit`, `get`, `result`, + `decide`. Env `CCL_SECUTOOLS_API_KEY` (preferred) ou `SECUTOOLS_API_KEY`. + `CCL_SECUTOOLS_URL` ou `CCL_SECUTOOLS_MOCK_URL` pour tests. +- `internal/secutools/routing.go` — `DecideProvider(map[string]any) string` + (adapter signature requis par le spec pour usage depuis CLI). Map-input + permissif (bool|string|int coerced). Fail-safe retourne `"local"`. + Table-driven tests couvrent 14 cas incluant coercions. +- `internal/secutools/client.go` — retries exponential backoff (max 3 + retries, 500ms base, x2 chaque attempt). Retry sur 5xx + transport + errors, PAS sur 4xx. `SetRetryPolicy(maxRetries, baseDelay)` exposé + pour tests. Tests : `TestSubmitJob_RetriesOn5xx`, + `TestSubmitJob_DoesNotRetry4xx`. +- `tests/phase2-E-integration.sh` — test bout-en-bout bash côté + dev-management avec mock HTTP secutools (Python stdlib). + Scénarios : decide, delegate (marker + status.json + + state=delegated_to_secutools), drive mock → completed, poll-reaper, + assertions done/ body + footer, cleanup inbox, budget tracker, + rejet tâche legacy (rc=2). + +### Notes / décisions +- Pas d'appel réseau réel à secutools dans les tests — fakeClient + + httptest côté Go, serveur mock Python stdlib côté integration. +- Le client secutools est branché sur `SECUTOOLS_API_KEY` env var pour + garder un path par défaut "Phase 1 only". +- Smoke test prod restera nécessaire (test E2E avec `preferred_ai: gpu` + réel) — pas effectué dans ce chantier comme demandé (pas de push). +- Décision : `DecideProvider(map[string]any)` vit dans `secutools` + (adapter CLI, signature du spec) ET `router.Decide(Task)` vit dans + `router` (struct-typed, utilisé par le dispatcher Go). Les deux partagent + la même matrice de précédence — pas de duplication de logique car le + router consomme `TaskFrontmatter` typé via yaml.Unmarshal, alors que + la CLI parse manuellement pour rester zero-dep. ## [0.3.8] - 2026-04-16 **Type:** Patch — Bug #1 (A3 flip+ensure inconsistency) + Bug #10 (requiredShared contract test) diff --git a/WORK_IN_PROGRESS.md b/WORK_IN_PROGRESS.md index 7538918..8f9f00c 100644 --- a/WORK_IN_PROGRESS.md +++ b/WORK_IN_PROGRESS.md @@ -1,12 +1,30 @@ # Travaux en Cours - claude-failover ## Dernière mise à jour -2026-04-16 19:00:00 +2026-04-17 02:20:00 ## Version Actuelle -0.3.5 (en cours de progression vers 0.4.0) +0.4.0 — Phase 2 chantier E livré (multi-provider routing + bash wiring). -## Demande Actuelle +## Demande en cours +Aucune demande active. Branche `feat/phase2-E-multi-provider-routing` +prête à push (en attente de validation utilisateur). + +## Phase 2 / Chantier E — Statut +- [x] router (decide tree + 8 tests) +- [x] secutools client (HTTP + 5 tests + 2 retry tests) +- [x] secutools/routing.go DecideProvider(map) + 14 table tests +- [x] delegation manager (submit + reaper + LoadFromDisk + 7 tests) +- [x] dispatcher integration (routeTask + dispatchProject branch + 5 tests) +- [x] API endpoints `/api/delegated/status` + `/watchdog/status` enrichi +- [x] main.go opt-in via `SECUTOOLS_API_KEY` +- [x] `cmd/ccl-delegate` CLI Go pour bash wrapper +- [x] tests/phase2-E-integration.sh bout-en-bout avec mock Python +- [x] tests `-race` clean +- [ ] smoke test E2E avec vraie API secutools (à faire après push) +- [ ] dashboard MCP `Secuaas:orchestrator` consommer `/api/delegated/status` + +## Demande Précédente **Phase 1 / Chantier A — Failover robuste** (spec dans `ccl-platform/phases/phase1/A-failover.md`). Rendre le failover compte1 ↔ compte2 déterministe en intégrant dans le code les fixes manuels (symlinks partagés), en ajoutant un registre UUID fiable, et en durcissant tmux send-keys. diff --git a/cmd/ccl-delegate/main.go b/cmd/ccl-delegate/main.go new file mode 100644 index 0000000..9ce575f --- /dev/null +++ b/cmd/ccl-delegate/main.go @@ -0,0 +1,267 @@ +// ccl-delegate is a thin CLI wrapper around internal/secutools. It exists +// because the dispatcher that scans agent-queue inboxes is written in +// bash (dev-management/agent-orchestrator/dispatcher.sh). Rather than +// duplicate the HTTP contract in bash + curl + jq, we shell out to this +// binary for every delegation action. +// +// Subcommands: +// +// ccl-delegate submit --prompt=... [--preferred-ai=gpu] [--priority=default] +// Prints JSON to stdout: {"job_id": "...", "status": "pending"}. +// +// ccl-delegate get --job-id=xyz +// Prints JSON to stdout: {"job_id":"...","status":"...","provider":"..."} +// +// ccl-delegate result --job-id=xyz [--timeout=5m] +// Waits for completion (timeout-bounded) and prints result JSON. +// +// Auth: reads CCL_SECUTOOLS_API_KEY first, then SECUTOOLS_API_KEY. URL +// override via CCL_SECUTOOLS_URL (default https://api.secutools.secuaas.ovh), +// or CCL_SECUTOOLS_MOCK_URL for tests. +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "os" + "time" + + "forge.secuaas.ovh/olivier/claude-failover/internal/secutools" +) + +const defaultURL = "https://api.secutools.secuaas.ovh" + +func main() { + if len(os.Args) < 2 { + usage() + os.Exit(2) + } + sub := os.Args[1] + args := os.Args[2:] + + switch sub { + case "submit": + runSubmit(args) + case "get": + runGet(args) + case "result": + runResult(args) + case "decide": + runDecide(args) + case "-h", "--help", "help": + usage() + os.Exit(0) + default: + fmt.Fprintf(os.Stderr, "ccl-delegate: unknown subcommand %q\n", sub) + usage() + os.Exit(2) + } +} + +func usage() { + fmt.Fprint(os.Stderr, `ccl-delegate — submit/poll secutools jobs from bash + +usage: + ccl-delegate submit --prompt=... [--preferred-ai=gpu] [--priority=default] [--source=...] + ccl-delegate get --job-id=xyz + ccl-delegate result --job-id=xyz [--timeout=5m] + ccl-delegate decide --frontmatter=path/to/task.md + +env: + CCL_SECUTOOLS_API_KEY (preferred) or SECUTOOLS_API_KEY + CCL_SECUTOOLS_URL (default https://api.secutools.secuaas.ovh) + CCL_SECUTOOLS_MOCK_URL (overrides CCL_SECUTOOLS_URL, for tests) +`) +} + +func newClient() *secutools.HTTPClient { + url := os.Getenv("CCL_SECUTOOLS_MOCK_URL") + if url == "" { + url = os.Getenv("CCL_SECUTOOLS_URL") + } + if url == "" { + url = defaultURL + } + key := os.Getenv("CCL_SECUTOOLS_API_KEY") + if key == "" { + key = os.Getenv("SECUTOOLS_API_KEY") + } + if key == "" { + fmt.Fprintln(os.Stderr, "ccl-delegate: missing CCL_SECUTOOLS_API_KEY / SECUTOOLS_API_KEY") + os.Exit(4) + } + return secutools.NewHTTPClient(url, key, nil) +} + +func runSubmit(args []string) { + fs := flag.NewFlagSet("submit", flag.ExitOnError) + prompt := fs.String("prompt", "", "prompt text (required)") + preferred := fs.String("preferred-ai", "", "preferred provider (gpu|gemini|claude-api|claude-opus|claude-sonnet|claude-haiku|auto)") + priority := fs.String("priority", "default", "priority (critical|high|default|low)") + jobType := fs.String("type", string(secutools.TypeAnalyze), "job type (ai:analyze|ai:batch|ai:report|ai:correlate)") + source := fs.String("source", "ccl-delegate", "source identifier") + maxTokens := fs.Int("max-tokens", 0, "max tokens (0 = server default)") + _ = fs.Parse(args) + + if *prompt == "" { + fmt.Fprintln(os.Stderr, "ccl-delegate submit: --prompt is required") + os.Exit(2) + } + c := newClient() + ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second) + defer cancel() + + resp, err := c.SubmitJob(ctx, &secutools.JobRequest{ + Type: secutools.JobType(*jobType), + Priority: secutools.Priority(*priority), + Prompt: *prompt, + PreferredAI: *preferred, + Source: *source, + MaxTokens: *maxTokens, + }) + if err != nil { + fmt.Fprintf(os.Stderr, "submit: %v\n", err) + os.Exit(1) + } + writeJSON(resp) +} + +func runGet(args []string) { + fs := flag.NewFlagSet("get", flag.ExitOnError) + id := fs.String("job-id", "", "job id (required)") + _ = fs.Parse(args) + if *id == "" { + fmt.Fprintln(os.Stderr, "ccl-delegate get: --job-id required") + os.Exit(2) + } + c := newClient() + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + st, err := c.GetJob(ctx, *id) + if err != nil { + fmt.Fprintf(os.Stderr, "get: %v\n", err) + os.Exit(1) + } + writeJSON(st) +} + +func runResult(args []string) { + fs := flag.NewFlagSet("result", flag.ExitOnError) + id := fs.String("job-id", "", "job id (required)") + timeout := fs.Duration("timeout", 5*time.Minute, "wait timeout") + _ = fs.Parse(args) + if *id == "" { + fmt.Fprintln(os.Stderr, "ccl-delegate result: --job-id required") + os.Exit(2) + } + c := newClient() + ctx := context.Background() + res, err := c.WaitForResult(ctx, *id, *timeout) + if err != nil { + fmt.Fprintf(os.Stderr, "result: %v\n", err) + os.Exit(1) + } + writeJSON(res) +} + +func runDecide(args []string) { + fs := flag.NewFlagSet("decide", flag.ExitOnError) + frontmatterPath := fs.String("frontmatter", "", "path to task .md with YAML frontmatter") + _ = fs.Parse(args) + if *frontmatterPath == "" { + fmt.Fprintln(os.Stderr, "ccl-delegate decide: --frontmatter required") + os.Exit(2) + } + data, err := os.ReadFile(*frontmatterPath) + if err != nil { + fmt.Fprintf(os.Stderr, "decide: read %s: %v\n", *frontmatterPath, err) + os.Exit(1) + } + fm := parseFrontmatterMap(data) + fmt.Println(secutools.DecideProvider(fm)) +} + +// parseFrontmatterMap extracts a flat key/value map from a YAML frontmatter +// block delimited by "---". Only top-level scalars; nested structures are +// ignored. Intentionally dependency-free (no yaml unmarshal) so this CLI +// stays tiny and predictable. +func parseFrontmatterMap(content []byte) map[string]any { + out := map[string]any{} + s := string(content) + if len(s) < 4 || s[:4] != "---\n" { + return out + } + end := -1 + for i := 4; i < len(s)-3; i++ { + if s[i] == '\n' && s[i+1:i+4] == "---" && (i+4 == len(s) || s[i+4] == '\n') { + end = i + break + } + } + if end < 0 { + return out + } + body := s[4:end] + start := 0 + for i := 0; i <= len(body); i++ { + if i == len(body) || body[i] == '\n' { + line := body[start:i] + start = i + 1 + // find "key: value" + colon := -1 + for j := 0; j < len(line); j++ { + if line[j] == ':' { + colon = j + break + } + } + if colon <= 0 { + continue + } + key := trim(line[:colon]) + val := trim(line[colon+1:]) + if key == "" { + continue + } + // strip surrounding quotes + if len(val) >= 2 && + ((val[0] == '"' && val[len(val)-1] == '"') || + (val[0] == '\'' && val[len(val)-1] == '\'')) { + val = val[1 : len(val)-1] + } + // bool coercion + switch val { + case "true": + out[key] = true + case "false": + out[key] = false + default: + out[key] = val + } + } + } + return out +} + +func trim(s string) string { + start := 0 + for start < len(s) && (s[start] == ' ' || s[start] == '\t') { + start++ + } + end := len(s) + for end > start && (s[end-1] == ' ' || s[end-1] == '\t' || s[end-1] == '\r') { + end-- + } + return s[start:end] +} + +func writeJSON(v any) { + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + if err := enc.Encode(v); err != nil { + fmt.Fprintf(os.Stderr, "encode: %v\n", err) + os.Exit(1) + } +} diff --git a/cmd/claude-failover/main.go b/cmd/claude-failover/main.go index 2c29f89..3f6ed80 100644 --- a/cmd/claude-failover/main.go +++ b/cmd/claude-failover/main.go @@ -12,18 +12,20 @@ import ( "forge.secuaas.ovh/olivier/claude-failover/internal/api" "forge.secuaas.ovh/olivier/claude-failover/internal/config" + "forge.secuaas.ovh/olivier/claude-failover/internal/delegation" "forge.secuaas.ovh/olivier/claude-failover/internal/dispatcher" "forge.secuaas.ovh/olivier/claude-failover/internal/janitor" "forge.secuaas.ovh/olivier/claude-failover/internal/lifecycle" "forge.secuaas.ovh/olivier/claude-failover/internal/notify" "forge.secuaas.ovh/olivier/claude-failover/internal/quota" + "forge.secuaas.ovh/olivier/claude-failover/internal/secutools" "forge.secuaas.ovh/olivier/claude-failover/internal/state" "forge.secuaas.ovh/olivier/claude-failover/internal/switcher" "forge.secuaas.ovh/olivier/claude-failover/internal/tmux" "forge.secuaas.ovh/olivier/claude-failover/internal/watcher" ) -const version = "0.1.0" +const version = "0.4.0" func main() { var cfgPath string @@ -84,8 +86,37 @@ func main() { as := switcher.New(tmuxClient, s, cfg, qm.SwitchChan(), notifier) go as.Run(ctx) + // Phase 2 chantier E — delegation to secutools (GPU/Gemini/Claude API). + // Disabled when the SECUTOOLS_API_KEY env var is empty so existing + // deployments keep their Phase 1 behaviour with zero config change. + var delegMgr *delegation.Manager + if key := os.Getenv("SECUTOOLS_API_KEY"); key != "" { + url := os.Getenv("SECUTOOLS_URL") + if url == "" { + url = "https://api.secutools.secuaas.ovh" + } + client := secutools.NewHTTPClient(url, key, nil) + delegMgr = delegation.New(client, 30*time.Second) + + // Restore in-flight markers from disk after a restart. + var projectDirs []string + for _, ds := range cfg.Pool.Dedicated { + projectDirs = append(projectDirs, ds.Project) + } + if err := delegMgr.LoadFromDisk(projectDirs); err != nil { + log.Printf("delegation LoadFromDisk warning: %v", err) + } + go delegMgr.Run(ctx) + log.Printf("delegation enabled: secutools=%s", url) + } else { + log.Printf("delegation disabled: SECUTOOLS_API_KEY unset (Phase 1 behaviour)") + } + // Dispatcher — assigns inbox tasks to idle sessions. disp := dispatcher.New(tmuxClient, s, cfg, sw.DoneChan()) + if delegMgr != nil { + disp.WithDelegation(delegMgr) + } go disp.Run(ctx) // Janitor — periodic cleanup of orphaned files and stale status.json. @@ -112,6 +143,9 @@ func main() { listenAddr = "127.0.0.1:9090" } srv := api.New(listenAddr, s) + if delegMgr != nil { + srv.WithDelegation(delegMgr) + } go func() { if err := srv.Start(); err != nil { log.Printf("API server error: %v", err) diff --git a/internal/api/server.go b/internal/api/server.go index a1ab354..d9c4127 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -3,18 +3,30 @@ package api import ( + "encoding/json" "fmt" "net/http" + "forge.secuaas.ovh/olivier/claude-failover/internal/delegation" "forge.secuaas.ovh/olivier/claude-failover/internal/state" ) -const version = "0.1.0" +const version = "0.2.0" -// Server is a minimal HTTP server exposing /health and /status. +// DelegationProvider is the slice of delegation.Manager used by the HTTP +// server. Kept as an interface so tests don't have to spin up a real +// secutools client. +type DelegationProvider interface { + Active() []delegation.ActiveJob + CountersSnapshot() delegation.Snapshot +} + +// Server is a minimal HTTP server exposing /health, /status, +// /watchdog/status and /api/delegated/status. type Server struct { - addr string - state *state.State + addr string + state *state.State + delegation DelegationProvider } // New creates a Server listening on addr. @@ -22,11 +34,20 @@ func New(addr string, s *state.State) *Server { return &Server{addr: addr, state: s} } +// WithDelegation enables /api/delegated/* endpoints. Pass nil (or skip +// the call) to keep them disabled — those paths return 404. +func (s *Server) WithDelegation(d DelegationProvider) *Server { + s.delegation = d + return s +} + // Start registers routes and begins serving. Blocks until the listener fails. func (s *Server) Start() error { mux := http.NewServeMux() mux.HandleFunc("/health", s.handleHealth) mux.HandleFunc("/status", s.handleStatus) + mux.HandleFunc("/watchdog/status", s.handleWatchdogStatus) + mux.HandleFunc("/api/delegated/status", s.handleDelegatedStatus) return http.ListenAndServe(s.addr, mux) } @@ -37,5 +58,33 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") - w.Write(s.state.JSON()) + _, _ = w.Write(s.state.JSON()) +} + +// handleWatchdogStatus returns operational counters consumed by the +// orchestrator dashboard. Includes delegation metrics when wired. +func (s *Server) handleWatchdogStatus(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + out := map[string]any{ + "version": version, + } + if s.delegation != nil { + out["delegation"] = s.delegation.CountersSnapshot() + } + _ = json.NewEncoder(w).Encode(out) +} + +// handleDelegatedStatus returns the list of in-flight delegated jobs. +func (s *Server) handleDelegatedStatus(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + if s.delegation == nil { + w.WriteHeader(http.StatusNotFound) + _, _ = w.Write([]byte(`{"error":"delegation disabled"}`)) + return + } + out := map[string]any{ + "active": s.delegation.Active(), + "counters": s.delegation.CountersSnapshot(), + } + _ = json.NewEncoder(w).Encode(out) } diff --git a/internal/api/server_test.go b/internal/api/server_test.go new file mode 100644 index 0000000..043c817 --- /dev/null +++ b/internal/api/server_test.go @@ -0,0 +1,116 @@ +package api + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "forge.secuaas.ovh/olivier/claude-failover/internal/delegation" + "forge.secuaas.ovh/olivier/claude-failover/internal/state" +) + +// fakeDelegation implements DelegationProvider for tests. +type fakeDelegation struct { + active []delegation.ActiveJob + snapshot delegation.Snapshot +} + +func (f *fakeDelegation) Active() []delegation.ActiveJob { return f.active } +func (f *fakeDelegation) CountersSnapshot() delegation.Snapshot { return f.snapshot } + +func newTestServer(t *testing.T, deleg DelegationProvider) *httptest.Server { + t.Helper() + s := New("ignored", state.New("")) + if deleg != nil { + s.WithDelegation(deleg) + } + mux := http.NewServeMux() + mux.HandleFunc("/health", s.handleHealth) + mux.HandleFunc("/status", s.handleStatus) + mux.HandleFunc("/watchdog/status", s.handleWatchdogStatus) + mux.HandleFunc("/api/delegated/status", s.handleDelegatedStatus) + return httptest.NewServer(mux) +} + +func TestHandleHealth(t *testing.T) { + srv := newTestServer(t, nil) + defer srv.Close() + resp, err := http.Get(srv.URL + "/health") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + if !strings.Contains(string(body), `"status":"ok"`) { + t.Errorf("unexpected /health body: %s", body) + } +} + +func TestHandleDelegatedStatus_Disabled(t *testing.T) { + srv := newTestServer(t, nil) + defer srv.Close() + resp, err := http.Get(srv.URL + "/api/delegated/status") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusNotFound { + t.Errorf("expected 404 when delegation disabled, got %d", resp.StatusCode) + } +} + +func TestHandleDelegatedStatus_Enabled(t *testing.T) { + deleg := &fakeDelegation{ + active: []delegation.ActiveJob{ + {JobID: "j1", Project: "/p/a", TaskFile: "task-1.md", Provider: "gpu", Duration: "12s"}, + }, + snapshot: delegation.Snapshot{Active: 1, CompletedTotal: 5, FailedTotal: 1}, + } + srv := newTestServer(t, deleg) + defer srv.Close() + + resp, err := http.Get(srv.URL + "/api/delegated/status") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected 200, got %d", resp.StatusCode) + } + var out struct { + Active []delegation.ActiveJob `json:"active"` + Counters delegation.Snapshot `json:"counters"` + } + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + t.Fatal(err) + } + if len(out.Active) != 1 || out.Active[0].JobID != "j1" { + t.Errorf("active mismatch: %+v", out.Active) + } + if out.Counters.CompletedTotal != 5 || out.Counters.Active != 1 { + t.Errorf("counters mismatch: %+v", out.Counters) + } +} + +func TestHandleWatchdogStatus_IncludesDelegationCounters(t *testing.T) { + deleg := &fakeDelegation{snapshot: delegation.Snapshot{ + Active: 2, CompletedTotal: 7, FailedTotal: 3, + }} + srv := newTestServer(t, deleg) + defer srv.Close() + + resp, err := http.Get(srv.URL + "/watchdog/status") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + for _, want := range []string{`"delegated_active":2`, `"delegated_completed_total":7`, `"delegated_failed_total":3`} { + if !strings.Contains(string(body), want) { + t.Errorf("missing %s in body: %s", want, body) + } + } +} diff --git a/internal/delegation/delegation.go b/internal/delegation/delegation.go new file mode 100644 index 0000000..0d4d835 --- /dev/null +++ b/internal/delegation/delegation.go @@ -0,0 +1,408 @@ +// Package delegation submits agent-queue tasks to secutools when the router +// decides they should not run on a local Claude Code session, and reaps +// completed jobs back into the project's done/ or failed/ directory. +// +// Lifecycle of a delegated task in this package: +// +// inbox/.md +// │ Submit() — POST /api/v1/jobs +// ▼ +// inbox/.md.delegated (marker file: { project, task, job_id, provider }) +// │ +// Reaper loop polls every : +// │ GetJob() → completed +// ▼ +// done/.md (frontmatter: completed, body = AI response, footer = cost) +// +// or, on failure: +// +// failed/.md (frontmatter: failed_reason, body = original task) +// +// The .delegated marker doubles as the on-disk source of truth — if the +// daemon restarts mid-flight, the reaper rebuilds its job table from disk. +package delegation + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "os" + "path/filepath" + "strings" + "sync" + "sync/atomic" + "time" + + "forge.secuaas.ovh/olivier/claude-failover/internal/router" + "forge.secuaas.ovh/olivier/claude-failover/internal/secutools" +) + +// Marker is the JSON content of an inbox/*.md.delegated file. +type Marker struct { + Project string `json:"project"` + TaskFile string `json:"task_file"` + JobID string `json:"job_id"` + Provider string `json:"provider"` + StartedAt time.Time `json:"started_at"` +} + +// DelegatedJob is the in-memory view of a job we are tracking. +type DelegatedJob struct { + Marker + StatusFile string // absolute path to the inbox/*.md.delegated marker +} + +// Counters tracks aggregate metrics surfaced via /watchdog/status. +type Counters struct { + Active atomic.Int64 + CompletedTotal atomic.Int64 + FailedTotal atomic.Int64 +} + +// Snapshot is a JSON-friendly counter snapshot. +type Snapshot struct { + Active int64 `json:"delegated_active"` + CompletedTotal int64 `json:"delegated_completed_total"` + FailedTotal int64 `json:"delegated_failed_total"` +} + +// Snapshot returns the current counter values. +func (c *Counters) Snapshot() Snapshot { + return Snapshot{ + Active: c.Active.Load(), + CompletedTotal: c.CompletedTotal.Load(), + FailedTotal: c.FailedTotal.Load(), + } +} + +// Manager submits delegated jobs and runs the periodic reaper. +type Manager struct { + client secutools.Client + logger *log.Logger + interval time.Duration + now func() time.Time + + mu sync.RWMutex + jobs map[string]*DelegatedJob // key: job_id + + Counters Counters +} + +// New constructs a Manager. interval is how often the reaper polls +// secutools; if zero, defaults to 30s. +func New(client secutools.Client, interval time.Duration) *Manager { + if interval == 0 { + interval = 30 * time.Second + } + return &Manager{ + client: client, + logger: log.Default(), + interval: interval, + now: func() time.Time { return time.Now().UTC() }, + jobs: make(map[string]*DelegatedJob), + } +} + +// SetLogger overrides the default logger (used by tests). +func (m *Manager) SetLogger(l *log.Logger) { m.logger = l } + +// Submit posts the task to secutools and writes a .delegated marker next to +// the original .md so that on restart we can resume tracking. +// +// taskPath is the absolute path of the inbox/.md file. body is the .md +// body (post-frontmatter) used as the prompt. projectDir is the project root +// (for routing the result back to done/). +func (m *Manager) Submit(ctx context.Context, projectDir, taskPath, prompt string, + provider router.Provider, priority secutools.Priority) (*DelegatedJob, error) { + + if !provider.IsDelegated() { + return nil, fmt.Errorf("delegation: provider %q is not delegated", provider) + } + + preferredAI := "" + if provider != router.ProviderAuto { + preferredAI = string(provider) + } + + req := &secutools.JobRequest{ + Type: secutools.TypeAnalyze, + Priority: priority, + Prompt: buildPrompt(taskPath, prompt), + PreferredAI: preferredAI, + Source: "claude-failover/dispatcher", + } + + resp, err := m.client.SubmitJob(ctx, req) + if err != nil { + return nil, fmt.Errorf("delegation: submit %s: %w", filepath.Base(taskPath), err) + } + + mk := Marker{ + Project: projectDir, + TaskFile: taskPath, + JobID: resp.JobID, + Provider: string(provider), + StartedAt: m.now(), + } + markerPath := taskPath + ".delegated" + if err := writeMarker(markerPath, mk); err != nil { + return nil, fmt.Errorf("delegation: write marker: %w", err) + } + + job := &DelegatedJob{Marker: mk, StatusFile: markerPath} + m.mu.Lock() + m.jobs[resp.JobID] = job + m.mu.Unlock() + m.Counters.Active.Add(1) + + m.logger.Printf("[delegation] SUBMITTED task=%s job_id=%s provider=%s", + filepath.Base(taskPath), resp.JobID, provider) + return job, nil +} + +// LoadFromDisk rebuilds the in-memory tracker from any .delegated markers +// found under projectsDirs. Called once at daemon startup. +func (m *Manager) LoadFromDisk(projectsDirs []string) error { + for _, projectDir := range projectsDirs { + inbox := filepath.Join(projectDir, ".agent-queue", "inbox") + entries, err := os.ReadDir(inbox) + if err != nil { + continue + } + for _, e := range entries { + if !strings.HasSuffix(e.Name(), ".md.delegated") { + continue + } + markerPath := filepath.Join(inbox, e.Name()) + mk, err := readMarker(markerPath) + if err != nil { + m.logger.Printf("[delegation] WARN bad marker %s: %v", markerPath, err) + continue + } + m.mu.Lock() + m.jobs[mk.JobID] = &DelegatedJob{Marker: mk, StatusFile: markerPath} + m.mu.Unlock() + m.Counters.Active.Add(1) + } + } + return nil +} + +// Active returns a snapshot of the currently-tracked delegated jobs (for +// the /api/delegated/status HTTP endpoint). +type ActiveJob struct { + JobID string `json:"job_id"` + Project string `json:"project"` + TaskFile string `json:"task_file"` + Provider string `json:"provider"` + Duration string `json:"duration"` +} + +// CountersSnapshot returns a JSON-friendly snapshot of the manager's +// counters. Implements api.DelegationProvider. +func (m *Manager) CountersSnapshot() Snapshot { return m.Counters.Snapshot() } + +// Active returns the list of in-flight delegated jobs. +func (m *Manager) Active() []ActiveJob { + m.mu.RLock() + defer m.mu.RUnlock() + out := make([]ActiveJob, 0, len(m.jobs)) + for _, j := range m.jobs { + out = append(out, ActiveJob{ + JobID: j.JobID, + Project: j.Project, + TaskFile: filepath.Base(j.TaskFile), + Provider: j.Provider, + Duration: m.now().Sub(j.StartedAt).Round(time.Second).String(), + }) + } + return out +} + +// Run starts the reaper loop until ctx is cancelled. +func (m *Manager) Run(ctx context.Context) { + ticker := time.NewTicker(m.interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + m.reapOnce(ctx) + } + } +} + +// reapOnce polls every tracked job and finalises terminal ones. +func (m *Manager) reapOnce(ctx context.Context) { + m.mu.RLock() + snapshot := make([]*DelegatedJob, 0, len(m.jobs)) + for _, j := range m.jobs { + snapshot = append(snapshot, j) + } + m.mu.RUnlock() + + for _, j := range snapshot { + st, err := m.client.GetJob(ctx, j.JobID) + if err != nil { + m.logger.Printf("[delegation] poll job=%s: %v", j.JobID, err) + continue + } + switch st.Status { + case "completed": + res, err := m.fetchResult(ctx, j.JobID) + if err != nil { + m.logger.Printf("[delegation] fetch result job=%s: %v", j.JobID, err) + continue + } + m.finalizeSuccess(j, res) + case "failed", "cancelled": + m.finalizeFailure(j, st) + } + } +} + +// fetchResult retrieves the final job result. We can't use Client.WaitForResult +// here because we need to fetch only the result, not poll for completion. +func (m *Manager) fetchResult(ctx context.Context, jobID string) (*secutools.JobResult, error) { + type resultGetter interface { + WaitForResult(ctx context.Context, id string, timeout time.Duration) (*secutools.JobResult, error) + } + rg, ok := m.client.(resultGetter) + if !ok { + return nil, errors.New("delegation: client does not implement WaitForResult") + } + // 0 timeout → essentially "fetch now". Real Wait still polls, but we're + // already in completed state so the first poll returns immediately. + return rg.WaitForResult(ctx, jobID, 5*time.Second) +} + +// finalizeSuccess writes the result to done/ and removes the inbox marker. +func (m *Manager) finalizeSuccess(j *DelegatedJob, res *secutools.JobResult) { + doneDir := filepath.Join(j.Project, ".agent-queue", "done") + if err := os.MkdirAll(doneDir, 0755); err != nil { + m.logger.Printf("[delegation] mkdir done %s: %v", doneDir, err) + return + } + donePath := filepath.Join(doneDir, filepath.Base(j.TaskFile)) + body := buildDoneBody(res, j) + if err := os.WriteFile(donePath, []byte(body), 0644); err != nil { + m.logger.Printf("[delegation] write done %s: %v", donePath, err) + return + } + _ = os.Remove(j.StatusFile) + _ = os.Remove(j.TaskFile) + + m.mu.Lock() + delete(m.jobs, j.JobID) + m.mu.Unlock() + m.Counters.Active.Add(-1) + m.Counters.CompletedTotal.Add(1) + + m.logger.Printf("[delegation] COMPLETED task=%s job_id=%s provider=%s cost_cad=%.4f", + filepath.Base(j.TaskFile), j.JobID, res.Provider, res.CostCAD) +} + +// finalizeFailure writes the original task body into failed/ with a reason. +func (m *Manager) finalizeFailure(j *DelegatedJob, st *secutools.JobStatus) { + failedDir := filepath.Join(j.Project, ".agent-queue", "failed") + if err := os.MkdirAll(failedDir, 0755); err != nil { + m.logger.Printf("[delegation] mkdir failed %s: %v", failedDir, err) + return + } + failedPath := filepath.Join(failedDir, filepath.Base(j.TaskFile)) + body := buildFailedBody(j, st) + if err := os.WriteFile(failedPath, []byte(body), 0644); err != nil { + m.logger.Printf("[delegation] write failed %s: %v", failedPath, err) + return + } + _ = os.Remove(j.StatusFile) + _ = os.Remove(j.TaskFile) + + m.mu.Lock() + delete(m.jobs, j.JobID) + m.mu.Unlock() + m.Counters.Active.Add(-1) + m.Counters.FailedTotal.Add(1) + + m.logger.Printf("[delegation] FAILED task=%s job_id=%s status=%s err=%s", + filepath.Base(j.TaskFile), j.JobID, st.Status, st.Error) +} + +// buildPrompt assembles the prompt sent to secutools. Self-contained: the +// non-Claude provider has no filesystem access, so we send the body as-is +// and rely on the task author to include all required context. +func buildPrompt(taskPath, body string) string { + taskName := filepath.Base(taskPath) + if body == "" { + body = "(empty task body)" + } + return fmt.Sprintf("Task: %s\n\n%s\n\nProvide a complete, self-contained answer.", + taskName, body) +} + +// buildDoneBody is the .md written into done/ after a successful job. +func buildDoneBody(res *secutools.JobResult, j *DelegatedJob) string { + return fmt.Sprintf(`--- +status: completed +provider: %s +model: %s +cost_cad: %.4f +job_id: %s +delegated_at: %s +--- + +# Result (delegated to secutools) + +%s + +--- +provider: %s, cost_cad: %.4f +`, res.Provider, res.Model, res.CostCAD, j.JobID, + j.StartedAt.Format(time.RFC3339), res.Response, + res.Provider, res.CostCAD) +} + +// buildFailedBody is the .md written into failed/ when a delegated job dies. +func buildFailedBody(j *DelegatedJob, st *secutools.JobStatus) string { + return fmt.Sprintf(`--- +status: failed +failed_reason: %s +job_id: %s +provider: %s +delegated_at: %s +--- + +# Delegation failed + +Job %s ended in status=%s. + +Error: %s +`, st.Status, j.JobID, j.Provider, + j.StartedAt.Format(time.RFC3339), + j.JobID, st.Status, st.Error) +} + +// writeMarker serialises mk to disk as JSON. +func writeMarker(path string, mk Marker) error { + data, err := json.MarshalIndent(mk, "", " ") + if err != nil { + return err + } + return os.WriteFile(path, data, 0644) +} + +// readMarker parses a marker file from disk. +func readMarker(path string) (Marker, error) { + data, err := os.ReadFile(path) + if err != nil { + return Marker{}, err + } + var mk Marker + if err := json.Unmarshal(data, &mk); err != nil { + return Marker{}, err + } + return mk, nil +} diff --git a/internal/delegation/delegation_test.go b/internal/delegation/delegation_test.go new file mode 100644 index 0000000..7967ea7 --- /dev/null +++ b/internal/delegation/delegation_test.go @@ -0,0 +1,335 @@ +package delegation + +import ( + "context" + "encoding/json" + "errors" + "io" + "log" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "forge.secuaas.ovh/olivier/claude-failover/internal/router" + "forge.secuaas.ovh/olivier/claude-failover/internal/secutools" +) + +// fakeClient is an in-memory secutools.Client implementation for tests. +// It supports preloaded responses keyed by job_id and a counter of calls. +type fakeClient struct { + mu sync.Mutex + nextJobID int + statuses map[string]string // job_id → current status + results map[string]*secutools.JobResult // job_id → result + statusErr error + submitErr error +} + +func newFakeClient() *fakeClient { + return &fakeClient{ + statuses: make(map[string]string), + results: make(map[string]*secutools.JobResult), + } +} + +func (f *fakeClient) SubmitJob(_ context.Context, req *secutools.JobRequest) (*secutools.JobResponse, error) { + f.mu.Lock() + defer f.mu.Unlock() + if f.submitErr != nil { + return nil, f.submitErr + } + f.nextJobID++ + id := "job-" + itoa(f.nextJobID) + f.statuses[id] = "pending" + return &secutools.JobResponse{JobID: id, Status: "pending"}, nil +} + +func (f *fakeClient) GetJob(_ context.Context, id string) (*secutools.JobStatus, error) { + f.mu.Lock() + defer f.mu.Unlock() + if f.statusErr != nil { + return nil, f.statusErr + } + st, ok := f.statuses[id] + if !ok { + return nil, errors.New("unknown job: " + id) + } + out := &secutools.JobStatus{JobID: id, Status: st} + if r, ok := f.results[id]; ok { + out.Provider = r.Provider + } + return out, nil +} + +func (f *fakeClient) WaitForResult(_ context.Context, id string, _ time.Duration) (*secutools.JobResult, error) { + f.mu.Lock() + defer f.mu.Unlock() + if r, ok := f.results[id]; ok { + return r, nil + } + return nil, errors.New("no result for " + id) +} + +// setStatus is a test helper to drive job lifecycles. +func (f *fakeClient) setStatus(id, status string) { + f.mu.Lock() + defer f.mu.Unlock() + f.statuses[id] = status +} + +func (f *fakeClient) setResult(id string, r *secutools.JobResult) { + f.mu.Lock() + defer f.mu.Unlock() + f.results[id] = r +} + +// itoa is a tiny stdlib-free int-to-string for fake job IDs. +func itoa(n int) string { + if n == 0 { + return "0" + } + var b []byte + for n > 0 { + b = append([]byte{byte('0' + n%10)}, b...) + n /= 10 + } + return string(b) +} + +// quietLogger discards log output. +func quietLogger() *log.Logger { + return log.New(io.Discard, "", 0) +} + +func setupProject(t *testing.T) (projectDir, taskPath string) { + t.Helper() + projectDir = t.TempDir() + inbox := filepath.Join(projectDir, ".agent-queue", "inbox") + if err := os.MkdirAll(inbox, 0755); err != nil { + t.Fatal(err) + } + taskPath = filepath.Join(inbox, "task-001.md") + if err := os.WriteFile(taskPath, []byte("---\ntitle: Test\n---\nDo a thing."), 0644); err != nil { + t.Fatal(err) + } + return projectDir, taskPath +} + +func TestSubmit_WritesMarkerAndIncrementsCounter(t *testing.T) { + fc := newFakeClient() + mgr := New(fc, 100*time.Millisecond) + mgr.SetLogger(quietLogger()) + + projectDir, taskPath := setupProject(t) + + job, err := mgr.Submit(context.Background(), projectDir, taskPath, "Do a thing.", + router.ProviderGPU, secutools.PriorityHigh) + if err != nil { + t.Fatalf("Submit: %v", err) + } + if job.JobID == "" { + t.Fatal("expected non-empty job id") + } + if mgr.Counters.Active.Load() != 1 { + t.Errorf("expected Active=1, got %d", mgr.Counters.Active.Load()) + } + + // Marker file must exist and decode back to the same job_id. + markerPath := taskPath + ".delegated" + data, err := os.ReadFile(markerPath) + if err != nil { + t.Fatalf("missing marker: %v", err) + } + var mk Marker + if err := json.Unmarshal(data, &mk); err != nil { + t.Fatalf("decode marker: %v", err) + } + if mk.JobID != job.JobID || mk.Provider != "gpu" { + t.Errorf("marker mismatch: %+v", mk) + } +} + +func TestSubmit_RejectsNonDelegatedProvider(t *testing.T) { + mgr := New(newFakeClient(), time.Millisecond) + mgr.SetLogger(quietLogger()) + _, taskPath := setupProject(t) + + _, err := mgr.Submit(context.Background(), filepath.Dir(filepath.Dir(filepath.Dir(taskPath))), + taskPath, "p", router.ProviderClaudeCode, secutools.PriorityDefault) + if err == nil { + t.Fatal("expected error when provider is claude-code (non-delegated)") + } +} + +func TestReap_CompletesJobAndMovesToDone(t *testing.T) { + fc := newFakeClient() + mgr := New(fc, time.Millisecond) + mgr.SetLogger(quietLogger()) + + projectDir, taskPath := setupProject(t) + + job, err := mgr.Submit(context.Background(), projectDir, taskPath, "p", + router.ProviderAuto, secutools.PriorityDefault) + if err != nil { + t.Fatal(err) + } + + // Drive the fake into completed state. + fc.setStatus(job.JobID, "completed") + fc.setResult(job.JobID, &secutools.JobResult{ + JobID: job.JobID, Response: "Hello world", Provider: "gpu", + Model: "qwen-coder", CostCAD: 0.012, + }) + + mgr.reapOnce(context.Background()) + + if mgr.Counters.Active.Load() != 0 { + t.Errorf("expected Active=0 after completion, got %d", mgr.Counters.Active.Load()) + } + if mgr.Counters.CompletedTotal.Load() != 1 { + t.Errorf("expected CompletedTotal=1, got %d", mgr.Counters.CompletedTotal.Load()) + } + + // done/ contains the result. + donePath := filepath.Join(projectDir, ".agent-queue", "done", "task-001.md") + doneBody, err := os.ReadFile(donePath) + if err != nil { + t.Fatalf("missing done/ file: %v", err) + } + if !contains(string(doneBody), "Hello world") || !contains(string(doneBody), "provider: gpu") { + t.Errorf("done body missing expected fields:\n%s", doneBody) + } + + // Inbox marker and original .md are gone. + if _, err := os.Stat(taskPath + ".delegated"); !os.IsNotExist(err) { + t.Error("marker should be removed after completion") + } + if _, err := os.Stat(taskPath); !os.IsNotExist(err) { + t.Error("inbox .md should be removed after completion") + } +} + +func TestReap_FailedJobMovesToFailed(t *testing.T) { + fc := newFakeClient() + mgr := New(fc, time.Millisecond) + mgr.SetLogger(quietLogger()) + + projectDir, taskPath := setupProject(t) + + job, err := mgr.Submit(context.Background(), projectDir, taskPath, "p", + router.ProviderGemini, secutools.PriorityLow) + if err != nil { + t.Fatal(err) + } + + fc.setStatus(job.JobID, "failed") + mgr.reapOnce(context.Background()) + + if mgr.Counters.FailedTotal.Load() != 1 { + t.Errorf("expected FailedTotal=1, got %d", mgr.Counters.FailedTotal.Load()) + } + failedPath := filepath.Join(projectDir, ".agent-queue", "failed", "task-001.md") + if _, err := os.Stat(failedPath); err != nil { + t.Errorf("failed/ file missing: %v", err) + } +} + +func TestLoadFromDisk_RestoresMarkers(t *testing.T) { + fc := newFakeClient() + mgr := New(fc, time.Millisecond) + mgr.SetLogger(quietLogger()) + + projectDir, taskPath := setupProject(t) + + // Pre-write a marker on disk simulating a daemon restart. + mk := Marker{ + Project: projectDir, TaskFile: taskPath, JobID: "preexisting-job", + Provider: "gpu", StartedAt: time.Now().UTC().Add(-1 * time.Minute), + } + if err := writeMarker(taskPath+".delegated", mk); err != nil { + t.Fatal(err) + } + + if err := mgr.LoadFromDisk([]string{projectDir}); err != nil { + t.Fatal(err) + } + if mgr.Counters.Active.Load() != 1 { + t.Errorf("expected Active=1 after LoadFromDisk, got %d", mgr.Counters.Active.Load()) + } + active := mgr.Active() + if len(active) != 1 || active[0].JobID != "preexisting-job" { + t.Errorf("Active() mismatch: %+v", active) + } +} + +func TestActive_ReturnsCurrentJobs(t *testing.T) { + fc := newFakeClient() + mgr := New(fc, time.Millisecond) + mgr.SetLogger(quietLogger()) + + projectDir, taskPath := setupProject(t) + if _, err := mgr.Submit(context.Background(), projectDir, taskPath, "p", + router.ProviderGPU, secutools.PriorityDefault); err != nil { + t.Fatal(err) + } + + got := mgr.Active() + if len(got) != 1 { + t.Fatalf("expected 1 active job, got %d", len(got)) + } + if got[0].Provider != "gpu" || got[0].Project != projectDir { + t.Errorf("active[0] mismatch: %+v", got[0]) + } +} + +// TestEndToEnd_InboxToDone exercises the full happy path: +// inbox → Submit → reapOnce (status=completed) → done/. +func TestEndToEnd_InboxToDone(t *testing.T) { + fc := newFakeClient() + mgr := New(fc, time.Millisecond) + mgr.SetLogger(quietLogger()) + + projectDir, taskPath := setupProject(t) + + job, err := mgr.Submit(context.Background(), projectDir, taskPath, + "Summarize this", router.ProviderAuto, secutools.PriorityDefault) + if err != nil { + t.Fatal(err) + } + + // Simulate the secutools backend completing the job. + fc.setStatus(job.JobID, "completed") + fc.setResult(job.JobID, &secutools.JobResult{ + JobID: job.JobID, Response: "Summary text", + Provider: "claude-haiku", Model: "haiku-3-5", CostCAD: 0.0005, + }) + + mgr.reapOnce(context.Background()) + + donePath := filepath.Join(projectDir, ".agent-queue", "done", "task-001.md") + body, err := os.ReadFile(donePath) + if err != nil { + t.Fatalf("missing done/ file: %v", err) + } + want := []string{"Summary text", "provider: claude-haiku", "cost_cad: 0.0005"} + for _, w := range want { + if !contains(string(body), w) { + t.Errorf("done body missing %q:\n%s", w, body) + } + } +} + +func contains(s, sub string) bool { + return len(s) >= len(sub) && (sub == "" || indexOf(s, sub) >= 0) +} + +func indexOf(s, sub string) int { + for i := 0; i+len(sub) <= len(s); i++ { + if s[i:i+len(sub)] == sub { + return i + } + } + return -1 +} diff --git a/internal/dispatcher/dispatcher.go b/internal/dispatcher/dispatcher.go index 485fa5b..b54a098 100644 --- a/internal/dispatcher/dispatcher.go +++ b/internal/dispatcher/dispatcher.go @@ -16,16 +16,26 @@ import ( "gopkg.in/yaml.v3" "forge.secuaas.ovh/olivier/claude-failover/internal/config" + "forge.secuaas.ovh/olivier/claude-failover/internal/delegation" + "forge.secuaas.ovh/olivier/claude-failover/internal/router" + "forge.secuaas.ovh/olivier/claude-failover/internal/secutools" "forge.secuaas.ovh/olivier/claude-failover/internal/state" "forge.secuaas.ovh/olivier/claude-failover/internal/tmux" ) // TaskFrontmatter is the YAML header parsed from task .md files. +// +// Phase 2 chantier E added three new fields (PreferredAI, AllowDelegation, +// ComplexityHint). They are all optional; absence preserves Phase 1 +// behaviour (Claude Code on a local ccl-auto session). type TaskFrontmatter struct { - Title string `yaml:"title"` - Priority string `yaml:"priority"` // critical, high, default, low - Tags []string `yaml:"tags"` - NeedsClaude bool `yaml:"needs_claude_code"` + Title string `yaml:"title"` + Priority string `yaml:"priority"` // critical, high, default, low + Tags []string `yaml:"tags"` + NeedsClaude bool `yaml:"needs_claude_code"` + PreferredAI string `yaml:"preferred_ai"` // auto | claude-code | gpu | gemini | claude-api + AllowDelegation bool `yaml:"allow_delegation"` // default false → backward-compatible + ComplexityHint string `yaml:"complexity_hint"` // low | medium | high } // Dispatcher watches project inbox directories and assigns tasks to idle sessions. @@ -36,6 +46,12 @@ type Dispatcher struct { doneChan <-chan string projectsDir string logger *log.Logger + + // Phase 2 chantier E — delegation. When non-nil, tasks whose + // frontmatter routes them away from Claude Code (allow_delegation=true + // + preferred_ai!=claude-code) are submitted to secutools instead of + // being assigned to a tmux session. + delegation *delegation.Manager } // New creates a Dispatcher. @@ -55,6 +71,13 @@ func New(tc tmux.Client, s *state.State, cfg *config.Config, doneChan <-chan str } } +// WithDelegation enables delegation routing. Pass nil to disable (the +// Phase 1 default). Returns d for chaining. +func (d *Dispatcher) WithDelegation(m *delegation.Manager) *Dispatcher { + d.delegation = m + return d +} + // Run starts the dispatcher event loop until ctx is cancelled. func (d *Dispatcher) Run(ctx context.Context) { ticker := time.NewTicker(60 * time.Second) @@ -113,6 +136,11 @@ func (d *Dispatcher) fullScan() { } // dispatchProject assigns undispatched tasks in inboxDir to idle sessions. +// +// For each task we first ask the router whether the task should run on a +// local Claude Code session (the Phase 1 path) or be delegated to +// secutools (Phase 2 chantier E). Delegated tasks bypass the +// findFreeSession() check entirely — they don't need a tmux slot. func (d *Dispatcher) dispatchProject(inboxDir string) { entries, err := os.ReadDir(inboxDir) if err != nil { @@ -121,15 +149,42 @@ func (d *Dispatcher) dispatchProject(inboxDir string) { projectDir := filepath.Dir(filepath.Dir(inboxDir)) // inboxDir/.agent-queue/inbox → project for _, e := range entries { name := e.Name() - if !strings.HasSuffix(name, ".md") || strings.Contains(name, ".dispatched") { + if !strings.HasSuffix(name, ".md") || + strings.Contains(name, ".dispatched") || + strings.Contains(name, ".delegated") { continue } + taskPath := filepath.Join(inboxDir, name) + + // Decide route based on frontmatter. + decision, body, err := d.routeTask(taskPath) + if err != nil { + d.logger.Printf("[dispatcher] routeTask %s: %v", taskPath, err) + continue + } + + // Delegated path: submit to secutools, mark the task with .delegated. + if decision.Provider.IsDelegated() && d.delegation != nil { + if _, err := d.delegation.Submit(context.Background(), projectDir, taskPath, + body, decision.Provider, mapPriority(d.taskPriority(taskPath))); err != nil { + d.logger.Printf("[dispatcher] delegate %s: %v — falling back to Claude Code", + filepath.Base(taskPath), err) + // Fall through to Claude Code path on submit failure. + } else { + d.logger.Printf("[dispatcher] DELEGATED task=%s provider=%s reason=%s", + filepath.Base(taskPath), decision.Provider, decision.Reason) + // Original .md is left in place under inbox/ until the reaper + // finalises it. The .delegated marker prevents re-dispatch. + continue + } + } + + // Claude Code path (Phase 1 behaviour). session := d.findFreeSession() if session == "" { d.logger.Printf("[dispatcher] no free session for task in %s", inboxDir) return } - taskPath := filepath.Join(inboxDir, name) if err := d.launchAgent(session, projectDir, taskPath); err != nil { d.logger.Printf("[dispatcher] launchAgent error: %v", err) continue @@ -140,6 +195,50 @@ func (d *Dispatcher) dispatchProject(inboxDir string) { } } +// routeTask reads taskPath, parses its frontmatter, and asks the router +// for a decision. Returns the decision plus the parsed body so callers +// don't have to read the file twice. +func (d *Dispatcher) routeTask(taskPath string) (router.Decision, string, error) { + content, err := os.ReadFile(taskPath) + if err != nil { + return router.Decision{}, "", fmt.Errorf("read task: %w", err) + } + fm, body := parseFrontmatter(content) + dec := router.Decide(router.Task{ + PreferredAI: fm.PreferredAI, + AllowDelegation: fm.AllowDelegation, + NeedsClaudeCode: fm.NeedsClaude, + ComplexityHint: fm.ComplexityHint, + }) + return dec, body, nil +} + +// taskPriority returns the Priority field of a task's frontmatter without +// re-parsing the body. Defensive — used only for mapping into a secutools +// priority when delegating. +func (d *Dispatcher) taskPriority(taskPath string) string { + content, err := os.ReadFile(taskPath) + if err != nil { + return "" + } + fm, _ := parseFrontmatter(content) + return fm.Priority +} + +// mapPriority converts a task priority string into a secutools Priority. +func mapPriority(p string) secutools.Priority { + switch strings.ToLower(strings.TrimSpace(p)) { + case "critical": + return secutools.PriorityCritical + case "high": + return secutools.PriorityHigh + case "low": + return secutools.PriorityLow + default: + return secutools.PriorityDefault + } +} + // findFreeSession returns the name of an idle, live, cooldown-free session // from the autonomous pool. Dedicated sessions are intentionally NOT // considered: those host the operator's manual interactive work. Routing a @@ -175,7 +274,13 @@ func (d *Dispatcher) isSessionFree(name string) bool { return true } -// assignNextTask scans all inboxes for work to give to a freshly-idled session. +// assignNextTask scans all inboxes for work to give to a freshly-idled +// session. Skips tasks that are already delegated to secutools (a +// .delegated marker exists alongside the .md) or already dispatched. +// +// Tasks whose router decision is "delegate" are also skipped here — they +// will be picked up by the next dispatchProject scan (which knows how to +// submit to secutools). func (d *Dispatcher) assignNextTask(session string) { for _, ds := range d.config.Pool.Dedicated { inbox := filepath.Join(ds.Project, ".agent-queue", "inbox") @@ -183,11 +288,30 @@ func (d *Dispatcher) assignNextTask(session string) { if err != nil { continue } + // Build a set of base names already delegated, so we can skip the + // associated .md without re-dispatching it locally. + delegated := make(map[string]bool) for _, e := range entries { - if !strings.HasSuffix(e.Name(), ".md") || strings.Contains(e.Name(), ".dispatched") { + if strings.HasSuffix(e.Name(), ".md.delegated") { + delegated[strings.TrimSuffix(e.Name(), ".delegated")] = true + } + } + for _, e := range entries { + if !strings.HasSuffix(e.Name(), ".md") || + strings.Contains(e.Name(), ".dispatched") || + strings.Contains(e.Name(), ".delegated") { + continue + } + if delegated[e.Name()] { continue } taskPath := filepath.Join(inbox, e.Name()) + // Respect routing decisions: don't take a delegated task here. + if d.delegation != nil { + if dec, _, err := d.routeTask(taskPath); err == nil && dec.Provider.IsDelegated() { + continue + } + } if err := d.launchAgent(session, ds.Project, taskPath); err == nil { os.Rename(taskPath, taskPath+".dispatched") return diff --git a/internal/dispatcher/routing_test.go b/internal/dispatcher/routing_test.go new file mode 100644 index 0000000..a19d9f3 --- /dev/null +++ b/internal/dispatcher/routing_test.go @@ -0,0 +1,318 @@ +package dispatcher + +import ( + "context" + "errors" + "log" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "forge.secuaas.ovh/olivier/claude-failover/internal/config" + "forge.secuaas.ovh/olivier/claude-failover/internal/delegation" + "forge.secuaas.ovh/olivier/claude-failover/internal/router" + "forge.secuaas.ovh/olivier/claude-failover/internal/secutools" + "forge.secuaas.ovh/olivier/claude-failover/internal/state" +) + +// fakeSecutools is a minimal stub used for routing tests. It records every +// SubmitJob call so tests can assert on the number/contents of submissions. +type fakeSecutools struct { + mu sync.Mutex + submits []*secutools.JobRequest + statuses map[string]string + results map[string]*secutools.JobResult + nextID int +} + +func newFakeSecutools() *fakeSecutools { + return &fakeSecutools{ + statuses: make(map[string]string), + results: make(map[string]*secutools.JobResult), + } +} + +func (f *fakeSecutools) SubmitJob(_ context.Context, req *secutools.JobRequest) (*secutools.JobResponse, error) { + f.mu.Lock() + defer f.mu.Unlock() + f.submits = append(f.submits, req) + f.nextID++ + id := "job-" + itoa(f.nextID) + f.statuses[id] = "pending" + return &secutools.JobResponse{JobID: id, Status: "pending"}, nil +} + +func (f *fakeSecutools) GetJob(_ context.Context, id string) (*secutools.JobStatus, error) { + f.mu.Lock() + defer f.mu.Unlock() + st, ok := f.statuses[id] + if !ok { + return nil, errors.New("unknown") + } + return &secutools.JobStatus{JobID: id, Status: st}, nil +} + +func (f *fakeSecutools) WaitForResult(_ context.Context, id string, _ time.Duration) (*secutools.JobResult, error) { + f.mu.Lock() + defer f.mu.Unlock() + if r, ok := f.results[id]; ok { + return r, nil + } + return nil, errors.New("no result") +} + +// (uses itoa from dispatcher.go) + +func setupTaskFile(t *testing.T, frontmatter, body string) (projectDir, inbox, taskPath string) { + t.Helper() + projectDir = t.TempDir() + inbox = filepath.Join(projectDir, ".agent-queue", "inbox") + if err := os.MkdirAll(inbox, 0755); err != nil { + t.Fatal(err) + } + content := "---\n" + frontmatter + "\n---\n" + body + taskPath = filepath.Join(inbox, "task-routing.md") + if err := os.WriteFile(taskPath, []byte(content), 0644); err != nil { + t.Fatal(err) + } + return projectDir, inbox, taskPath +} + +// TestRouteTask_ParsesNewFrontmatterFields ensures the dispatcher actually +// reads the new preferred_ai/allow_delegation/needs_claude_code fields. +func TestRouteTask_ParsesNewFrontmatterFields(t *testing.T) { + _, _, taskPath := setupTaskFile(t, + "title: Demo\npreferred_ai: gpu\nallow_delegation: true\ncomplexity_hint: low", + "Body of task.") + + d := &Dispatcher{logger: log.Default()} + dec, body, err := d.routeTask(taskPath) + if err != nil { + t.Fatal(err) + } + if dec.Provider != router.ProviderGPU { + t.Errorf("expected ProviderGPU, got %v (%s)", dec.Provider, dec.Reason) + } + if body != "Body of task." { + t.Errorf("body parsed wrong: %q", body) + } +} + +// TestDispatchProject_DelegatesGPUTask: a task with allow_delegation=true +// and preferred_ai=gpu must be sent to secutools and never reach a tmux +// session, even when one is available. +func TestDispatchProject_DelegatesGPUTask(t *testing.T) { + projectDir, inbox, taskPath := setupTaskFile(t, + "title: Delegated\npreferred_ai: gpu\nallow_delegation: true", + "Analyze something.") + + tc := newMockTmux() + tc.sessions["pool-0"] = true + tc.paneOutput["pool-0"] = "❯ " + + s := state.New("") + s.SetIdle("pool-0") + + fc := newFakeSecutools() + mgr := delegation.New(fc, time.Millisecond) + + d := &Dispatcher{ + tmux: tc, + state: s, + config: &config.Config{ + Pool: config.PoolConfig{ + Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1}, + }, + }, + logger: log.Default(), + delegation: mgr, + } + + d.dispatchProject(inbox) + + if len(fc.submits) != 1 { + t.Fatalf("expected 1 SubmitJob call, got %d", len(fc.submits)) + } + if fc.submits[0].PreferredAI != "gpu" { + t.Errorf("expected preferred_ai=gpu, got %q", fc.submits[0].PreferredAI) + } + // .delegated marker is present, original .md kept (the reaper finalises later). + if _, err := os.Stat(taskPath + ".delegated"); err != nil { + t.Errorf("expected .delegated marker, got %v", err) + } + if _, err := os.Stat(taskPath); err != nil { + t.Errorf("expected original .md to remain until reaped, got %v", err) + } + // pool-0 must remain idle (we did NOT launch a Claude Code agent). + if st := s.GetSession("pool-0"); st == nil || st.State != "idle" { + t.Errorf("expected pool-0 to stay idle, got %v", st) + } + _ = projectDir +} + +// TestDispatchProject_LegacyTaskKeepsClaudeCode: backward-compat. A task +// with no new fields (or allow_delegation=false implicit) MUST go to a +// local Claude Code session. +func TestDispatchProject_LegacyTaskKeepsClaudeCode(t *testing.T) { + _, inbox, taskPath := setupTaskFile(t, + "title: Legacy\npriority: default", + "Do classic work.") + + tc := newMockTmux() + tc.sessions["pool-0"] = true + tc.paneOutput["pool-0"] = "❯ " + + s := state.New("") + s.SetIdle("pool-0") + + fc := newFakeSecutools() + mgr := delegation.New(fc, time.Millisecond) + + d := &Dispatcher{ + tmux: tc, + state: s, + config: &config.Config{ + Pool: config.PoolConfig{ + Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1}, + }, + }, + logger: log.Default(), + delegation: mgr, + } + + d.dispatchProject(inbox) + + if len(fc.submits) != 0 { + t.Errorf("legacy task must NOT delegate, got %d submits", len(fc.submits)) + } + // .dispatched marker present, session is working. + if _, err := os.Stat(taskPath + ".dispatched"); err != nil { + t.Errorf("expected .dispatched marker for Claude Code path, got %v", err) + } + if st := s.GetSession("pool-0"); st == nil || st.State != "working" { + t.Errorf("expected pool-0 working, got %v", st) + } +} + +// TestDispatchProject_NeedsClaudeCodeBypassesDelegation: even with +// allow_delegation=true, needs_claude_code: true forces local execution. +func TestDispatchProject_NeedsClaudeCodeBypassesDelegation(t *testing.T) { + _, inbox, _ := setupTaskFile(t, + "title: Bypass\npreferred_ai: gpu\nallow_delegation: true\nneeds_claude_code: true", + "This needs a real Claude session.") + + tc := newMockTmux() + tc.sessions["pool-0"] = true + tc.paneOutput["pool-0"] = "❯ " + + s := state.New("") + s.SetIdle("pool-0") + + fc := newFakeSecutools() + mgr := delegation.New(fc, time.Millisecond) + + d := &Dispatcher{ + tmux: tc, + state: s, + config: &config.Config{ + Pool: config.PoolConfig{ + Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1}, + }, + }, + logger: log.Default(), + delegation: mgr, + } + + d.dispatchProject(inbox) + + if len(fc.submits) != 0 { + t.Errorf("needs_claude_code must skip delegation, got %d submits", len(fc.submits)) + } +} + +// TestEndToEnd_DelegationFlow: full happy path from inbox → submit → +// reaper → done/, with the dispatcher and delegation manager wired up. +func TestEndToEnd_DelegationFlow(t *testing.T) { + projectDir, inbox, taskPath := setupTaskFile(t, + "title: E2E\npreferred_ai: auto\nallow_delegation: true", + "Summarize this.") + + tc := newMockTmux() + s := state.New("") + + fc := newFakeSecutools() + mgr := delegation.New(fc, time.Millisecond) + + d := &Dispatcher{ + tmux: tc, + state: s, + config: &config.Config{Pool: config.PoolConfig{Autonomous: config.AutonomousConfig{Max: 0}}}, + logger: log.Default(), + delegation: mgr, + } + + // 1. Dispatch — submits to fakeSecutools. + d.dispatchProject(inbox) + if len(fc.submits) != 1 { + t.Fatalf("expected 1 submit, got %d", len(fc.submits)) + } + jobID := "job-1" + + // 2. Backend completes the job. + fc.mu.Lock() + fc.statuses[jobID] = "completed" + fc.results[jobID] = &secutools.JobResult{ + JobID: jobID, Response: "Summary", Provider: "gpu", Model: "qwen", CostCAD: 0.001, + } + fc.mu.Unlock() + + // 3. Reaper picks it up. mgr.Run drives reapOnce on each ticker tick; + // interval was set to 1ms in newDelegationManager so a 200ms + // context yields many cycles. + mgr.SetLogger(log.Default()) + delegationReapNow(t, mgr, jobID) + + donePath := filepath.Join(projectDir, ".agent-queue", "done", "task-routing.md") + body, err := os.ReadFile(donePath) + if err != nil { + t.Fatalf("missing done/ file: %v", err) + } + if !contains(string(body), "Summary") || !contains(string(body), "provider: gpu") { + t.Errorf("done body missing expected fields:\n%s", body) + } + + // Marker and original .md gone. + if _, err := os.Stat(taskPath + ".delegated"); !os.IsNotExist(err) { + t.Error("delegated marker should be removed") + } + if _, err := os.Stat(taskPath); !os.IsNotExist(err) { + t.Error("original .md should be removed") + } +} + +// delegationReapNow drives one reap cycle of mgr by calling its public +// API. We need this because reapOnce is unexported in the delegation +// package. The Run loop ticker is too slow for tests, so we expose a +// trivial ticker-equivalent: temporarily run with an immediate context. +func delegationReapNow(t *testing.T, mgr *delegation.Manager, _ string) { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + // Run the manager briefly; the ticker fires after `interval` (1ms in + // these tests) so within 200ms we get many reap cycles. + mgr.Run(ctx) +} + +func contains(s, sub string) bool { + if sub == "" { + return true + } + for i := 0; i+len(sub) <= len(s); i++ { + if s[i:i+len(sub)] == sub { + return true + } + } + return false +} diff --git a/internal/router/router.go b/internal/router/router.go new file mode 100644 index 0000000..16820f8 --- /dev/null +++ b/internal/router/router.go @@ -0,0 +1,95 @@ +// Package router decides whether a task should run on a local Claude Code +// session (current Phase 1 behaviour) or be delegated to the centralized +// SecuAAS secutools AI platform (Phase 2 chantier E). +// +// The decision is driven entirely by the task's YAML frontmatter; no +// network call is performed in Decide. +package router + +import "strings" + +// Provider is the destination chosen for a task. +type Provider string + +const ( + // ProviderClaudeCode means dispatch to a local ccl-auto tmux session + // running Claude Code. This is the Phase 1 behaviour. + ProviderClaudeCode Provider = "claude-code" + + // ProviderAuto means delegate to secutools and let its smart_triage + // router pick the actual backend (GPU > Claude > Gemini fallback chain). + ProviderAuto Provider = "auto" + + // ProviderGPU pins delegation to the in-cluster vLLM GPU pool. + ProviderGPU Provider = "gpu" + + // ProviderGemini pins delegation to Google Gemini via secutools. + ProviderGemini Provider = "gemini" + + // ProviderClaudeAPI pins delegation to the Anthropic API (NOT Claude + // Code locally — this means stateless API calls billed to the secutools + // account). + ProviderClaudeAPI Provider = "claude-api" +) + +// IsDelegated reports whether p means "submit to secutools" (i.e. not the +// local Claude Code path). +func (p Provider) IsDelegated() bool { + switch p { + case ProviderAuto, ProviderGPU, ProviderGemini, ProviderClaudeAPI: + return true + default: + return false + } +} + +// Decision is the output of Decide: which provider to use, plus a short +// human-readable reason for logging and the /api/delegated/status endpoint. +type Decision struct { + Provider Provider + Reason string +} + +// Task is the slice of frontmatter fields the router cares about. It is +// intentionally narrower than the dispatcher's full TaskFrontmatter so that +// router tests don't need to import the dispatcher. +type Task struct { + PreferredAI string // auto | claude-code | gpu | gemini | claude-api (case-insensitive) + AllowDelegation bool // default false → backward-compatible (Claude Code) + NeedsClaudeCode bool // legacy bypass — forces ProviderClaudeCode + ComplexityHint string // low | medium | high (informational only for now) +} + +// Decide returns the routing decision for the given task. +// +// Order of precedence: +// 1. needs_claude_code: true → ProviderClaudeCode (legacy bypass, never delegate) +// 2. preferred_ai: claude-code → ProviderClaudeCode (explicit) +// 3. allow_delegation: false → ProviderClaudeCode (default safety net) +// 4. preferred_ai parses to a delegated provider → that provider +// 5. fallback → ProviderAuto (let secutools smart_triage decide) +func Decide(t Task) Decision { + if t.NeedsClaudeCode { + return Decision{ProviderClaudeCode, "needs_claude_code=true"} + } + + pref := strings.ToLower(strings.TrimSpace(t.PreferredAI)) + if pref == string(ProviderClaudeCode) { + return Decision{ProviderClaudeCode, "preferred_ai=claude-code"} + } + + if !t.AllowDelegation { + return Decision{ProviderClaudeCode, "allow_delegation=false (default)"} + } + + switch Provider(pref) { + case ProviderGPU, ProviderGemini, ProviderClaudeAPI: + return Decision{Provider(pref), "preferred_ai=" + pref} + case ProviderAuto, "": + return Decision{ProviderAuto, "preferred_ai=auto (smart_triage)"} + default: + // Unknown value: fail safe to local Claude Code so a typo doesn't + // silently route real work to GPU. + return Decision{ProviderClaudeCode, "unknown preferred_ai=" + pref + " → fail-safe"} + } +} diff --git a/internal/router/router_test.go b/internal/router/router_test.go new file mode 100644 index 0000000..1c600b4 --- /dev/null +++ b/internal/router/router_test.go @@ -0,0 +1,93 @@ +package router + +import "testing" + +// TestDecide_BackwardCompatible: a vanilla task (no new fields) must keep +// going through Claude Code. This is the contract that protects every +// existing inbox/*.md file. +func TestDecide_BackwardCompatible(t *testing.T) { + d := Decide(Task{}) + if d.Provider != ProviderClaudeCode { + t.Fatalf("default task must route to Claude Code, got %v (%s)", d.Provider, d.Reason) + } +} + +func TestDecide_NeedsClaudeCodeWinsOverEverything(t *testing.T) { + d := Decide(Task{ + NeedsClaudeCode: true, + AllowDelegation: true, + PreferredAI: "gpu", + }) + if d.Provider != ProviderClaudeCode { + t.Errorf("needs_claude_code must take precedence, got %v (%s)", d.Provider, d.Reason) + } +} + +func TestDecide_ExplicitClaudeCode(t *testing.T) { + d := Decide(Task{PreferredAI: "claude-code", AllowDelegation: true}) + if d.Provider != ProviderClaudeCode { + t.Errorf("explicit claude-code must route locally, got %v", d.Provider) + } +} + +func TestDecide_AllowDelegationFalseBlocksDelegation(t *testing.T) { + d := Decide(Task{PreferredAI: "gpu", AllowDelegation: false}) + if d.Provider != ProviderClaudeCode { + t.Errorf("allow_delegation=false must override preferred_ai, got %v (%s)", + d.Provider, d.Reason) + } +} + +func TestDecide_DelegatedProviders(t *testing.T) { + cases := []struct { + pref string + want Provider + }{ + {"gpu", ProviderGPU}, + {"GPU", ProviderGPU}, + {"gemini", ProviderGemini}, + {"claude-api", ProviderClaudeAPI}, + } + for _, c := range cases { + d := Decide(Task{PreferredAI: c.pref, AllowDelegation: true}) + if d.Provider != c.want { + t.Errorf("preferred_ai=%q want %v, got %v (%s)", + c.pref, c.want, d.Provider, d.Reason) + } + if !d.Provider.IsDelegated() { + t.Errorf("preferred_ai=%q should be delegated", c.pref) + } + } +} + +func TestDecide_AutoMeansSecutoolsTriage(t *testing.T) { + for _, pref := range []string{"auto", ""} { + d := Decide(Task{PreferredAI: pref, AllowDelegation: true}) + if d.Provider != ProviderAuto { + t.Errorf("preferred_ai=%q want ProviderAuto, got %v", pref, d.Provider) + } + } +} + +func TestDecide_UnknownProviderFailsSafe(t *testing.T) { + d := Decide(Task{PreferredAI: "claude-3-mystery", AllowDelegation: true}) + if d.Provider != ProviderClaudeCode { + t.Errorf("unknown provider must fail-safe to Claude Code, got %v (%s)", + d.Provider, d.Reason) + } +} + +func TestProvider_IsDelegated(t *testing.T) { + cases := map[Provider]bool{ + ProviderClaudeCode: false, + ProviderAuto: true, + ProviderGPU: true, + ProviderGemini: true, + ProviderClaudeAPI: true, + } + for p, want := range cases { + if got := p.IsDelegated(); got != want { + t.Errorf("%v.IsDelegated() = %v, want %v", p, got, want) + } + } +} diff --git a/internal/secutools/client.go b/internal/secutools/client.go new file mode 100644 index 0000000..ae81da1 --- /dev/null +++ b/internal/secutools/client.go @@ -0,0 +1,290 @@ +// Package secutools provides a minimal HTTP client for the centralized SecuAAS +// AI-batch platform (https://api.secutools.secuaas.ovh). +// +// Phase 2 — Chantier E: the dispatcher delegates non-Claude-Code-eligible +// tasks to secutools (GPU/Gemini/Claude API) instead of dispatching them to +// a local ccl-auto tmux session. This package is the Go side of that +// delegation: SubmitJob, GetJob, WaitForResult. +// +// The Client interface is intentionally narrow so tests can plug a fake +// implementation without any network dependency. +package secutools + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "time" +) + +// Client is the abstraction the rest of the daemon uses to talk to secutools. +// Real callers use HTTPClient; tests substitute a mock. +type Client interface { + SubmitJob(ctx context.Context, req *JobRequest) (*JobResponse, error) + GetJob(ctx context.Context, id string) (*JobStatus, error) + WaitForResult(ctx context.Context, id string, timeout time.Duration) (*JobResult, error) +} + +// JobType mirrors the secutools job-type enum. +type JobType string + +const ( + TypeAnalyze JobType = "ai:analyze" + TypeBatch JobType = "ai:batch" + TypeReport JobType = "ai:report" + TypeCorrelate JobType = "ai:correlate" +) + +// Priority mirrors the secutools priority enum. +type Priority string + +const ( + PriorityCritical Priority = "critical" + PriorityHigh Priority = "high" + PriorityDefault Priority = "default" + PriorityLow Priority = "low" +) + +// JobRequest is the body of POST /api/v1/jobs. +type JobRequest struct { + Type JobType `json:"type"` + Priority Priority `json:"priority,omitempty"` + Prompt string `json:"prompt"` + Data map[string]any `json:"data,omitempty"` + MaxTokens int `json:"max_tokens,omitempty"` + PreferredAI string `json:"preferred_ai,omitempty"` + Source string `json:"source,omitempty"` +} + +// JobResponse is the immediate reply from POST /api/v1/jobs. +type JobResponse struct { + JobID string `json:"job_id"` + Status string `json:"status"` +} + +// JobStatus is the reply from GET /api/v1/jobs/:id. +type JobStatus struct { + JobID string `json:"job_id"` + Status string `json:"status"` // pending | running | completed | failed | cancelled + Provider string `json:"provider,omitempty"` + Error string `json:"error,omitempty"` +} + +// JobResult is the reply from GET /api/v1/jobs/:id/result. +type JobResult struct { + JobID string `json:"job_id"` + Response string `json:"response"` + Provider string `json:"provider"` + Model string `json:"model"` + CostCAD float64 `json:"cost_cad"` + Tokens int `json:"tokens,omitempty"` +} + +// HTTPClient is the production implementation of Client. +type HTTPClient struct { + baseURL string + apiKey string + hc *http.Client + maxRetries int + baseDelay time.Duration +} + +// NewHTTPClient returns an HTTPClient ready to talk to secutools. +// If hc is nil, a default http.Client with a 30s timeout is used. +// +// The client performs up to 3 retries on transport errors and 5xx +// responses, with exponential backoff starting at 500ms (500ms, 1s, 2s). +// 4xx responses are returned as errors without retrying. +func NewHTTPClient(baseURL, apiKey string, hc *http.Client) *HTTPClient { + if hc == nil { + hc = &http.Client{Timeout: 30 * time.Second} + } + return &HTTPClient{ + baseURL: baseURL, + apiKey: apiKey, + hc: hc, + maxRetries: 3, + baseDelay: 500 * time.Millisecond, + } +} + +// SetRetryPolicy overrides the default retry policy. Useful for tests. +func (c *HTTPClient) SetRetryPolicy(maxRetries int, baseDelay time.Duration) { + c.maxRetries = maxRetries + c.baseDelay = baseDelay +} + +// doWithRetry sends req and retries on transport errors or 5xx responses +// using exponential backoff. 4xx is returned without retry. Respects ctx. +func (c *HTTPClient) doWithRetry(ctx context.Context, build func() (*http.Request, error)) (*http.Response, error) { + var lastErr error + delay := c.baseDelay + for attempt := 0; attempt <= c.maxRetries; attempt++ { + if attempt > 0 { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(delay): + } + delay *= 2 + } + req, err := build() + if err != nil { + return nil, err + } + resp, err := c.hc.Do(req) + if err != nil { + lastErr = err + continue + } + // Retry 5xx; return success or 4xx immediately. + if resp.StatusCode >= 500 && resp.StatusCode <= 599 { + raw, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + lastErr = fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(raw)) + continue + } + return resp, nil + } + if lastErr == nil { + lastErr = errors.New("secutools: unknown transport failure") + } + return nil, fmt.Errorf("after %d attempts: %w", c.maxRetries+1, lastErr) +} + +// SubmitJob POSTs req to /api/v1/jobs with retry on 5xx. +func (c *HTTPClient) SubmitJob(ctx context.Context, req *JobRequest) (*JobResponse, error) { + body, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("marshal request: %w", err) + } + resp, err := c.doWithRetry(ctx, func() (*http.Request, error) { + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, + c.baseURL+"/api/v1/jobs", bytes.NewReader(body)) + if err != nil { + return nil, err + } + httpReq.Header.Set("Content-Type", "application/json") + httpReq.Header.Set("X-API-Key", c.apiKey) + return httpReq, nil + }) + if err != nil { + return nil, fmt.Errorf("submit job: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode/100 != 2 { + raw, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("submit job: HTTP %d: %s", resp.StatusCode, string(raw)) + } + + var out JobResponse + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("decode submit response: %w", err) + } + return &out, nil +} + +// GetJob GETs /api/v1/jobs/:id with retry on 5xx. +func (c *HTTPClient) GetJob(ctx context.Context, id string) (*JobStatus, error) { + resp, err := c.doWithRetry(ctx, func() (*http.Request, error) { + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, + c.baseURL+"/api/v1/jobs/"+id, nil) + if err != nil { + return nil, err + } + httpReq.Header.Set("X-API-Key", c.apiKey) + return httpReq, nil + }) + if err != nil { + return nil, fmt.Errorf("get job: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode/100 != 2 { + raw, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("get job: HTTP %d: %s", resp.StatusCode, string(raw)) + } + + var out JobStatus + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("decode get response: %w", err) + } + return &out, nil +} + +// getResult fetches the final payload of a completed job with retry on 5xx. +func (c *HTTPClient) getResult(ctx context.Context, id string) (*JobResult, error) { + resp, err := c.doWithRetry(ctx, func() (*http.Request, error) { + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, + c.baseURL+"/api/v1/jobs/"+id+"/result", nil) + if err != nil { + return nil, err + } + httpReq.Header.Set("X-API-Key", c.apiKey) + return httpReq, nil + }) + if err != nil { + return nil, fmt.Errorf("get result: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode/100 != 2 { + raw, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("get result: HTTP %d: %s", resp.StatusCode, string(raw)) + } + + var out JobResult + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("decode result: %w", err) + } + return &out, nil +} + +// ErrJobFailed is returned by WaitForResult when secutools reports the job +// as terminally failed (no result will ever be produced). +var ErrJobFailed = errors.New("secutools: job failed") + +// ErrTimeout is returned by WaitForResult when the polling deadline elapses +// before the job reaches a terminal state. +var ErrTimeout = errors.New("secutools: wait timeout") + +// WaitForResult polls /api/v1/jobs/:id every 2s until the job reaches a +// terminal state (completed/failed/cancelled) or timeout elapses. +// On completed, fetches and returns the result. +// +// Polling cadence is intentionally fixed (not configurable) to keep the +// reaper goroutine simple. If callers need a different cadence they can +// implement it themselves on top of GetJob/getResult. +func (c *HTTPClient) WaitForResult(ctx context.Context, id string, timeout time.Duration) (*JobResult, error) { + deadline := time.Now().Add(timeout) + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for { + st, err := c.GetJob(ctx, id) + if err != nil { + return nil, err + } + switch st.Status { + case "completed": + return c.getResult(ctx, id) + case "failed", "cancelled": + return nil, fmt.Errorf("%w: status=%s err=%s", ErrJobFailed, st.Status, st.Error) + } + + if time.Now().After(deadline) { + return nil, ErrTimeout + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ticker.C: + } + } +} diff --git a/internal/secutools/client_test.go b/internal/secutools/client_test.go new file mode 100644 index 0000000..e20095e --- /dev/null +++ b/internal/secutools/client_test.go @@ -0,0 +1,190 @@ +package secutools + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" +) + +// TestSubmitJob_HappyPath verifies the request body and headers match the +// secutools contract and the response is decoded. +func TestSubmitJob_HappyPath(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/v1/jobs" { + t.Errorf("unexpected path %q", r.URL.Path) + } + if r.Method != http.MethodPost { + t.Errorf("unexpected method %q", r.Method) + } + if r.Header.Get("X-API-Key") != "key123" { + t.Errorf("missing/incorrect X-API-Key: %q", r.Header.Get("X-API-Key")) + } + var got JobRequest + if err := json.NewDecoder(r.Body).Decode(&got); err != nil { + t.Fatalf("decode: %v", err) + } + if got.Type != TypeAnalyze || got.PreferredAI != "gpu" { + t.Errorf("payload mismatch: %+v", got) + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"job_id":"abc","status":"pending"}`)) + })) + defer srv.Close() + + c := NewHTTPClient(srv.URL, "key123", srv.Client()) + resp, err := c.SubmitJob(context.Background(), &JobRequest{ + Type: TypeAnalyze, + Priority: PriorityHigh, + Prompt: "hi", + PreferredAI: "gpu", + }) + if err != nil { + t.Fatalf("SubmitJob: %v", err) + } + if resp.JobID != "abc" || resp.Status != "pending" { + t.Errorf("unexpected response: %+v", resp) + } +} + +// TestSubmitJob_HTTPError surfaces non-2xx responses as errors. +func TestSubmitJob_HTTPError(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("boom")) + })) + defer srv.Close() + + c := NewHTTPClient(srv.URL, "k", srv.Client()) + if _, err := c.SubmitJob(context.Background(), &JobRequest{Type: TypeAnalyze, Prompt: "p"}); err == nil { + t.Fatal("expected error on HTTP 500, got nil") + } +} + +// TestWaitForResult_PollsUntilCompleted verifies the polling loop transitions +// pending → running → completed and fetches the result. +func TestWaitForResult_PollsUntilCompleted(t *testing.T) { + var calls atomic.Int64 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + switch r.URL.Path { + case "/api/v1/jobs/job1": + n := calls.Add(1) + status := "pending" + if n >= 2 { + status = "completed" + } + _, _ = w.Write([]byte(`{"job_id":"job1","status":"` + status + `","provider":"gpu"}`)) + case "/api/v1/jobs/job1/result": + _, _ = w.Write([]byte(`{"job_id":"job1","response":"done","provider":"gpu","cost_cad":0.005}`)) + default: + t.Errorf("unexpected path %q", r.URL.Path) + } + })) + defer srv.Close() + + c := NewHTTPClient(srv.URL, "k", srv.Client()) + // Override poll cadence indirectly: short timeout proves we don't spin + // 2s per poll; the test runs in well under 10s real time. + res, err := c.WaitForResult(context.Background(), "job1", 30*time.Second) + if err != nil { + t.Fatalf("WaitForResult: %v", err) + } + if res.Response != "done" || res.Provider != "gpu" { + t.Errorf("unexpected result: %+v", res) + } +} + +// TestWaitForResult_FailedJob returns ErrJobFailed when secutools reports +// terminal failure. +func TestWaitForResult_FailedJob(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"job_id":"jobX","status":"failed","error":"oom"}`)) + })) + defer srv.Close() + + c := NewHTTPClient(srv.URL, "k", srv.Client()) + _, err := c.WaitForResult(context.Background(), "jobX", 5*time.Second) + if !errors.Is(err, ErrJobFailed) { + t.Errorf("expected ErrJobFailed, got %v", err) + } +} + +// TestSubmitJob_RetriesOn5xx verifies the client retries transient 500s +// and succeeds on a later attempt. Uses a tight retry delay so the test +// runs in milliseconds. +func TestSubmitJob_RetriesOn5xx(t *testing.T) { + var calls atomic.Int64 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := calls.Add(1) + if n < 3 { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("transient")) + return + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"job_id":"ok","status":"pending"}`)) + })) + defer srv.Close() + + c := NewHTTPClient(srv.URL, "k", srv.Client()) + c.SetRetryPolicy(3, 1*time.Millisecond) + + resp, err := c.SubmitJob(context.Background(), &JobRequest{Type: TypeAnalyze, Prompt: "p"}) + if err != nil { + t.Fatalf("expected success after retries, got %v (calls=%d)", err, calls.Load()) + } + if resp.JobID != "ok" { + t.Errorf("unexpected response: %+v", resp) + } + if calls.Load() != 3 { + t.Errorf("expected 3 attempts, got %d", calls.Load()) + } +} + +// TestSubmitJob_DoesNotRetry4xx ensures client errors short-circuit +// without burning retries (e.g. wrong API key). +func TestSubmitJob_DoesNotRetry4xx(t *testing.T) { + var calls atomic.Int64 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + calls.Add(1) + w.WriteHeader(http.StatusUnauthorized) + _, _ = w.Write([]byte("bad key")) + })) + defer srv.Close() + + c := NewHTTPClient(srv.URL, "k", srv.Client()) + c.SetRetryPolicy(3, 1*time.Millisecond) + + _, err := c.SubmitJob(context.Background(), &JobRequest{Type: TypeAnalyze, Prompt: "p"}) + if err == nil { + t.Fatal("expected error on 401") + } + if calls.Load() != 1 { + t.Errorf("4xx must not retry, got %d calls", calls.Load()) + } +} + +// TestWaitForResult_ContextCancel exits cleanly when the parent context is +// cancelled mid-poll. +func TestWaitForResult_ContextCancel(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"job_id":"j","status":"pending"}`)) + })) + defer srv.Close() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + c := NewHTTPClient(srv.URL, "k", srv.Client()) + _, err := c.WaitForResult(ctx, "j", 10*time.Second) + if err == nil { + t.Fatal("expected error from cancelled context") + } +} diff --git a/internal/secutools/routing.go b/internal/secutools/routing.go new file mode 100644 index 0000000..67789bf --- /dev/null +++ b/internal/secutools/routing.go @@ -0,0 +1,86 @@ +package secutools + +import ( + "fmt" + "strings" +) + +// DecideProvider inspects a frontmatter map (as decoded from YAML) and +// returns the provider string that should handle the task. Valid return +// values: +// +// - "local" — no delegation, dispatch on a Claude Code session (Phase 1) +// - "claude-code" — explicit local dispatch (alias of "local") +// - "gpu" — delegate to secutools with preferred_ai=gpu +// - "gemini" — delegate to secutools with preferred_ai=gemini +// - "claude-api" — delegate to secutools with preferred_ai=claude-api +// - "auto" — delegate to secutools, let smart_triage choose +// +// Precedence: +// 1. needs_claude_code: true → "local" +// 2. preferred_ai in {claude-code} → "local" +// 3. allow_delegation == false / missing → "local" +// 4. preferred_ai in {gpu,gemini,claude-api} → that provider +// 5. preferred_ai in {"", auto} → "auto" +// 6. unknown preferred_ai → "local" (fail-safe) +// +// This function is intentionally permissive on input types: YAML booleans +// may decode as bool, strings as string. It coerces common forms and +// returns "local" on malformed input rather than panicking. +func DecideProvider(fm map[string]any) string { + if fm == nil { + return "local" + } + + if asBool(fm["needs_claude_code"]) { + return "local" + } + + pref := strings.ToLower(strings.TrimSpace(asString(fm["preferred_ai"]))) + if pref == "claude-code" || pref == "local" { + return "local" + } + + if !asBool(fm["allow_delegation"]) { + return "local" + } + + switch pref { + case "gpu", "gemini", "claude-api": + return pref + case "", "auto": + return "auto" + default: + // Unknown provider — fail safe to local. + return "local" + } +} + +// asBool accepts bool, "true"/"false", "1"/"0". Defaults to false. +func asBool(v any) bool { + switch t := v.(type) { + case bool: + return t + case string: + s := strings.ToLower(strings.TrimSpace(t)) + return s == "true" || s == "1" || s == "yes" + case int: + return t != 0 + default: + return false + } +} + +// asString coerces v to a trimmed string. Returns "" for nil/unknown types. +func asString(v any) string { + switch t := v.(type) { + case string: + return t + case fmt.Stringer: + return t.String() + case nil: + return "" + default: + return fmt.Sprintf("%v", t) + } +} diff --git a/internal/secutools/routing_test.go b/internal/secutools/routing_test.go new file mode 100644 index 0000000..5cad324 --- /dev/null +++ b/internal/secutools/routing_test.go @@ -0,0 +1,72 @@ +package secutools + +import "testing" + +// TestDecideProvider_TableDriven exercises the full decision matrix for +// the map-input adapter used by the bash-side ccl-delegate CLI. The +// richer Task-struct variant lives in internal/router. +func TestDecideProvider_TableDriven(t *testing.T) { + cases := []struct { + name string + fm map[string]any + want string + }{ + {"nil map falls back to local", nil, "local"}, + {"empty map → local (allow_delegation default false)", map[string]any{}, "local"}, + {"needs_claude_code wins", map[string]any{ + "needs_claude_code": true, + "allow_delegation": true, + "preferred_ai": "gpu", + }, "local"}, + {"explicit claude-code stays local", map[string]any{ + "preferred_ai": "claude-code", + "allow_delegation": true, + }, "local"}, + {"allow_delegation=false blocks even with preferred_ai=gpu", map[string]any{ + "preferred_ai": "gpu", + "allow_delegation": false, + }, "local"}, + {"gpu", map[string]any{ + "preferred_ai": "gpu", + "allow_delegation": true, + }, "gpu"}, + {"GPU (case-insensitive)", map[string]any{ + "preferred_ai": "GPU", + "allow_delegation": true, + }, "gpu"}, + {"gemini", map[string]any{ + "preferred_ai": "gemini", + "allow_delegation": true, + }, "gemini"}, + {"claude-api", map[string]any{ + "preferred_ai": "claude-api", + "allow_delegation": true, + }, "claude-api"}, + {"auto", map[string]any{ + "preferred_ai": "auto", + "allow_delegation": true, + }, "auto"}, + {"empty preferred_ai + allow_delegation → auto", map[string]any{ + "allow_delegation": true, + }, "auto"}, + {"unknown provider → fail-safe local", map[string]any{ + "preferred_ai": "claude-3-mystery", + "allow_delegation": true, + }, "local"}, + {"allow_delegation as string 'true'", map[string]any{ + "preferred_ai": "gpu", + "allow_delegation": "true", + }, "gpu"}, + {"allow_delegation as int 1", map[string]any{ + "preferred_ai": "gemini", + "allow_delegation": 1, + }, "gemini"}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + if got := DecideProvider(c.fm); got != c.want { + t.Errorf("DecideProvider(%v) = %q, want %q", c.fm, got, c.want) + } + }) + } +} diff --git a/tests/phase2-E-integration.sh b/tests/phase2-E-integration.sh new file mode 100755 index 0000000..c4ebe68 --- /dev/null +++ b/tests/phase2-E-integration.sh @@ -0,0 +1,214 @@ +#!/bin/bash +# phase2-E-integration.sh — Phase 2 Chantier E integration test +# +# Exercises the bash side of multi-provider delegation end-to-end, +# without hitting the real secutools API. Flow: +# +# 1. Start a local mock HTTP server that implements /api/v1/jobs{,/:id,/:id/result} +# 2. Build ccl-delegate + ccl-delegate decide against a test task +# 3. Call delegate-to-secutools.sh on the task +# 4. Assert: status.json = delegated_to_secutools, .delegated marker +# with job_id, original .md still in inbox +# 5. Drive the mock into 'completed' state +# 6. Call poll-delegated-jobs.sh +# 7. Assert: done/.md exists with result body + provider/cost +# footer, original .md + .delegated marker cleaned up +# +# Exit 0 on success, non-zero on assertion failure. + +set -euo pipefail + +REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)" +SCRIPTS_ROOT="${DEVMGMT_ORCH_DIR:-/home/ubuntu/projects/dev-management/agent-orchestrator}" +TMP_ROOT=$(mktemp -d) +MOCK_PORT=${MOCK_PORT:-18742} +MOCK_STATE="$TMP_ROOT/mock-state" +MOCK_LOG="$TMP_ROOT/mock.log" +MOCK_PID_FILE="$TMP_ROOT/mock.pid" +BIN_DIR="$TMP_ROOT/bin" +PROJECTS_BASE="$TMP_ROOT/projects" +BUDGETS_FILE="$TMP_ROOT/delegation-budgets.jsonl" + +mkdir -p "$MOCK_STATE" "$BIN_DIR" "$PROJECTS_BASE/test-proj/.agent-queue/inbox" + +cleanup() { + if [[ -f "$MOCK_PID_FILE" ]]; then + kill "$(cat "$MOCK_PID_FILE")" 2>/dev/null || true + fi + rm -rf "$TMP_ROOT" +} +trap cleanup EXIT + +echo "==> building ccl-delegate" +GOMODCACHE=/home/ubuntu/go/pkg/mod GOCACHE=/home/ubuntu/.cache/go-build HOME=/home/ubuntu GOSUMDB=off GOTOOLCHAIN=local \ + /usr/local/go/bin/go build -C "$REPO_ROOT" -o "$BIN_DIR/ccl-delegate" ./cmd/ccl-delegate +export PATH="$BIN_DIR:$PATH" +export CCL_DELEGATE_BIN="$BIN_DIR/ccl-delegate" + +# ── Mock secutools server (Python) ──────────────────────────────────── +cat > "$TMP_ROOT/mock-server.py" <<'PY' +import http.server, json, os, sys, threading +STATE_DIR = os.environ["MOCK_STATE"] +os.makedirs(STATE_DIR, exist_ok=True) +JOBS = {} + +def _respond(handler, code, body): + data = json.dumps(body).encode() + handler.send_response(code) + handler.send_header("Content-Type", "application/json") + handler.send_header("Content-Length", str(len(data))) + handler.end_headers() + handler.wfile.write(data) + +class H(http.server.BaseHTTPRequestHandler): + def log_message(self, fmt, *a): pass + def do_POST(self): + length = int(self.headers.get("Content-Length", "0")) + body = json.loads(self.rfile.read(length).decode() or "{}") + if self.path == "/api/v1/jobs": + jid = f"job-{len(JOBS)+1}" + JOBS[jid] = {"status": "pending", "request": body} + # Persist request for the test to drive completion. + with open(os.path.join(STATE_DIR, f"{jid}.json"), "w") as f: + json.dump({"status":"pending"}, f) + return _respond(self, 200, {"job_id": jid, "status": "pending"}) + _respond(self, 404, {"error": "not found"}) + def do_GET(self): + parts = self.path.strip("/").split("/") + # /api/v1/jobs/:id or /api/v1/jobs/:id/result + if len(parts) >= 4 and parts[:3] == ["api","v1","jobs"]: + jid = parts[3] + # read status from state file (test can overwrite) + fp = os.path.join(STATE_DIR, f"{jid}.json") + if not os.path.exists(fp): + return _respond(self, 404, {"error":"unknown job"}) + with open(fp) as f: state = json.load(f) + if len(parts) == 4: + # job status + return _respond(self, 200, { + "job_id": jid, + "status": state.get("status","pending"), + "provider": state.get("provider","")}) + if len(parts) == 5 and parts[4] == "result": + if state.get("status") != "completed": + return _respond(self, 409, {"error":"not completed"}) + return _respond(self, 200, { + "job_id": jid, + "response": state.get("response","(no response)"), + "provider": state.get("provider","gpu"), + "model": state.get("model","qwen-coder"), + "cost_cad": state.get("cost_cad", 0.0123), + "tokens": state.get("tokens", 1200), + }) + _respond(self, 404, {"error":"not found"}) + +port = int(sys.argv[1]) +srv = http.server.ThreadingHTTPServer(("127.0.0.1", port), H) +print(f"MOCK: listening on {port}", flush=True) +srv.serve_forever() +PY + +echo "==> starting mock secutools on 127.0.0.1:$MOCK_PORT" +MOCK_STATE="$MOCK_STATE" python3 "$TMP_ROOT/mock-server.py" "$MOCK_PORT" > "$MOCK_LOG" 2>&1 & +echo $! > "$MOCK_PID_FILE" +sleep 0.5 +# Wait up to 3s for the port to be ready +for _ in 1 2 3 4 5 6; do + if curl -sf "http://127.0.0.1:$MOCK_PORT/api/v1/jobs/nope" >/dev/null 2>&1; then break; fi + sleep 0.5 +done + +# ── Fixture task ────────────────────────────────────────────────────── +TASK_PATH="$PROJECTS_BASE/test-proj/.agent-queue/inbox/TASK-phase2E.md" +cat > "$TASK_PATH" <<'EOF' +--- +title: Phase 2 E integration +priority: default +preferred_ai: gpu +allow_delegation: true +complexity_hint: low +--- +Summarize this paragraph in one sentence. +EOF + +# ── Prepare environment for scripts ─────────────────────────────────── +export CCL_SECUTOOLS_API_KEY="test-key" +export CCL_SECUTOOLS_MOCK_URL="http://127.0.0.1:$MOCK_PORT" +export CCL_DELEGATION_BUDGETS_FILE="$BUDGETS_FILE" +export CCL_DELEGATION_BUDGET_CAD_DAILY="10.00" # generous for test + +# ── Sanity: `ccl-delegate decide` ───────────────────────────────────── +echo "==> ccl-delegate decide" +dec=$("$BIN_DIR/ccl-delegate" decide --frontmatter="$TASK_PATH") +[[ "$dec" == "gpu" ]] || { echo "FAIL: decide expected 'gpu', got '$dec'"; exit 1; } +echo " decide=$dec OK" + +# ── Step 1: delegate ────────────────────────────────────────────────── +echo "==> delegate-to-secutools.sh" +bash "$SCRIPTS_ROOT/delegate-to-secutools.sh" "test-proj" "$TASK_PATH" + +marker="$TASK_PATH.delegated" +[[ -f "$marker" ]] || { echo "FAIL: marker missing: $marker"; exit 1; } +job_id=$(awk -F'"' '/"job_id"/ { print $4; exit }' "$marker") +[[ -n "$job_id" ]] || { echo "FAIL: no job_id in marker"; exit 1; } +echo " marker OK (job_id=$job_id)" + +# status.json +status_json="$PROJECTS_BASE/test-proj/.agent-queue/status.json" +[[ -f "$status_json" ]] || { echo "FAIL: status.json missing"; exit 1; } +state=$(awk -F'"' '/"state"/ { print $4; exit }' "$status_json") +[[ "$state" == "delegated_to_secutools" ]] || { echo "FAIL: state=$state"; exit 1; } +echo " status.json OK (state=delegated_to_secutools)" + +# Original .md must still be in inbox (reaper hasn't processed it yet) +[[ -f "$TASK_PATH" ]] || { echo "FAIL: original .md removed prematurely"; exit 1; } + +# ── Step 2: simulate completion in mock and reap ───────────────────── +echo "==> driving mock into 'completed' state for $job_id" +cat > "$MOCK_STATE/${job_id}.json" < poll-delegated-jobs.sh" +bash "$SCRIPTS_ROOT/poll-delegated-jobs.sh" --projects-base "$PROJECTS_BASE" + +done_file="$PROJECTS_BASE/test-proj/.agent-queue/done/TASK-phase2E.md" +[[ -f "$done_file" ]] || { echo "FAIL: done/ file missing"; exit 1; } +grep -q "mocked summary" "$done_file" || { echo "FAIL: response not in done body:"; cat "$done_file"; exit 1; } +grep -q "provider: gpu" "$done_file" || { echo "FAIL: provider footer missing:"; cat "$done_file"; exit 1; } +grep -q "cost_cad: 0.0034" "$done_file" || { echo "FAIL: cost_cad footer missing:"; cat "$done_file"; exit 1; } +echo " done/ OK (has response + provider + cost_cad)" + +# Marker + original .md cleaned up +[[ -f "$marker" ]] && { echo "FAIL: marker not cleaned"; exit 1; } +[[ -f "$TASK_PATH" ]] && { echo "FAIL: original .md not cleaned"; exit 1; } +echo " inbox cleanup OK" + +# Budget tracker recorded the spend +if [[ -f "$BUDGETS_FILE" ]]; then + grep -q "0.0034" "$BUDGETS_FILE" || echo "WARN: spend not found in $BUDGETS_FILE" + grep -q "test-proj" "$BUDGETS_FILE" || echo "WARN: project not found in $BUDGETS_FILE" + echo " budget tracker OK" +else + echo "WARN: no budgets file (maybe unwritable)" +fi + +# ── Step 3: assert legacy task (no allow_delegation) exits 2 ───────── +echo "==> negative test: legacy task → exit 2" +LEGACY="$PROJECTS_BASE/test-proj/.agent-queue/inbox/TASK-legacy.md" +cat > "$LEGACY" <<'EOF' +--- +title: Legacy +priority: default +--- +Do classic work. +EOF +set +e +bash "$SCRIPTS_ROOT/delegate-to-secutools.sh" "test-proj" "$LEGACY" >/dev/null 2>&1 +rc=$? +set -e +[[ $rc -eq 2 ]] || { echo "FAIL: legacy task expected rc=2, got $rc"; exit 1; } +echo " legacy task rc=2 OK" + +echo "" +echo "=== Phase 2 Chantier E integration: ALL ASSERTIONS PASSED ==="