feat(watcher): Phase 2.1 — SessionWatcher goroutine
- internal/watcher: detecte fin de tache via signal file, prompt ❯, idle timeout - state: ForEachWorking, SetStalled, SetActiveAccount, ActiveAccount - config: WatcherConfig, DispatcherConfig, JanitorConfig, NotificationsConfig + defaults - 5 tests unitaires, go test ./... -race OK Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
978b60ccf7
commit
c87145ea0b
7 changed files with 989 additions and 8 deletions
|
|
@ -13,11 +13,15 @@ import (
|
|||
|
||||
// Config is the top-level configuration structure, mapping config.example.yaml.
|
||||
type Config struct {
|
||||
Accounts []AccountConfig `yaml:"accounts"`
|
||||
Pool PoolConfig `yaml:"pool"`
|
||||
Quota QuotaConfig `yaml:"quota"`
|
||||
Checkpoint CheckpointConfig `yaml:"checkpoint"`
|
||||
MCPHTTP MCPHTTPConfig `yaml:"mcp_http"`
|
||||
Accounts []AccountConfig `yaml:"accounts"`
|
||||
Pool PoolConfig `yaml:"pool"`
|
||||
Quota QuotaConfig `yaml:"quota"`
|
||||
Checkpoint CheckpointConfig `yaml:"checkpoint"`
|
||||
MCPHTTP MCPHTTPConfig `yaml:"mcp_http"`
|
||||
Notifications NotificationsConfig `yaml:"notifications"`
|
||||
Dispatcher DispatcherConfig `yaml:"dispatcher"`
|
||||
Watcher WatcherConfig `yaml:"watcher"`
|
||||
Janitor JanitorConfig `yaml:"janitor"`
|
||||
}
|
||||
|
||||
// AccountConfig describes a single Anthropic account available to the daemon.
|
||||
|
|
@ -71,9 +75,38 @@ type CheckpointConfig struct {
|
|||
|
||||
// MCPHTTPConfig defines the HTTP control-plane endpoint.
|
||||
type MCPHTTPConfig struct {
|
||||
Listen string `yaml:"listen"`
|
||||
BearerTokenEnv string `yaml:"bearer_token_env"`
|
||||
EnableTrigger bool `yaml:"enable_trigger"`
|
||||
Listen string `yaml:"listen"`
|
||||
BearerTokenEnv string `yaml:"bearer_token_env"`
|
||||
EnableTrigger bool `yaml:"enable_trigger"`
|
||||
}
|
||||
|
||||
// NotificationsConfig holds environment variable names for alert credentials.
|
||||
// Actual secrets are read from env at runtime — never stored in config files.
|
||||
type NotificationsConfig struct {
|
||||
TelegramTokenEnv string `yaml:"telegram_token_env"`
|
||||
TelegramChatIDEnv string `yaml:"telegram_chat_id_env"`
|
||||
ResendAPIKeyEnv string `yaml:"resend_api_key_env"`
|
||||
NotifyEmailEnv string `yaml:"notify_email_env"`
|
||||
}
|
||||
|
||||
// DispatcherConfig controls the task dispatcher behaviour.
|
||||
type DispatcherConfig struct {
|
||||
ProjectsDir string `yaml:"projects_dir"`
|
||||
IdleTimeout Duration `yaml:"idle_timeout"`
|
||||
PromptTimeout Duration `yaml:"prompt_timeout"`
|
||||
MaxDispatchPerTask int `yaml:"max_dispatch_per_task"`
|
||||
}
|
||||
|
||||
// WatcherConfig controls the session watcher behaviour.
|
||||
type WatcherConfig struct {
|
||||
Interval Duration `yaml:"interval"`
|
||||
DoneSignalDir string `yaml:"done_signal_dir"`
|
||||
IdleTimeout Duration `yaml:"idle_timeout"`
|
||||
}
|
||||
|
||||
// JanitorConfig controls the periodic housekeeping goroutine.
|
||||
type JanitorConfig struct {
|
||||
Interval Duration `yaml:"interval"`
|
||||
}
|
||||
|
||||
// Duration is a time.Duration that unmarshals from YAML strings like "30s", "1h".
|
||||
|
|
@ -138,6 +171,27 @@ func (c *Config) defaults() {
|
|||
if c.Pool.Autonomous.Max == 0 {
|
||||
c.Pool.Autonomous.Max = 10
|
||||
}
|
||||
if c.Watcher.Interval.Duration == 0 {
|
||||
c.Watcher.Interval.Duration = 30 * time.Second
|
||||
}
|
||||
if c.Watcher.IdleTimeout.Duration == 0 {
|
||||
c.Watcher.IdleTimeout.Duration = 60 * time.Minute
|
||||
}
|
||||
if c.Watcher.DoneSignalDir == "" {
|
||||
c.Watcher.DoneSignalDir = "/tmp"
|
||||
}
|
||||
if c.Janitor.Interval.Duration == 0 {
|
||||
c.Janitor.Interval.Duration = 5 * time.Minute
|
||||
}
|
||||
if c.Dispatcher.IdleTimeout.Duration == 0 {
|
||||
c.Dispatcher.IdleTimeout.Duration = 60 * time.Minute
|
||||
}
|
||||
if c.Dispatcher.PromptTimeout.Duration == 0 {
|
||||
c.Dispatcher.PromptTimeout.Duration = 30 * time.Second
|
||||
}
|
||||
if c.Dispatcher.MaxDispatchPerTask == 0 {
|
||||
c.Dispatcher.MaxDispatchPerTask = 3
|
||||
}
|
||||
}
|
||||
|
||||
// Load reads the YAML file at path, expands home paths, and applies defaults.
|
||||
|
|
|
|||
|
|
@ -133,6 +133,51 @@ func (s *State) SetWorking(name, task string) {
|
|||
s.touch()
|
||||
}
|
||||
|
||||
// SetStalled marks the named session as stalled (working but heartbeat too old).
|
||||
func (s *State) SetStalled(name string) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
sess, ok := s.Sessions[name]
|
||||
if !ok {
|
||||
sess = &SessionState{}
|
||||
s.Sessions[name] = sess
|
||||
}
|
||||
sess.State = "stalled"
|
||||
s.touch()
|
||||
}
|
||||
|
||||
// ForEachWorking calls f for each session currently in "working" state.
|
||||
// A snapshot is taken under the read lock; f is called without any lock held.
|
||||
func (s *State) ForEachWorking(f func(name string, sess *SessionState)) {
|
||||
s.mu.RLock()
|
||||
working := make(map[string]SessionState, len(s.Sessions))
|
||||
for name, sess := range s.Sessions {
|
||||
if sess.State == "working" {
|
||||
working[name] = *sess
|
||||
}
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
for name, snap := range working {
|
||||
snap := snap
|
||||
f(name, &snap)
|
||||
}
|
||||
}
|
||||
|
||||
// SetActiveAccount updates the active account in the quota state.
|
||||
func (s *State) SetActiveAccount(name string) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.Quota.ActiveAccount = name
|
||||
s.touch()
|
||||
}
|
||||
|
||||
// ActiveAccount returns the current active account name.
|
||||
func (s *State) ActiveAccount() string {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.Quota.ActiveAccount
|
||||
}
|
||||
|
||||
// SetFailed marks the named session as failed and records the failure timestamp.
|
||||
// The task is preserved for potential requeue by the caller.
|
||||
func (s *State) SetFailed(name string) {
|
||||
|
|
|
|||
139
internal/watcher/session_watcher.go
Normal file
139
internal/watcher/session_watcher.go
Normal file
|
|
@ -0,0 +1,139 @@
|
|||
// Package watcher detects when a Claude Code session has finished its current
|
||||
// task and signals the dispatcher to assign a new one.
|
||||
package watcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/tmux"
|
||||
)
|
||||
|
||||
// spinnerRe matches Claude Code's "Xs ·" or "Xs ⠋" progress indicator.
|
||||
var spinnerRe = regexp.MustCompile(`\d+s\s+[·⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏]`)
|
||||
|
||||
// SessionWatcher monitors active tmux sessions and emits on DoneChan when
|
||||
// a Claude Code session returns to the idle prompt (❯) or exceeds its timeout.
|
||||
type SessionWatcher struct {
|
||||
tmux tmux.Client
|
||||
state *state.State
|
||||
config *config.Config
|
||||
done chan string
|
||||
interval time.Duration
|
||||
idleTimeout time.Duration
|
||||
signalDir string
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
// New creates a SessionWatcher with defaults from cfg.
|
||||
func New(tc tmux.Client, s *state.State, cfg *config.Config) *SessionWatcher {
|
||||
interval := cfg.Watcher.Interval.Duration
|
||||
if interval == 0 {
|
||||
interval = 30 * time.Second
|
||||
}
|
||||
idleTimeout := cfg.Watcher.IdleTimeout.Duration
|
||||
if idleTimeout == 0 {
|
||||
idleTimeout = 60 * time.Minute
|
||||
}
|
||||
signalDir := cfg.Watcher.DoneSignalDir
|
||||
if signalDir == "" {
|
||||
signalDir = "/tmp"
|
||||
}
|
||||
return &SessionWatcher{
|
||||
tmux: tc,
|
||||
state: s,
|
||||
config: cfg,
|
||||
done: make(chan string, 32),
|
||||
interval: interval,
|
||||
idleTimeout: idleTimeout,
|
||||
signalDir: signalDir,
|
||||
logger: log.Default(),
|
||||
}
|
||||
}
|
||||
|
||||
// DoneChan returns the channel on which completed session names are sent.
|
||||
func (w *SessionWatcher) DoneChan() <-chan string {
|
||||
return w.done
|
||||
}
|
||||
|
||||
// Run starts the watcher loop until ctx is cancelled.
|
||||
func (w *SessionWatcher) Run(ctx context.Context) {
|
||||
ticker := time.NewTicker(w.interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
w.poll()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// poll inspects all currently-working sessions once.
|
||||
func (w *SessionWatcher) poll() {
|
||||
w.state.ForEachWorking(func(name string, sess *state.SessionState) {
|
||||
w.checkSession(name, sess)
|
||||
})
|
||||
}
|
||||
|
||||
// checkSession evaluates a single working session for completion or timeout.
|
||||
func (w *SessionWatcher) checkSession(name string, sess *state.SessionState) {
|
||||
// 1. Check the done-signal file written by hooks or external scripts.
|
||||
sigFile := filepath.Join(w.signalDir, "agent-done-"+name)
|
||||
if _, err := os.Stat(sigFile); err == nil {
|
||||
w.completeSession(name, sigFile)
|
||||
return
|
||||
}
|
||||
|
||||
// 2. Capture the last 5 pane lines.
|
||||
tail, err := w.tmux.CapturePaneTail(name, 5)
|
||||
if err != nil {
|
||||
// Session may have vanished; lifecycle.Manager handles recreation.
|
||||
return
|
||||
}
|
||||
|
||||
// 3. Idle prompt ❯ without an active spinner → Claude has finished.
|
||||
if hasClaudePrompt(tail) && !hasSpinner(tail) {
|
||||
w.completeSession(name, sigFile)
|
||||
return
|
||||
}
|
||||
|
||||
// 4. Idle-timeout guard.
|
||||
if sess.AssignedAt != nil && time.Since(*sess.AssignedAt) > w.idleTimeout {
|
||||
w.logger.Printf("[watcher] TIMEOUT session=%q elapsed=%v idleTimeout=%v",
|
||||
name, time.Since(*sess.AssignedAt).Round(time.Second), w.idleTimeout)
|
||||
w.completeSession(name, sigFile)
|
||||
}
|
||||
}
|
||||
|
||||
// completeSession sends /exit, marks the session idle, and notifies the dispatcher.
|
||||
func (w *SessionWatcher) completeSession(name, sigFile string) {
|
||||
w.logger.Printf("[watcher] DONE session=%q → /exit", name)
|
||||
_ = w.tmux.SendKeys(name, "/exit")
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
w.state.SetIdle(name)
|
||||
os.Remove(sigFile)
|
||||
select {
|
||||
case w.done <- name:
|
||||
default:
|
||||
w.logger.Printf("[watcher] done channel full, dropping signal for %q", name)
|
||||
}
|
||||
}
|
||||
|
||||
// hasClaudePrompt returns true if the Claude Code interactive prompt is visible.
|
||||
func hasClaudePrompt(output string) bool {
|
||||
return strings.Contains(output, "❯")
|
||||
}
|
||||
|
||||
// hasSpinner returns true if Claude Code's progress spinner is active.
|
||||
func hasSpinner(output string) bool {
|
||||
return spinnerRe.MatchString(output)
|
||||
}
|
||||
183
internal/watcher/session_watcher_test.go
Normal file
183
internal/watcher/session_watcher_test.go
Normal file
|
|
@ -0,0 +1,183 @@
|
|||
package watcher
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
||||
)
|
||||
|
||||
// mockTmux is a minimal in-memory tmux.Client for tests.
|
||||
type mockTmux struct {
|
||||
sessions map[string]bool
|
||||
paneOutput map[string]string
|
||||
sentKeys []string
|
||||
}
|
||||
|
||||
func newMockTmux() *mockTmux {
|
||||
return &mockTmux{
|
||||
sessions: make(map[string]bool),
|
||||
paneOutput: make(map[string]string),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockTmux) HasSession(name string) bool { return m.sessions[name] }
|
||||
func (m *mockTmux) CreateSession(name, _ string) error { m.sessions[name] = true; return nil }
|
||||
func (m *mockTmux) KillSession(_ string) error { return nil }
|
||||
func (m *mockTmux) SendKeys(_, keys string) error { m.sentKeys = append(m.sentKeys, keys); return nil }
|
||||
func (m *mockTmux) CapturePaneTail(session string, _ int) (string, error) {
|
||||
return m.paneOutput[session], nil
|
||||
}
|
||||
|
||||
func newTestWatcher(tc *mockTmux, s *state.State, signalDir string) *SessionWatcher {
|
||||
return &SessionWatcher{
|
||||
tmux: tc,
|
||||
state: s,
|
||||
done: make(chan string, 32),
|
||||
interval: time.Second,
|
||||
idleTimeout: 60 * time.Minute,
|
||||
signalDir: signalDir,
|
||||
logger: log.Default(),
|
||||
}
|
||||
}
|
||||
|
||||
// TestSignalFileTriggersDone verifies that agent-done-<session> causes idle + done signal.
|
||||
func TestSignalFileTriggersDone(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
tc := newMockTmux()
|
||||
tc.sessions["sess-a"] = true
|
||||
|
||||
s := state.New("")
|
||||
s.SetWorking("sess-a", "task-1")
|
||||
|
||||
w := newTestWatcher(tc, s, dir)
|
||||
|
||||
sig := filepath.Join(dir, "agent-done-sess-a")
|
||||
if err := os.WriteFile(sig, []byte("done"), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
w.poll()
|
||||
|
||||
select {
|
||||
case got := <-w.done:
|
||||
if got != "sess-a" {
|
||||
t.Errorf("expected sess-a on done, got %q", got)
|
||||
}
|
||||
default:
|
||||
t.Fatal("expected done signal, got none")
|
||||
}
|
||||
|
||||
if st := s.GetSession("sess-a"); st == nil || st.State != "idle" {
|
||||
t.Errorf("expected sess-a idle, got %v", st)
|
||||
}
|
||||
if _, err := os.Stat(sig); !os.IsNotExist(err) {
|
||||
t.Error("expected signal file to be deleted")
|
||||
}
|
||||
}
|
||||
|
||||
// TestPromptDetectionTriggersDone verifies that ❯ in pane output signals completion.
|
||||
func TestPromptDetectionTriggersDone(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
tc := newMockTmux()
|
||||
tc.sessions["sess-b"] = true
|
||||
tc.paneOutput["sess-b"] = "some output\n❯ "
|
||||
|
||||
s := state.New("")
|
||||
s.SetWorking("sess-b", "task-2")
|
||||
|
||||
w := newTestWatcher(tc, s, dir)
|
||||
w.poll()
|
||||
|
||||
select {
|
||||
case got := <-w.done:
|
||||
if got != "sess-b" {
|
||||
t.Errorf("expected sess-b, got %q", got)
|
||||
}
|
||||
default:
|
||||
t.Fatal("expected done signal from prompt detection")
|
||||
}
|
||||
|
||||
if st := s.GetSession("sess-b"); st == nil || st.State != "idle" {
|
||||
t.Errorf("expected sess-b idle, got %v", st)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSpinnerSuppressesCompletion verifies that an active spinner prevents false completion.
|
||||
func TestSpinnerSuppressesCompletion(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
tc := newMockTmux()
|
||||
tc.sessions["sess-c"] = true
|
||||
tc.paneOutput["sess-c"] = "doing work 5s · \n❯ "
|
||||
|
||||
s := state.New("")
|
||||
s.SetWorking("sess-c", "task-3")
|
||||
|
||||
w := newTestWatcher(tc, s, dir)
|
||||
w.poll()
|
||||
|
||||
select {
|
||||
case name := <-w.done:
|
||||
t.Errorf("unexpected done signal for %q (spinner should suppress)", name)
|
||||
default:
|
||||
// Correct: no signal while spinner is active.
|
||||
}
|
||||
|
||||
if st := s.GetSession("sess-c"); st == nil || st.State != "working" {
|
||||
t.Errorf("expected sess-c still working, got %v", st)
|
||||
}
|
||||
}
|
||||
|
||||
// TestIdleTimeoutTriggersDone verifies that a session exceeding idleTimeout is completed.
|
||||
func TestIdleTimeoutTriggersDone(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
tc := newMockTmux()
|
||||
tc.sessions["sess-d"] = true
|
||||
tc.paneOutput["sess-d"] = "still running..."
|
||||
|
||||
s := state.New("")
|
||||
s.SetWorking("sess-d", "task-4")
|
||||
|
||||
w := &SessionWatcher{
|
||||
tmux: tc,
|
||||
state: s,
|
||||
done: make(chan string, 32),
|
||||
interval: time.Second,
|
||||
idleTimeout: 1 * time.Millisecond,
|
||||
signalDir: dir,
|
||||
logger: log.Default(),
|
||||
}
|
||||
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
w.poll()
|
||||
|
||||
select {
|
||||
case got := <-w.done:
|
||||
if got != "sess-d" {
|
||||
t.Errorf("expected sess-d, got %q", got)
|
||||
}
|
||||
default:
|
||||
t.Fatal("expected done signal from idle timeout")
|
||||
}
|
||||
}
|
||||
|
||||
// TestHasSpinnerPatterns verifies spinner pattern detection.
|
||||
func TestHasSpinnerPatterns(t *testing.T) {
|
||||
cases := []struct {
|
||||
input string
|
||||
want bool
|
||||
}{
|
||||
{"5s · running", true},
|
||||
{"12s ⠋ working", true},
|
||||
{"❯ prompt only", false},
|
||||
{"no spinner here", false},
|
||||
}
|
||||
for _, c := range cases {
|
||||
if got := hasSpinner(c.input); got != c.want {
|
||||
t.Errorf("hasSpinner(%q) = %v, want %v", c.input, got, c.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue