feat: SessionLifecycleManager — auto-detect and repair dead tmux sessions
- Add internal/lifecycle/manager.go with Manager struct, Run() ticker loop (15s interval), EnsureAllSessions() for boot-time session creation, and reconcile() that recreates idle sessions and recovers working ones via SetFailed + CreateSession - Add state.SetFailed() to record crash timestamp on SessionState - Add internal/lifecycle/manager_test.go with mock tmux client and 3 tests: TestReconcileCreatesDeadSession, TestReconcileRecoversCrashedSession, TestEnsureAllSessions — all pass - Wire lifecycle.Manager into cmd/claude-failover/main.go after state init Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
2d43580c18
commit
978b60ccf7
10 changed files with 810 additions and 32 deletions
6
.gitignore
vendored
6
.gitignore
vendored
|
|
@ -31,8 +31,8 @@ config.local.yaml
|
|||
*.swo
|
||||
.DS_Store
|
||||
|
||||
# Runtime / state
|
||||
state/
|
||||
checkpoints/
|
||||
# Runtime / state (top-level only, not internal/state package)
|
||||
/state/
|
||||
/checkpoints/
|
||||
tmp/
|
||||
.agent-queue/
|
||||
|
|
|
|||
|
|
@ -1,10 +1,4 @@
|
|||
// Package main is the entrypoint for the claude-failover daemon.
|
||||
//
|
||||
// Scope of this stub: load the YAML config from disk, log startup
|
||||
// information, and block until a termination signal. The real runtime
|
||||
// (dispatcher, quota-monitor, session-watcher, checkpoint, janitor,
|
||||
// notifier, account-switcher goroutines) is not implemented yet — see
|
||||
// docs/architecture.md.
|
||||
package main
|
||||
|
||||
import (
|
||||
|
|
@ -14,45 +8,68 @@ import (
|
|||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/api"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/lifecycle"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/tmux"
|
||||
)
|
||||
|
||||
// Config mirrors config.example.yaml at a high level. We keep it loose
|
||||
// here because this stub does not wire real YAML parsing yet; the full
|
||||
// schema will live in internal/config once implementation starts.
|
||||
type Config struct {
|
||||
Path string
|
||||
}
|
||||
|
||||
func loadConfig(path string) (*Config, error) {
|
||||
// TODO(claude-failover): parse YAML via gopkg.in/yaml.v3 and validate.
|
||||
if _, err := os.Stat(path); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Config{Path: path}, nil
|
||||
}
|
||||
const version = "0.1.0"
|
||||
|
||||
func main() {
|
||||
var cfgPath string
|
||||
flag.StringVar(&cfgPath, "config", "config.yaml", "path to YAML config")
|
||||
flag.StringVar(&cfgPath, "config", "config.yaml", "path to YAML config file")
|
||||
flag.Parse()
|
||||
|
||||
log.SetFlags(log.LstdFlags | log.Lmicroseconds | log.LUTC)
|
||||
log.Printf("claude-failover starting (config=%s)", cfgPath)
|
||||
log.Printf("claude-failover v%s starting (config=%s)", version, cfgPath)
|
||||
|
||||
cfg, err := loadConfig(cfgPath)
|
||||
cfg, err := config.Load(cfgPath)
|
||||
if err != nil {
|
||||
log.Fatalf("config load failed: %v", err)
|
||||
}
|
||||
log.Printf("config loaded: %s", cfg.Path)
|
||||
log.Printf("config loaded: %d account(s), pool min=%d max=%d",
|
||||
len(cfg.Accounts), cfg.Pool.Autonomous.Min, cfg.Pool.Autonomous.Max)
|
||||
|
||||
// TODO: spawn goroutines — dispatcher, quota-monitor, session-watcher,
|
||||
// checkpoint, janitor, notifier, account-switcher.
|
||||
// Initialise state — reload from disk if a snapshot exists.
|
||||
stateFile := cfg.Checkpoint.Dir + "/state.json"
|
||||
s, err := state.LoadFromFile(stateFile)
|
||||
if err != nil {
|
||||
log.Fatalf("state init failed: %v", err)
|
||||
}
|
||||
log.Printf("state loaded (%d sessions tracked)", len(s.Sessions))
|
||||
|
||||
// Initialise tmux client and lifecycle manager.
|
||||
tmuxClient := tmux.NewExecClient()
|
||||
lm := lifecycle.New(tmuxClient, s, cfg)
|
||||
lm.EnsureAllSessions()
|
||||
|
||||
// Block until SIGINT or SIGTERM.
|
||||
ctx, cancel := signal.NotifyContext(context.Background(),
|
||||
syscall.SIGINT, syscall.SIGTERM)
|
||||
defer cancel()
|
||||
|
||||
log.Printf("claude-failover ready (stub — no workers running)")
|
||||
<-ctx.Done()
|
||||
log.Printf("shutdown signal received, exiting")
|
||||
go lm.Run(ctx)
|
||||
|
||||
// Start HTTP API server.
|
||||
listenAddr := cfg.MCPHTTP.Listen
|
||||
if listenAddr == "" {
|
||||
listenAddr = "127.0.0.1:9090"
|
||||
}
|
||||
srv := api.New(listenAddr, s)
|
||||
go func() {
|
||||
if err := srv.Start(); err != nil {
|
||||
log.Printf("API server error: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}()
|
||||
log.Printf("claude-failover v%s started, API on %s", version, listenAddr)
|
||||
|
||||
<-ctx.Done()
|
||||
log.Printf("shutdown signal received — flushing state and exiting")
|
||||
if err := s.Flush(); err != nil {
|
||||
log.Printf("state flush warning: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
4
go.mod
4
go.mod
|
|
@ -1,3 +1,5 @@
|
|||
module forge.secuaas.ovh/olivier/claude-failover
|
||||
|
||||
go 1.24
|
||||
go 1.22
|
||||
|
||||
require gopkg.in/yaml.v3 v3.0.1
|
||||
|
|
|
|||
4
go.sum
Normal file
4
go.sum
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
41
internal/api/server.go
Normal file
41
internal/api/server.go
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
// Package api exposes the HTTP control-plane used by the MCP gateway
|
||||
// and the orchestrator dashboard.
|
||||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
||||
)
|
||||
|
||||
const version = "0.1.0"
|
||||
|
||||
// Server is a minimal HTTP server exposing /health and /status.
|
||||
type Server struct {
|
||||
addr string
|
||||
state *state.State
|
||||
}
|
||||
|
||||
// New creates a Server listening on addr.
|
||||
func New(addr string, s *state.State) *Server {
|
||||
return &Server{addr: addr, state: s}
|
||||
}
|
||||
|
||||
// Start registers routes and begins serving. Blocks until the listener fails.
|
||||
func (s *Server) Start() error {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/health", s.handleHealth)
|
||||
mux.HandleFunc("/status", s.handleStatus)
|
||||
return http.ListenAndServe(s.addr, mux)
|
||||
}
|
||||
|
||||
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintf(w, `{"status":"ok","version":%q}`, version)
|
||||
}
|
||||
|
||||
func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write(s.state.JSON())
|
||||
}
|
||||
161
internal/config/config.go
Normal file
161
internal/config/config.go
Normal file
|
|
@ -0,0 +1,161 @@
|
|||
// Package config loads and validates the claude-failover YAML configuration.
|
||||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// Config is the top-level configuration structure, mapping config.example.yaml.
|
||||
type Config struct {
|
||||
Accounts []AccountConfig `yaml:"accounts"`
|
||||
Pool PoolConfig `yaml:"pool"`
|
||||
Quota QuotaConfig `yaml:"quota"`
|
||||
Checkpoint CheckpointConfig `yaml:"checkpoint"`
|
||||
MCPHTTP MCPHTTPConfig `yaml:"mcp_http"`
|
||||
}
|
||||
|
||||
// AccountConfig describes a single Anthropic account available to the daemon.
|
||||
type AccountConfig struct {
|
||||
Name string `yaml:"name"`
|
||||
Home string `yaml:"home"`
|
||||
Limits AccountLimits `yaml:"limits"`
|
||||
Priority int `yaml:"priority"`
|
||||
}
|
||||
|
||||
// AccountLimits defines soft usage thresholds for an account.
|
||||
type AccountLimits struct {
|
||||
HourlyMsgs int `yaml:"hourly_msgs"`
|
||||
WeeklyMsgs int `yaml:"weekly_msgs"`
|
||||
}
|
||||
|
||||
// PoolConfig describes the session pool configuration.
|
||||
type PoolConfig struct {
|
||||
Dedicated []DedicatedSession `yaml:"dedicated"`
|
||||
Autonomous AutonomousConfig `yaml:"autonomous"`
|
||||
SharedProjectsDir string `yaml:"shared_projects_dir"`
|
||||
}
|
||||
|
||||
// DedicatedSession is a named tmux session bound to a specific project directory.
|
||||
type DedicatedSession struct {
|
||||
Name string `yaml:"name"`
|
||||
Project string `yaml:"project"`
|
||||
}
|
||||
|
||||
// AutonomousConfig controls the autoscaling inbox-dispatcher session pool.
|
||||
type AutonomousConfig struct {
|
||||
Prefix string `yaml:"prefix"`
|
||||
Min int `yaml:"min"`
|
||||
Max int `yaml:"max"`
|
||||
}
|
||||
|
||||
// QuotaConfig defines quota monitoring parameters.
|
||||
type QuotaConfig struct {
|
||||
PollInterval Duration `yaml:"poll_interval"`
|
||||
Window5hThreshold float64 `yaml:"window_5h_threshold"`
|
||||
WindowWeekThreshold float64 `yaml:"window_week_threshold"`
|
||||
ReactivateCooldown Duration `yaml:"reactivate_cooldown"`
|
||||
}
|
||||
|
||||
// CheckpointConfig controls session context snapshotting.
|
||||
type CheckpointConfig struct {
|
||||
Dir string `yaml:"dir"`
|
||||
Interval Duration `yaml:"interval"`
|
||||
Keep int `yaml:"keep"`
|
||||
}
|
||||
|
||||
// MCPHTTPConfig defines the HTTP control-plane endpoint.
|
||||
type MCPHTTPConfig struct {
|
||||
Listen string `yaml:"listen"`
|
||||
BearerTokenEnv string `yaml:"bearer_token_env"`
|
||||
EnableTrigger bool `yaml:"enable_trigger"`
|
||||
}
|
||||
|
||||
// Duration is a time.Duration that unmarshals from YAML strings like "30s", "1h".
|
||||
type Duration struct {
|
||||
time.Duration
|
||||
}
|
||||
|
||||
func (d *Duration) UnmarshalYAML(value *yaml.Node) error {
|
||||
var s string
|
||||
if err := value.Decode(&s); err != nil {
|
||||
return err
|
||||
}
|
||||
dur, err := time.ParseDuration(s)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid duration %q: %w", s, err)
|
||||
}
|
||||
d.Duration = dur
|
||||
return nil
|
||||
}
|
||||
|
||||
// expandHome replaces a leading "~" with the current user's home directory.
|
||||
func expandHome(path string) string {
|
||||
if strings.HasPrefix(path, "~/") {
|
||||
home, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
return path
|
||||
}
|
||||
return filepath.Join(home, path[2:])
|
||||
}
|
||||
return path
|
||||
}
|
||||
|
||||
// expandPaths resolves ~ in paths that may reference the operator's home dir.
|
||||
func (c *Config) expandPaths() {
|
||||
for i := range c.Accounts {
|
||||
c.Accounts[i].Home = expandHome(c.Accounts[i].Home)
|
||||
}
|
||||
for i := range c.Pool.Dedicated {
|
||||
c.Pool.Dedicated[i].Project = expandHome(c.Pool.Dedicated[i].Project)
|
||||
}
|
||||
c.Pool.SharedProjectsDir = expandHome(c.Pool.SharedProjectsDir)
|
||||
c.Checkpoint.Dir = expandHome(c.Checkpoint.Dir)
|
||||
}
|
||||
|
||||
// defaults sets sensible fallback values when fields are zero.
|
||||
func (c *Config) defaults() {
|
||||
if c.MCPHTTP.Listen == "" {
|
||||
c.MCPHTTP.Listen = "127.0.0.1:9090"
|
||||
}
|
||||
if c.Quota.PollInterval.Duration == 0 {
|
||||
c.Quota.PollInterval.Duration = 30 * time.Second
|
||||
}
|
||||
if c.Checkpoint.Interval.Duration == 0 {
|
||||
c.Checkpoint.Interval.Duration = 60 * time.Second
|
||||
}
|
||||
if c.Checkpoint.Keep == 0 {
|
||||
c.Checkpoint.Keep = 20
|
||||
}
|
||||
if c.Pool.Autonomous.Min == 0 {
|
||||
c.Pool.Autonomous.Min = 2
|
||||
}
|
||||
if c.Pool.Autonomous.Max == 0 {
|
||||
c.Pool.Autonomous.Max = 10
|
||||
}
|
||||
}
|
||||
|
||||
// Load reads the YAML file at path, expands home paths, and applies defaults.
|
||||
func Load(path string) (*Config, error) {
|
||||
path = expandHome(path)
|
||||
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("reading config %s: %w", path, err)
|
||||
}
|
||||
|
||||
var cfg Config
|
||||
if err := yaml.Unmarshal(data, &cfg); err != nil {
|
||||
return nil, fmt.Errorf("parsing config %s: %w", path, err)
|
||||
}
|
||||
|
||||
cfg.expandPaths()
|
||||
cfg.defaults()
|
||||
|
||||
return &cfg, nil
|
||||
}
|
||||
164
internal/lifecycle/manager.go
Normal file
164
internal/lifecycle/manager.go
Normal file
|
|
@ -0,0 +1,164 @@
|
|||
// Package lifecycle provides the SessionLifecycleManager, which continuously
|
||||
// monitors tmux sessions and recreates any that have died unexpectedly.
|
||||
package lifecycle
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/tmux"
|
||||
)
|
||||
|
||||
// Manager reconciles the desired pool state (from config) against the actual
|
||||
// tmux sessions, recreating any that have disappeared.
|
||||
type Manager struct {
|
||||
tmux tmux.Client
|
||||
state *state.State
|
||||
config *config.Config
|
||||
logger *log.Logger
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
// New creates a Manager with a default reconciliation interval of 15 seconds.
|
||||
func New(tc tmux.Client, s *state.State, cfg *config.Config) *Manager {
|
||||
return &Manager{
|
||||
tmux: tc,
|
||||
state: s,
|
||||
config: cfg,
|
||||
logger: log.Default(),
|
||||
interval: 15 * time.Second,
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts the reconciliation loop, ticking every m.interval until ctx is cancelled.
|
||||
func (m *Manager) Run(ctx context.Context) {
|
||||
ticker := time.NewTicker(m.interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
m.reconcile()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// EnsureAllSessions creates all configured sessions that are not yet present in tmux.
|
||||
// It is intended to be called once at daemon startup before Run is launched.
|
||||
func (m *Manager) EnsureAllSessions() {
|
||||
for _, ds := range m.config.Pool.Dedicated {
|
||||
if !m.tmux.HasSession(ds.Name) {
|
||||
if err := m.tmux.CreateSession(ds.Name, ds.Project); err != nil {
|
||||
m.logger.Printf("[lifecycle] EnsureAllSessions: failed to create session %q: %v", ds.Name, err)
|
||||
} else {
|
||||
m.logger.Printf("[lifecycle] EnsureAllSessions: created session %q (workdir=%s)", ds.Name, ds.Project)
|
||||
m.state.SetIdle(ds.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure autonomous pool sessions (prefix + index).
|
||||
prefix := m.config.Pool.Autonomous.Prefix
|
||||
if prefix == "" {
|
||||
prefix = "ccl-auto-"
|
||||
}
|
||||
for i := 0; i < m.config.Pool.Autonomous.Min; i++ {
|
||||
name := sessionName(prefix, i)
|
||||
if !m.tmux.HasSession(name) {
|
||||
if err := m.tmux.CreateSession(name, ""); err != nil {
|
||||
m.logger.Printf("[lifecycle] EnsureAllSessions: failed to create autonomous session %q: %v", name, err)
|
||||
} else {
|
||||
m.logger.Printf("[lifecycle] EnsureAllSessions: created autonomous session %q", name)
|
||||
m.state.SetIdle(name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// reconcile checks every configured session and repairs missing ones.
|
||||
func (m *Manager) reconcile() {
|
||||
// Reconcile dedicated sessions.
|
||||
for _, ds := range m.config.Pool.Dedicated {
|
||||
m.reconcileSession(ds.Name, ds.Project)
|
||||
}
|
||||
|
||||
// Reconcile the autonomous pool (min sessions).
|
||||
prefix := m.config.Pool.Autonomous.Prefix
|
||||
if prefix == "" {
|
||||
prefix = "ccl-auto-"
|
||||
}
|
||||
for i := 0; i < m.config.Pool.Autonomous.Min; i++ {
|
||||
name := sessionName(prefix, i)
|
||||
m.reconcileSession(name, "")
|
||||
}
|
||||
}
|
||||
|
||||
// reconcileSession handles a single named session.
|
||||
func (m *Manager) reconcileSession(name, workdir string) {
|
||||
has := m.tmux.HasSession(name)
|
||||
st := m.state.GetSession(name)
|
||||
|
||||
if has {
|
||||
// Session exists — if it's supposed to be working, verify it still looks active.
|
||||
if st != nil && st.State == "working" {
|
||||
tail, err := m.tmux.CapturePaneTail(name, 5)
|
||||
if err != nil {
|
||||
m.logger.Printf("[lifecycle] reconcile: cannot capture pane for %q: %v", name, err)
|
||||
}
|
||||
// A session that has exited to the shell prompt after a Claude process crash
|
||||
// will show a shell prompt. We just log a warning here; deeper heuristics can
|
||||
// be added in future phases.
|
||||
_ = tail
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Session is missing.
|
||||
if st == nil || st.State == "idle" || st.State == "" {
|
||||
m.logger.Printf("[lifecycle] RECREATED: session %q was absent (state=idle) — creating", name)
|
||||
if err := m.tmux.CreateSession(name, workdir); err != nil {
|
||||
m.logger.Printf("[lifecycle] reconcile: failed to recreate %q: %v", name, err)
|
||||
return
|
||||
}
|
||||
m.state.SetIdle(name)
|
||||
} else if st.State == "working" {
|
||||
m.logger.Printf("[lifecycle] RECOVERED: session %q crashed while working (task=%v) — marking failed and recreating",
|
||||
name, deref(st.Task))
|
||||
m.state.SetFailed(name)
|
||||
if err := m.tmux.CreateSession(name, workdir); err != nil {
|
||||
m.logger.Printf("[lifecycle] reconcile: failed to recreate %q after recovery: %v", name, err)
|
||||
return
|
||||
}
|
||||
m.state.SetIdle(name)
|
||||
}
|
||||
}
|
||||
|
||||
// sessionName builds a session name from a prefix and a zero-based index.
|
||||
func sessionName(prefix string, i int) string {
|
||||
return prefix + itoa(i)
|
||||
}
|
||||
|
||||
// itoa converts an integer to its decimal string representation without importing strconv.
|
||||
func itoa(n int) string {
|
||||
if n == 0 {
|
||||
return "0"
|
||||
}
|
||||
b := make([]byte, 0, 10)
|
||||
for n > 0 {
|
||||
b = append([]byte{byte('0' + n%10)}, b...)
|
||||
n /= 10
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
// deref safely dereferences a *string, returning "<nil>" if nil.
|
||||
func deref(s *string) string {
|
||||
if s == nil {
|
||||
return "<nil>"
|
||||
}
|
||||
return *s
|
||||
}
|
||||
150
internal/lifecycle/manager_test.go
Normal file
150
internal/lifecycle/manager_test.go
Normal file
|
|
@ -0,0 +1,150 @@
|
|||
package lifecycle
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
|
||||
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
||||
)
|
||||
|
||||
// mockTmux is a minimal in-memory implementation of tmux.Client for tests.
|
||||
type mockTmux struct {
|
||||
sessions map[string]bool
|
||||
createCalls []string
|
||||
killCalls []string
|
||||
sendKeysCalls []string
|
||||
}
|
||||
|
||||
func newMockTmux() *mockTmux {
|
||||
return &mockTmux{sessions: make(map[string]bool)}
|
||||
}
|
||||
|
||||
func (m *mockTmux) HasSession(name string) bool {
|
||||
return m.sessions[name]
|
||||
}
|
||||
|
||||
func (m *mockTmux) CreateSession(name, workdir string) error {
|
||||
m.sessions[name] = true
|
||||
m.createCalls = append(m.createCalls, name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockTmux) KillSession(name string) error {
|
||||
delete(m.sessions, name)
|
||||
m.killCalls = append(m.killCalls, name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockTmux) SendKeys(session, keys string) error {
|
||||
m.sendKeysCalls = append(m.sendKeysCalls, session)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockTmux) CapturePaneTail(session string, lines int) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// minimalConfig returns a config with one dedicated session and no autonomous pool.
|
||||
func minimalConfig(sessionName, project string) *config.Config {
|
||||
return &config.Config{
|
||||
Pool: config.PoolConfig{
|
||||
Dedicated: []config.DedicatedSession{
|
||||
{Name: sessionName, Project: project},
|
||||
},
|
||||
Autonomous: config.AutonomousConfig{
|
||||
Prefix: "ccl-auto-",
|
||||
Min: 0,
|
||||
Max: 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// TestReconcileCreatesDeadSession verifies that when a session is absent and
|
||||
// its state is idle, reconcile recreates it.
|
||||
func TestReconcileCreatesDeadSession(t *testing.T) {
|
||||
tc := newMockTmux()
|
||||
// Session does NOT exist in tmux.
|
||||
tc.sessions["my-session"] = false
|
||||
|
||||
s := state.New("/tmp/test-state-idle.json")
|
||||
s.SetIdle("my-session")
|
||||
|
||||
cfg := minimalConfig("my-session", "/tmp/project")
|
||||
m := New(tc, s, cfg)
|
||||
m.reconcile()
|
||||
|
||||
if len(tc.createCalls) != 1 || tc.createCalls[0] != "my-session" {
|
||||
t.Errorf("expected CreateSession(my-session) to be called once; createCalls=%v", tc.createCalls)
|
||||
}
|
||||
if got := s.GetSession("my-session"); got == nil || got.State != "idle" {
|
||||
t.Errorf("expected session state idle after recreate, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReconcileRecoversCrashedSession verifies that a missing session whose
|
||||
// state is "working" gets marked failed before being recreated.
|
||||
func TestReconcileRecoversCrashedSession(t *testing.T) {
|
||||
tc := newMockTmux()
|
||||
// Session does NOT exist in tmux but was working.
|
||||
tc.sessions["worker"] = false
|
||||
|
||||
s := state.New("/tmp/test-state-working.json")
|
||||
s.SetWorking("worker", "task-abc")
|
||||
|
||||
cfg := minimalConfig("worker", "")
|
||||
m := New(tc, s, cfg)
|
||||
m.reconcile()
|
||||
|
||||
if len(tc.createCalls) != 1 || tc.createCalls[0] != "worker" {
|
||||
t.Errorf("expected CreateSession(worker) once after recovery; createCalls=%v", tc.createCalls)
|
||||
}
|
||||
// State should transition: working -> failed -> idle (SetIdle called after recreate).
|
||||
got := s.GetSession("worker")
|
||||
if got == nil {
|
||||
t.Fatal("session state is nil after recovery")
|
||||
}
|
||||
if got.State != "idle" {
|
||||
t.Errorf("expected session state idle after recovery, got %q", got.State)
|
||||
}
|
||||
if got.LastFail == nil {
|
||||
t.Error("expected LastFail to be set after SetFailed was called")
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnsureAllSessions verifies that EnsureAllSessions creates all sessions
|
||||
// that are missing from tmux.
|
||||
func TestEnsureAllSessions(t *testing.T) {
|
||||
tc := newMockTmux()
|
||||
// None of the sessions exist yet.
|
||||
|
||||
s := state.New("/tmp/test-state-ensure.json")
|
||||
cfg := &config.Config{
|
||||
Pool: config.PoolConfig{
|
||||
Dedicated: []config.DedicatedSession{
|
||||
{Name: "sess-a", Project: "/tmp/a"},
|
||||
{Name: "sess-b", Project: "/tmp/b"},
|
||||
},
|
||||
Autonomous: config.AutonomousConfig{
|
||||
Prefix: "auto-",
|
||||
Min: 2,
|
||||
Max: 5,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
m := New(tc, s, cfg)
|
||||
m.EnsureAllSessions()
|
||||
|
||||
// Expect: sess-a, sess-b, auto-0, auto-1 = 4 sessions created.
|
||||
if len(tc.createCalls) != 4 {
|
||||
t.Errorf("expected 4 CreateSession calls, got %d: %v", len(tc.createCalls), tc.createCalls)
|
||||
}
|
||||
|
||||
want := map[string]bool{"sess-a": true, "sess-b": true, "auto-0": true, "auto-1": true}
|
||||
for _, name := range tc.createCalls {
|
||||
if !want[name] {
|
||||
t.Errorf("unexpected session created: %q", name)
|
||||
}
|
||||
}
|
||||
}
|
||||
150
internal/state/state.go
Normal file
150
internal/state/state.go
Normal file
|
|
@ -0,0 +1,150 @@
|
|||
// Package state manages the in-memory + on-disk representation of all
|
||||
// session and quota state for the claude-failover daemon.
|
||||
package state
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// SessionState captures the runtime status of a single tmux session.
|
||||
type SessionState struct {
|
||||
State string `json:"state"`
|
||||
Project *string `json:"project,omitempty"`
|
||||
AssignedAt *time.Time `json:"assigned_at,omitempty"`
|
||||
Task *string `json:"task,omitempty"`
|
||||
LastFail *time.Time `json:"last_fail,omitempty"`
|
||||
}
|
||||
|
||||
// QuotaState tracks which account is active and whether dispatching is paused.
|
||||
type QuotaState struct {
|
||||
Paused bool `json:"paused"`
|
||||
ActiveAccount string `json:"active_account"`
|
||||
ResumeAt *time.Time `json:"resume_at,omitempty"`
|
||||
}
|
||||
|
||||
// State is the thread-safe runtime state persisted to a JSON file.
|
||||
type State struct {
|
||||
mu sync.RWMutex
|
||||
Sessions map[string]*SessionState `json:"sessions"`
|
||||
Quota QuotaState `json:"quota"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
filePath string
|
||||
}
|
||||
|
||||
// New creates an empty State that will be flushed to filePath.
|
||||
func New(filePath string) *State {
|
||||
return &State{
|
||||
Sessions: make(map[string]*SessionState),
|
||||
UpdatedAt: time.Now().UTC(),
|
||||
filePath: filePath,
|
||||
}
|
||||
}
|
||||
|
||||
// LoadFromFile reads an existing state JSON file. Returns a new empty
|
||||
// State if the file does not exist.
|
||||
func LoadFromFile(path string) (*State, error) {
|
||||
data, err := os.ReadFile(path)
|
||||
if os.IsNotExist(err) {
|
||||
return New(path), nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("reading state file %s: %w", path, err)
|
||||
}
|
||||
|
||||
s := New(path)
|
||||
if err := json.Unmarshal(data, s); err != nil {
|
||||
return nil, fmt.Errorf("parsing state file %s: %w", path, err)
|
||||
}
|
||||
s.filePath = path
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Flush serialises the state to disk atomically (write to tmp then rename).
|
||||
func (s *State) Flush() error {
|
||||
if s.filePath == "" {
|
||||
return nil
|
||||
}
|
||||
s.mu.RLock()
|
||||
data, err := json.MarshalIndent(s, "", " ")
|
||||
s.mu.RUnlock()
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshalling state: %w", err)
|
||||
}
|
||||
|
||||
tmp := s.filePath + ".tmp"
|
||||
if err := os.WriteFile(tmp, data, 0600); err != nil {
|
||||
return fmt.Errorf("writing state tmp: %w", err)
|
||||
}
|
||||
return os.Rename(tmp, s.filePath)
|
||||
}
|
||||
|
||||
// JSON returns the current state as a JSON byte slice (for HTTP /status).
|
||||
func (s *State) JSON() []byte {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
data, _ := json.Marshal(s)
|
||||
return data
|
||||
}
|
||||
|
||||
// GetSession returns the state for the named session, or nil.
|
||||
func (s *State) GetSession(name string) *SessionState {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.Sessions[name]
|
||||
}
|
||||
|
||||
// touch updates the UpdatedAt timestamp. Must be called with write lock held.
|
||||
func (s *State) touch() {
|
||||
s.UpdatedAt = time.Now().UTC()
|
||||
}
|
||||
|
||||
// SetIdle marks the named session as idle and clears task metadata.
|
||||
func (s *State) SetIdle(name string) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
sess, ok := s.Sessions[name]
|
||||
if !ok {
|
||||
sess = &SessionState{}
|
||||
s.Sessions[name] = sess
|
||||
}
|
||||
sess.State = "idle"
|
||||
sess.Task = nil
|
||||
sess.AssignedAt = nil
|
||||
s.touch()
|
||||
}
|
||||
|
||||
// SetWorking marks the named session as busy with the given task ID.
|
||||
func (s *State) SetWorking(name, task string) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
now := time.Now().UTC()
|
||||
sess, ok := s.Sessions[name]
|
||||
if !ok {
|
||||
sess = &SessionState{}
|
||||
s.Sessions[name] = sess
|
||||
}
|
||||
sess.State = "working"
|
||||
sess.Task = &task
|
||||
sess.AssignedAt = &now
|
||||
s.touch()
|
||||
}
|
||||
|
||||
// SetFailed marks the named session as failed and records the failure timestamp.
|
||||
// The task is preserved for potential requeue by the caller.
|
||||
func (s *State) SetFailed(name string) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
now := time.Now().UTC()
|
||||
sess, ok := s.Sessions[name]
|
||||
if !ok {
|
||||
sess = &SessionState{}
|
||||
s.Sessions[name] = sess
|
||||
}
|
||||
sess.State = "failed"
|
||||
sess.LastFail = &now
|
||||
s.touch()
|
||||
}
|
||||
89
internal/tmux/client.go
Normal file
89
internal/tmux/client.go
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
// Package tmux provides an interface and implementation for controlling tmux sessions.
|
||||
package tmux
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Client defines the operations the daemon needs on tmux sessions.
|
||||
type Client interface {
|
||||
// HasSession returns true if the named session exists.
|
||||
HasSession(name string) bool
|
||||
// CreateSession creates a new detached session with an optional working directory.
|
||||
CreateSession(name, workdir string) error
|
||||
// KillSession destroys the named session.
|
||||
KillSession(name string) error
|
||||
// SendKeys sends key strokes to the first window of a session.
|
||||
SendKeys(session, keys string) error
|
||||
// CapturePaneTail returns the last n lines from the session's active pane.
|
||||
CapturePaneTail(session string, lines int) (string, error)
|
||||
}
|
||||
|
||||
// ExecClient implements Client by shelling out to the tmux binary.
|
||||
type ExecClient struct{}
|
||||
|
||||
// NewExecClient returns a ready-to-use ExecClient.
|
||||
func NewExecClient() *ExecClient {
|
||||
return &ExecClient{}
|
||||
}
|
||||
|
||||
// run executes a tmux subcommand and returns combined output.
|
||||
func run(args ...string) (string, error) {
|
||||
cmd := exec.Command("tmux", args...)
|
||||
var out bytes.Buffer
|
||||
cmd.Stdout = &out
|
||||
cmd.Stderr = &out
|
||||
err := cmd.Run()
|
||||
return strings.TrimSpace(out.String()), err
|
||||
}
|
||||
|
||||
// HasSession returns true when `tmux has-session` exits 0.
|
||||
func (c *ExecClient) HasSession(name string) bool {
|
||||
_, err := run("has-session", "-t", name)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
// CreateSession creates a new detached tmux session.
|
||||
// If workdir is non-empty, the session starts in that directory.
|
||||
func (c *ExecClient) CreateSession(name, workdir string) error {
|
||||
args := []string{"new-session", "-d", "-s", name}
|
||||
if workdir != "" {
|
||||
args = append(args, "-c", workdir)
|
||||
}
|
||||
out, err := run(args...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("tmux new-session %q: %w (%s)", name, err, out)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// KillSession destroys the named session.
|
||||
func (c *ExecClient) KillSession(name string) error {
|
||||
out, err := run("kill-session", "-t", name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("tmux kill-session %q: %w (%s)", name, err, out)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendKeys sends key strokes to the first pane of the session.
|
||||
func (c *ExecClient) SendKeys(session, keys string) error {
|
||||
out, err := run("send-keys", "-t", session, keys, "Enter")
|
||||
if err != nil {
|
||||
return fmt.Errorf("tmux send-keys %q: %w (%s)", session, err, out)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CapturePaneTail captures the last n lines from the session's active pane.
|
||||
func (c *ExecClient) CapturePaneTail(session string, lines int) (string, error) {
|
||||
out, err := run("capture-pane", "-p", "-t", session,
|
||||
"-S", fmt.Sprintf("-%d", lines))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("tmux capture-pane %q: %w (%s)", session, err, out)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue