claude-failover/internal/quota/monitor.go
Ubuntu eb6b74c547 feat(pool): add start_index so manual and auto pools can coexist
Production had two disjoint tmux pools named alike but for different
purposes:
  ccl-0..ccl-9           — manual/interactive sessions (operator)
  ccl-auto-11..ccl-auto-20 — autonomous dispatcher pool

Until now the daemon's loops iterated prefix + 0..Max, so with the
deployed config ("prefix: ccl-auto", min=2, max=10) the dispatcher
looked for sessions "ccl-auto0..ccl-auto9" that never existed, while
the real auto pool ccl-auto-11..20 was invisible. Net effect: no task
was ever dispatched, and killAllPoolSessions fabricated phantom
"ccl-auto0/1" sessions on each swap.

- AutonomousConfig gains StartIndex (yaml start_index, default 0).
  Behaviour is unchanged when StartIndex is 0.
- Monitor, switcher (kill + recreate), dispatcher (findFreeSession),
  and lifecycle (EnsureAll + reconcile) all iterate
  [StartIndex, StartIndex+Max) so the daemon only touches its own
  range and leaves ccl-0..ccl-9 alone.
- Production config updated to prefix: "ccl-auto-", start_index: 11,
  min: 10, max: 10 — covering the 10 real ccl-auto-11..20 sessions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 20:39:57 +00:00

296 lines
8.9 KiB
Go

// Package quota monitors Claude Code sessions for quota exhaustion and triggers
// account switches when thresholds are crossed.
package quota
import (
"context"
"log"
"regexp"
"strings"
"time"
"forge.secuaas.ovh/olivier/claude-failover/internal/config"
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
"forge.secuaas.ovh/olivier/claude-failover/internal/tmux"
)
// SwitchRequest is emitted when quota exhaustion is detected, requesting
// the AccountSwitcher to activate a different account.
type SwitchRequest struct {
From string // current active account name
To string // desired account name (empty = auto-select)
ResetTime string // human-readable reset time extracted from the pane
}
// quotaPatterns are substrings that indicate quota exhaustion in a pane.
// Must be specific enough to avoid matching Anthropic server errors (500/503)
// that happen to include generic words like "rate limit" in their payload.
var quotaPatterns = []string{
"you've hit your limit", // Claude Code TUI friendly message
"rate_limit_error", // Anthropic typed error (real HTTP 429)
"quota exceeded", // generic, but specific enough
"usage limit reached", // Claude Pro phrasing
"claude pro usage", // Claude Pro dashboard phrasing
"too many requests", // HTTP 429 status text
"5-hour limit", // Claude Code 5h window phrasing
}
// serverErrorPatterns indicate a transient upstream server error
// (Anthropic 500/503), NOT a quota hit. When any of these is present in
// the pane, we treat the "hit" as a false positive and do NOT swap.
var serverErrorPatterns = []string{
"api_error", // Anthropic typed error (HTTP 500)
"overloaded_error", // Anthropic typed error (HTTP 503)
"internal server error", // HTTP 500 status text
"api error: 5", // Claude TUI rendering of 5xx
}
// resetTimeRe extracts reset times like "resets 8pm", "resets at 11:30pm",
// "resets in 45 minutes".
var resetTimeRe = regexp.MustCompile(
`(?i)resets?\s+(?:at\s+)?([0-9]+(?::[0-9]+)?\s*[ap]m|in\s+[0-9]+\s+(?:minute|hour)s?)`,
)
// Monitor polls tmux panes for quota exhaustion messages.
type Monitor struct {
tmux tmux.Client
state *state.State
config *config.Config
switchCh chan SwitchRequest
interval time.Duration
logger *log.Logger
// suspectedHitAt tracks the first poll that detected a quota pattern
// without a parseable reset time. A second consecutive hit is required
// before emitting SwapRequested; a single-poll hit is ignored as likely
// transient (e.g. Anthropic 500s containing "rate limit" in payload).
// Mutated only from poll(), which runs on a single goroutine — no lock.
suspectedHitAt time.Time
}
// New creates a Monitor with defaults from cfg.
func New(tc tmux.Client, s *state.State, cfg *config.Config) *Monitor {
interval := cfg.Quota.PollInterval.Duration
if interval == 0 {
interval = 30 * time.Second
}
return &Monitor{
tmux: tc,
state: s,
config: cfg,
switchCh: make(chan SwitchRequest, 1),
interval: interval,
logger: log.Default(),
}
}
// SwitchChan returns the channel on which SwitchRequests are sent.
func (m *Monitor) SwitchChan() <-chan SwitchRequest {
return m.switchCh
}
// Run starts the quota monitor loop until ctx is cancelled.
func (m *Monitor) Run(ctx context.Context) {
ticker := time.NewTicker(m.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.poll()
}
}
}
// poll checks all sessions for quota exhaustion once.
func (m *Monitor) poll() {
if m.state.ActiveAccount() != "" && m.isQuotaPaused() {
return
}
// Cooldown guard: after a recent swap, refuse to trigger another one until
// the cooldown elapses. Prevents ping-pong when the freshly-activated
// account surfaces transient errors whose text happens to match
// quotaPatterns (e.g. Anthropic 500s rendered as "rate limit" in the TUI).
cooldown := m.config.Quota.ReactivateCooldown.Duration
if cooldown == 0 {
cooldown = 10 * time.Minute
}
if lastAt, _, lastTo := m.state.LastSwapInfo(); !lastAt.IsZero() {
if since := time.Since(lastAt); since < cooldown {
m.logger.Printf("[quota] swap cooldown active (since=%v < cooldown=%v, last_to=%q) — skipping detection",
since.Round(time.Second), cooldown, lastTo)
return
}
}
blockedPool := 0
blockedInteractive := 0
var resetTime string
var firstPattern, firstSession, firstSnippet string
recordHit := func(session, tail string) {
if firstPattern == "" {
firstPattern = firstMatchingPattern(tail)
firstSession = session
firstSnippet = snippet(tail)
}
}
prefix := m.config.Pool.Autonomous.Prefix
if prefix == "" {
prefix = "ccl-auto-"
}
start := m.config.Pool.Autonomous.StartIndex
for i := start; i < start+m.config.Pool.Autonomous.Max; i++ {
name := sessionName(prefix, i)
if !m.tmux.HasSession(name) {
continue
}
// Only capture 3 lines — avoids false positives on stale history.
tail, err := m.tmux.CapturePaneTail(name, 3)
if err != nil {
continue
}
if isQuotaExhausted(tail) {
blockedPool++
recordHit(name, tail)
if rt := extractResetTime(tail); rt != "" {
resetTime = rt
}
}
}
for _, ds := range m.config.Pool.Dedicated {
if !m.tmux.HasSession(ds.Name) {
continue
}
tail, err := m.tmux.CapturePaneTail(ds.Name, 3)
if err != nil {
continue
}
if isQuotaExhausted(tail) {
blockedInteractive++
recordHit(ds.Name, tail)
if rt := extractResetTime(tail); rt != "" {
resetTime = rt
}
}
}
if blockedPool < 2 && blockedInteractive < 1 {
// No detection — clear any suspected-hit state so future transient
// blips have to re-confirm from scratch.
if !m.suspectedHitAt.IsZero() {
m.logger.Printf("[quota] suspected hit cleared (no detection this poll)")
m.suspectedHitAt = time.Time{}
}
return
}
// When no reset time can be parsed, the "hit" might be a transient
// Anthropic 500 that happens to contain "rate limit" in its error
// payload. Require two consecutive polls detecting a hit before
// swapping, so a single-poll false positive is absorbed.
if resetTime == "" {
if m.suspectedHitAt.IsZero() {
m.suspectedHitAt = time.Now()
m.logger.Printf("[quota] suspected hit (no reset time): session=%q pattern=%q snippet=%q — awaiting confirmation next poll",
firstSession, firstPattern, firstSnippet)
return
}
// Second consecutive hit — proceed.
m.logger.Printf("[quota] hit confirmed across %v — proceeding with swap",
time.Since(m.suspectedHitAt).Round(time.Second))
}
m.suspectedHitAt = time.Time{}
req := SwitchRequest{
From: m.state.ActiveAccount(),
ResetTime: resetTime,
}
select {
case m.switchCh <- req:
m.logger.Printf("[quota] SwapRequested: from=%s pool=%d interactive=%d reset=%q trigger_session=%q pattern=%q snippet=%q",
req.From, blockedPool, blockedInteractive, resetTime,
firstSession, firstPattern, firstSnippet)
default:
// Swap already pending — do not queue another.
}
}
// isQuotaPaused checks whether a swap is already in progress.
func (m *Monitor) isQuotaPaused() bool {
// Lightweight proxy: if switch channel is full, a swap is pending.
return len(m.switchCh) > 0
}
// isQuotaExhausted returns true if the pane content indicates quota exhaustion
// AND does not simultaneously show a transient server error (which would make
// the apparent quota match a false positive).
func isQuotaExhausted(paneContent string) bool {
if firstMatchingPattern(paneContent) == "" {
return false
}
return !hasServerError(paneContent)
}
// firstMatchingPattern returns the first quota pattern found in paneContent,
// or "" if none match. Exposed for diagnostic logging.
func firstMatchingPattern(paneContent string) string {
lower := strings.ToLower(paneContent)
for _, p := range quotaPatterns {
if strings.Contains(lower, p) {
return p
}
}
return ""
}
// hasServerError reports whether the pane content shows an Anthropic 5xx /
// transient server error. Used to veto a quota match: a 500 is not a quota.
func hasServerError(paneContent string) bool {
lower := strings.ToLower(paneContent)
for _, p := range serverErrorPatterns {
if strings.Contains(lower, p) {
return true
}
}
return false
}
// snippet returns a 120-char single-line excerpt of pane content for logging.
func snippet(s string) string {
s = strings.ReplaceAll(s, "\n", " ")
s = strings.ReplaceAll(s, "\r", "")
s = strings.TrimSpace(s)
if len(s) > 120 {
s = s[:120]
}
return s
}
// extractResetTime parses a reset time string from pane content.
// Returns "" if none found.
func extractResetTime(content string) string {
m := resetTimeRe.FindStringSubmatch(content)
if len(m) >= 2 {
return strings.TrimSpace(m[1])
}
return ""
}
func sessionName(prefix string, i int) string {
return prefix + itoa(i)
}
func itoa(n int) string {
if n == 0 {
return "0"
}
b := make([]byte, 0, 10)
for n > 0 {
b = append([]byte{byte('0' + n%10)}, b...)
n /= 10
}
return string(b)
}