claude-failover/internal/dispatcher/dispatcher.go
Ubuntu 3e20085204 feat(phase2-E): multi-provider routing via secutools delegation
Adds optional delegation of agent-queue tasks to the SecuAAS secutools
AI platform (GPU / Gemini / Claude API) instead of dispatching to a
local Claude Code tmux session. Per-task opt-in via YAML frontmatter
fields preferred_ai, allow_delegation, complexity_hint — absence keeps
the Phase 1 behaviour exactly (zero breaking change).

Go side:
- internal/secutools: HTTP client with exponential-backoff retries
  (SubmitJob/GetJob/WaitForResult), DecideProvider map adapter for CLI
  use, table tests.
- internal/router: struct-typed Decide() with strict precedence
  (needs_claude_code > preferred_ai=claude-code > allow_delegation=false
  > preferred_ai > fail-safe local on unknown).
- internal/delegation: Manager submits jobs, writes .md.delegated
  markers for on-restart recovery, runs a periodic reaper that moves
  completed jobs into done/ with provider/cost footer and failed jobs
  into failed/.
- internal/dispatcher: WithDelegation() opt-in, routeTask hook before
  findFreeSession, skips .md.delegated in assignNextTask.
- internal/api: /api/delegated/status (active jobs + counters),
  /watchdog/status extended with delegation counters.
- cmd/ccl-delegate: small CLI exposing submit/get/result/decide so the
  bash dispatcher can call the same contract without duplicating logic.
- cmd/claude-failover: delegation wired opt-in via SECUTOOLS_API_KEY.

Tests:
- 29+ new unit tests across router, secutools, delegation, dispatcher,
  api packages. go test -race -count=1 clean.
- tests/phase2-E-integration.sh: bash end-to-end against a Python
  stdlib mock HTTP server, exercising the dev-management scripts.

Forward-compat with watchdog (Phase 1 B1 already ignores
state=delegated_to_secutools) so delegated tasks aren't flagged stale.
2026-04-17 02:17:19 +00:00

449 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package dispatcher watches project inbox directories and assigns tasks to
// idle tmux sessions. One task per session — batch dispatch is intentionally
// unsupported.
package dispatcher
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"time"
"github.com/fsnotify/fsnotify"
"gopkg.in/yaml.v3"
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
"forge.secuaas.ovh/olivier/claude-failover/internal/delegation"
"forge.secuaas.ovh/olivier/claude-failover/internal/router"
"forge.secuaas.ovh/olivier/claude-failover/internal/secutools"
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
"forge.secuaas.ovh/olivier/claude-failover/internal/tmux"
)
// TaskFrontmatter is the YAML header parsed from task .md files.
//
// Phase 2 chantier E added three new fields (PreferredAI, AllowDelegation,
// ComplexityHint). They are all optional; absence preserves Phase 1
// behaviour (Claude Code on a local ccl-auto session).
type TaskFrontmatter struct {
Title string `yaml:"title"`
Priority string `yaml:"priority"` // critical, high, default, low
Tags []string `yaml:"tags"`
NeedsClaude bool `yaml:"needs_claude_code"`
PreferredAI string `yaml:"preferred_ai"` // auto | claude-code | gpu | gemini | claude-api
AllowDelegation bool `yaml:"allow_delegation"` // default false → backward-compatible
ComplexityHint string `yaml:"complexity_hint"` // low | medium | high
}
// Dispatcher watches project inbox directories and assigns tasks to idle sessions.
type Dispatcher struct {
tmux tmux.Client
state *state.State
config *config.Config
doneChan <-chan string
projectsDir string
logger *log.Logger
// Phase 2 chantier E — delegation. When non-nil, tasks whose
// frontmatter routes them away from Claude Code (allow_delegation=true
// + preferred_ai!=claude-code) are submitted to secutools instead of
// being assigned to a tmux session.
delegation *delegation.Manager
}
// New creates a Dispatcher.
// doneChan receives session names when they become idle (from SessionWatcher).
func New(tc tmux.Client, s *state.State, cfg *config.Config, doneChan <-chan string) *Dispatcher {
projectsDir := cfg.Dispatcher.ProjectsDir
if projectsDir == "" {
projectsDir = cfg.Pool.SharedProjectsDir
}
return &Dispatcher{
tmux: tc,
state: s,
config: cfg,
doneChan: doneChan,
projectsDir: projectsDir,
logger: log.Default(),
}
}
// WithDelegation enables delegation routing. Pass nil to disable (the
// Phase 1 default). Returns d for chaining.
func (d *Dispatcher) WithDelegation(m *delegation.Manager) *Dispatcher {
d.delegation = m
return d
}
// Run starts the dispatcher event loop until ctx is cancelled.
func (d *Dispatcher) Run(ctx context.Context) {
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
fw, err := fsnotify.NewWatcher()
if err != nil {
d.logger.Printf("[dispatcher] fsnotify unavailable: %v — poll-only mode", err)
} else {
defer fw.Close()
d.watchInboxDirs(fw)
}
var fwEvents <-chan fsnotify.Event
if fw != nil {
fwEvents = fw.Events
}
for {
select {
case <-ctx.Done():
return
case evt, ok := <-fwEvents:
if !ok {
return
}
if evt.Op&fsnotify.Create != 0 &&
strings.HasSuffix(evt.Name, ".md") &&
!strings.HasSuffix(evt.Name, ".dispatched") {
d.dispatchProject(filepath.Dir(evt.Name))
}
case session := <-d.doneChan:
d.assignNextTask(session)
case <-ticker.C:
d.fullScan()
}
}
}
// watchInboxDirs registers all known project inbox dirs with the fsnotify watcher.
func (d *Dispatcher) watchInboxDirs(fw *fsnotify.Watcher) {
for _, ds := range d.config.Pool.Dedicated {
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
if err := fw.Add(inbox); err != nil {
d.logger.Printf("[dispatcher] watch %s: %v", inbox, err)
}
}
}
// fullScan dispatches pending tasks in all project inboxes.
func (d *Dispatcher) fullScan() {
for _, ds := range d.config.Pool.Dedicated {
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
d.dispatchProject(inbox)
}
}
// dispatchProject assigns undispatched tasks in inboxDir to idle sessions.
//
// For each task we first ask the router whether the task should run on a
// local Claude Code session (the Phase 1 path) or be delegated to
// secutools (Phase 2 chantier E). Delegated tasks bypass the
// findFreeSession() check entirely — they don't need a tmux slot.
func (d *Dispatcher) dispatchProject(inboxDir string) {
entries, err := os.ReadDir(inboxDir)
if err != nil {
return
}
projectDir := filepath.Dir(filepath.Dir(inboxDir)) // inboxDir/.agent-queue/inbox → project
for _, e := range entries {
name := e.Name()
if !strings.HasSuffix(name, ".md") ||
strings.Contains(name, ".dispatched") ||
strings.Contains(name, ".delegated") {
continue
}
taskPath := filepath.Join(inboxDir, name)
// Decide route based on frontmatter.
decision, body, err := d.routeTask(taskPath)
if err != nil {
d.logger.Printf("[dispatcher] routeTask %s: %v", taskPath, err)
continue
}
// Delegated path: submit to secutools, mark the task with .delegated.
if decision.Provider.IsDelegated() && d.delegation != nil {
if _, err := d.delegation.Submit(context.Background(), projectDir, taskPath,
body, decision.Provider, mapPriority(d.taskPriority(taskPath))); err != nil {
d.logger.Printf("[dispatcher] delegate %s: %v — falling back to Claude Code",
filepath.Base(taskPath), err)
// Fall through to Claude Code path on submit failure.
} else {
d.logger.Printf("[dispatcher] DELEGATED task=%s provider=%s reason=%s",
filepath.Base(taskPath), decision.Provider, decision.Reason)
// Original .md is left in place under inbox/ until the reaper
// finalises it. The .delegated marker prevents re-dispatch.
continue
}
}
// Claude Code path (Phase 1 behaviour).
session := d.findFreeSession()
if session == "" {
d.logger.Printf("[dispatcher] no free session for task in %s", inboxDir)
return
}
if err := d.launchAgent(session, projectDir, taskPath); err != nil {
d.logger.Printf("[dispatcher] launchAgent error: %v", err)
continue
}
if err := os.Rename(taskPath, taskPath+".dispatched"); err != nil {
d.logger.Printf("[dispatcher] rename .dispatched: %v", err)
}
}
}
// routeTask reads taskPath, parses its frontmatter, and asks the router
// for a decision. Returns the decision plus the parsed body so callers
// don't have to read the file twice.
func (d *Dispatcher) routeTask(taskPath string) (router.Decision, string, error) {
content, err := os.ReadFile(taskPath)
if err != nil {
return router.Decision{}, "", fmt.Errorf("read task: %w", err)
}
fm, body := parseFrontmatter(content)
dec := router.Decide(router.Task{
PreferredAI: fm.PreferredAI,
AllowDelegation: fm.AllowDelegation,
NeedsClaudeCode: fm.NeedsClaude,
ComplexityHint: fm.ComplexityHint,
})
return dec, body, nil
}
// taskPriority returns the Priority field of a task's frontmatter without
// re-parsing the body. Defensive — used only for mapping into a secutools
// priority when delegating.
func (d *Dispatcher) taskPriority(taskPath string) string {
content, err := os.ReadFile(taskPath)
if err != nil {
return ""
}
fm, _ := parseFrontmatter(content)
return fm.Priority
}
// mapPriority converts a task priority string into a secutools Priority.
func mapPriority(p string) secutools.Priority {
switch strings.ToLower(strings.TrimSpace(p)) {
case "critical":
return secutools.PriorityCritical
case "high":
return secutools.PriorityHigh
case "low":
return secutools.PriorityLow
default:
return secutools.PriorityDefault
}
}
// findFreeSession returns the name of an idle, live, cooldown-free session
// from the autonomous pool. Dedicated sessions are intentionally NOT
// considered: those host the operator's manual interactive work. Routing a
// background dispatch into them would (a) hijack a Claude instance the
// operator is using and (b) trigger the watcher's /exit recycle at task
// end, kicking the operator out mid-conversation.
func (d *Dispatcher) findFreeSession() string {
prefix := d.config.Pool.Autonomous.Prefix
if prefix == "" {
prefix = "ccl-auto-"
}
start := d.config.Pool.Autonomous.StartIndex
for i := start; i < start+d.config.Pool.Autonomous.Max; i++ {
if d.isSessionFree(sessionName(prefix, i)) {
return sessionName(prefix, i)
}
}
return ""
}
// isSessionFree returns true when the session is idle, alive, and past its cooldown.
func (d *Dispatcher) isSessionFree(name string) bool {
if !d.tmux.HasSession(name) {
return false
}
sess := d.state.GetSession(name)
if sess == nil || sess.State != "idle" {
return false
}
if sess.LastFail != nil && time.Since(*sess.LastFail) < 5*time.Minute {
return false
}
return true
}
// assignNextTask scans all inboxes for work to give to a freshly-idled
// session. Skips tasks that are already delegated to secutools (a
// .delegated marker exists alongside the .md) or already dispatched.
//
// Tasks whose router decision is "delegate" are also skipped here — they
// will be picked up by the next dispatchProject scan (which knows how to
// submit to secutools).
func (d *Dispatcher) assignNextTask(session string) {
for _, ds := range d.config.Pool.Dedicated {
inbox := filepath.Join(ds.Project, ".agent-queue", "inbox")
entries, err := os.ReadDir(inbox)
if err != nil {
continue
}
// Build a set of base names already delegated, so we can skip the
// associated .md without re-dispatching it locally.
delegated := make(map[string]bool)
for _, e := range entries {
if strings.HasSuffix(e.Name(), ".md.delegated") {
delegated[strings.TrimSuffix(e.Name(), ".delegated")] = true
}
}
for _, e := range entries {
if !strings.HasSuffix(e.Name(), ".md") ||
strings.Contains(e.Name(), ".dispatched") ||
strings.Contains(e.Name(), ".delegated") {
continue
}
if delegated[e.Name()] {
continue
}
taskPath := filepath.Join(inbox, e.Name())
// Respect routing decisions: don't take a delegated task here.
if d.delegation != nil {
if dec, _, err := d.routeTask(taskPath); err == nil && dec.Provider.IsDelegated() {
continue
}
}
if err := d.launchAgent(session, ds.Project, taskPath); err == nil {
os.Rename(taskPath, taskPath+".dispatched")
return
}
}
}
}
// launchAgent starts Claude Code in session for the given task file.
func (d *Dispatcher) launchAgent(session, projectDir, taskFile string) error {
content, err := os.ReadFile(taskFile)
if err != nil {
return fmt.Errorf("read task %s: %w", taskFile, err)
}
fm, body := parseFrontmatter(content)
model := modelForPriority(fm.Priority)
// Change to project directory.
if err := d.tmux.SendKeys(session, "cd "+projectDir); err != nil {
return err
}
time.Sleep(300 * time.Millisecond)
// Build and send the claude command, with optional --resume UUID.
cmd := fmt.Sprintf("claude --model %s --dangerously-skip-permissions", model)
resumeFile := filepath.Join(d.resumeDir(), session+"-resume-id.txt")
if data, ferr := os.ReadFile(resumeFile); ferr == nil {
if uuid := strings.TrimSpace(string(data)); uuid != "" {
cmd += " --resume " + uuid
os.Remove(resumeFile)
}
}
if err := d.tmux.SendKeys(session, cmd); err != nil {
return err
}
// Wait for the prompt before sending the task message.
promptTimeout := d.config.Dispatcher.PromptTimeout.Duration
if promptTimeout == 0 {
promptTimeout = 30 * time.Second
}
if !d.waitForPrompt(session, promptTimeout) {
return fmt.Errorf("claude not ready in %q after %v", session, promptTimeout)
}
// Send the task message. Multi-line task bodies render in Claude's TUI
// as "[Pasted text #N +M lines]" — the trailing Enter that SendKeys
// appends is consumed as part of the paste, so the message would stay
// unsubmitted in the input buffer forever. Wait for the paste to
// register, then send a lone Enter to actually submit.
msg := buildTaskMessage(body, taskFile)
if err := d.tmux.SendKeys(session, msg); err != nil {
return err
}
time.Sleep(500 * time.Millisecond)
if err := d.tmux.SendEnter(session); err != nil {
return err
}
d.state.SetWorking(session, filepath.Base(taskFile))
d.logger.Printf("[dispatcher] DISPATCHED session=%q task=%s model=%s",
session, filepath.Base(taskFile), model)
return nil
}
// waitForPrompt polls for the Claude prompt up to timeout.
func (d *Dispatcher) waitForPrompt(session string, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
tail, err := d.tmux.CapturePaneTail(session, 5)
if err == nil && strings.Contains(tail, "") {
return true
}
time.Sleep(1 * time.Second)
}
return false
}
// resumeDir returns the directory containing per-session resume UUIDs.
func (d *Dispatcher) resumeDir() string {
home, _ := os.UserHomeDir()
return filepath.Join(home, ".claude-context")
}
// parseFrontmatter splits YAML frontmatter (between --- markers) from the body.
// Returns an empty TaskFrontmatter and the full content if no header is found.
func parseFrontmatter(content []byte) (TaskFrontmatter, string) {
s := string(content)
if !strings.HasPrefix(s, "---\n") {
return TaskFrontmatter{}, s
}
end := strings.Index(s[4:], "\n---\n")
if end < 0 {
return TaskFrontmatter{}, s
}
yamlBlock := s[4 : end+4]
body := strings.TrimSpace(s[end+4+5:]) // skip "\n---\n"
var fm TaskFrontmatter
yaml.Unmarshal([]byte(yamlBlock), &fm) //nolint:errcheck // best-effort parse
return fm, body
}
// modelForPriority maps a task priority to a Claude model name.
func modelForPriority(priority string) string {
if strings.EqualFold(priority, "critical") {
return "opus"
}
return "sonnet"
}
// buildTaskMessage constructs the dispatch message sent to Claude Code.
func buildTaskMessage(body, taskFile string) string {
taskName := filepath.Base(taskFile)
if body == "" {
body = "Verifie .agent-queue/inbox/ — 1 tache assignee."
}
return fmt.Sprintf("[%s] %s\n\nIMPORTANT: Tu dois EXECUTER les actions demandées, pas seulement les décrire.",
taskName, body)
}
func sessionName(prefix string, i int) string {
return prefix + itoa(i)
}
func itoa(n int) string {
if n == 0 {
return "0"
}
b := make([]byte, 0, 10)
for n > 0 {
b = append([]byte{byte('0' + n%10)}, b...)
n /= 10
}
return string(b)
}