claude-failover/internal/delegation/delegation.go

409 lines
12 KiB
Go
Raw Normal View History

feat(phase2-E): multi-provider routing via secutools delegation Adds optional delegation of agent-queue tasks to the SecuAAS secutools AI platform (GPU / Gemini / Claude API) instead of dispatching to a local Claude Code tmux session. Per-task opt-in via YAML frontmatter fields preferred_ai, allow_delegation, complexity_hint — absence keeps the Phase 1 behaviour exactly (zero breaking change). Go side: - internal/secutools: HTTP client with exponential-backoff retries (SubmitJob/GetJob/WaitForResult), DecideProvider map adapter for CLI use, table tests. - internal/router: struct-typed Decide() with strict precedence (needs_claude_code > preferred_ai=claude-code > allow_delegation=false > preferred_ai > fail-safe local on unknown). - internal/delegation: Manager submits jobs, writes .md.delegated markers for on-restart recovery, runs a periodic reaper that moves completed jobs into done/ with provider/cost footer and failed jobs into failed/. - internal/dispatcher: WithDelegation() opt-in, routeTask hook before findFreeSession, skips .md.delegated in assignNextTask. - internal/api: /api/delegated/status (active jobs + counters), /watchdog/status extended with delegation counters. - cmd/ccl-delegate: small CLI exposing submit/get/result/decide so the bash dispatcher can call the same contract without duplicating logic. - cmd/claude-failover: delegation wired opt-in via SECUTOOLS_API_KEY. Tests: - 29+ new unit tests across router, secutools, delegation, dispatcher, api packages. go test -race -count=1 clean. - tests/phase2-E-integration.sh: bash end-to-end against a Python stdlib mock HTTP server, exercising the dev-management scripts. Forward-compat with watchdog (Phase 1 B1 already ignores state=delegated_to_secutools) so delegated tasks aren't flagged stale.
2026-04-17 02:17:19 +00:00
// Package delegation submits agent-queue tasks to secutools when the router
// decides they should not run on a local Claude Code session, and reaps
// completed jobs back into the project's done/ or failed/ directory.
//
// Lifecycle of a delegated task in this package:
//
// inbox/<id>.md
// │ Submit() — POST /api/v1/jobs
// ▼
// inbox/<id>.md.delegated (marker file: { project, task, job_id, provider })
// │
// Reaper loop polls every <interval>:
// │ GetJob() → completed
// ▼
// done/<id>.md (frontmatter: completed, body = AI response, footer = cost)
//
// or, on failure:
//
// failed/<id>.md (frontmatter: failed_reason, body = original task)
//
// The .delegated marker doubles as the on-disk source of truth — if the
// daemon restarts mid-flight, the reaper rebuilds its job table from disk.
package delegation
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"forge.secuaas.ovh/olivier/claude-failover/internal/router"
"forge.secuaas.ovh/olivier/claude-failover/internal/secutools"
)
// Marker is the JSON content of an inbox/*.md.delegated file.
type Marker struct {
Project string `json:"project"`
TaskFile string `json:"task_file"`
JobID string `json:"job_id"`
Provider string `json:"provider"`
StartedAt time.Time `json:"started_at"`
}
// DelegatedJob is the in-memory view of a job we are tracking.
type DelegatedJob struct {
Marker
StatusFile string // absolute path to the inbox/*.md.delegated marker
}
// Counters tracks aggregate metrics surfaced via /watchdog/status.
type Counters struct {
Active atomic.Int64
CompletedTotal atomic.Int64
FailedTotal atomic.Int64
}
// Snapshot is a JSON-friendly counter snapshot.
type Snapshot struct {
Active int64 `json:"delegated_active"`
CompletedTotal int64 `json:"delegated_completed_total"`
FailedTotal int64 `json:"delegated_failed_total"`
}
// Snapshot returns the current counter values.
func (c *Counters) Snapshot() Snapshot {
return Snapshot{
Active: c.Active.Load(),
CompletedTotal: c.CompletedTotal.Load(),
FailedTotal: c.FailedTotal.Load(),
}
}
// Manager submits delegated jobs and runs the periodic reaper.
type Manager struct {
client secutools.Client
logger *log.Logger
interval time.Duration
now func() time.Time
mu sync.RWMutex
jobs map[string]*DelegatedJob // key: job_id
Counters Counters
}
// New constructs a Manager. interval is how often the reaper polls
// secutools; if zero, defaults to 30s.
func New(client secutools.Client, interval time.Duration) *Manager {
if interval == 0 {
interval = 30 * time.Second
}
return &Manager{
client: client,
logger: log.Default(),
interval: interval,
now: func() time.Time { return time.Now().UTC() },
jobs: make(map[string]*DelegatedJob),
}
}
// SetLogger overrides the default logger (used by tests).
func (m *Manager) SetLogger(l *log.Logger) { m.logger = l }
// Submit posts the task to secutools and writes a .delegated marker next to
// the original .md so that on restart we can resume tracking.
//
// taskPath is the absolute path of the inbox/<id>.md file. body is the .md
// body (post-frontmatter) used as the prompt. projectDir is the project root
// (for routing the result back to done/).
func (m *Manager) Submit(ctx context.Context, projectDir, taskPath, prompt string,
provider router.Provider, priority secutools.Priority) (*DelegatedJob, error) {
if !provider.IsDelegated() {
return nil, fmt.Errorf("delegation: provider %q is not delegated", provider)
}
preferredAI := ""
if provider != router.ProviderAuto {
preferredAI = string(provider)
}
req := &secutools.JobRequest{
Type: secutools.TypeAnalyze,
Priority: priority,
Prompt: buildPrompt(taskPath, prompt),
PreferredAI: preferredAI,
Source: "claude-failover/dispatcher",
}
resp, err := m.client.SubmitJob(ctx, req)
if err != nil {
return nil, fmt.Errorf("delegation: submit %s: %w", filepath.Base(taskPath), err)
}
mk := Marker{
Project: projectDir,
TaskFile: taskPath,
JobID: resp.JobID,
Provider: string(provider),
StartedAt: m.now(),
}
markerPath := taskPath + ".delegated"
if err := writeMarker(markerPath, mk); err != nil {
return nil, fmt.Errorf("delegation: write marker: %w", err)
}
job := &DelegatedJob{Marker: mk, StatusFile: markerPath}
m.mu.Lock()
m.jobs[resp.JobID] = job
m.mu.Unlock()
m.Counters.Active.Add(1)
m.logger.Printf("[delegation] SUBMITTED task=%s job_id=%s provider=%s",
filepath.Base(taskPath), resp.JobID, provider)
return job, nil
}
// LoadFromDisk rebuilds the in-memory tracker from any .delegated markers
// found under projectsDirs. Called once at daemon startup.
func (m *Manager) LoadFromDisk(projectsDirs []string) error {
for _, projectDir := range projectsDirs {
inbox := filepath.Join(projectDir, ".agent-queue", "inbox")
entries, err := os.ReadDir(inbox)
if err != nil {
continue
}
for _, e := range entries {
if !strings.HasSuffix(e.Name(), ".md.delegated") {
continue
}
markerPath := filepath.Join(inbox, e.Name())
mk, err := readMarker(markerPath)
if err != nil {
m.logger.Printf("[delegation] WARN bad marker %s: %v", markerPath, err)
continue
}
m.mu.Lock()
m.jobs[mk.JobID] = &DelegatedJob{Marker: mk, StatusFile: markerPath}
m.mu.Unlock()
m.Counters.Active.Add(1)
}
}
return nil
}
// Active returns a snapshot of the currently-tracked delegated jobs (for
// the /api/delegated/status HTTP endpoint).
type ActiveJob struct {
JobID string `json:"job_id"`
Project string `json:"project"`
TaskFile string `json:"task_file"`
Provider string `json:"provider"`
Duration string `json:"duration"`
}
// CountersSnapshot returns a JSON-friendly snapshot of the manager's
// counters. Implements api.DelegationProvider.
func (m *Manager) CountersSnapshot() Snapshot { return m.Counters.Snapshot() }
// Active returns the list of in-flight delegated jobs.
func (m *Manager) Active() []ActiveJob {
m.mu.RLock()
defer m.mu.RUnlock()
out := make([]ActiveJob, 0, len(m.jobs))
for _, j := range m.jobs {
out = append(out, ActiveJob{
JobID: j.JobID,
Project: j.Project,
TaskFile: filepath.Base(j.TaskFile),
Provider: j.Provider,
Duration: m.now().Sub(j.StartedAt).Round(time.Second).String(),
})
}
return out
}
// Run starts the reaper loop until ctx is cancelled.
func (m *Manager) Run(ctx context.Context) {
ticker := time.NewTicker(m.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.reapOnce(ctx)
}
}
}
// reapOnce polls every tracked job and finalises terminal ones.
func (m *Manager) reapOnce(ctx context.Context) {
m.mu.RLock()
snapshot := make([]*DelegatedJob, 0, len(m.jobs))
for _, j := range m.jobs {
snapshot = append(snapshot, j)
}
m.mu.RUnlock()
for _, j := range snapshot {
st, err := m.client.GetJob(ctx, j.JobID)
if err != nil {
m.logger.Printf("[delegation] poll job=%s: %v", j.JobID, err)
continue
}
switch st.Status {
case "completed":
res, err := m.fetchResult(ctx, j.JobID)
if err != nil {
m.logger.Printf("[delegation] fetch result job=%s: %v", j.JobID, err)
continue
}
m.finalizeSuccess(j, res)
case "failed", "cancelled":
m.finalizeFailure(j, st)
}
}
}
// fetchResult retrieves the final job result. We can't use Client.WaitForResult
// here because we need to fetch only the result, not poll for completion.
func (m *Manager) fetchResult(ctx context.Context, jobID string) (*secutools.JobResult, error) {
type resultGetter interface {
WaitForResult(ctx context.Context, id string, timeout time.Duration) (*secutools.JobResult, error)
}
rg, ok := m.client.(resultGetter)
if !ok {
return nil, errors.New("delegation: client does not implement WaitForResult")
}
// 0 timeout → essentially "fetch now". Real Wait still polls, but we're
// already in completed state so the first poll returns immediately.
return rg.WaitForResult(ctx, jobID, 5*time.Second)
}
// finalizeSuccess writes the result to done/ and removes the inbox marker.
func (m *Manager) finalizeSuccess(j *DelegatedJob, res *secutools.JobResult) {
doneDir := filepath.Join(j.Project, ".agent-queue", "done")
if err := os.MkdirAll(doneDir, 0755); err != nil {
m.logger.Printf("[delegation] mkdir done %s: %v", doneDir, err)
return
}
donePath := filepath.Join(doneDir, filepath.Base(j.TaskFile))
body := buildDoneBody(res, j)
if err := os.WriteFile(donePath, []byte(body), 0644); err != nil {
m.logger.Printf("[delegation] write done %s: %v", donePath, err)
return
}
_ = os.Remove(j.StatusFile)
_ = os.Remove(j.TaskFile)
m.mu.Lock()
delete(m.jobs, j.JobID)
m.mu.Unlock()
m.Counters.Active.Add(-1)
m.Counters.CompletedTotal.Add(1)
m.logger.Printf("[delegation] COMPLETED task=%s job_id=%s provider=%s cost_cad=%.4f",
filepath.Base(j.TaskFile), j.JobID, res.Provider, res.CostCAD)
}
// finalizeFailure writes the original task body into failed/ with a reason.
func (m *Manager) finalizeFailure(j *DelegatedJob, st *secutools.JobStatus) {
failedDir := filepath.Join(j.Project, ".agent-queue", "failed")
if err := os.MkdirAll(failedDir, 0755); err != nil {
m.logger.Printf("[delegation] mkdir failed %s: %v", failedDir, err)
return
}
failedPath := filepath.Join(failedDir, filepath.Base(j.TaskFile))
body := buildFailedBody(j, st)
if err := os.WriteFile(failedPath, []byte(body), 0644); err != nil {
m.logger.Printf("[delegation] write failed %s: %v", failedPath, err)
return
}
_ = os.Remove(j.StatusFile)
_ = os.Remove(j.TaskFile)
m.mu.Lock()
delete(m.jobs, j.JobID)
m.mu.Unlock()
m.Counters.Active.Add(-1)
m.Counters.FailedTotal.Add(1)
m.logger.Printf("[delegation] FAILED task=%s job_id=%s status=%s err=%s",
filepath.Base(j.TaskFile), j.JobID, st.Status, st.Error)
}
// buildPrompt assembles the prompt sent to secutools. Self-contained: the
// non-Claude provider has no filesystem access, so we send the body as-is
// and rely on the task author to include all required context.
func buildPrompt(taskPath, body string) string {
taskName := filepath.Base(taskPath)
if body == "" {
body = "(empty task body)"
}
return fmt.Sprintf("Task: %s\n\n%s\n\nProvide a complete, self-contained answer.",
taskName, body)
}
// buildDoneBody is the .md written into done/ after a successful job.
func buildDoneBody(res *secutools.JobResult, j *DelegatedJob) string {
return fmt.Sprintf(`---
status: completed
provider: %s
model: %s
cost_cad: %.4f
job_id: %s
delegated_at: %s
---
# Result (delegated to secutools)
%s
---
provider: %s, cost_cad: %.4f
`, res.Provider, res.Model, res.CostCAD, j.JobID,
j.StartedAt.Format(time.RFC3339), res.Response,
res.Provider, res.CostCAD)
}
// buildFailedBody is the .md written into failed/ when a delegated job dies.
func buildFailedBody(j *DelegatedJob, st *secutools.JobStatus) string {
return fmt.Sprintf(`---
status: failed
failed_reason: %s
job_id: %s
provider: %s
delegated_at: %s
---
# Delegation failed
Job %s ended in status=%s.
Error: %s
`, st.Status, j.JobID, j.Provider,
j.StartedAt.Format(time.RFC3339),
j.JobID, st.Status, st.Error)
}
// writeMarker serialises mk to disk as JSON.
func writeMarker(path string, mk Marker) error {
data, err := json.MarshalIndent(mk, "", " ")
if err != nil {
return err
}
return os.WriteFile(path, data, 0644)
}
// readMarker parses a marker file from disk.
func readMarker(path string) (Marker, error) {
data, err := os.ReadFile(path)
if err != nil {
return Marker{}, err
}
var mk Marker
if err := json.Unmarshal(data, &mk); err != nil {
return Marker{}, err
}
return mk, nil
}