claude-failover/internal/state/state.go

224 lines
5.9 KiB
Go
Raw Permalink Normal View History

// Package state manages the in-memory + on-disk representation of all
// session and quota state for the claude-failover daemon.
package state
import (
"encoding/json"
"fmt"
"os"
"sync"
"time"
)
// SessionState captures the runtime status of a single tmux session.
type SessionState struct {
State string `json:"state"`
Project *string `json:"project,omitempty"`
AssignedAt *time.Time `json:"assigned_at,omitempty"`
Task *string `json:"task,omitempty"`
LastFail *time.Time `json:"last_fail,omitempty"`
}
// QuotaState tracks which account is active and whether dispatching is paused.
type QuotaState struct {
Paused bool `json:"paused"`
ActiveAccount string `json:"active_account"`
ResumeAt *time.Time `json:"resume_at,omitempty"`
LastSwapAt *time.Time `json:"last_swap_at,omitempty"`
LastSwapFrom string `json:"last_swap_from,omitempty"`
LastSwapTo string `json:"last_swap_to,omitempty"`
}
// State is the thread-safe runtime state persisted to a JSON file.
type State struct {
mu sync.RWMutex
Sessions map[string]*SessionState `json:"sessions"`
Quota QuotaState `json:"quota"`
UpdatedAt time.Time `json:"updated_at"`
filePath string
}
// New creates an empty State that will be flushed to filePath.
func New(filePath string) *State {
return &State{
Sessions: make(map[string]*SessionState),
UpdatedAt: time.Now().UTC(),
filePath: filePath,
}
}
// LoadFromFile reads an existing state JSON file. Returns a new empty
// State if the file does not exist.
func LoadFromFile(path string) (*State, error) {
data, err := os.ReadFile(path)
if os.IsNotExist(err) {
return New(path), nil
}
if err != nil {
return nil, fmt.Errorf("reading state file %s: %w", path, err)
}
s := New(path)
if err := json.Unmarshal(data, s); err != nil {
return nil, fmt.Errorf("parsing state file %s: %w", path, err)
}
s.filePath = path
return s, nil
}
// Flush serialises the state to disk atomically (write to tmp then rename).
func (s *State) Flush() error {
if s.filePath == "" {
return nil
}
s.mu.RLock()
data, err := json.MarshalIndent(s, "", " ")
s.mu.RUnlock()
if err != nil {
return fmt.Errorf("marshalling state: %w", err)
}
tmp := s.filePath + ".tmp"
if err := os.WriteFile(tmp, data, 0600); err != nil {
return fmt.Errorf("writing state tmp: %w", err)
}
return os.Rename(tmp, s.filePath)
}
// JSON returns the current state as a JSON byte slice (for HTTP /status).
func (s *State) JSON() []byte {
s.mu.RLock()
defer s.mu.RUnlock()
data, _ := json.Marshal(s)
return data
}
// GetSession returns the state for the named session, or nil.
func (s *State) GetSession(name string) *SessionState {
s.mu.RLock()
defer s.mu.RUnlock()
return s.Sessions[name]
}
// touch updates the UpdatedAt timestamp. Must be called with write lock held.
func (s *State) touch() {
s.UpdatedAt = time.Now().UTC()
}
// SetIdle marks the named session as idle and clears task metadata.
func (s *State) SetIdle(name string) {
s.mu.Lock()
defer s.mu.Unlock()
sess, ok := s.Sessions[name]
if !ok {
sess = &SessionState{}
s.Sessions[name] = sess
}
sess.State = "idle"
sess.Task = nil
sess.AssignedAt = nil
s.touch()
}
// SetWorking marks the named session as busy with the given task ID.
func (s *State) SetWorking(name, task string) {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now().UTC()
sess, ok := s.Sessions[name]
if !ok {
sess = &SessionState{}
s.Sessions[name] = sess
}
sess.State = "working"
sess.Task = &task
sess.AssignedAt = &now
s.touch()
}
// SetStalled marks the named session as stalled (working but heartbeat too old).
func (s *State) SetStalled(name string) {
s.mu.Lock()
defer s.mu.Unlock()
sess, ok := s.Sessions[name]
if !ok {
sess = &SessionState{}
s.Sessions[name] = sess
}
sess.State = "stalled"
s.touch()
}
// ForEachWorking calls f for each session currently in "working" state.
// A snapshot is taken under the read lock; f is called without any lock held.
func (s *State) ForEachWorking(f func(name string, sess *SessionState)) {
s.mu.RLock()
working := make(map[string]SessionState, len(s.Sessions))
for name, sess := range s.Sessions {
if sess.State == "working" {
working[name] = *sess
}
}
s.mu.RUnlock()
for name, snap := range working {
snap := snap
f(name, &snap)
}
}
// SetActiveAccount updates the active account in the quota state.
func (s *State) SetActiveAccount(name string) {
s.mu.Lock()
defer s.mu.Unlock()
s.Quota.ActiveAccount = name
s.touch()
}
// ActiveAccount returns the current active account name.
func (s *State) ActiveAccount() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.Quota.ActiveAccount
}
// RecordSwap stamps the time, source and target of the most recent account swap.
// Used by the quota monitor to enforce a cooldown and avoid ping-pong loops
// when transient upstream errors (e.g. Anthropic 500s mirrored as "rate limit"
// text in the TUI) are misread as quota hits.
func (s *State) RecordSwap(from, to string) {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now().UTC()
s.Quota.LastSwapAt = &now
s.Quota.LastSwapFrom = from
s.Quota.LastSwapTo = to
s.touch()
}
// LastSwapInfo returns the time, source and target of the most recent swap,
// or a zero time and empty strings if no swap has occurred.
func (s *State) LastSwapInfo() (time.Time, string, string) {
s.mu.RLock()
defer s.mu.RUnlock()
if s.Quota.LastSwapAt == nil {
return time.Time{}, "", ""
}
return *s.Quota.LastSwapAt, s.Quota.LastSwapFrom, s.Quota.LastSwapTo
}
// SetFailed marks the named session as failed and records the failure timestamp.
// The task is preserved for potential requeue by the caller.
func (s *State) SetFailed(name string) {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now().UTC()
sess, ok := s.Sessions[name]
if !ok {
sess = &SessionState{}
s.Sessions[name] = sess
}
sess.State = "failed"
sess.LastFail = &now
s.touch()
}