Compare commits
1 commit
feat/phase
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
336f1f27bb |
17 changed files with 24 additions and 2810 deletions
91
VERSION.md
91
VERSION.md
|
|
@ -1,92 +1,15 @@
|
||||||
# Version actuelle : 0.4.0
|
# Version actuelle : 0.3.9
|
||||||
|
|
||||||
## [0.4.0] - 2026-04-17
|
## [0.3.9] - 2026-04-16
|
||||||
**Type:** Minor — Phase 2 chantier E : multi-provider routing (delegation to secutools)
|
**Type:** Patch — `go mod tidy` (fsnotify direct dep cleanup)
|
||||||
|
|
||||||
### Ajouté
|
### Modifié
|
||||||
- `internal/secutools` — HTTP client (SubmitJob, GetJob, WaitForResult)
|
- `go.mod` : `fsnotify` promu en dépendance directe, `yaml.v3` regroupé, `golang.org/x/sys` isolé en indirect. Aucun changement fonctionnel.
|
||||||
pour la plateforme centralisée IA SecuAAS. Interface `Client` mockable
|
|
||||||
pour les tests, implémentation `HTTPClient` avec polling 2s + timeout
|
|
||||||
+ propagation `context.Context`.
|
|
||||||
- `internal/router` — Décide ProviderClaudeCode (Phase 1) vs ProviderGPU /
|
|
||||||
Gemini / ClaudeAPI / Auto (delegation secutools) à partir des nouveaux
|
|
||||||
champs frontmatter `preferred_ai`, `allow_delegation`, `complexity_hint`.
|
|
||||||
Précédence stricte : `needs_claude_code` > `preferred_ai=claude-code` >
|
|
||||||
`allow_delegation=false` (default) > `preferred_ai=...` > fail-safe Claude
|
|
||||||
Code sur valeur inconnue.
|
|
||||||
- `internal/delegation` — Manager qui submit les tâches non-Claude vers
|
|
||||||
secutools, écrit un marker `inbox/<id>.md.delegated` (rebuild-on-restart),
|
|
||||||
et fait tourner un reaper périodique qui finalise les jobs vers `done/`
|
|
||||||
(succès) ou `failed/` (échec). Compteurs atomiques (Active /
|
|
||||||
CompletedTotal / FailedTotal) exposés via Snapshot().
|
|
||||||
- Dispatcher: méthode `WithDelegation()` opt-in, branchement router avant
|
|
||||||
`findFreeSession()`, fallback automatique vers Claude Code si le submit
|
|
||||||
secutools échoue. Skip des `*.md.delegated` dans `assignNextTask`.
|
|
||||||
- Frontmatter: `TaskFrontmatter` étendu avec `PreferredAI`, `AllowDelegation`,
|
|
||||||
`ComplexityHint`. Tous optionnels — un .md sans ces champs garde
|
|
||||||
exactement le comportement Phase 1.
|
|
||||||
- API HTTP : nouveau `GET /api/delegated/status` (jobs en cours +
|
|
||||||
compteurs), `GET /watchdog/status` qui inclut les compteurs delegation.
|
|
||||||
`WithDelegation()` opt-in — endpoints renvoient 404 si désactivés.
|
|
||||||
- main.go : Manager initialisé seulement si `SECUTOOLS_API_KEY` est set
|
|
||||||
(zéro changement pour les déploiements existants). `LoadFromDisk()`
|
|
||||||
réhydrate les markers en attente après un restart.
|
|
||||||
|
|
||||||
### Tests ajoutés (29 nouveaux cas)
|
|
||||||
- `internal/router` (8 tests) : matrice de décision complète, contrats
|
|
||||||
IsDelegated, fail-safe sur provider inconnu.
|
|
||||||
- `internal/secutools` (5 tests) : httptest.Server qui mock le contrat,
|
|
||||||
HappyPath, HTTPError, polling jusqu'à completed, ErrJobFailed, cancel
|
|
||||||
via context.
|
|
||||||
- `internal/delegation` (7 tests) : Submit + marker, rejet provider non
|
|
||||||
délégué, reap success/fail, LoadFromDisk, Active(), end-to-end.
|
|
||||||
- `internal/api` (4 tests) : /health, /api/delegated/status désactivé/
|
|
||||||
activé, /watchdog/status inclut bien les counters.
|
|
||||||
- `internal/dispatcher` (5 nouveaux tests + 8 existants intacts) :
|
|
||||||
routeTask parse les nouveaux champs, dispatchProject délègue le GPU,
|
|
||||||
dispatchProject garde Claude Code en backward-compat, needs_claude_code
|
|
||||||
bypass, end-to-end inbox→submit→reap→done/.
|
|
||||||
|
|
||||||
### Tests effectués
|
### Tests effectués
|
||||||
- `go build ./...` : ok
|
- ✅ Build OK (aucune modification de code)
|
||||||
- `go vet ./...` : clean
|
|
||||||
- `go test -race ./...` : tous les packages passent (~10s)
|
|
||||||
- `go mod tidy` : aucun changement nécessaire
|
|
||||||
|
|
||||||
### Ajouté — complément 2026-04-17 (bash wiring)
|
---
|
||||||
- `cmd/ccl-delegate` — CLI Go tiny wrapper autour d'`internal/secutools`
|
|
||||||
utilisable depuis bash. Sous-commandes `submit`, `get`, `result`,
|
|
||||||
`decide`. Env `CCL_SECUTOOLS_API_KEY` (preferred) ou `SECUTOOLS_API_KEY`.
|
|
||||||
`CCL_SECUTOOLS_URL` ou `CCL_SECUTOOLS_MOCK_URL` pour tests.
|
|
||||||
- `internal/secutools/routing.go` — `DecideProvider(map[string]any) string`
|
|
||||||
(adapter signature requis par le spec pour usage depuis CLI). Map-input
|
|
||||||
permissif (bool|string|int coerced). Fail-safe retourne `"local"`.
|
|
||||||
Table-driven tests couvrent 14 cas incluant coercions.
|
|
||||||
- `internal/secutools/client.go` — retries exponential backoff (max 3
|
|
||||||
retries, 500ms base, x2 chaque attempt). Retry sur 5xx + transport
|
|
||||||
errors, PAS sur 4xx. `SetRetryPolicy(maxRetries, baseDelay)` exposé
|
|
||||||
pour tests. Tests : `TestSubmitJob_RetriesOn5xx`,
|
|
||||||
`TestSubmitJob_DoesNotRetry4xx`.
|
|
||||||
- `tests/phase2-E-integration.sh` — test bout-en-bout bash côté
|
|
||||||
dev-management avec mock HTTP secutools (Python stdlib).
|
|
||||||
Scénarios : decide, delegate (marker + status.json +
|
|
||||||
state=delegated_to_secutools), drive mock → completed, poll-reaper,
|
|
||||||
assertions done/ body + footer, cleanup inbox, budget tracker,
|
|
||||||
rejet tâche legacy (rc=2).
|
|
||||||
|
|
||||||
### Notes / décisions
|
|
||||||
- Pas d'appel réseau réel à secutools dans les tests — fakeClient +
|
|
||||||
httptest côté Go, serveur mock Python stdlib côté integration.
|
|
||||||
- Le client secutools est branché sur `SECUTOOLS_API_KEY` env var pour
|
|
||||||
garder un path par défaut "Phase 1 only".
|
|
||||||
- Smoke test prod restera nécessaire (test E2E avec `preferred_ai: gpu`
|
|
||||||
réel) — pas effectué dans ce chantier comme demandé (pas de push).
|
|
||||||
- Décision : `DecideProvider(map[string]any)` vit dans `secutools`
|
|
||||||
(adapter CLI, signature du spec) ET `router.Decide(Task)` vit dans
|
|
||||||
`router` (struct-typed, utilisé par le dispatcher Go). Les deux partagent
|
|
||||||
la même matrice de précédence — pas de duplication de logique car le
|
|
||||||
router consomme `TaskFrontmatter` typé via yaml.Unmarshal, alors que
|
|
||||||
la CLI parse manuellement pour rester zero-dep.
|
|
||||||
|
|
||||||
## [0.3.8] - 2026-04-16
|
## [0.3.8] - 2026-04-16
|
||||||
**Type:** Patch — Bug #1 (A3 flip+ensure inconsistency) + Bug #10 (requiredShared contract test)
|
**Type:** Patch — Bug #1 (A3 flip+ensure inconsistency) + Bug #10 (requiredShared contract test)
|
||||||
|
|
|
||||||
|
|
@ -1,30 +1,12 @@
|
||||||
# Travaux en Cours - claude-failover
|
# Travaux en Cours - claude-failover
|
||||||
|
|
||||||
## Dernière mise à jour
|
## Dernière mise à jour
|
||||||
2026-04-17 02:20:00
|
2026-04-16 19:00:00
|
||||||
|
|
||||||
## Version Actuelle
|
## Version Actuelle
|
||||||
0.4.0 — Phase 2 chantier E livré (multi-provider routing + bash wiring).
|
0.3.5 (en cours de progression vers 0.4.0)
|
||||||
|
|
||||||
## Demande en cours
|
## Demande Actuelle
|
||||||
Aucune demande active. Branche `feat/phase2-E-multi-provider-routing`
|
|
||||||
prête à push (en attente de validation utilisateur).
|
|
||||||
|
|
||||||
## Phase 2 / Chantier E — Statut
|
|
||||||
- [x] router (decide tree + 8 tests)
|
|
||||||
- [x] secutools client (HTTP + 5 tests + 2 retry tests)
|
|
||||||
- [x] secutools/routing.go DecideProvider(map) + 14 table tests
|
|
||||||
- [x] delegation manager (submit + reaper + LoadFromDisk + 7 tests)
|
|
||||||
- [x] dispatcher integration (routeTask + dispatchProject branch + 5 tests)
|
|
||||||
- [x] API endpoints `/api/delegated/status` + `/watchdog/status` enrichi
|
|
||||||
- [x] main.go opt-in via `SECUTOOLS_API_KEY`
|
|
||||||
- [x] `cmd/ccl-delegate` CLI Go pour bash wrapper
|
|
||||||
- [x] tests/phase2-E-integration.sh bout-en-bout avec mock Python
|
|
||||||
- [x] tests `-race` clean
|
|
||||||
- [ ] smoke test E2E avec vraie API secutools (à faire après push)
|
|
||||||
- [ ] dashboard MCP `Secuaas:orchestrator` consommer `/api/delegated/status`
|
|
||||||
|
|
||||||
## Demande Précédente
|
|
||||||
**Phase 1 / Chantier A — Failover robuste** (spec dans `ccl-platform/phases/phase1/A-failover.md`).
|
**Phase 1 / Chantier A — Failover robuste** (spec dans `ccl-platform/phases/phase1/A-failover.md`).
|
||||||
Rendre le failover compte1 ↔ compte2 déterministe en intégrant dans le code les fixes manuels
|
Rendre le failover compte1 ↔ compte2 déterministe en intégrant dans le code les fixes manuels
|
||||||
(symlinks partagés), en ajoutant un registre UUID fiable, et en durcissant tmux send-keys.
|
(symlinks partagés), en ajoutant un registre UUID fiable, et en durcissant tmux send-keys.
|
||||||
|
|
|
||||||
|
|
@ -1,267 +0,0 @@
|
||||||
// ccl-delegate is a thin CLI wrapper around internal/secutools. It exists
|
|
||||||
// because the dispatcher that scans agent-queue inboxes is written in
|
|
||||||
// bash (dev-management/agent-orchestrator/dispatcher.sh). Rather than
|
|
||||||
// duplicate the HTTP contract in bash + curl + jq, we shell out to this
|
|
||||||
// binary for every delegation action.
|
|
||||||
//
|
|
||||||
// Subcommands:
|
|
||||||
//
|
|
||||||
// ccl-delegate submit --prompt=... [--preferred-ai=gpu] [--priority=default]
|
|
||||||
// Prints JSON to stdout: {"job_id": "...", "status": "pending"}.
|
|
||||||
//
|
|
||||||
// ccl-delegate get --job-id=xyz
|
|
||||||
// Prints JSON to stdout: {"job_id":"...","status":"...","provider":"..."}
|
|
||||||
//
|
|
||||||
// ccl-delegate result --job-id=xyz [--timeout=5m]
|
|
||||||
// Waits for completion (timeout-bounded) and prints result JSON.
|
|
||||||
//
|
|
||||||
// Auth: reads CCL_SECUTOOLS_API_KEY first, then SECUTOOLS_API_KEY. URL
|
|
||||||
// override via CCL_SECUTOOLS_URL (default https://api.secutools.secuaas.ovh),
|
|
||||||
// or CCL_SECUTOOLS_MOCK_URL for tests.
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/secutools"
|
|
||||||
)
|
|
||||||
|
|
||||||
const defaultURL = "https://api.secutools.secuaas.ovh"
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
if len(os.Args) < 2 {
|
|
||||||
usage()
|
|
||||||
os.Exit(2)
|
|
||||||
}
|
|
||||||
sub := os.Args[1]
|
|
||||||
args := os.Args[2:]
|
|
||||||
|
|
||||||
switch sub {
|
|
||||||
case "submit":
|
|
||||||
runSubmit(args)
|
|
||||||
case "get":
|
|
||||||
runGet(args)
|
|
||||||
case "result":
|
|
||||||
runResult(args)
|
|
||||||
case "decide":
|
|
||||||
runDecide(args)
|
|
||||||
case "-h", "--help", "help":
|
|
||||||
usage()
|
|
||||||
os.Exit(0)
|
|
||||||
default:
|
|
||||||
fmt.Fprintf(os.Stderr, "ccl-delegate: unknown subcommand %q\n", sub)
|
|
||||||
usage()
|
|
||||||
os.Exit(2)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func usage() {
|
|
||||||
fmt.Fprint(os.Stderr, `ccl-delegate — submit/poll secutools jobs from bash
|
|
||||||
|
|
||||||
usage:
|
|
||||||
ccl-delegate submit --prompt=... [--preferred-ai=gpu] [--priority=default] [--source=...]
|
|
||||||
ccl-delegate get --job-id=xyz
|
|
||||||
ccl-delegate result --job-id=xyz [--timeout=5m]
|
|
||||||
ccl-delegate decide --frontmatter=path/to/task.md
|
|
||||||
|
|
||||||
env:
|
|
||||||
CCL_SECUTOOLS_API_KEY (preferred) or SECUTOOLS_API_KEY
|
|
||||||
CCL_SECUTOOLS_URL (default https://api.secutools.secuaas.ovh)
|
|
||||||
CCL_SECUTOOLS_MOCK_URL (overrides CCL_SECUTOOLS_URL, for tests)
|
|
||||||
`)
|
|
||||||
}
|
|
||||||
|
|
||||||
func newClient() *secutools.HTTPClient {
|
|
||||||
url := os.Getenv("CCL_SECUTOOLS_MOCK_URL")
|
|
||||||
if url == "" {
|
|
||||||
url = os.Getenv("CCL_SECUTOOLS_URL")
|
|
||||||
}
|
|
||||||
if url == "" {
|
|
||||||
url = defaultURL
|
|
||||||
}
|
|
||||||
key := os.Getenv("CCL_SECUTOOLS_API_KEY")
|
|
||||||
if key == "" {
|
|
||||||
key = os.Getenv("SECUTOOLS_API_KEY")
|
|
||||||
}
|
|
||||||
if key == "" {
|
|
||||||
fmt.Fprintln(os.Stderr, "ccl-delegate: missing CCL_SECUTOOLS_API_KEY / SECUTOOLS_API_KEY")
|
|
||||||
os.Exit(4)
|
|
||||||
}
|
|
||||||
return secutools.NewHTTPClient(url, key, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func runSubmit(args []string) {
|
|
||||||
fs := flag.NewFlagSet("submit", flag.ExitOnError)
|
|
||||||
prompt := fs.String("prompt", "", "prompt text (required)")
|
|
||||||
preferred := fs.String("preferred-ai", "", "preferred provider (gpu|gemini|claude-api|claude-opus|claude-sonnet|claude-haiku|auto)")
|
|
||||||
priority := fs.String("priority", "default", "priority (critical|high|default|low)")
|
|
||||||
jobType := fs.String("type", string(secutools.TypeAnalyze), "job type (ai:analyze|ai:batch|ai:report|ai:correlate)")
|
|
||||||
source := fs.String("source", "ccl-delegate", "source identifier")
|
|
||||||
maxTokens := fs.Int("max-tokens", 0, "max tokens (0 = server default)")
|
|
||||||
_ = fs.Parse(args)
|
|
||||||
|
|
||||||
if *prompt == "" {
|
|
||||||
fmt.Fprintln(os.Stderr, "ccl-delegate submit: --prompt is required")
|
|
||||||
os.Exit(2)
|
|
||||||
}
|
|
||||||
c := newClient()
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
resp, err := c.SubmitJob(ctx, &secutools.JobRequest{
|
|
||||||
Type: secutools.JobType(*jobType),
|
|
||||||
Priority: secutools.Priority(*priority),
|
|
||||||
Prompt: *prompt,
|
|
||||||
PreferredAI: *preferred,
|
|
||||||
Source: *source,
|
|
||||||
MaxTokens: *maxTokens,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
fmt.Fprintf(os.Stderr, "submit: %v\n", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
writeJSON(resp)
|
|
||||||
}
|
|
||||||
|
|
||||||
func runGet(args []string) {
|
|
||||||
fs := flag.NewFlagSet("get", flag.ExitOnError)
|
|
||||||
id := fs.String("job-id", "", "job id (required)")
|
|
||||||
_ = fs.Parse(args)
|
|
||||||
if *id == "" {
|
|
||||||
fmt.Fprintln(os.Stderr, "ccl-delegate get: --job-id required")
|
|
||||||
os.Exit(2)
|
|
||||||
}
|
|
||||||
c := newClient()
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
st, err := c.GetJob(ctx, *id)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Fprintf(os.Stderr, "get: %v\n", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
writeJSON(st)
|
|
||||||
}
|
|
||||||
|
|
||||||
func runResult(args []string) {
|
|
||||||
fs := flag.NewFlagSet("result", flag.ExitOnError)
|
|
||||||
id := fs.String("job-id", "", "job id (required)")
|
|
||||||
timeout := fs.Duration("timeout", 5*time.Minute, "wait timeout")
|
|
||||||
_ = fs.Parse(args)
|
|
||||||
if *id == "" {
|
|
||||||
fmt.Fprintln(os.Stderr, "ccl-delegate result: --job-id required")
|
|
||||||
os.Exit(2)
|
|
||||||
}
|
|
||||||
c := newClient()
|
|
||||||
ctx := context.Background()
|
|
||||||
res, err := c.WaitForResult(ctx, *id, *timeout)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Fprintf(os.Stderr, "result: %v\n", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
writeJSON(res)
|
|
||||||
}
|
|
||||||
|
|
||||||
func runDecide(args []string) {
|
|
||||||
fs := flag.NewFlagSet("decide", flag.ExitOnError)
|
|
||||||
frontmatterPath := fs.String("frontmatter", "", "path to task .md with YAML frontmatter")
|
|
||||||
_ = fs.Parse(args)
|
|
||||||
if *frontmatterPath == "" {
|
|
||||||
fmt.Fprintln(os.Stderr, "ccl-delegate decide: --frontmatter required")
|
|
||||||
os.Exit(2)
|
|
||||||
}
|
|
||||||
data, err := os.ReadFile(*frontmatterPath)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Fprintf(os.Stderr, "decide: read %s: %v\n", *frontmatterPath, err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
fm := parseFrontmatterMap(data)
|
|
||||||
fmt.Println(secutools.DecideProvider(fm))
|
|
||||||
}
|
|
||||||
|
|
||||||
// parseFrontmatterMap extracts a flat key/value map from a YAML frontmatter
|
|
||||||
// block delimited by "---". Only top-level scalars; nested structures are
|
|
||||||
// ignored. Intentionally dependency-free (no yaml unmarshal) so this CLI
|
|
||||||
// stays tiny and predictable.
|
|
||||||
func parseFrontmatterMap(content []byte) map[string]any {
|
|
||||||
out := map[string]any{}
|
|
||||||
s := string(content)
|
|
||||||
if len(s) < 4 || s[:4] != "---\n" {
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
end := -1
|
|
||||||
for i := 4; i < len(s)-3; i++ {
|
|
||||||
if s[i] == '\n' && s[i+1:i+4] == "---" && (i+4 == len(s) || s[i+4] == '\n') {
|
|
||||||
end = i
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if end < 0 {
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
body := s[4:end]
|
|
||||||
start := 0
|
|
||||||
for i := 0; i <= len(body); i++ {
|
|
||||||
if i == len(body) || body[i] == '\n' {
|
|
||||||
line := body[start:i]
|
|
||||||
start = i + 1
|
|
||||||
// find "key: value"
|
|
||||||
colon := -1
|
|
||||||
for j := 0; j < len(line); j++ {
|
|
||||||
if line[j] == ':' {
|
|
||||||
colon = j
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if colon <= 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
key := trim(line[:colon])
|
|
||||||
val := trim(line[colon+1:])
|
|
||||||
if key == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// strip surrounding quotes
|
|
||||||
if len(val) >= 2 &&
|
|
||||||
((val[0] == '"' && val[len(val)-1] == '"') ||
|
|
||||||
(val[0] == '\'' && val[len(val)-1] == '\'')) {
|
|
||||||
val = val[1 : len(val)-1]
|
|
||||||
}
|
|
||||||
// bool coercion
|
|
||||||
switch val {
|
|
||||||
case "true":
|
|
||||||
out[key] = true
|
|
||||||
case "false":
|
|
||||||
out[key] = false
|
|
||||||
default:
|
|
||||||
out[key] = val
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
|
||||||
func trim(s string) string {
|
|
||||||
start := 0
|
|
||||||
for start < len(s) && (s[start] == ' ' || s[start] == '\t') {
|
|
||||||
start++
|
|
||||||
}
|
|
||||||
end := len(s)
|
|
||||||
for end > start && (s[end-1] == ' ' || s[end-1] == '\t' || s[end-1] == '\r') {
|
|
||||||
end--
|
|
||||||
}
|
|
||||||
return s[start:end]
|
|
||||||
}
|
|
||||||
|
|
||||||
func writeJSON(v any) {
|
|
||||||
enc := json.NewEncoder(os.Stdout)
|
|
||||||
enc.SetIndent("", " ")
|
|
||||||
if err := enc.Encode(v); err != nil {
|
|
||||||
fmt.Fprintf(os.Stderr, "encode: %v\n", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -12,20 +12,18 @@ import (
|
||||||
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/api"
|
"forge.secuaas.ovh/olivier/claude-failover/internal/api"
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
|
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/delegation"
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/dispatcher"
|
"forge.secuaas.ovh/olivier/claude-failover/internal/dispatcher"
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/janitor"
|
"forge.secuaas.ovh/olivier/claude-failover/internal/janitor"
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/lifecycle"
|
"forge.secuaas.ovh/olivier/claude-failover/internal/lifecycle"
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/notify"
|
"forge.secuaas.ovh/olivier/claude-failover/internal/notify"
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/quota"
|
"forge.secuaas.ovh/olivier/claude-failover/internal/quota"
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/secutools"
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/switcher"
|
"forge.secuaas.ovh/olivier/claude-failover/internal/switcher"
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/tmux"
|
"forge.secuaas.ovh/olivier/claude-failover/internal/tmux"
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/watcher"
|
"forge.secuaas.ovh/olivier/claude-failover/internal/watcher"
|
||||||
)
|
)
|
||||||
|
|
||||||
const version = "0.4.0"
|
const version = "0.1.0"
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
var cfgPath string
|
var cfgPath string
|
||||||
|
|
@ -86,37 +84,8 @@ func main() {
|
||||||
as := switcher.New(tmuxClient, s, cfg, qm.SwitchChan(), notifier)
|
as := switcher.New(tmuxClient, s, cfg, qm.SwitchChan(), notifier)
|
||||||
go as.Run(ctx)
|
go as.Run(ctx)
|
||||||
|
|
||||||
// Phase 2 chantier E — delegation to secutools (GPU/Gemini/Claude API).
|
|
||||||
// Disabled when the SECUTOOLS_API_KEY env var is empty so existing
|
|
||||||
// deployments keep their Phase 1 behaviour with zero config change.
|
|
||||||
var delegMgr *delegation.Manager
|
|
||||||
if key := os.Getenv("SECUTOOLS_API_KEY"); key != "" {
|
|
||||||
url := os.Getenv("SECUTOOLS_URL")
|
|
||||||
if url == "" {
|
|
||||||
url = "https://api.secutools.secuaas.ovh"
|
|
||||||
}
|
|
||||||
client := secutools.NewHTTPClient(url, key, nil)
|
|
||||||
delegMgr = delegation.New(client, 30*time.Second)
|
|
||||||
|
|
||||||
// Restore in-flight markers from disk after a restart.
|
|
||||||
var projectDirs []string
|
|
||||||
for _, ds := range cfg.Pool.Dedicated {
|
|
||||||
projectDirs = append(projectDirs, ds.Project)
|
|
||||||
}
|
|
||||||
if err := delegMgr.LoadFromDisk(projectDirs); err != nil {
|
|
||||||
log.Printf("delegation LoadFromDisk warning: %v", err)
|
|
||||||
}
|
|
||||||
go delegMgr.Run(ctx)
|
|
||||||
log.Printf("delegation enabled: secutools=%s", url)
|
|
||||||
} else {
|
|
||||||
log.Printf("delegation disabled: SECUTOOLS_API_KEY unset (Phase 1 behaviour)")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dispatcher — assigns inbox tasks to idle sessions.
|
// Dispatcher — assigns inbox tasks to idle sessions.
|
||||||
disp := dispatcher.New(tmuxClient, s, cfg, sw.DoneChan())
|
disp := dispatcher.New(tmuxClient, s, cfg, sw.DoneChan())
|
||||||
if delegMgr != nil {
|
|
||||||
disp.WithDelegation(delegMgr)
|
|
||||||
}
|
|
||||||
go disp.Run(ctx)
|
go disp.Run(ctx)
|
||||||
|
|
||||||
// Janitor — periodic cleanup of orphaned files and stale status.json.
|
// Janitor — periodic cleanup of orphaned files and stale status.json.
|
||||||
|
|
@ -143,9 +112,6 @@ func main() {
|
||||||
listenAddr = "127.0.0.1:9090"
|
listenAddr = "127.0.0.1:9090"
|
||||||
}
|
}
|
||||||
srv := api.New(listenAddr, s)
|
srv := api.New(listenAddr, s)
|
||||||
if delegMgr != nil {
|
|
||||||
srv.WithDelegation(delegMgr)
|
|
||||||
}
|
|
||||||
go func() {
|
go func() {
|
||||||
if err := srv.Start(); err != nil {
|
if err := srv.Start(); err != nil {
|
||||||
log.Printf("API server error: %v", err)
|
log.Printf("API server error: %v", err)
|
||||||
|
|
|
||||||
|
|
@ -3,30 +3,18 @@
|
||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/delegation"
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
||||||
)
|
)
|
||||||
|
|
||||||
const version = "0.2.0"
|
const version = "0.1.0"
|
||||||
|
|
||||||
// DelegationProvider is the slice of delegation.Manager used by the HTTP
|
// Server is a minimal HTTP server exposing /health and /status.
|
||||||
// server. Kept as an interface so tests don't have to spin up a real
|
|
||||||
// secutools client.
|
|
||||||
type DelegationProvider interface {
|
|
||||||
Active() []delegation.ActiveJob
|
|
||||||
CountersSnapshot() delegation.Snapshot
|
|
||||||
}
|
|
||||||
|
|
||||||
// Server is a minimal HTTP server exposing /health, /status,
|
|
||||||
// /watchdog/status and /api/delegated/status.
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
addr string
|
addr string
|
||||||
state *state.State
|
state *state.State
|
||||||
delegation DelegationProvider
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a Server listening on addr.
|
// New creates a Server listening on addr.
|
||||||
|
|
@ -34,20 +22,11 @@ func New(addr string, s *state.State) *Server {
|
||||||
return &Server{addr: addr, state: s}
|
return &Server{addr: addr, state: s}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithDelegation enables /api/delegated/* endpoints. Pass nil (or skip
|
|
||||||
// the call) to keep them disabled — those paths return 404.
|
|
||||||
func (s *Server) WithDelegation(d DelegationProvider) *Server {
|
|
||||||
s.delegation = d
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start registers routes and begins serving. Blocks until the listener fails.
|
// Start registers routes and begins serving. Blocks until the listener fails.
|
||||||
func (s *Server) Start() error {
|
func (s *Server) Start() error {
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
mux.HandleFunc("/health", s.handleHealth)
|
mux.HandleFunc("/health", s.handleHealth)
|
||||||
mux.HandleFunc("/status", s.handleStatus)
|
mux.HandleFunc("/status", s.handleStatus)
|
||||||
mux.HandleFunc("/watchdog/status", s.handleWatchdogStatus)
|
|
||||||
mux.HandleFunc("/api/delegated/status", s.handleDelegatedStatus)
|
|
||||||
return http.ListenAndServe(s.addr, mux)
|
return http.ListenAndServe(s.addr, mux)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -58,33 +37,5 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
_, _ = w.Write(s.state.JSON())
|
w.Write(s.state.JSON())
|
||||||
}
|
|
||||||
|
|
||||||
// handleWatchdogStatus returns operational counters consumed by the
|
|
||||||
// orchestrator dashboard. Includes delegation metrics when wired.
|
|
||||||
func (s *Server) handleWatchdogStatus(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
out := map[string]any{
|
|
||||||
"version": version,
|
|
||||||
}
|
|
||||||
if s.delegation != nil {
|
|
||||||
out["delegation"] = s.delegation.CountersSnapshot()
|
|
||||||
}
|
|
||||||
_ = json.NewEncoder(w).Encode(out)
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleDelegatedStatus returns the list of in-flight delegated jobs.
|
|
||||||
func (s *Server) handleDelegatedStatus(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
if s.delegation == nil {
|
|
||||||
w.WriteHeader(http.StatusNotFound)
|
|
||||||
_, _ = w.Write([]byte(`{"error":"delegation disabled"}`))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
out := map[string]any{
|
|
||||||
"active": s.delegation.Active(),
|
|
||||||
"counters": s.delegation.CountersSnapshot(),
|
|
||||||
}
|
|
||||||
_ = json.NewEncoder(w).Encode(out)
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,116 +0,0 @@
|
||||||
package api
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"net/http/httptest"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/delegation"
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
|
||||||
)
|
|
||||||
|
|
||||||
// fakeDelegation implements DelegationProvider for tests.
|
|
||||||
type fakeDelegation struct {
|
|
||||||
active []delegation.ActiveJob
|
|
||||||
snapshot delegation.Snapshot
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakeDelegation) Active() []delegation.ActiveJob { return f.active }
|
|
||||||
func (f *fakeDelegation) CountersSnapshot() delegation.Snapshot { return f.snapshot }
|
|
||||||
|
|
||||||
func newTestServer(t *testing.T, deleg DelegationProvider) *httptest.Server {
|
|
||||||
t.Helper()
|
|
||||||
s := New("ignored", state.New(""))
|
|
||||||
if deleg != nil {
|
|
||||||
s.WithDelegation(deleg)
|
|
||||||
}
|
|
||||||
mux := http.NewServeMux()
|
|
||||||
mux.HandleFunc("/health", s.handleHealth)
|
|
||||||
mux.HandleFunc("/status", s.handleStatus)
|
|
||||||
mux.HandleFunc("/watchdog/status", s.handleWatchdogStatus)
|
|
||||||
mux.HandleFunc("/api/delegated/status", s.handleDelegatedStatus)
|
|
||||||
return httptest.NewServer(mux)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHandleHealth(t *testing.T) {
|
|
||||||
srv := newTestServer(t, nil)
|
|
||||||
defer srv.Close()
|
|
||||||
resp, err := http.Get(srv.URL + "/health")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
body, _ := io.ReadAll(resp.Body)
|
|
||||||
if !strings.Contains(string(body), `"status":"ok"`) {
|
|
||||||
t.Errorf("unexpected /health body: %s", body)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHandleDelegatedStatus_Disabled(t *testing.T) {
|
|
||||||
srv := newTestServer(t, nil)
|
|
||||||
defer srv.Close()
|
|
||||||
resp, err := http.Get(srv.URL + "/api/delegated/status")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
if resp.StatusCode != http.StatusNotFound {
|
|
||||||
t.Errorf("expected 404 when delegation disabled, got %d", resp.StatusCode)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHandleDelegatedStatus_Enabled(t *testing.T) {
|
|
||||||
deleg := &fakeDelegation{
|
|
||||||
active: []delegation.ActiveJob{
|
|
||||||
{JobID: "j1", Project: "/p/a", TaskFile: "task-1.md", Provider: "gpu", Duration: "12s"},
|
|
||||||
},
|
|
||||||
snapshot: delegation.Snapshot{Active: 1, CompletedTotal: 5, FailedTotal: 1},
|
|
||||||
}
|
|
||||||
srv := newTestServer(t, deleg)
|
|
||||||
defer srv.Close()
|
|
||||||
|
|
||||||
resp, err := http.Get(srv.URL + "/api/delegated/status")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
t.Fatalf("expected 200, got %d", resp.StatusCode)
|
|
||||||
}
|
|
||||||
var out struct {
|
|
||||||
Active []delegation.ActiveJob `json:"active"`
|
|
||||||
Counters delegation.Snapshot `json:"counters"`
|
|
||||||
}
|
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if len(out.Active) != 1 || out.Active[0].JobID != "j1" {
|
|
||||||
t.Errorf("active mismatch: %+v", out.Active)
|
|
||||||
}
|
|
||||||
if out.Counters.CompletedTotal != 5 || out.Counters.Active != 1 {
|
|
||||||
t.Errorf("counters mismatch: %+v", out.Counters)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHandleWatchdogStatus_IncludesDelegationCounters(t *testing.T) {
|
|
||||||
deleg := &fakeDelegation{snapshot: delegation.Snapshot{
|
|
||||||
Active: 2, CompletedTotal: 7, FailedTotal: 3,
|
|
||||||
}}
|
|
||||||
srv := newTestServer(t, deleg)
|
|
||||||
defer srv.Close()
|
|
||||||
|
|
||||||
resp, err := http.Get(srv.URL + "/watchdog/status")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
body, _ := io.ReadAll(resp.Body)
|
|
||||||
for _, want := range []string{`"delegated_active":2`, `"delegated_completed_total":7`, `"delegated_failed_total":3`} {
|
|
||||||
if !strings.Contains(string(body), want) {
|
|
||||||
t.Errorf("missing %s in body: %s", want, body)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,408 +0,0 @@
|
||||||
// Package delegation submits agent-queue tasks to secutools when the router
|
|
||||||
// decides they should not run on a local Claude Code session, and reaps
|
|
||||||
// completed jobs back into the project's done/ or failed/ directory.
|
|
||||||
//
|
|
||||||
// Lifecycle of a delegated task in this package:
|
|
||||||
//
|
|
||||||
// inbox/<id>.md
|
|
||||||
// │ Submit() — POST /api/v1/jobs
|
|
||||||
// ▼
|
|
||||||
// inbox/<id>.md.delegated (marker file: { project, task, job_id, provider })
|
|
||||||
// │
|
|
||||||
// Reaper loop polls every <interval>:
|
|
||||||
// │ GetJob() → completed
|
|
||||||
// ▼
|
|
||||||
// done/<id>.md (frontmatter: completed, body = AI response, footer = cost)
|
|
||||||
//
|
|
||||||
// or, on failure:
|
|
||||||
//
|
|
||||||
// failed/<id>.md (frontmatter: failed_reason, body = original task)
|
|
||||||
//
|
|
||||||
// The .delegated marker doubles as the on-disk source of truth — if the
|
|
||||||
// daemon restarts mid-flight, the reaper rebuilds its job table from disk.
|
|
||||||
package delegation
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/router"
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/secutools"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Marker is the JSON content of an inbox/*.md.delegated file.
|
|
||||||
type Marker struct {
|
|
||||||
Project string `json:"project"`
|
|
||||||
TaskFile string `json:"task_file"`
|
|
||||||
JobID string `json:"job_id"`
|
|
||||||
Provider string `json:"provider"`
|
|
||||||
StartedAt time.Time `json:"started_at"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// DelegatedJob is the in-memory view of a job we are tracking.
|
|
||||||
type DelegatedJob struct {
|
|
||||||
Marker
|
|
||||||
StatusFile string // absolute path to the inbox/*.md.delegated marker
|
|
||||||
}
|
|
||||||
|
|
||||||
// Counters tracks aggregate metrics surfaced via /watchdog/status.
|
|
||||||
type Counters struct {
|
|
||||||
Active atomic.Int64
|
|
||||||
CompletedTotal atomic.Int64
|
|
||||||
FailedTotal atomic.Int64
|
|
||||||
}
|
|
||||||
|
|
||||||
// Snapshot is a JSON-friendly counter snapshot.
|
|
||||||
type Snapshot struct {
|
|
||||||
Active int64 `json:"delegated_active"`
|
|
||||||
CompletedTotal int64 `json:"delegated_completed_total"`
|
|
||||||
FailedTotal int64 `json:"delegated_failed_total"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// Snapshot returns the current counter values.
|
|
||||||
func (c *Counters) Snapshot() Snapshot {
|
|
||||||
return Snapshot{
|
|
||||||
Active: c.Active.Load(),
|
|
||||||
CompletedTotal: c.CompletedTotal.Load(),
|
|
||||||
FailedTotal: c.FailedTotal.Load(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Manager submits delegated jobs and runs the periodic reaper.
|
|
||||||
type Manager struct {
|
|
||||||
client secutools.Client
|
|
||||||
logger *log.Logger
|
|
||||||
interval time.Duration
|
|
||||||
now func() time.Time
|
|
||||||
|
|
||||||
mu sync.RWMutex
|
|
||||||
jobs map[string]*DelegatedJob // key: job_id
|
|
||||||
|
|
||||||
Counters Counters
|
|
||||||
}
|
|
||||||
|
|
||||||
// New constructs a Manager. interval is how often the reaper polls
|
|
||||||
// secutools; if zero, defaults to 30s.
|
|
||||||
func New(client secutools.Client, interval time.Duration) *Manager {
|
|
||||||
if interval == 0 {
|
|
||||||
interval = 30 * time.Second
|
|
||||||
}
|
|
||||||
return &Manager{
|
|
||||||
client: client,
|
|
||||||
logger: log.Default(),
|
|
||||||
interval: interval,
|
|
||||||
now: func() time.Time { return time.Now().UTC() },
|
|
||||||
jobs: make(map[string]*DelegatedJob),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetLogger overrides the default logger (used by tests).
|
|
||||||
func (m *Manager) SetLogger(l *log.Logger) { m.logger = l }
|
|
||||||
|
|
||||||
// Submit posts the task to secutools and writes a .delegated marker next to
|
|
||||||
// the original .md so that on restart we can resume tracking.
|
|
||||||
//
|
|
||||||
// taskPath is the absolute path of the inbox/<id>.md file. body is the .md
|
|
||||||
// body (post-frontmatter) used as the prompt. projectDir is the project root
|
|
||||||
// (for routing the result back to done/).
|
|
||||||
func (m *Manager) Submit(ctx context.Context, projectDir, taskPath, prompt string,
|
|
||||||
provider router.Provider, priority secutools.Priority) (*DelegatedJob, error) {
|
|
||||||
|
|
||||||
if !provider.IsDelegated() {
|
|
||||||
return nil, fmt.Errorf("delegation: provider %q is not delegated", provider)
|
|
||||||
}
|
|
||||||
|
|
||||||
preferredAI := ""
|
|
||||||
if provider != router.ProviderAuto {
|
|
||||||
preferredAI = string(provider)
|
|
||||||
}
|
|
||||||
|
|
||||||
req := &secutools.JobRequest{
|
|
||||||
Type: secutools.TypeAnalyze,
|
|
||||||
Priority: priority,
|
|
||||||
Prompt: buildPrompt(taskPath, prompt),
|
|
||||||
PreferredAI: preferredAI,
|
|
||||||
Source: "claude-failover/dispatcher",
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := m.client.SubmitJob(ctx, req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("delegation: submit %s: %w", filepath.Base(taskPath), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
mk := Marker{
|
|
||||||
Project: projectDir,
|
|
||||||
TaskFile: taskPath,
|
|
||||||
JobID: resp.JobID,
|
|
||||||
Provider: string(provider),
|
|
||||||
StartedAt: m.now(),
|
|
||||||
}
|
|
||||||
markerPath := taskPath + ".delegated"
|
|
||||||
if err := writeMarker(markerPath, mk); err != nil {
|
|
||||||
return nil, fmt.Errorf("delegation: write marker: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
job := &DelegatedJob{Marker: mk, StatusFile: markerPath}
|
|
||||||
m.mu.Lock()
|
|
||||||
m.jobs[resp.JobID] = job
|
|
||||||
m.mu.Unlock()
|
|
||||||
m.Counters.Active.Add(1)
|
|
||||||
|
|
||||||
m.logger.Printf("[delegation] SUBMITTED task=%s job_id=%s provider=%s",
|
|
||||||
filepath.Base(taskPath), resp.JobID, provider)
|
|
||||||
return job, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoadFromDisk rebuilds the in-memory tracker from any .delegated markers
|
|
||||||
// found under projectsDirs. Called once at daemon startup.
|
|
||||||
func (m *Manager) LoadFromDisk(projectsDirs []string) error {
|
|
||||||
for _, projectDir := range projectsDirs {
|
|
||||||
inbox := filepath.Join(projectDir, ".agent-queue", "inbox")
|
|
||||||
entries, err := os.ReadDir(inbox)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, e := range entries {
|
|
||||||
if !strings.HasSuffix(e.Name(), ".md.delegated") {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
markerPath := filepath.Join(inbox, e.Name())
|
|
||||||
mk, err := readMarker(markerPath)
|
|
||||||
if err != nil {
|
|
||||||
m.logger.Printf("[delegation] WARN bad marker %s: %v", markerPath, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
m.mu.Lock()
|
|
||||||
m.jobs[mk.JobID] = &DelegatedJob{Marker: mk, StatusFile: markerPath}
|
|
||||||
m.mu.Unlock()
|
|
||||||
m.Counters.Active.Add(1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Active returns a snapshot of the currently-tracked delegated jobs (for
|
|
||||||
// the /api/delegated/status HTTP endpoint).
|
|
||||||
type ActiveJob struct {
|
|
||||||
JobID string `json:"job_id"`
|
|
||||||
Project string `json:"project"`
|
|
||||||
TaskFile string `json:"task_file"`
|
|
||||||
Provider string `json:"provider"`
|
|
||||||
Duration string `json:"duration"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// CountersSnapshot returns a JSON-friendly snapshot of the manager's
|
|
||||||
// counters. Implements api.DelegationProvider.
|
|
||||||
func (m *Manager) CountersSnapshot() Snapshot { return m.Counters.Snapshot() }
|
|
||||||
|
|
||||||
// Active returns the list of in-flight delegated jobs.
|
|
||||||
func (m *Manager) Active() []ActiveJob {
|
|
||||||
m.mu.RLock()
|
|
||||||
defer m.mu.RUnlock()
|
|
||||||
out := make([]ActiveJob, 0, len(m.jobs))
|
|
||||||
for _, j := range m.jobs {
|
|
||||||
out = append(out, ActiveJob{
|
|
||||||
JobID: j.JobID,
|
|
||||||
Project: j.Project,
|
|
||||||
TaskFile: filepath.Base(j.TaskFile),
|
|
||||||
Provider: j.Provider,
|
|
||||||
Duration: m.now().Sub(j.StartedAt).Round(time.Second).String(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run starts the reaper loop until ctx is cancelled.
|
|
||||||
func (m *Manager) Run(ctx context.Context) {
|
|
||||||
ticker := time.NewTicker(m.interval)
|
|
||||||
defer ticker.Stop()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
m.reapOnce(ctx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// reapOnce polls every tracked job and finalises terminal ones.
|
|
||||||
func (m *Manager) reapOnce(ctx context.Context) {
|
|
||||||
m.mu.RLock()
|
|
||||||
snapshot := make([]*DelegatedJob, 0, len(m.jobs))
|
|
||||||
for _, j := range m.jobs {
|
|
||||||
snapshot = append(snapshot, j)
|
|
||||||
}
|
|
||||||
m.mu.RUnlock()
|
|
||||||
|
|
||||||
for _, j := range snapshot {
|
|
||||||
st, err := m.client.GetJob(ctx, j.JobID)
|
|
||||||
if err != nil {
|
|
||||||
m.logger.Printf("[delegation] poll job=%s: %v", j.JobID, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
switch st.Status {
|
|
||||||
case "completed":
|
|
||||||
res, err := m.fetchResult(ctx, j.JobID)
|
|
||||||
if err != nil {
|
|
||||||
m.logger.Printf("[delegation] fetch result job=%s: %v", j.JobID, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
m.finalizeSuccess(j, res)
|
|
||||||
case "failed", "cancelled":
|
|
||||||
m.finalizeFailure(j, st)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// fetchResult retrieves the final job result. We can't use Client.WaitForResult
|
|
||||||
// here because we need to fetch only the result, not poll for completion.
|
|
||||||
func (m *Manager) fetchResult(ctx context.Context, jobID string) (*secutools.JobResult, error) {
|
|
||||||
type resultGetter interface {
|
|
||||||
WaitForResult(ctx context.Context, id string, timeout time.Duration) (*secutools.JobResult, error)
|
|
||||||
}
|
|
||||||
rg, ok := m.client.(resultGetter)
|
|
||||||
if !ok {
|
|
||||||
return nil, errors.New("delegation: client does not implement WaitForResult")
|
|
||||||
}
|
|
||||||
// 0 timeout → essentially "fetch now". Real Wait still polls, but we're
|
|
||||||
// already in completed state so the first poll returns immediately.
|
|
||||||
return rg.WaitForResult(ctx, jobID, 5*time.Second)
|
|
||||||
}
|
|
||||||
|
|
||||||
// finalizeSuccess writes the result to done/ and removes the inbox marker.
|
|
||||||
func (m *Manager) finalizeSuccess(j *DelegatedJob, res *secutools.JobResult) {
|
|
||||||
doneDir := filepath.Join(j.Project, ".agent-queue", "done")
|
|
||||||
if err := os.MkdirAll(doneDir, 0755); err != nil {
|
|
||||||
m.logger.Printf("[delegation] mkdir done %s: %v", doneDir, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
donePath := filepath.Join(doneDir, filepath.Base(j.TaskFile))
|
|
||||||
body := buildDoneBody(res, j)
|
|
||||||
if err := os.WriteFile(donePath, []byte(body), 0644); err != nil {
|
|
||||||
m.logger.Printf("[delegation] write done %s: %v", donePath, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
_ = os.Remove(j.StatusFile)
|
|
||||||
_ = os.Remove(j.TaskFile)
|
|
||||||
|
|
||||||
m.mu.Lock()
|
|
||||||
delete(m.jobs, j.JobID)
|
|
||||||
m.mu.Unlock()
|
|
||||||
m.Counters.Active.Add(-1)
|
|
||||||
m.Counters.CompletedTotal.Add(1)
|
|
||||||
|
|
||||||
m.logger.Printf("[delegation] COMPLETED task=%s job_id=%s provider=%s cost_cad=%.4f",
|
|
||||||
filepath.Base(j.TaskFile), j.JobID, res.Provider, res.CostCAD)
|
|
||||||
}
|
|
||||||
|
|
||||||
// finalizeFailure writes the original task body into failed/ with a reason.
|
|
||||||
func (m *Manager) finalizeFailure(j *DelegatedJob, st *secutools.JobStatus) {
|
|
||||||
failedDir := filepath.Join(j.Project, ".agent-queue", "failed")
|
|
||||||
if err := os.MkdirAll(failedDir, 0755); err != nil {
|
|
||||||
m.logger.Printf("[delegation] mkdir failed %s: %v", failedDir, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
failedPath := filepath.Join(failedDir, filepath.Base(j.TaskFile))
|
|
||||||
body := buildFailedBody(j, st)
|
|
||||||
if err := os.WriteFile(failedPath, []byte(body), 0644); err != nil {
|
|
||||||
m.logger.Printf("[delegation] write failed %s: %v", failedPath, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
_ = os.Remove(j.StatusFile)
|
|
||||||
_ = os.Remove(j.TaskFile)
|
|
||||||
|
|
||||||
m.mu.Lock()
|
|
||||||
delete(m.jobs, j.JobID)
|
|
||||||
m.mu.Unlock()
|
|
||||||
m.Counters.Active.Add(-1)
|
|
||||||
m.Counters.FailedTotal.Add(1)
|
|
||||||
|
|
||||||
m.logger.Printf("[delegation] FAILED task=%s job_id=%s status=%s err=%s",
|
|
||||||
filepath.Base(j.TaskFile), j.JobID, st.Status, st.Error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// buildPrompt assembles the prompt sent to secutools. Self-contained: the
|
|
||||||
// non-Claude provider has no filesystem access, so we send the body as-is
|
|
||||||
// and rely on the task author to include all required context.
|
|
||||||
func buildPrompt(taskPath, body string) string {
|
|
||||||
taskName := filepath.Base(taskPath)
|
|
||||||
if body == "" {
|
|
||||||
body = "(empty task body)"
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("Task: %s\n\n%s\n\nProvide a complete, self-contained answer.",
|
|
||||||
taskName, body)
|
|
||||||
}
|
|
||||||
|
|
||||||
// buildDoneBody is the .md written into done/ after a successful job.
|
|
||||||
func buildDoneBody(res *secutools.JobResult, j *DelegatedJob) string {
|
|
||||||
return fmt.Sprintf(`---
|
|
||||||
status: completed
|
|
||||||
provider: %s
|
|
||||||
model: %s
|
|
||||||
cost_cad: %.4f
|
|
||||||
job_id: %s
|
|
||||||
delegated_at: %s
|
|
||||||
---
|
|
||||||
|
|
||||||
# Result (delegated to secutools)
|
|
||||||
|
|
||||||
%s
|
|
||||||
|
|
||||||
---
|
|
||||||
provider: %s, cost_cad: %.4f
|
|
||||||
`, res.Provider, res.Model, res.CostCAD, j.JobID,
|
|
||||||
j.StartedAt.Format(time.RFC3339), res.Response,
|
|
||||||
res.Provider, res.CostCAD)
|
|
||||||
}
|
|
||||||
|
|
||||||
// buildFailedBody is the .md written into failed/ when a delegated job dies.
|
|
||||||
func buildFailedBody(j *DelegatedJob, st *secutools.JobStatus) string {
|
|
||||||
return fmt.Sprintf(`---
|
|
||||||
status: failed
|
|
||||||
failed_reason: %s
|
|
||||||
job_id: %s
|
|
||||||
provider: %s
|
|
||||||
delegated_at: %s
|
|
||||||
---
|
|
||||||
|
|
||||||
# Delegation failed
|
|
||||||
|
|
||||||
Job %s ended in status=%s.
|
|
||||||
|
|
||||||
Error: %s
|
|
||||||
`, st.Status, j.JobID, j.Provider,
|
|
||||||
j.StartedAt.Format(time.RFC3339),
|
|
||||||
j.JobID, st.Status, st.Error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// writeMarker serialises mk to disk as JSON.
|
|
||||||
func writeMarker(path string, mk Marker) error {
|
|
||||||
data, err := json.MarshalIndent(mk, "", " ")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return os.WriteFile(path, data, 0644)
|
|
||||||
}
|
|
||||||
|
|
||||||
// readMarker parses a marker file from disk.
|
|
||||||
func readMarker(path string) (Marker, error) {
|
|
||||||
data, err := os.ReadFile(path)
|
|
||||||
if err != nil {
|
|
||||||
return Marker{}, err
|
|
||||||
}
|
|
||||||
var mk Marker
|
|
||||||
if err := json.Unmarshal(data, &mk); err != nil {
|
|
||||||
return Marker{}, err
|
|
||||||
}
|
|
||||||
return mk, nil
|
|
||||||
}
|
|
||||||
|
|
@ -1,335 +0,0 @@
|
||||||
package delegation
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"io"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/router"
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/secutools"
|
|
||||||
)
|
|
||||||
|
|
||||||
// fakeClient is an in-memory secutools.Client implementation for tests.
|
|
||||||
// It supports preloaded responses keyed by job_id and a counter of calls.
|
|
||||||
type fakeClient struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
nextJobID int
|
|
||||||
statuses map[string]string // job_id → current status
|
|
||||||
results map[string]*secutools.JobResult // job_id → result
|
|
||||||
statusErr error
|
|
||||||
submitErr error
|
|
||||||
}
|
|
||||||
|
|
||||||
func newFakeClient() *fakeClient {
|
|
||||||
return &fakeClient{
|
|
||||||
statuses: make(map[string]string),
|
|
||||||
results: make(map[string]*secutools.JobResult),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakeClient) SubmitJob(_ context.Context, req *secutools.JobRequest) (*secutools.JobResponse, error) {
|
|
||||||
f.mu.Lock()
|
|
||||||
defer f.mu.Unlock()
|
|
||||||
if f.submitErr != nil {
|
|
||||||
return nil, f.submitErr
|
|
||||||
}
|
|
||||||
f.nextJobID++
|
|
||||||
id := "job-" + itoa(f.nextJobID)
|
|
||||||
f.statuses[id] = "pending"
|
|
||||||
return &secutools.JobResponse{JobID: id, Status: "pending"}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakeClient) GetJob(_ context.Context, id string) (*secutools.JobStatus, error) {
|
|
||||||
f.mu.Lock()
|
|
||||||
defer f.mu.Unlock()
|
|
||||||
if f.statusErr != nil {
|
|
||||||
return nil, f.statusErr
|
|
||||||
}
|
|
||||||
st, ok := f.statuses[id]
|
|
||||||
if !ok {
|
|
||||||
return nil, errors.New("unknown job: " + id)
|
|
||||||
}
|
|
||||||
out := &secutools.JobStatus{JobID: id, Status: st}
|
|
||||||
if r, ok := f.results[id]; ok {
|
|
||||||
out.Provider = r.Provider
|
|
||||||
}
|
|
||||||
return out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakeClient) WaitForResult(_ context.Context, id string, _ time.Duration) (*secutools.JobResult, error) {
|
|
||||||
f.mu.Lock()
|
|
||||||
defer f.mu.Unlock()
|
|
||||||
if r, ok := f.results[id]; ok {
|
|
||||||
return r, nil
|
|
||||||
}
|
|
||||||
return nil, errors.New("no result for " + id)
|
|
||||||
}
|
|
||||||
|
|
||||||
// setStatus is a test helper to drive job lifecycles.
|
|
||||||
func (f *fakeClient) setStatus(id, status string) {
|
|
||||||
f.mu.Lock()
|
|
||||||
defer f.mu.Unlock()
|
|
||||||
f.statuses[id] = status
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakeClient) setResult(id string, r *secutools.JobResult) {
|
|
||||||
f.mu.Lock()
|
|
||||||
defer f.mu.Unlock()
|
|
||||||
f.results[id] = r
|
|
||||||
}
|
|
||||||
|
|
||||||
// itoa is a tiny stdlib-free int-to-string for fake job IDs.
|
|
||||||
func itoa(n int) string {
|
|
||||||
if n == 0 {
|
|
||||||
return "0"
|
|
||||||
}
|
|
||||||
var b []byte
|
|
||||||
for n > 0 {
|
|
||||||
b = append([]byte{byte('0' + n%10)}, b...)
|
|
||||||
n /= 10
|
|
||||||
}
|
|
||||||
return string(b)
|
|
||||||
}
|
|
||||||
|
|
||||||
// quietLogger discards log output.
|
|
||||||
func quietLogger() *log.Logger {
|
|
||||||
return log.New(io.Discard, "", 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func setupProject(t *testing.T) (projectDir, taskPath string) {
|
|
||||||
t.Helper()
|
|
||||||
projectDir = t.TempDir()
|
|
||||||
inbox := filepath.Join(projectDir, ".agent-queue", "inbox")
|
|
||||||
if err := os.MkdirAll(inbox, 0755); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
taskPath = filepath.Join(inbox, "task-001.md")
|
|
||||||
if err := os.WriteFile(taskPath, []byte("---\ntitle: Test\n---\nDo a thing."), 0644); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
return projectDir, taskPath
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSubmit_WritesMarkerAndIncrementsCounter(t *testing.T) {
|
|
||||||
fc := newFakeClient()
|
|
||||||
mgr := New(fc, 100*time.Millisecond)
|
|
||||||
mgr.SetLogger(quietLogger())
|
|
||||||
|
|
||||||
projectDir, taskPath := setupProject(t)
|
|
||||||
|
|
||||||
job, err := mgr.Submit(context.Background(), projectDir, taskPath, "Do a thing.",
|
|
||||||
router.ProviderGPU, secutools.PriorityHigh)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Submit: %v", err)
|
|
||||||
}
|
|
||||||
if job.JobID == "" {
|
|
||||||
t.Fatal("expected non-empty job id")
|
|
||||||
}
|
|
||||||
if mgr.Counters.Active.Load() != 1 {
|
|
||||||
t.Errorf("expected Active=1, got %d", mgr.Counters.Active.Load())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Marker file must exist and decode back to the same job_id.
|
|
||||||
markerPath := taskPath + ".delegated"
|
|
||||||
data, err := os.ReadFile(markerPath)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("missing marker: %v", err)
|
|
||||||
}
|
|
||||||
var mk Marker
|
|
||||||
if err := json.Unmarshal(data, &mk); err != nil {
|
|
||||||
t.Fatalf("decode marker: %v", err)
|
|
||||||
}
|
|
||||||
if mk.JobID != job.JobID || mk.Provider != "gpu" {
|
|
||||||
t.Errorf("marker mismatch: %+v", mk)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSubmit_RejectsNonDelegatedProvider(t *testing.T) {
|
|
||||||
mgr := New(newFakeClient(), time.Millisecond)
|
|
||||||
mgr.SetLogger(quietLogger())
|
|
||||||
_, taskPath := setupProject(t)
|
|
||||||
|
|
||||||
_, err := mgr.Submit(context.Background(), filepath.Dir(filepath.Dir(filepath.Dir(taskPath))),
|
|
||||||
taskPath, "p", router.ProviderClaudeCode, secutools.PriorityDefault)
|
|
||||||
if err == nil {
|
|
||||||
t.Fatal("expected error when provider is claude-code (non-delegated)")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestReap_CompletesJobAndMovesToDone(t *testing.T) {
|
|
||||||
fc := newFakeClient()
|
|
||||||
mgr := New(fc, time.Millisecond)
|
|
||||||
mgr.SetLogger(quietLogger())
|
|
||||||
|
|
||||||
projectDir, taskPath := setupProject(t)
|
|
||||||
|
|
||||||
job, err := mgr.Submit(context.Background(), projectDir, taskPath, "p",
|
|
||||||
router.ProviderAuto, secutools.PriorityDefault)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Drive the fake into completed state.
|
|
||||||
fc.setStatus(job.JobID, "completed")
|
|
||||||
fc.setResult(job.JobID, &secutools.JobResult{
|
|
||||||
JobID: job.JobID, Response: "Hello world", Provider: "gpu",
|
|
||||||
Model: "qwen-coder", CostCAD: 0.012,
|
|
||||||
})
|
|
||||||
|
|
||||||
mgr.reapOnce(context.Background())
|
|
||||||
|
|
||||||
if mgr.Counters.Active.Load() != 0 {
|
|
||||||
t.Errorf("expected Active=0 after completion, got %d", mgr.Counters.Active.Load())
|
|
||||||
}
|
|
||||||
if mgr.Counters.CompletedTotal.Load() != 1 {
|
|
||||||
t.Errorf("expected CompletedTotal=1, got %d", mgr.Counters.CompletedTotal.Load())
|
|
||||||
}
|
|
||||||
|
|
||||||
// done/ contains the result.
|
|
||||||
donePath := filepath.Join(projectDir, ".agent-queue", "done", "task-001.md")
|
|
||||||
doneBody, err := os.ReadFile(donePath)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("missing done/ file: %v", err)
|
|
||||||
}
|
|
||||||
if !contains(string(doneBody), "Hello world") || !contains(string(doneBody), "provider: gpu") {
|
|
||||||
t.Errorf("done body missing expected fields:\n%s", doneBody)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Inbox marker and original .md are gone.
|
|
||||||
if _, err := os.Stat(taskPath + ".delegated"); !os.IsNotExist(err) {
|
|
||||||
t.Error("marker should be removed after completion")
|
|
||||||
}
|
|
||||||
if _, err := os.Stat(taskPath); !os.IsNotExist(err) {
|
|
||||||
t.Error("inbox .md should be removed after completion")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestReap_FailedJobMovesToFailed(t *testing.T) {
|
|
||||||
fc := newFakeClient()
|
|
||||||
mgr := New(fc, time.Millisecond)
|
|
||||||
mgr.SetLogger(quietLogger())
|
|
||||||
|
|
||||||
projectDir, taskPath := setupProject(t)
|
|
||||||
|
|
||||||
job, err := mgr.Submit(context.Background(), projectDir, taskPath, "p",
|
|
||||||
router.ProviderGemini, secutools.PriorityLow)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fc.setStatus(job.JobID, "failed")
|
|
||||||
mgr.reapOnce(context.Background())
|
|
||||||
|
|
||||||
if mgr.Counters.FailedTotal.Load() != 1 {
|
|
||||||
t.Errorf("expected FailedTotal=1, got %d", mgr.Counters.FailedTotal.Load())
|
|
||||||
}
|
|
||||||
failedPath := filepath.Join(projectDir, ".agent-queue", "failed", "task-001.md")
|
|
||||||
if _, err := os.Stat(failedPath); err != nil {
|
|
||||||
t.Errorf("failed/ file missing: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestLoadFromDisk_RestoresMarkers(t *testing.T) {
|
|
||||||
fc := newFakeClient()
|
|
||||||
mgr := New(fc, time.Millisecond)
|
|
||||||
mgr.SetLogger(quietLogger())
|
|
||||||
|
|
||||||
projectDir, taskPath := setupProject(t)
|
|
||||||
|
|
||||||
// Pre-write a marker on disk simulating a daemon restart.
|
|
||||||
mk := Marker{
|
|
||||||
Project: projectDir, TaskFile: taskPath, JobID: "preexisting-job",
|
|
||||||
Provider: "gpu", StartedAt: time.Now().UTC().Add(-1 * time.Minute),
|
|
||||||
}
|
|
||||||
if err := writeMarker(taskPath+".delegated", mk); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := mgr.LoadFromDisk([]string{projectDir}); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if mgr.Counters.Active.Load() != 1 {
|
|
||||||
t.Errorf("expected Active=1 after LoadFromDisk, got %d", mgr.Counters.Active.Load())
|
|
||||||
}
|
|
||||||
active := mgr.Active()
|
|
||||||
if len(active) != 1 || active[0].JobID != "preexisting-job" {
|
|
||||||
t.Errorf("Active() mismatch: %+v", active)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestActive_ReturnsCurrentJobs(t *testing.T) {
|
|
||||||
fc := newFakeClient()
|
|
||||||
mgr := New(fc, time.Millisecond)
|
|
||||||
mgr.SetLogger(quietLogger())
|
|
||||||
|
|
||||||
projectDir, taskPath := setupProject(t)
|
|
||||||
if _, err := mgr.Submit(context.Background(), projectDir, taskPath, "p",
|
|
||||||
router.ProviderGPU, secutools.PriorityDefault); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
got := mgr.Active()
|
|
||||||
if len(got) != 1 {
|
|
||||||
t.Fatalf("expected 1 active job, got %d", len(got))
|
|
||||||
}
|
|
||||||
if got[0].Provider != "gpu" || got[0].Project != projectDir {
|
|
||||||
t.Errorf("active[0] mismatch: %+v", got[0])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestEndToEnd_InboxToDone exercises the full happy path:
|
|
||||||
// inbox → Submit → reapOnce (status=completed) → done/.
|
|
||||||
func TestEndToEnd_InboxToDone(t *testing.T) {
|
|
||||||
fc := newFakeClient()
|
|
||||||
mgr := New(fc, time.Millisecond)
|
|
||||||
mgr.SetLogger(quietLogger())
|
|
||||||
|
|
||||||
projectDir, taskPath := setupProject(t)
|
|
||||||
|
|
||||||
job, err := mgr.Submit(context.Background(), projectDir, taskPath,
|
|
||||||
"Summarize this", router.ProviderAuto, secutools.PriorityDefault)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Simulate the secutools backend completing the job.
|
|
||||||
fc.setStatus(job.JobID, "completed")
|
|
||||||
fc.setResult(job.JobID, &secutools.JobResult{
|
|
||||||
JobID: job.JobID, Response: "Summary text",
|
|
||||||
Provider: "claude-haiku", Model: "haiku-3-5", CostCAD: 0.0005,
|
|
||||||
})
|
|
||||||
|
|
||||||
mgr.reapOnce(context.Background())
|
|
||||||
|
|
||||||
donePath := filepath.Join(projectDir, ".agent-queue", "done", "task-001.md")
|
|
||||||
body, err := os.ReadFile(donePath)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("missing done/ file: %v", err)
|
|
||||||
}
|
|
||||||
want := []string{"Summary text", "provider: claude-haiku", "cost_cad: 0.0005"}
|
|
||||||
for _, w := range want {
|
|
||||||
if !contains(string(body), w) {
|
|
||||||
t.Errorf("done body missing %q:\n%s", w, body)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func contains(s, sub string) bool {
|
|
||||||
return len(s) >= len(sub) && (sub == "" || indexOf(s, sub) >= 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func indexOf(s, sub string) int {
|
|
||||||
for i := 0; i+len(sub) <= len(s); i++ {
|
|
||||||
if s[i:i+len(sub)] == sub {
|
|
||||||
return i
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
|
|
@ -16,26 +16,16 @@ import (
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
|
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/delegation"
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/router"
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/secutools"
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/tmux"
|
"forge.secuaas.ovh/olivier/claude-failover/internal/tmux"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TaskFrontmatter is the YAML header parsed from task .md files.
|
// TaskFrontmatter is the YAML header parsed from task .md files.
|
||||||
//
|
|
||||||
// Phase 2 chantier E added three new fields (PreferredAI, AllowDelegation,
|
|
||||||
// ComplexityHint). They are all optional; absence preserves Phase 1
|
|
||||||
// behaviour (Claude Code on a local ccl-auto session).
|
|
||||||
type TaskFrontmatter struct {
|
type TaskFrontmatter struct {
|
||||||
Title string `yaml:"title"`
|
Title string `yaml:"title"`
|
||||||
Priority string `yaml:"priority"` // critical, high, default, low
|
Priority string `yaml:"priority"` // critical, high, default, low
|
||||||
Tags []string `yaml:"tags"`
|
Tags []string `yaml:"tags"`
|
||||||
NeedsClaude bool `yaml:"needs_claude_code"`
|
NeedsClaude bool `yaml:"needs_claude_code"`
|
||||||
PreferredAI string `yaml:"preferred_ai"` // auto | claude-code | gpu | gemini | claude-api
|
|
||||||
AllowDelegation bool `yaml:"allow_delegation"` // default false → backward-compatible
|
|
||||||
ComplexityHint string `yaml:"complexity_hint"` // low | medium | high
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dispatcher watches project inbox directories and assigns tasks to idle sessions.
|
// Dispatcher watches project inbox directories and assigns tasks to idle sessions.
|
||||||
|
|
@ -46,12 +36,6 @@ type Dispatcher struct {
|
||||||
doneChan <-chan string
|
doneChan <-chan string
|
||||||
projectsDir string
|
projectsDir string
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
|
|
||||||
// Phase 2 chantier E — delegation. When non-nil, tasks whose
|
|
||||||
// frontmatter routes them away from Claude Code (allow_delegation=true
|
|
||||||
// + preferred_ai!=claude-code) are submitted to secutools instead of
|
|
||||||
// being assigned to a tmux session.
|
|
||||||
delegation *delegation.Manager
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a Dispatcher.
|
// New creates a Dispatcher.
|
||||||
|
|
@ -71,13 +55,6 @@ func New(tc tmux.Client, s *state.State, cfg *config.Config, doneChan <-chan str
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithDelegation enables delegation routing. Pass nil to disable (the
|
|
||||||
// Phase 1 default). Returns d for chaining.
|
|
||||||
func (d *Dispatcher) WithDelegation(m *delegation.Manager) *Dispatcher {
|
|
||||||
d.delegation = m
|
|
||||||
return d
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run starts the dispatcher event loop until ctx is cancelled.
|
// Run starts the dispatcher event loop until ctx is cancelled.
|
||||||
func (d *Dispatcher) Run(ctx context.Context) {
|
func (d *Dispatcher) Run(ctx context.Context) {
|
||||||
ticker := time.NewTicker(60 * time.Second)
|
ticker := time.NewTicker(60 * time.Second)
|
||||||
|
|
@ -136,11 +113,6 @@ func (d *Dispatcher) fullScan() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// dispatchProject assigns undispatched tasks in inboxDir to idle sessions.
|
// dispatchProject assigns undispatched tasks in inboxDir to idle sessions.
|
||||||
//
|
|
||||||
// For each task we first ask the router whether the task should run on a
|
|
||||||
// local Claude Code session (the Phase 1 path) or be delegated to
|
|
||||||
// secutools (Phase 2 chantier E). Delegated tasks bypass the
|
|
||||||
// findFreeSession() check entirely — they don't need a tmux slot.
|
|
||||||
func (d *Dispatcher) dispatchProject(inboxDir string) {
|
func (d *Dispatcher) dispatchProject(inboxDir string) {
|
||||||
entries, err := os.ReadDir(inboxDir)
|
entries, err := os.ReadDir(inboxDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -149,42 +121,15 @@ func (d *Dispatcher) dispatchProject(inboxDir string) {
|
||||||
projectDir := filepath.Dir(filepath.Dir(inboxDir)) // inboxDir/.agent-queue/inbox → project
|
projectDir := filepath.Dir(filepath.Dir(inboxDir)) // inboxDir/.agent-queue/inbox → project
|
||||||
for _, e := range entries {
|
for _, e := range entries {
|
||||||
name := e.Name()
|
name := e.Name()
|
||||||
if !strings.HasSuffix(name, ".md") ||
|
if !strings.HasSuffix(name, ".md") || strings.Contains(name, ".dispatched") {
|
||||||
strings.Contains(name, ".dispatched") ||
|
|
||||||
strings.Contains(name, ".delegated") {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
taskPath := filepath.Join(inboxDir, name)
|
|
||||||
|
|
||||||
// Decide route based on frontmatter.
|
|
||||||
decision, body, err := d.routeTask(taskPath)
|
|
||||||
if err != nil {
|
|
||||||
d.logger.Printf("[dispatcher] routeTask %s: %v", taskPath, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delegated path: submit to secutools, mark the task with .delegated.
|
|
||||||
if decision.Provider.IsDelegated() && d.delegation != nil {
|
|
||||||
if _, err := d.delegation.Submit(context.Background(), projectDir, taskPath,
|
|
||||||
body, decision.Provider, mapPriority(d.taskPriority(taskPath))); err != nil {
|
|
||||||
d.logger.Printf("[dispatcher] delegate %s: %v — falling back to Claude Code",
|
|
||||||
filepath.Base(taskPath), err)
|
|
||||||
// Fall through to Claude Code path on submit failure.
|
|
||||||
} else {
|
|
||||||
d.logger.Printf("[dispatcher] DELEGATED task=%s provider=%s reason=%s",
|
|
||||||
filepath.Base(taskPath), decision.Provider, decision.Reason)
|
|
||||||
// Original .md is left in place under inbox/ until the reaper
|
|
||||||
// finalises it. The .delegated marker prevents re-dispatch.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Claude Code path (Phase 1 behaviour).
|
|
||||||
session := d.findFreeSession()
|
session := d.findFreeSession()
|
||||||
if session == "" {
|
if session == "" {
|
||||||
d.logger.Printf("[dispatcher] no free session for task in %s", inboxDir)
|
d.logger.Printf("[dispatcher] no free session for task in %s", inboxDir)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
taskPath := filepath.Join(inboxDir, name)
|
||||||
if err := d.launchAgent(session, projectDir, taskPath); err != nil {
|
if err := d.launchAgent(session, projectDir, taskPath); err != nil {
|
||||||
d.logger.Printf("[dispatcher] launchAgent error: %v", err)
|
d.logger.Printf("[dispatcher] launchAgent error: %v", err)
|
||||||
continue
|
continue
|
||||||
|
|
@ -195,50 +140,6 @@ func (d *Dispatcher) dispatchProject(inboxDir string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// routeTask reads taskPath, parses its frontmatter, and asks the router
|
|
||||||
// for a decision. Returns the decision plus the parsed body so callers
|
|
||||||
// don't have to read the file twice.
|
|
||||||
func (d *Dispatcher) routeTask(taskPath string) (router.Decision, string, error) {
|
|
||||||
content, err := os.ReadFile(taskPath)
|
|
||||||
if err != nil {
|
|
||||||
return router.Decision{}, "", fmt.Errorf("read task: %w", err)
|
|
||||||
}
|
|
||||||
fm, body := parseFrontmatter(content)
|
|
||||||
dec := router.Decide(router.Task{
|
|
||||||
PreferredAI: fm.PreferredAI,
|
|
||||||
AllowDelegation: fm.AllowDelegation,
|
|
||||||
NeedsClaudeCode: fm.NeedsClaude,
|
|
||||||
ComplexityHint: fm.ComplexityHint,
|
|
||||||
})
|
|
||||||
return dec, body, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// taskPriority returns the Priority field of a task's frontmatter without
|
|
||||||
// re-parsing the body. Defensive — used only for mapping into a secutools
|
|
||||||
// priority when delegating.
|
|
||||||
func (d *Dispatcher) taskPriority(taskPath string) string {
|
|
||||||
content, err := os.ReadFile(taskPath)
|
|
||||||
if err != nil {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
fm, _ := parseFrontmatter(content)
|
|
||||||
return fm.Priority
|
|
||||||
}
|
|
||||||
|
|
||||||
// mapPriority converts a task priority string into a secutools Priority.
|
|
||||||
func mapPriority(p string) secutools.Priority {
|
|
||||||
switch strings.ToLower(strings.TrimSpace(p)) {
|
|
||||||
case "critical":
|
|
||||||
return secutools.PriorityCritical
|
|
||||||
case "high":
|
|
||||||
return secutools.PriorityHigh
|
|
||||||
case "low":
|
|
||||||
return secutools.PriorityLow
|
|
||||||
default:
|
|
||||||
return secutools.PriorityDefault
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// findFreeSession returns the name of an idle, live, cooldown-free session
|
// findFreeSession returns the name of an idle, live, cooldown-free session
|
||||||
// from the autonomous pool. Dedicated sessions are intentionally NOT
|
// from the autonomous pool. Dedicated sessions are intentionally NOT
|
||||||
// considered: those host the operator's manual interactive work. Routing a
|
// considered: those host the operator's manual interactive work. Routing a
|
||||||
|
|
@ -274,13 +175,7 @@ func (d *Dispatcher) isSessionFree(name string) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// assignNextTask scans all inboxes for work to give to a freshly-idled
|
// assignNextTask scans all inboxes for work to give to a freshly-idled session.
|
||||||
// session. Skips tasks that are already delegated to secutools (a
|
|
||||||
// .delegated marker exists alongside the .md) or already dispatched.
|
|
||||||
//
|
|
||||||
// Tasks whose router decision is "delegate" are also skipped here — they
|
|
||||||
// will be picked up by the next dispatchProject scan (which knows how to
|
|
||||||
// submit to secutools).
|
|
||||||
func (d *Dispatcher) assignNextTask(session string) {
|
func (d *Dispatcher) assignNextTask(session string) {
|
||||||
for _, ds := range d.config.Pool.Dedicated {
|
for _, ds := range d.config.Pool.Dedicated {
|
||||||
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
|
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
|
||||||
|
|
@ -288,30 +183,11 @@ func (d *Dispatcher) assignNextTask(session string) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Build a set of base names already delegated, so we can skip the
|
|
||||||
// associated .md without re-dispatching it locally.
|
|
||||||
delegated := make(map[string]bool)
|
|
||||||
for _, e := range entries {
|
for _, e := range entries {
|
||||||
if strings.HasSuffix(e.Name(), ".md.delegated") {
|
if !strings.HasSuffix(e.Name(), ".md") || strings.Contains(e.Name(), ".dispatched") {
|
||||||
delegated[strings.TrimSuffix(e.Name(), ".delegated")] = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, e := range entries {
|
|
||||||
if !strings.HasSuffix(e.Name(), ".md") ||
|
|
||||||
strings.Contains(e.Name(), ".dispatched") ||
|
|
||||||
strings.Contains(e.Name(), ".delegated") {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if delegated[e.Name()] {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
taskPath := filepath.Join(inbox, e.Name())
|
taskPath := filepath.Join(inbox, e.Name())
|
||||||
// Respect routing decisions: don't take a delegated task here.
|
|
||||||
if d.delegation != nil {
|
|
||||||
if dec, _, err := d.routeTask(taskPath); err == nil && dec.Provider.IsDelegated() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := d.launchAgent(session, ds.Project, taskPath); err == nil {
|
if err := d.launchAgent(session, ds.Project, taskPath); err == nil {
|
||||||
os.Rename(taskPath, taskPath+".dispatched")
|
os.Rename(taskPath, taskPath+".dispatched")
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -1,318 +0,0 @@
|
||||||
package dispatcher
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/delegation"
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/router"
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/secutools"
|
|
||||||
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
|
||||||
)
|
|
||||||
|
|
||||||
// fakeSecutools is a minimal stub used for routing tests. It records every
|
|
||||||
// SubmitJob call so tests can assert on the number/contents of submissions.
|
|
||||||
type fakeSecutools struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
submits []*secutools.JobRequest
|
|
||||||
statuses map[string]string
|
|
||||||
results map[string]*secutools.JobResult
|
|
||||||
nextID int
|
|
||||||
}
|
|
||||||
|
|
||||||
func newFakeSecutools() *fakeSecutools {
|
|
||||||
return &fakeSecutools{
|
|
||||||
statuses: make(map[string]string),
|
|
||||||
results: make(map[string]*secutools.JobResult),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakeSecutools) SubmitJob(_ context.Context, req *secutools.JobRequest) (*secutools.JobResponse, error) {
|
|
||||||
f.mu.Lock()
|
|
||||||
defer f.mu.Unlock()
|
|
||||||
f.submits = append(f.submits, req)
|
|
||||||
f.nextID++
|
|
||||||
id := "job-" + itoa(f.nextID)
|
|
||||||
f.statuses[id] = "pending"
|
|
||||||
return &secutools.JobResponse{JobID: id, Status: "pending"}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakeSecutools) GetJob(_ context.Context, id string) (*secutools.JobStatus, error) {
|
|
||||||
f.mu.Lock()
|
|
||||||
defer f.mu.Unlock()
|
|
||||||
st, ok := f.statuses[id]
|
|
||||||
if !ok {
|
|
||||||
return nil, errors.New("unknown")
|
|
||||||
}
|
|
||||||
return &secutools.JobStatus{JobID: id, Status: st}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakeSecutools) WaitForResult(_ context.Context, id string, _ time.Duration) (*secutools.JobResult, error) {
|
|
||||||
f.mu.Lock()
|
|
||||||
defer f.mu.Unlock()
|
|
||||||
if r, ok := f.results[id]; ok {
|
|
||||||
return r, nil
|
|
||||||
}
|
|
||||||
return nil, errors.New("no result")
|
|
||||||
}
|
|
||||||
|
|
||||||
// (uses itoa from dispatcher.go)
|
|
||||||
|
|
||||||
func setupTaskFile(t *testing.T, frontmatter, body string) (projectDir, inbox, taskPath string) {
|
|
||||||
t.Helper()
|
|
||||||
projectDir = t.TempDir()
|
|
||||||
inbox = filepath.Join(projectDir, ".agent-queue", "inbox")
|
|
||||||
if err := os.MkdirAll(inbox, 0755); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
content := "---\n" + frontmatter + "\n---\n" + body
|
|
||||||
taskPath = filepath.Join(inbox, "task-routing.md")
|
|
||||||
if err := os.WriteFile(taskPath, []byte(content), 0644); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
return projectDir, inbox, taskPath
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestRouteTask_ParsesNewFrontmatterFields ensures the dispatcher actually
|
|
||||||
// reads the new preferred_ai/allow_delegation/needs_claude_code fields.
|
|
||||||
func TestRouteTask_ParsesNewFrontmatterFields(t *testing.T) {
|
|
||||||
_, _, taskPath := setupTaskFile(t,
|
|
||||||
"title: Demo\npreferred_ai: gpu\nallow_delegation: true\ncomplexity_hint: low",
|
|
||||||
"Body of task.")
|
|
||||||
|
|
||||||
d := &Dispatcher{logger: log.Default()}
|
|
||||||
dec, body, err := d.routeTask(taskPath)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if dec.Provider != router.ProviderGPU {
|
|
||||||
t.Errorf("expected ProviderGPU, got %v (%s)", dec.Provider, dec.Reason)
|
|
||||||
}
|
|
||||||
if body != "Body of task." {
|
|
||||||
t.Errorf("body parsed wrong: %q", body)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestDispatchProject_DelegatesGPUTask: a task with allow_delegation=true
|
|
||||||
// and preferred_ai=gpu must be sent to secutools and never reach a tmux
|
|
||||||
// session, even when one is available.
|
|
||||||
func TestDispatchProject_DelegatesGPUTask(t *testing.T) {
|
|
||||||
projectDir, inbox, taskPath := setupTaskFile(t,
|
|
||||||
"title: Delegated\npreferred_ai: gpu\nallow_delegation: true",
|
|
||||||
"Analyze something.")
|
|
||||||
|
|
||||||
tc := newMockTmux()
|
|
||||||
tc.sessions["pool-0"] = true
|
|
||||||
tc.paneOutput["pool-0"] = "❯ "
|
|
||||||
|
|
||||||
s := state.New("")
|
|
||||||
s.SetIdle("pool-0")
|
|
||||||
|
|
||||||
fc := newFakeSecutools()
|
|
||||||
mgr := delegation.New(fc, time.Millisecond)
|
|
||||||
|
|
||||||
d := &Dispatcher{
|
|
||||||
tmux: tc,
|
|
||||||
state: s,
|
|
||||||
config: &config.Config{
|
|
||||||
Pool: config.PoolConfig{
|
|
||||||
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
logger: log.Default(),
|
|
||||||
delegation: mgr,
|
|
||||||
}
|
|
||||||
|
|
||||||
d.dispatchProject(inbox)
|
|
||||||
|
|
||||||
if len(fc.submits) != 1 {
|
|
||||||
t.Fatalf("expected 1 SubmitJob call, got %d", len(fc.submits))
|
|
||||||
}
|
|
||||||
if fc.submits[0].PreferredAI != "gpu" {
|
|
||||||
t.Errorf("expected preferred_ai=gpu, got %q", fc.submits[0].PreferredAI)
|
|
||||||
}
|
|
||||||
// .delegated marker is present, original .md kept (the reaper finalises later).
|
|
||||||
if _, err := os.Stat(taskPath + ".delegated"); err != nil {
|
|
||||||
t.Errorf("expected .delegated marker, got %v", err)
|
|
||||||
}
|
|
||||||
if _, err := os.Stat(taskPath); err != nil {
|
|
||||||
t.Errorf("expected original .md to remain until reaped, got %v", err)
|
|
||||||
}
|
|
||||||
// pool-0 must remain idle (we did NOT launch a Claude Code agent).
|
|
||||||
if st := s.GetSession("pool-0"); st == nil || st.State != "idle" {
|
|
||||||
t.Errorf("expected pool-0 to stay idle, got %v", st)
|
|
||||||
}
|
|
||||||
_ = projectDir
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestDispatchProject_LegacyTaskKeepsClaudeCode: backward-compat. A task
|
|
||||||
// with no new fields (or allow_delegation=false implicit) MUST go to a
|
|
||||||
// local Claude Code session.
|
|
||||||
func TestDispatchProject_LegacyTaskKeepsClaudeCode(t *testing.T) {
|
|
||||||
_, inbox, taskPath := setupTaskFile(t,
|
|
||||||
"title: Legacy\npriority: default",
|
|
||||||
"Do classic work.")
|
|
||||||
|
|
||||||
tc := newMockTmux()
|
|
||||||
tc.sessions["pool-0"] = true
|
|
||||||
tc.paneOutput["pool-0"] = "❯ "
|
|
||||||
|
|
||||||
s := state.New("")
|
|
||||||
s.SetIdle("pool-0")
|
|
||||||
|
|
||||||
fc := newFakeSecutools()
|
|
||||||
mgr := delegation.New(fc, time.Millisecond)
|
|
||||||
|
|
||||||
d := &Dispatcher{
|
|
||||||
tmux: tc,
|
|
||||||
state: s,
|
|
||||||
config: &config.Config{
|
|
||||||
Pool: config.PoolConfig{
|
|
||||||
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
logger: log.Default(),
|
|
||||||
delegation: mgr,
|
|
||||||
}
|
|
||||||
|
|
||||||
d.dispatchProject(inbox)
|
|
||||||
|
|
||||||
if len(fc.submits) != 0 {
|
|
||||||
t.Errorf("legacy task must NOT delegate, got %d submits", len(fc.submits))
|
|
||||||
}
|
|
||||||
// .dispatched marker present, session is working.
|
|
||||||
if _, err := os.Stat(taskPath + ".dispatched"); err != nil {
|
|
||||||
t.Errorf("expected .dispatched marker for Claude Code path, got %v", err)
|
|
||||||
}
|
|
||||||
if st := s.GetSession("pool-0"); st == nil || st.State != "working" {
|
|
||||||
t.Errorf("expected pool-0 working, got %v", st)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestDispatchProject_NeedsClaudeCodeBypassesDelegation: even with
|
|
||||||
// allow_delegation=true, needs_claude_code: true forces local execution.
|
|
||||||
func TestDispatchProject_NeedsClaudeCodeBypassesDelegation(t *testing.T) {
|
|
||||||
_, inbox, _ := setupTaskFile(t,
|
|
||||||
"title: Bypass\npreferred_ai: gpu\nallow_delegation: true\nneeds_claude_code: true",
|
|
||||||
"This needs a real Claude session.")
|
|
||||||
|
|
||||||
tc := newMockTmux()
|
|
||||||
tc.sessions["pool-0"] = true
|
|
||||||
tc.paneOutput["pool-0"] = "❯ "
|
|
||||||
|
|
||||||
s := state.New("")
|
|
||||||
s.SetIdle("pool-0")
|
|
||||||
|
|
||||||
fc := newFakeSecutools()
|
|
||||||
mgr := delegation.New(fc, time.Millisecond)
|
|
||||||
|
|
||||||
d := &Dispatcher{
|
|
||||||
tmux: tc,
|
|
||||||
state: s,
|
|
||||||
config: &config.Config{
|
|
||||||
Pool: config.PoolConfig{
|
|
||||||
Autonomous: config.AutonomousConfig{Prefix: "pool-", Max: 1},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
logger: log.Default(),
|
|
||||||
delegation: mgr,
|
|
||||||
}
|
|
||||||
|
|
||||||
d.dispatchProject(inbox)
|
|
||||||
|
|
||||||
if len(fc.submits) != 0 {
|
|
||||||
t.Errorf("needs_claude_code must skip delegation, got %d submits", len(fc.submits))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestEndToEnd_DelegationFlow: full happy path from inbox → submit →
|
|
||||||
// reaper → done/, with the dispatcher and delegation manager wired up.
|
|
||||||
func TestEndToEnd_DelegationFlow(t *testing.T) {
|
|
||||||
projectDir, inbox, taskPath := setupTaskFile(t,
|
|
||||||
"title: E2E\npreferred_ai: auto\nallow_delegation: true",
|
|
||||||
"Summarize this.")
|
|
||||||
|
|
||||||
tc := newMockTmux()
|
|
||||||
s := state.New("")
|
|
||||||
|
|
||||||
fc := newFakeSecutools()
|
|
||||||
mgr := delegation.New(fc, time.Millisecond)
|
|
||||||
|
|
||||||
d := &Dispatcher{
|
|
||||||
tmux: tc,
|
|
||||||
state: s,
|
|
||||||
config: &config.Config{Pool: config.PoolConfig{Autonomous: config.AutonomousConfig{Max: 0}}},
|
|
||||||
logger: log.Default(),
|
|
||||||
delegation: mgr,
|
|
||||||
}
|
|
||||||
|
|
||||||
// 1. Dispatch — submits to fakeSecutools.
|
|
||||||
d.dispatchProject(inbox)
|
|
||||||
if len(fc.submits) != 1 {
|
|
||||||
t.Fatalf("expected 1 submit, got %d", len(fc.submits))
|
|
||||||
}
|
|
||||||
jobID := "job-1"
|
|
||||||
|
|
||||||
// 2. Backend completes the job.
|
|
||||||
fc.mu.Lock()
|
|
||||||
fc.statuses[jobID] = "completed"
|
|
||||||
fc.results[jobID] = &secutools.JobResult{
|
|
||||||
JobID: jobID, Response: "Summary", Provider: "gpu", Model: "qwen", CostCAD: 0.001,
|
|
||||||
}
|
|
||||||
fc.mu.Unlock()
|
|
||||||
|
|
||||||
// 3. Reaper picks it up. mgr.Run drives reapOnce on each ticker tick;
|
|
||||||
// interval was set to 1ms in newDelegationManager so a 200ms
|
|
||||||
// context yields many cycles.
|
|
||||||
mgr.SetLogger(log.Default())
|
|
||||||
delegationReapNow(t, mgr, jobID)
|
|
||||||
|
|
||||||
donePath := filepath.Join(projectDir, ".agent-queue", "done", "task-routing.md")
|
|
||||||
body, err := os.ReadFile(donePath)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("missing done/ file: %v", err)
|
|
||||||
}
|
|
||||||
if !contains(string(body), "Summary") || !contains(string(body), "provider: gpu") {
|
|
||||||
t.Errorf("done body missing expected fields:\n%s", body)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Marker and original .md gone.
|
|
||||||
if _, err := os.Stat(taskPath + ".delegated"); !os.IsNotExist(err) {
|
|
||||||
t.Error("delegated marker should be removed")
|
|
||||||
}
|
|
||||||
if _, err := os.Stat(taskPath); !os.IsNotExist(err) {
|
|
||||||
t.Error("original .md should be removed")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// delegationReapNow drives one reap cycle of mgr by calling its public
|
|
||||||
// API. We need this because reapOnce is unexported in the delegation
|
|
||||||
// package. The Run loop ticker is too slow for tests, so we expose a
|
|
||||||
// trivial ticker-equivalent: temporarily run with an immediate context.
|
|
||||||
func delegationReapNow(t *testing.T, mgr *delegation.Manager, _ string) {
|
|
||||||
t.Helper()
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
|
||||||
defer cancel()
|
|
||||||
// Run the manager briefly; the ticker fires after `interval` (1ms in
|
|
||||||
// these tests) so within 200ms we get many reap cycles.
|
|
||||||
mgr.Run(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func contains(s, sub string) bool {
|
|
||||||
if sub == "" {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
for i := 0; i+len(sub) <= len(s); i++ {
|
|
||||||
if s[i:i+len(sub)] == sub {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
@ -1,95 +0,0 @@
|
||||||
// Package router decides whether a task should run on a local Claude Code
|
|
||||||
// session (current Phase 1 behaviour) or be delegated to the centralized
|
|
||||||
// SecuAAS secutools AI platform (Phase 2 chantier E).
|
|
||||||
//
|
|
||||||
// The decision is driven entirely by the task's YAML frontmatter; no
|
|
||||||
// network call is performed in Decide.
|
|
||||||
package router
|
|
||||||
|
|
||||||
import "strings"
|
|
||||||
|
|
||||||
// Provider is the destination chosen for a task.
|
|
||||||
type Provider string
|
|
||||||
|
|
||||||
const (
|
|
||||||
// ProviderClaudeCode means dispatch to a local ccl-auto tmux session
|
|
||||||
// running Claude Code. This is the Phase 1 behaviour.
|
|
||||||
ProviderClaudeCode Provider = "claude-code"
|
|
||||||
|
|
||||||
// ProviderAuto means delegate to secutools and let its smart_triage
|
|
||||||
// router pick the actual backend (GPU > Claude > Gemini fallback chain).
|
|
||||||
ProviderAuto Provider = "auto"
|
|
||||||
|
|
||||||
// ProviderGPU pins delegation to the in-cluster vLLM GPU pool.
|
|
||||||
ProviderGPU Provider = "gpu"
|
|
||||||
|
|
||||||
// ProviderGemini pins delegation to Google Gemini via secutools.
|
|
||||||
ProviderGemini Provider = "gemini"
|
|
||||||
|
|
||||||
// ProviderClaudeAPI pins delegation to the Anthropic API (NOT Claude
|
|
||||||
// Code locally — this means stateless API calls billed to the secutools
|
|
||||||
// account).
|
|
||||||
ProviderClaudeAPI Provider = "claude-api"
|
|
||||||
)
|
|
||||||
|
|
||||||
// IsDelegated reports whether p means "submit to secutools" (i.e. not the
|
|
||||||
// local Claude Code path).
|
|
||||||
func (p Provider) IsDelegated() bool {
|
|
||||||
switch p {
|
|
||||||
case ProviderAuto, ProviderGPU, ProviderGemini, ProviderClaudeAPI:
|
|
||||||
return true
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Decision is the output of Decide: which provider to use, plus a short
|
|
||||||
// human-readable reason for logging and the /api/delegated/status endpoint.
|
|
||||||
type Decision struct {
|
|
||||||
Provider Provider
|
|
||||||
Reason string
|
|
||||||
}
|
|
||||||
|
|
||||||
// Task is the slice of frontmatter fields the router cares about. It is
|
|
||||||
// intentionally narrower than the dispatcher's full TaskFrontmatter so that
|
|
||||||
// router tests don't need to import the dispatcher.
|
|
||||||
type Task struct {
|
|
||||||
PreferredAI string // auto | claude-code | gpu | gemini | claude-api (case-insensitive)
|
|
||||||
AllowDelegation bool // default false → backward-compatible (Claude Code)
|
|
||||||
NeedsClaudeCode bool // legacy bypass — forces ProviderClaudeCode
|
|
||||||
ComplexityHint string // low | medium | high (informational only for now)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Decide returns the routing decision for the given task.
|
|
||||||
//
|
|
||||||
// Order of precedence:
|
|
||||||
// 1. needs_claude_code: true → ProviderClaudeCode (legacy bypass, never delegate)
|
|
||||||
// 2. preferred_ai: claude-code → ProviderClaudeCode (explicit)
|
|
||||||
// 3. allow_delegation: false → ProviderClaudeCode (default safety net)
|
|
||||||
// 4. preferred_ai parses to a delegated provider → that provider
|
|
||||||
// 5. fallback → ProviderAuto (let secutools smart_triage decide)
|
|
||||||
func Decide(t Task) Decision {
|
|
||||||
if t.NeedsClaudeCode {
|
|
||||||
return Decision{ProviderClaudeCode, "needs_claude_code=true"}
|
|
||||||
}
|
|
||||||
|
|
||||||
pref := strings.ToLower(strings.TrimSpace(t.PreferredAI))
|
|
||||||
if pref == string(ProviderClaudeCode) {
|
|
||||||
return Decision{ProviderClaudeCode, "preferred_ai=claude-code"}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !t.AllowDelegation {
|
|
||||||
return Decision{ProviderClaudeCode, "allow_delegation=false (default)"}
|
|
||||||
}
|
|
||||||
|
|
||||||
switch Provider(pref) {
|
|
||||||
case ProviderGPU, ProviderGemini, ProviderClaudeAPI:
|
|
||||||
return Decision{Provider(pref), "preferred_ai=" + pref}
|
|
||||||
case ProviderAuto, "":
|
|
||||||
return Decision{ProviderAuto, "preferred_ai=auto (smart_triage)"}
|
|
||||||
default:
|
|
||||||
// Unknown value: fail safe to local Claude Code so a typo doesn't
|
|
||||||
// silently route real work to GPU.
|
|
||||||
return Decision{ProviderClaudeCode, "unknown preferred_ai=" + pref + " → fail-safe"}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,93 +0,0 @@
|
||||||
package router
|
|
||||||
|
|
||||||
import "testing"
|
|
||||||
|
|
||||||
// TestDecide_BackwardCompatible: a vanilla task (no new fields) must keep
|
|
||||||
// going through Claude Code. This is the contract that protects every
|
|
||||||
// existing inbox/*.md file.
|
|
||||||
func TestDecide_BackwardCompatible(t *testing.T) {
|
|
||||||
d := Decide(Task{})
|
|
||||||
if d.Provider != ProviderClaudeCode {
|
|
||||||
t.Fatalf("default task must route to Claude Code, got %v (%s)", d.Provider, d.Reason)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDecide_NeedsClaudeCodeWinsOverEverything(t *testing.T) {
|
|
||||||
d := Decide(Task{
|
|
||||||
NeedsClaudeCode: true,
|
|
||||||
AllowDelegation: true,
|
|
||||||
PreferredAI: "gpu",
|
|
||||||
})
|
|
||||||
if d.Provider != ProviderClaudeCode {
|
|
||||||
t.Errorf("needs_claude_code must take precedence, got %v (%s)", d.Provider, d.Reason)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDecide_ExplicitClaudeCode(t *testing.T) {
|
|
||||||
d := Decide(Task{PreferredAI: "claude-code", AllowDelegation: true})
|
|
||||||
if d.Provider != ProviderClaudeCode {
|
|
||||||
t.Errorf("explicit claude-code must route locally, got %v", d.Provider)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDecide_AllowDelegationFalseBlocksDelegation(t *testing.T) {
|
|
||||||
d := Decide(Task{PreferredAI: "gpu", AllowDelegation: false})
|
|
||||||
if d.Provider != ProviderClaudeCode {
|
|
||||||
t.Errorf("allow_delegation=false must override preferred_ai, got %v (%s)",
|
|
||||||
d.Provider, d.Reason)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDecide_DelegatedProviders(t *testing.T) {
|
|
||||||
cases := []struct {
|
|
||||||
pref string
|
|
||||||
want Provider
|
|
||||||
}{
|
|
||||||
{"gpu", ProviderGPU},
|
|
||||||
{"GPU", ProviderGPU},
|
|
||||||
{"gemini", ProviderGemini},
|
|
||||||
{"claude-api", ProviderClaudeAPI},
|
|
||||||
}
|
|
||||||
for _, c := range cases {
|
|
||||||
d := Decide(Task{PreferredAI: c.pref, AllowDelegation: true})
|
|
||||||
if d.Provider != c.want {
|
|
||||||
t.Errorf("preferred_ai=%q want %v, got %v (%s)",
|
|
||||||
c.pref, c.want, d.Provider, d.Reason)
|
|
||||||
}
|
|
||||||
if !d.Provider.IsDelegated() {
|
|
||||||
t.Errorf("preferred_ai=%q should be delegated", c.pref)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDecide_AutoMeansSecutoolsTriage(t *testing.T) {
|
|
||||||
for _, pref := range []string{"auto", ""} {
|
|
||||||
d := Decide(Task{PreferredAI: pref, AllowDelegation: true})
|
|
||||||
if d.Provider != ProviderAuto {
|
|
||||||
t.Errorf("preferred_ai=%q want ProviderAuto, got %v", pref, d.Provider)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDecide_UnknownProviderFailsSafe(t *testing.T) {
|
|
||||||
d := Decide(Task{PreferredAI: "claude-3-mystery", AllowDelegation: true})
|
|
||||||
if d.Provider != ProviderClaudeCode {
|
|
||||||
t.Errorf("unknown provider must fail-safe to Claude Code, got %v (%s)",
|
|
||||||
d.Provider, d.Reason)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestProvider_IsDelegated(t *testing.T) {
|
|
||||||
cases := map[Provider]bool{
|
|
||||||
ProviderClaudeCode: false,
|
|
||||||
ProviderAuto: true,
|
|
||||||
ProviderGPU: true,
|
|
||||||
ProviderGemini: true,
|
|
||||||
ProviderClaudeAPI: true,
|
|
||||||
}
|
|
||||||
for p, want := range cases {
|
|
||||||
if got := p.IsDelegated(); got != want {
|
|
||||||
t.Errorf("%v.IsDelegated() = %v, want %v", p, got, want)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,290 +0,0 @@
|
||||||
// Package secutools provides a minimal HTTP client for the centralized SecuAAS
|
|
||||||
// AI-batch platform (https://api.secutools.secuaas.ovh).
|
|
||||||
//
|
|
||||||
// Phase 2 — Chantier E: the dispatcher delegates non-Claude-Code-eligible
|
|
||||||
// tasks to secutools (GPU/Gemini/Claude API) instead of dispatching them to
|
|
||||||
// a local ccl-auto tmux session. This package is the Go side of that
|
|
||||||
// delegation: SubmitJob, GetJob, WaitForResult.
|
|
||||||
//
|
|
||||||
// The Client interface is intentionally narrow so tests can plug a fake
|
|
||||||
// implementation without any network dependency.
|
|
||||||
package secutools
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Client is the abstraction the rest of the daemon uses to talk to secutools.
|
|
||||||
// Real callers use HTTPClient; tests substitute a mock.
|
|
||||||
type Client interface {
|
|
||||||
SubmitJob(ctx context.Context, req *JobRequest) (*JobResponse, error)
|
|
||||||
GetJob(ctx context.Context, id string) (*JobStatus, error)
|
|
||||||
WaitForResult(ctx context.Context, id string, timeout time.Duration) (*JobResult, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// JobType mirrors the secutools job-type enum.
|
|
||||||
type JobType string
|
|
||||||
|
|
||||||
const (
|
|
||||||
TypeAnalyze JobType = "ai:analyze"
|
|
||||||
TypeBatch JobType = "ai:batch"
|
|
||||||
TypeReport JobType = "ai:report"
|
|
||||||
TypeCorrelate JobType = "ai:correlate"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Priority mirrors the secutools priority enum.
|
|
||||||
type Priority string
|
|
||||||
|
|
||||||
const (
|
|
||||||
PriorityCritical Priority = "critical"
|
|
||||||
PriorityHigh Priority = "high"
|
|
||||||
PriorityDefault Priority = "default"
|
|
||||||
PriorityLow Priority = "low"
|
|
||||||
)
|
|
||||||
|
|
||||||
// JobRequest is the body of POST /api/v1/jobs.
|
|
||||||
type JobRequest struct {
|
|
||||||
Type JobType `json:"type"`
|
|
||||||
Priority Priority `json:"priority,omitempty"`
|
|
||||||
Prompt string `json:"prompt"`
|
|
||||||
Data map[string]any `json:"data,omitempty"`
|
|
||||||
MaxTokens int `json:"max_tokens,omitempty"`
|
|
||||||
PreferredAI string `json:"preferred_ai,omitempty"`
|
|
||||||
Source string `json:"source,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// JobResponse is the immediate reply from POST /api/v1/jobs.
|
|
||||||
type JobResponse struct {
|
|
||||||
JobID string `json:"job_id"`
|
|
||||||
Status string `json:"status"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// JobStatus is the reply from GET /api/v1/jobs/:id.
|
|
||||||
type JobStatus struct {
|
|
||||||
JobID string `json:"job_id"`
|
|
||||||
Status string `json:"status"` // pending | running | completed | failed | cancelled
|
|
||||||
Provider string `json:"provider,omitempty"`
|
|
||||||
Error string `json:"error,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// JobResult is the reply from GET /api/v1/jobs/:id/result.
|
|
||||||
type JobResult struct {
|
|
||||||
JobID string `json:"job_id"`
|
|
||||||
Response string `json:"response"`
|
|
||||||
Provider string `json:"provider"`
|
|
||||||
Model string `json:"model"`
|
|
||||||
CostCAD float64 `json:"cost_cad"`
|
|
||||||
Tokens int `json:"tokens,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// HTTPClient is the production implementation of Client.
|
|
||||||
type HTTPClient struct {
|
|
||||||
baseURL string
|
|
||||||
apiKey string
|
|
||||||
hc *http.Client
|
|
||||||
maxRetries int
|
|
||||||
baseDelay time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewHTTPClient returns an HTTPClient ready to talk to secutools.
|
|
||||||
// If hc is nil, a default http.Client with a 30s timeout is used.
|
|
||||||
//
|
|
||||||
// The client performs up to 3 retries on transport errors and 5xx
|
|
||||||
// responses, with exponential backoff starting at 500ms (500ms, 1s, 2s).
|
|
||||||
// 4xx responses are returned as errors without retrying.
|
|
||||||
func NewHTTPClient(baseURL, apiKey string, hc *http.Client) *HTTPClient {
|
|
||||||
if hc == nil {
|
|
||||||
hc = &http.Client{Timeout: 30 * time.Second}
|
|
||||||
}
|
|
||||||
return &HTTPClient{
|
|
||||||
baseURL: baseURL,
|
|
||||||
apiKey: apiKey,
|
|
||||||
hc: hc,
|
|
||||||
maxRetries: 3,
|
|
||||||
baseDelay: 500 * time.Millisecond,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetRetryPolicy overrides the default retry policy. Useful for tests.
|
|
||||||
func (c *HTTPClient) SetRetryPolicy(maxRetries int, baseDelay time.Duration) {
|
|
||||||
c.maxRetries = maxRetries
|
|
||||||
c.baseDelay = baseDelay
|
|
||||||
}
|
|
||||||
|
|
||||||
// doWithRetry sends req and retries on transport errors or 5xx responses
|
|
||||||
// using exponential backoff. 4xx is returned without retry. Respects ctx.
|
|
||||||
func (c *HTTPClient) doWithRetry(ctx context.Context, build func() (*http.Request, error)) (*http.Response, error) {
|
|
||||||
var lastErr error
|
|
||||||
delay := c.baseDelay
|
|
||||||
for attempt := 0; attempt <= c.maxRetries; attempt++ {
|
|
||||||
if attempt > 0 {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil, ctx.Err()
|
|
||||||
case <-time.After(delay):
|
|
||||||
}
|
|
||||||
delay *= 2
|
|
||||||
}
|
|
||||||
req, err := build()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
resp, err := c.hc.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
lastErr = err
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Retry 5xx; return success or 4xx immediately.
|
|
||||||
if resp.StatusCode >= 500 && resp.StatusCode <= 599 {
|
|
||||||
raw, _ := io.ReadAll(resp.Body)
|
|
||||||
_ = resp.Body.Close()
|
|
||||||
lastErr = fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(raw))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
||||||
if lastErr == nil {
|
|
||||||
lastErr = errors.New("secutools: unknown transport failure")
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("after %d attempts: %w", c.maxRetries+1, lastErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SubmitJob POSTs req to /api/v1/jobs with retry on 5xx.
|
|
||||||
func (c *HTTPClient) SubmitJob(ctx context.Context, req *JobRequest) (*JobResponse, error) {
|
|
||||||
body, err := json.Marshal(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("marshal request: %w", err)
|
|
||||||
}
|
|
||||||
resp, err := c.doWithRetry(ctx, func() (*http.Request, error) {
|
|
||||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
|
|
||||||
c.baseURL+"/api/v1/jobs", bytes.NewReader(body))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
httpReq.Header.Set("Content-Type", "application/json")
|
|
||||||
httpReq.Header.Set("X-API-Key", c.apiKey)
|
|
||||||
return httpReq, nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("submit job: %w", err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode/100 != 2 {
|
|
||||||
raw, _ := io.ReadAll(resp.Body)
|
|
||||||
return nil, fmt.Errorf("submit job: HTTP %d: %s", resp.StatusCode, string(raw))
|
|
||||||
}
|
|
||||||
|
|
||||||
var out JobResponse
|
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
|
||||||
return nil, fmt.Errorf("decode submit response: %w", err)
|
|
||||||
}
|
|
||||||
return &out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetJob GETs /api/v1/jobs/:id with retry on 5xx.
|
|
||||||
func (c *HTTPClient) GetJob(ctx context.Context, id string) (*JobStatus, error) {
|
|
||||||
resp, err := c.doWithRetry(ctx, func() (*http.Request, error) {
|
|
||||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet,
|
|
||||||
c.baseURL+"/api/v1/jobs/"+id, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
httpReq.Header.Set("X-API-Key", c.apiKey)
|
|
||||||
return httpReq, nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("get job: %w", err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode/100 != 2 {
|
|
||||||
raw, _ := io.ReadAll(resp.Body)
|
|
||||||
return nil, fmt.Errorf("get job: HTTP %d: %s", resp.StatusCode, string(raw))
|
|
||||||
}
|
|
||||||
|
|
||||||
var out JobStatus
|
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
|
||||||
return nil, fmt.Errorf("decode get response: %w", err)
|
|
||||||
}
|
|
||||||
return &out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// getResult fetches the final payload of a completed job with retry on 5xx.
|
|
||||||
func (c *HTTPClient) getResult(ctx context.Context, id string) (*JobResult, error) {
|
|
||||||
resp, err := c.doWithRetry(ctx, func() (*http.Request, error) {
|
|
||||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet,
|
|
||||||
c.baseURL+"/api/v1/jobs/"+id+"/result", nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
httpReq.Header.Set("X-API-Key", c.apiKey)
|
|
||||||
return httpReq, nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("get result: %w", err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode/100 != 2 {
|
|
||||||
raw, _ := io.ReadAll(resp.Body)
|
|
||||||
return nil, fmt.Errorf("get result: HTTP %d: %s", resp.StatusCode, string(raw))
|
|
||||||
}
|
|
||||||
|
|
||||||
var out JobResult
|
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
|
||||||
return nil, fmt.Errorf("decode result: %w", err)
|
|
||||||
}
|
|
||||||
return &out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ErrJobFailed is returned by WaitForResult when secutools reports the job
|
|
||||||
// as terminally failed (no result will ever be produced).
|
|
||||||
var ErrJobFailed = errors.New("secutools: job failed")
|
|
||||||
|
|
||||||
// ErrTimeout is returned by WaitForResult when the polling deadline elapses
|
|
||||||
// before the job reaches a terminal state.
|
|
||||||
var ErrTimeout = errors.New("secutools: wait timeout")
|
|
||||||
|
|
||||||
// WaitForResult polls /api/v1/jobs/:id every 2s until the job reaches a
|
|
||||||
// terminal state (completed/failed/cancelled) or timeout elapses.
|
|
||||||
// On completed, fetches and returns the result.
|
|
||||||
//
|
|
||||||
// Polling cadence is intentionally fixed (not configurable) to keep the
|
|
||||||
// reaper goroutine simple. If callers need a different cadence they can
|
|
||||||
// implement it themselves on top of GetJob/getResult.
|
|
||||||
func (c *HTTPClient) WaitForResult(ctx context.Context, id string, timeout time.Duration) (*JobResult, error) {
|
|
||||||
deadline := time.Now().Add(timeout)
|
|
||||||
ticker := time.NewTicker(2 * time.Second)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
st, err := c.GetJob(ctx, id)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
switch st.Status {
|
|
||||||
case "completed":
|
|
||||||
return c.getResult(ctx, id)
|
|
||||||
case "failed", "cancelled":
|
|
||||||
return nil, fmt.Errorf("%w: status=%s err=%s", ErrJobFailed, st.Status, st.Error)
|
|
||||||
}
|
|
||||||
|
|
||||||
if time.Now().After(deadline) {
|
|
||||||
return nil, ErrTimeout
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil, ctx.Err()
|
|
||||||
case <-ticker.C:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,190 +0,0 @@
|
||||||
package secutools
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"net/http"
|
|
||||||
"net/http/httptest"
|
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TestSubmitJob_HappyPath verifies the request body and headers match the
|
|
||||||
// secutools contract and the response is decoded.
|
|
||||||
func TestSubmitJob_HappyPath(t *testing.T) {
|
|
||||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if r.URL.Path != "/api/v1/jobs" {
|
|
||||||
t.Errorf("unexpected path %q", r.URL.Path)
|
|
||||||
}
|
|
||||||
if r.Method != http.MethodPost {
|
|
||||||
t.Errorf("unexpected method %q", r.Method)
|
|
||||||
}
|
|
||||||
if r.Header.Get("X-API-Key") != "key123" {
|
|
||||||
t.Errorf("missing/incorrect X-API-Key: %q", r.Header.Get("X-API-Key"))
|
|
||||||
}
|
|
||||||
var got JobRequest
|
|
||||||
if err := json.NewDecoder(r.Body).Decode(&got); err != nil {
|
|
||||||
t.Fatalf("decode: %v", err)
|
|
||||||
}
|
|
||||||
if got.Type != TypeAnalyze || got.PreferredAI != "gpu" {
|
|
||||||
t.Errorf("payload mismatch: %+v", got)
|
|
||||||
}
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
_, _ = w.Write([]byte(`{"job_id":"abc","status":"pending"}`))
|
|
||||||
}))
|
|
||||||
defer srv.Close()
|
|
||||||
|
|
||||||
c := NewHTTPClient(srv.URL, "key123", srv.Client())
|
|
||||||
resp, err := c.SubmitJob(context.Background(), &JobRequest{
|
|
||||||
Type: TypeAnalyze,
|
|
||||||
Priority: PriorityHigh,
|
|
||||||
Prompt: "hi",
|
|
||||||
PreferredAI: "gpu",
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("SubmitJob: %v", err)
|
|
||||||
}
|
|
||||||
if resp.JobID != "abc" || resp.Status != "pending" {
|
|
||||||
t.Errorf("unexpected response: %+v", resp)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubmitJob_HTTPError surfaces non-2xx responses as errors.
|
|
||||||
func TestSubmitJob_HTTPError(t *testing.T) {
|
|
||||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
|
||||||
_, _ = w.Write([]byte("boom"))
|
|
||||||
}))
|
|
||||||
defer srv.Close()
|
|
||||||
|
|
||||||
c := NewHTTPClient(srv.URL, "k", srv.Client())
|
|
||||||
if _, err := c.SubmitJob(context.Background(), &JobRequest{Type: TypeAnalyze, Prompt: "p"}); err == nil {
|
|
||||||
t.Fatal("expected error on HTTP 500, got nil")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestWaitForResult_PollsUntilCompleted verifies the polling loop transitions
|
|
||||||
// pending → running → completed and fetches the result.
|
|
||||||
func TestWaitForResult_PollsUntilCompleted(t *testing.T) {
|
|
||||||
var calls atomic.Int64
|
|
||||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
switch r.URL.Path {
|
|
||||||
case "/api/v1/jobs/job1":
|
|
||||||
n := calls.Add(1)
|
|
||||||
status := "pending"
|
|
||||||
if n >= 2 {
|
|
||||||
status = "completed"
|
|
||||||
}
|
|
||||||
_, _ = w.Write([]byte(`{"job_id":"job1","status":"` + status + `","provider":"gpu"}`))
|
|
||||||
case "/api/v1/jobs/job1/result":
|
|
||||||
_, _ = w.Write([]byte(`{"job_id":"job1","response":"done","provider":"gpu","cost_cad":0.005}`))
|
|
||||||
default:
|
|
||||||
t.Errorf("unexpected path %q", r.URL.Path)
|
|
||||||
}
|
|
||||||
}))
|
|
||||||
defer srv.Close()
|
|
||||||
|
|
||||||
c := NewHTTPClient(srv.URL, "k", srv.Client())
|
|
||||||
// Override poll cadence indirectly: short timeout proves we don't spin
|
|
||||||
// 2s per poll; the test runs in well under 10s real time.
|
|
||||||
res, err := c.WaitForResult(context.Background(), "job1", 30*time.Second)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("WaitForResult: %v", err)
|
|
||||||
}
|
|
||||||
if res.Response != "done" || res.Provider != "gpu" {
|
|
||||||
t.Errorf("unexpected result: %+v", res)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestWaitForResult_FailedJob returns ErrJobFailed when secutools reports
|
|
||||||
// terminal failure.
|
|
||||||
func TestWaitForResult_FailedJob(t *testing.T) {
|
|
||||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
_, _ = w.Write([]byte(`{"job_id":"jobX","status":"failed","error":"oom"}`))
|
|
||||||
}))
|
|
||||||
defer srv.Close()
|
|
||||||
|
|
||||||
c := NewHTTPClient(srv.URL, "k", srv.Client())
|
|
||||||
_, err := c.WaitForResult(context.Background(), "jobX", 5*time.Second)
|
|
||||||
if !errors.Is(err, ErrJobFailed) {
|
|
||||||
t.Errorf("expected ErrJobFailed, got %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubmitJob_RetriesOn5xx verifies the client retries transient 500s
|
|
||||||
// and succeeds on a later attempt. Uses a tight retry delay so the test
|
|
||||||
// runs in milliseconds.
|
|
||||||
func TestSubmitJob_RetriesOn5xx(t *testing.T) {
|
|
||||||
var calls atomic.Int64
|
|
||||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
n := calls.Add(1)
|
|
||||||
if n < 3 {
|
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
|
||||||
_, _ = w.Write([]byte("transient"))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
_, _ = w.Write([]byte(`{"job_id":"ok","status":"pending"}`))
|
|
||||||
}))
|
|
||||||
defer srv.Close()
|
|
||||||
|
|
||||||
c := NewHTTPClient(srv.URL, "k", srv.Client())
|
|
||||||
c.SetRetryPolicy(3, 1*time.Millisecond)
|
|
||||||
|
|
||||||
resp, err := c.SubmitJob(context.Background(), &JobRequest{Type: TypeAnalyze, Prompt: "p"})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("expected success after retries, got %v (calls=%d)", err, calls.Load())
|
|
||||||
}
|
|
||||||
if resp.JobID != "ok" {
|
|
||||||
t.Errorf("unexpected response: %+v", resp)
|
|
||||||
}
|
|
||||||
if calls.Load() != 3 {
|
|
||||||
t.Errorf("expected 3 attempts, got %d", calls.Load())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubmitJob_DoesNotRetry4xx ensures client errors short-circuit
|
|
||||||
// without burning retries (e.g. wrong API key).
|
|
||||||
func TestSubmitJob_DoesNotRetry4xx(t *testing.T) {
|
|
||||||
var calls atomic.Int64
|
|
||||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
calls.Add(1)
|
|
||||||
w.WriteHeader(http.StatusUnauthorized)
|
|
||||||
_, _ = w.Write([]byte("bad key"))
|
|
||||||
}))
|
|
||||||
defer srv.Close()
|
|
||||||
|
|
||||||
c := NewHTTPClient(srv.URL, "k", srv.Client())
|
|
||||||
c.SetRetryPolicy(3, 1*time.Millisecond)
|
|
||||||
|
|
||||||
_, err := c.SubmitJob(context.Background(), &JobRequest{Type: TypeAnalyze, Prompt: "p"})
|
|
||||||
if err == nil {
|
|
||||||
t.Fatal("expected error on 401")
|
|
||||||
}
|
|
||||||
if calls.Load() != 1 {
|
|
||||||
t.Errorf("4xx must not retry, got %d calls", calls.Load())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestWaitForResult_ContextCancel exits cleanly when the parent context is
|
|
||||||
// cancelled mid-poll.
|
|
||||||
func TestWaitForResult_ContextCancel(t *testing.T) {
|
|
||||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
_, _ = w.Write([]byte(`{"job_id":"j","status":"pending"}`))
|
|
||||||
}))
|
|
||||||
defer srv.Close()
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
cancel() // cancel immediately
|
|
||||||
|
|
||||||
c := NewHTTPClient(srv.URL, "k", srv.Client())
|
|
||||||
_, err := c.WaitForResult(ctx, "j", 10*time.Second)
|
|
||||||
if err == nil {
|
|
||||||
t.Fatal("expected error from cancelled context")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,86 +0,0 @@
|
||||||
package secutools
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
)
|
|
||||||
|
|
||||||
// DecideProvider inspects a frontmatter map (as decoded from YAML) and
|
|
||||||
// returns the provider string that should handle the task. Valid return
|
|
||||||
// values:
|
|
||||||
//
|
|
||||||
// - "local" — no delegation, dispatch on a Claude Code session (Phase 1)
|
|
||||||
// - "claude-code" — explicit local dispatch (alias of "local")
|
|
||||||
// - "gpu" — delegate to secutools with preferred_ai=gpu
|
|
||||||
// - "gemini" — delegate to secutools with preferred_ai=gemini
|
|
||||||
// - "claude-api" — delegate to secutools with preferred_ai=claude-api
|
|
||||||
// - "auto" — delegate to secutools, let smart_triage choose
|
|
||||||
//
|
|
||||||
// Precedence:
|
|
||||||
// 1. needs_claude_code: true → "local"
|
|
||||||
// 2. preferred_ai in {claude-code} → "local"
|
|
||||||
// 3. allow_delegation == false / missing → "local"
|
|
||||||
// 4. preferred_ai in {gpu,gemini,claude-api} → that provider
|
|
||||||
// 5. preferred_ai in {"", auto} → "auto"
|
|
||||||
// 6. unknown preferred_ai → "local" (fail-safe)
|
|
||||||
//
|
|
||||||
// This function is intentionally permissive on input types: YAML booleans
|
|
||||||
// may decode as bool, strings as string. It coerces common forms and
|
|
||||||
// returns "local" on malformed input rather than panicking.
|
|
||||||
func DecideProvider(fm map[string]any) string {
|
|
||||||
if fm == nil {
|
|
||||||
return "local"
|
|
||||||
}
|
|
||||||
|
|
||||||
if asBool(fm["needs_claude_code"]) {
|
|
||||||
return "local"
|
|
||||||
}
|
|
||||||
|
|
||||||
pref := strings.ToLower(strings.TrimSpace(asString(fm["preferred_ai"])))
|
|
||||||
if pref == "claude-code" || pref == "local" {
|
|
||||||
return "local"
|
|
||||||
}
|
|
||||||
|
|
||||||
if !asBool(fm["allow_delegation"]) {
|
|
||||||
return "local"
|
|
||||||
}
|
|
||||||
|
|
||||||
switch pref {
|
|
||||||
case "gpu", "gemini", "claude-api":
|
|
||||||
return pref
|
|
||||||
case "", "auto":
|
|
||||||
return "auto"
|
|
||||||
default:
|
|
||||||
// Unknown provider — fail safe to local.
|
|
||||||
return "local"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// asBool accepts bool, "true"/"false", "1"/"0". Defaults to false.
|
|
||||||
func asBool(v any) bool {
|
|
||||||
switch t := v.(type) {
|
|
||||||
case bool:
|
|
||||||
return t
|
|
||||||
case string:
|
|
||||||
s := strings.ToLower(strings.TrimSpace(t))
|
|
||||||
return s == "true" || s == "1" || s == "yes"
|
|
||||||
case int:
|
|
||||||
return t != 0
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// asString coerces v to a trimmed string. Returns "" for nil/unknown types.
|
|
||||||
func asString(v any) string {
|
|
||||||
switch t := v.(type) {
|
|
||||||
case string:
|
|
||||||
return t
|
|
||||||
case fmt.Stringer:
|
|
||||||
return t.String()
|
|
||||||
case nil:
|
|
||||||
return ""
|
|
||||||
default:
|
|
||||||
return fmt.Sprintf("%v", t)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,72 +0,0 @@
|
||||||
package secutools
|
|
||||||
|
|
||||||
import "testing"
|
|
||||||
|
|
||||||
// TestDecideProvider_TableDriven exercises the full decision matrix for
|
|
||||||
// the map-input adapter used by the bash-side ccl-delegate CLI. The
|
|
||||||
// richer Task-struct variant lives in internal/router.
|
|
||||||
func TestDecideProvider_TableDriven(t *testing.T) {
|
|
||||||
cases := []struct {
|
|
||||||
name string
|
|
||||||
fm map[string]any
|
|
||||||
want string
|
|
||||||
}{
|
|
||||||
{"nil map falls back to local", nil, "local"},
|
|
||||||
{"empty map → local (allow_delegation default false)", map[string]any{}, "local"},
|
|
||||||
{"needs_claude_code wins", map[string]any{
|
|
||||||
"needs_claude_code": true,
|
|
||||||
"allow_delegation": true,
|
|
||||||
"preferred_ai": "gpu",
|
|
||||||
}, "local"},
|
|
||||||
{"explicit claude-code stays local", map[string]any{
|
|
||||||
"preferred_ai": "claude-code",
|
|
||||||
"allow_delegation": true,
|
|
||||||
}, "local"},
|
|
||||||
{"allow_delegation=false blocks even with preferred_ai=gpu", map[string]any{
|
|
||||||
"preferred_ai": "gpu",
|
|
||||||
"allow_delegation": false,
|
|
||||||
}, "local"},
|
|
||||||
{"gpu", map[string]any{
|
|
||||||
"preferred_ai": "gpu",
|
|
||||||
"allow_delegation": true,
|
|
||||||
}, "gpu"},
|
|
||||||
{"GPU (case-insensitive)", map[string]any{
|
|
||||||
"preferred_ai": "GPU",
|
|
||||||
"allow_delegation": true,
|
|
||||||
}, "gpu"},
|
|
||||||
{"gemini", map[string]any{
|
|
||||||
"preferred_ai": "gemini",
|
|
||||||
"allow_delegation": true,
|
|
||||||
}, "gemini"},
|
|
||||||
{"claude-api", map[string]any{
|
|
||||||
"preferred_ai": "claude-api",
|
|
||||||
"allow_delegation": true,
|
|
||||||
}, "claude-api"},
|
|
||||||
{"auto", map[string]any{
|
|
||||||
"preferred_ai": "auto",
|
|
||||||
"allow_delegation": true,
|
|
||||||
}, "auto"},
|
|
||||||
{"empty preferred_ai + allow_delegation → auto", map[string]any{
|
|
||||||
"allow_delegation": true,
|
|
||||||
}, "auto"},
|
|
||||||
{"unknown provider → fail-safe local", map[string]any{
|
|
||||||
"preferred_ai": "claude-3-mystery",
|
|
||||||
"allow_delegation": true,
|
|
||||||
}, "local"},
|
|
||||||
{"allow_delegation as string 'true'", map[string]any{
|
|
||||||
"preferred_ai": "gpu",
|
|
||||||
"allow_delegation": "true",
|
|
||||||
}, "gpu"},
|
|
||||||
{"allow_delegation as int 1", map[string]any{
|
|
||||||
"preferred_ai": "gemini",
|
|
||||||
"allow_delegation": 1,
|
|
||||||
}, "gemini"},
|
|
||||||
}
|
|
||||||
for _, c := range cases {
|
|
||||||
t.Run(c.name, func(t *testing.T) {
|
|
||||||
if got := DecideProvider(c.fm); got != c.want {
|
|
||||||
t.Errorf("DecideProvider(%v) = %q, want %q", c.fm, got, c.want)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,214 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
# phase2-E-integration.sh — Phase 2 Chantier E integration test
|
|
||||||
#
|
|
||||||
# Exercises the bash side of multi-provider delegation end-to-end,
|
|
||||||
# without hitting the real secutools API. Flow:
|
|
||||||
#
|
|
||||||
# 1. Start a local mock HTTP server that implements /api/v1/jobs{,/:id,/:id/result}
|
|
||||||
# 2. Build ccl-delegate + ccl-delegate decide against a test task
|
|
||||||
# 3. Call delegate-to-secutools.sh on the task
|
|
||||||
# 4. Assert: status.json = delegated_to_secutools, .delegated marker
|
|
||||||
# with job_id, original .md still in inbox
|
|
||||||
# 5. Drive the mock into 'completed' state
|
|
||||||
# 6. Call poll-delegated-jobs.sh
|
|
||||||
# 7. Assert: done/<task>.md exists with result body + provider/cost
|
|
||||||
# footer, original .md + .delegated marker cleaned up
|
|
||||||
#
|
|
||||||
# Exit 0 on success, non-zero on assertion failure.
|
|
||||||
|
|
||||||
set -euo pipefail
|
|
||||||
|
|
||||||
REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)"
|
|
||||||
SCRIPTS_ROOT="${DEVMGMT_ORCH_DIR:-/home/ubuntu/projects/dev-management/agent-orchestrator}"
|
|
||||||
TMP_ROOT=$(mktemp -d)
|
|
||||||
MOCK_PORT=${MOCK_PORT:-18742}
|
|
||||||
MOCK_STATE="$TMP_ROOT/mock-state"
|
|
||||||
MOCK_LOG="$TMP_ROOT/mock.log"
|
|
||||||
MOCK_PID_FILE="$TMP_ROOT/mock.pid"
|
|
||||||
BIN_DIR="$TMP_ROOT/bin"
|
|
||||||
PROJECTS_BASE="$TMP_ROOT/projects"
|
|
||||||
BUDGETS_FILE="$TMP_ROOT/delegation-budgets.jsonl"
|
|
||||||
|
|
||||||
mkdir -p "$MOCK_STATE" "$BIN_DIR" "$PROJECTS_BASE/test-proj/.agent-queue/inbox"
|
|
||||||
|
|
||||||
cleanup() {
|
|
||||||
if [[ -f "$MOCK_PID_FILE" ]]; then
|
|
||||||
kill "$(cat "$MOCK_PID_FILE")" 2>/dev/null || true
|
|
||||||
fi
|
|
||||||
rm -rf "$TMP_ROOT"
|
|
||||||
}
|
|
||||||
trap cleanup EXIT
|
|
||||||
|
|
||||||
echo "==> building ccl-delegate"
|
|
||||||
GOMODCACHE=/home/ubuntu/go/pkg/mod GOCACHE=/home/ubuntu/.cache/go-build HOME=/home/ubuntu GOSUMDB=off GOTOOLCHAIN=local \
|
|
||||||
/usr/local/go/bin/go build -C "$REPO_ROOT" -o "$BIN_DIR/ccl-delegate" ./cmd/ccl-delegate
|
|
||||||
export PATH="$BIN_DIR:$PATH"
|
|
||||||
export CCL_DELEGATE_BIN="$BIN_DIR/ccl-delegate"
|
|
||||||
|
|
||||||
# ── Mock secutools server (Python) ────────────────────────────────────
|
|
||||||
cat > "$TMP_ROOT/mock-server.py" <<'PY'
|
|
||||||
import http.server, json, os, sys, threading
|
|
||||||
STATE_DIR = os.environ["MOCK_STATE"]
|
|
||||||
os.makedirs(STATE_DIR, exist_ok=True)
|
|
||||||
JOBS = {}
|
|
||||||
|
|
||||||
def _respond(handler, code, body):
|
|
||||||
data = json.dumps(body).encode()
|
|
||||||
handler.send_response(code)
|
|
||||||
handler.send_header("Content-Type", "application/json")
|
|
||||||
handler.send_header("Content-Length", str(len(data)))
|
|
||||||
handler.end_headers()
|
|
||||||
handler.wfile.write(data)
|
|
||||||
|
|
||||||
class H(http.server.BaseHTTPRequestHandler):
|
|
||||||
def log_message(self, fmt, *a): pass
|
|
||||||
def do_POST(self):
|
|
||||||
length = int(self.headers.get("Content-Length", "0"))
|
|
||||||
body = json.loads(self.rfile.read(length).decode() or "{}")
|
|
||||||
if self.path == "/api/v1/jobs":
|
|
||||||
jid = f"job-{len(JOBS)+1}"
|
|
||||||
JOBS[jid] = {"status": "pending", "request": body}
|
|
||||||
# Persist request for the test to drive completion.
|
|
||||||
with open(os.path.join(STATE_DIR, f"{jid}.json"), "w") as f:
|
|
||||||
json.dump({"status":"pending"}, f)
|
|
||||||
return _respond(self, 200, {"job_id": jid, "status": "pending"})
|
|
||||||
_respond(self, 404, {"error": "not found"})
|
|
||||||
def do_GET(self):
|
|
||||||
parts = self.path.strip("/").split("/")
|
|
||||||
# /api/v1/jobs/:id or /api/v1/jobs/:id/result
|
|
||||||
if len(parts) >= 4 and parts[:3] == ["api","v1","jobs"]:
|
|
||||||
jid = parts[3]
|
|
||||||
# read status from state file (test can overwrite)
|
|
||||||
fp = os.path.join(STATE_DIR, f"{jid}.json")
|
|
||||||
if not os.path.exists(fp):
|
|
||||||
return _respond(self, 404, {"error":"unknown job"})
|
|
||||||
with open(fp) as f: state = json.load(f)
|
|
||||||
if len(parts) == 4:
|
|
||||||
# job status
|
|
||||||
return _respond(self, 200, {
|
|
||||||
"job_id": jid,
|
|
||||||
"status": state.get("status","pending"),
|
|
||||||
"provider": state.get("provider","")})
|
|
||||||
if len(parts) == 5 and parts[4] == "result":
|
|
||||||
if state.get("status") != "completed":
|
|
||||||
return _respond(self, 409, {"error":"not completed"})
|
|
||||||
return _respond(self, 200, {
|
|
||||||
"job_id": jid,
|
|
||||||
"response": state.get("response","(no response)"),
|
|
||||||
"provider": state.get("provider","gpu"),
|
|
||||||
"model": state.get("model","qwen-coder"),
|
|
||||||
"cost_cad": state.get("cost_cad", 0.0123),
|
|
||||||
"tokens": state.get("tokens", 1200),
|
|
||||||
})
|
|
||||||
_respond(self, 404, {"error":"not found"})
|
|
||||||
|
|
||||||
port = int(sys.argv[1])
|
|
||||||
srv = http.server.ThreadingHTTPServer(("127.0.0.1", port), H)
|
|
||||||
print(f"MOCK: listening on {port}", flush=True)
|
|
||||||
srv.serve_forever()
|
|
||||||
PY
|
|
||||||
|
|
||||||
echo "==> starting mock secutools on 127.0.0.1:$MOCK_PORT"
|
|
||||||
MOCK_STATE="$MOCK_STATE" python3 "$TMP_ROOT/mock-server.py" "$MOCK_PORT" > "$MOCK_LOG" 2>&1 &
|
|
||||||
echo $! > "$MOCK_PID_FILE"
|
|
||||||
sleep 0.5
|
|
||||||
# Wait up to 3s for the port to be ready
|
|
||||||
for _ in 1 2 3 4 5 6; do
|
|
||||||
if curl -sf "http://127.0.0.1:$MOCK_PORT/api/v1/jobs/nope" >/dev/null 2>&1; then break; fi
|
|
||||||
sleep 0.5
|
|
||||||
done
|
|
||||||
|
|
||||||
# ── Fixture task ──────────────────────────────────────────────────────
|
|
||||||
TASK_PATH="$PROJECTS_BASE/test-proj/.agent-queue/inbox/TASK-phase2E.md"
|
|
||||||
cat > "$TASK_PATH" <<'EOF'
|
|
||||||
---
|
|
||||||
title: Phase 2 E integration
|
|
||||||
priority: default
|
|
||||||
preferred_ai: gpu
|
|
||||||
allow_delegation: true
|
|
||||||
complexity_hint: low
|
|
||||||
---
|
|
||||||
Summarize this paragraph in one sentence.
|
|
||||||
EOF
|
|
||||||
|
|
||||||
# ── Prepare environment for scripts ───────────────────────────────────
|
|
||||||
export CCL_SECUTOOLS_API_KEY="test-key"
|
|
||||||
export CCL_SECUTOOLS_MOCK_URL="http://127.0.0.1:$MOCK_PORT"
|
|
||||||
export CCL_DELEGATION_BUDGETS_FILE="$BUDGETS_FILE"
|
|
||||||
export CCL_DELEGATION_BUDGET_CAD_DAILY="10.00" # generous for test
|
|
||||||
|
|
||||||
# ── Sanity: `ccl-delegate decide` ─────────────────────────────────────
|
|
||||||
echo "==> ccl-delegate decide"
|
|
||||||
dec=$("$BIN_DIR/ccl-delegate" decide --frontmatter="$TASK_PATH")
|
|
||||||
[[ "$dec" == "gpu" ]] || { echo "FAIL: decide expected 'gpu', got '$dec'"; exit 1; }
|
|
||||||
echo " decide=$dec OK"
|
|
||||||
|
|
||||||
# ── Step 1: delegate ──────────────────────────────────────────────────
|
|
||||||
echo "==> delegate-to-secutools.sh"
|
|
||||||
bash "$SCRIPTS_ROOT/delegate-to-secutools.sh" "test-proj" "$TASK_PATH"
|
|
||||||
|
|
||||||
marker="$TASK_PATH.delegated"
|
|
||||||
[[ -f "$marker" ]] || { echo "FAIL: marker missing: $marker"; exit 1; }
|
|
||||||
job_id=$(awk -F'"' '/"job_id"/ { print $4; exit }' "$marker")
|
|
||||||
[[ -n "$job_id" ]] || { echo "FAIL: no job_id in marker"; exit 1; }
|
|
||||||
echo " marker OK (job_id=$job_id)"
|
|
||||||
|
|
||||||
# status.json
|
|
||||||
status_json="$PROJECTS_BASE/test-proj/.agent-queue/status.json"
|
|
||||||
[[ -f "$status_json" ]] || { echo "FAIL: status.json missing"; exit 1; }
|
|
||||||
state=$(awk -F'"' '/"state"/ { print $4; exit }' "$status_json")
|
|
||||||
[[ "$state" == "delegated_to_secutools" ]] || { echo "FAIL: state=$state"; exit 1; }
|
|
||||||
echo " status.json OK (state=delegated_to_secutools)"
|
|
||||||
|
|
||||||
# Original .md must still be in inbox (reaper hasn't processed it yet)
|
|
||||||
[[ -f "$TASK_PATH" ]] || { echo "FAIL: original .md removed prematurely"; exit 1; }
|
|
||||||
|
|
||||||
# ── Step 2: simulate completion in mock and reap ─────────────────────
|
|
||||||
echo "==> driving mock into 'completed' state for $job_id"
|
|
||||||
cat > "$MOCK_STATE/${job_id}.json" <<EOF
|
|
||||||
{"status":"completed","provider":"gpu","model":"qwen-coder","response":"In one sentence: mocked summary.","cost_cad":0.0034,"tokens":321}
|
|
||||||
EOF
|
|
||||||
|
|
||||||
echo "==> poll-delegated-jobs.sh"
|
|
||||||
bash "$SCRIPTS_ROOT/poll-delegated-jobs.sh" --projects-base "$PROJECTS_BASE"
|
|
||||||
|
|
||||||
done_file="$PROJECTS_BASE/test-proj/.agent-queue/done/TASK-phase2E.md"
|
|
||||||
[[ -f "$done_file" ]] || { echo "FAIL: done/ file missing"; exit 1; }
|
|
||||||
grep -q "mocked summary" "$done_file" || { echo "FAIL: response not in done body:"; cat "$done_file"; exit 1; }
|
|
||||||
grep -q "provider: gpu" "$done_file" || { echo "FAIL: provider footer missing:"; cat "$done_file"; exit 1; }
|
|
||||||
grep -q "cost_cad: 0.0034" "$done_file" || { echo "FAIL: cost_cad footer missing:"; cat "$done_file"; exit 1; }
|
|
||||||
echo " done/ OK (has response + provider + cost_cad)"
|
|
||||||
|
|
||||||
# Marker + original .md cleaned up
|
|
||||||
[[ -f "$marker" ]] && { echo "FAIL: marker not cleaned"; exit 1; }
|
|
||||||
[[ -f "$TASK_PATH" ]] && { echo "FAIL: original .md not cleaned"; exit 1; }
|
|
||||||
echo " inbox cleanup OK"
|
|
||||||
|
|
||||||
# Budget tracker recorded the spend
|
|
||||||
if [[ -f "$BUDGETS_FILE" ]]; then
|
|
||||||
grep -q "0.0034" "$BUDGETS_FILE" || echo "WARN: spend not found in $BUDGETS_FILE"
|
|
||||||
grep -q "test-proj" "$BUDGETS_FILE" || echo "WARN: project not found in $BUDGETS_FILE"
|
|
||||||
echo " budget tracker OK"
|
|
||||||
else
|
|
||||||
echo "WARN: no budgets file (maybe unwritable)"
|
|
||||||
fi
|
|
||||||
|
|
||||||
# ── Step 3: assert legacy task (no allow_delegation) exits 2 ─────────
|
|
||||||
echo "==> negative test: legacy task → exit 2"
|
|
||||||
LEGACY="$PROJECTS_BASE/test-proj/.agent-queue/inbox/TASK-legacy.md"
|
|
||||||
cat > "$LEGACY" <<'EOF'
|
|
||||||
---
|
|
||||||
title: Legacy
|
|
||||||
priority: default
|
|
||||||
---
|
|
||||||
Do classic work.
|
|
||||||
EOF
|
|
||||||
set +e
|
|
||||||
bash "$SCRIPTS_ROOT/delegate-to-secutools.sh" "test-proj" "$LEGACY" >/dev/null 2>&1
|
|
||||||
rc=$?
|
|
||||||
set -e
|
|
||||||
[[ $rc -eq 2 ]] || { echo "FAIL: legacy task expected rc=2, got $rc"; exit 1; }
|
|
||||||
echo " legacy task rc=2 OK"
|
|
||||||
|
|
||||||
echo ""
|
|
||||||
echo "=== Phase 2 Chantier E integration: ALL ASSERTIONS PASSED ==="
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue