200 lines
5.7 KiB
Go
200 lines
5.7 KiB
Go
|
|
// Package janitor provides a periodic cleanup goroutine for the claude-failover
|
||
|
|
// daemon. It removes orphaned dispatch-meta files, rewrites agent-queue
|
||
|
|
// status.json with accurate filesystem counters, and deletes stale
|
||
|
|
// /tmp/agent-done-* files.
|
||
|
|
package janitor
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"encoding/json"
|
||
|
|
"log"
|
||
|
|
"os"
|
||
|
|
"path/filepath"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"forge.secuaas.ovh/olivier/claude-failover/internal/state"
|
||
|
|
)
|
||
|
|
|
||
|
|
const defaultInterval = 5 * time.Minute
|
||
|
|
|
||
|
|
// agentQueueStatus mirrors the JSON written to .agent-queue/status.json.
|
||
|
|
// Only the counter fields and state/current_task are managed here.
|
||
|
|
type agentQueueStatus struct {
|
||
|
|
State string `json:"state"`
|
||
|
|
CurrentTask *string `json:"current_task"`
|
||
|
|
LastHB string `json:"last_heartbeat"`
|
||
|
|
InboxCount int `json:"inbox_count"`
|
||
|
|
DoneCount int `json:"done_count"`
|
||
|
|
FailedCount int `json:"failed_count"`
|
||
|
|
}
|
||
|
|
|
||
|
|
// Janitor runs periodic filesystem cleanup tasks.
|
||
|
|
type Janitor struct {
|
||
|
|
state *state.State
|
||
|
|
projectsDir string
|
||
|
|
interval time.Duration
|
||
|
|
logger *log.Logger
|
||
|
|
}
|
||
|
|
|
||
|
|
// New returns a Janitor with the default 5-minute interval.
|
||
|
|
func New(s *state.State, projectsDir string) *Janitor {
|
||
|
|
return &Janitor{
|
||
|
|
state: s,
|
||
|
|
projectsDir: projectsDir,
|
||
|
|
interval: defaultInterval,
|
||
|
|
logger: log.New(os.Stderr, "[janitor] ", log.LstdFlags),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Run starts the cleanup loop. It blocks until ctx is cancelled.
|
||
|
|
func (j *Janitor) Run(ctx context.Context) {
|
||
|
|
ticker := time.NewTicker(j.interval)
|
||
|
|
defer ticker.Stop()
|
||
|
|
for {
|
||
|
|
select {
|
||
|
|
case <-ctx.Done():
|
||
|
|
return
|
||
|
|
case <-ticker.C:
|
||
|
|
j.cleanup()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// cleanup performs a single cleanup pass.
|
||
|
|
func (j *Janitor) cleanup() {
|
||
|
|
j.cleanupProjects()
|
||
|
|
j.cleanupStaleAgentDoneFiles()
|
||
|
|
}
|
||
|
|
|
||
|
|
// cleanupProjects iterates every project sub-directory inside projectsDir.
|
||
|
|
func (j *Janitor) cleanupProjects() {
|
||
|
|
entries, err := os.ReadDir(j.projectsDir)
|
||
|
|
if err != nil {
|
||
|
|
if !os.IsNotExist(err) {
|
||
|
|
j.logger.Printf("readdir %s: %v", j.projectsDir, err)
|
||
|
|
}
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
// Build the set of session names currently working, keyed by task ID.
|
||
|
|
workingByTask := make(map[string]struct{})
|
||
|
|
j.state.ForEachWorking(func(_ string, sess *state.SessionState) {
|
||
|
|
if sess.Task != nil {
|
||
|
|
workingByTask[*sess.Task] = struct{}{}
|
||
|
|
}
|
||
|
|
})
|
||
|
|
|
||
|
|
for _, e := range entries {
|
||
|
|
if !e.IsDir() {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
projectDir := filepath.Join(j.projectsDir, e.Name())
|
||
|
|
inboxDir := filepath.Join(projectDir, ".agent-queue", "inbox")
|
||
|
|
doneDir := filepath.Join(projectDir, ".agent-queue", "done")
|
||
|
|
failedDir := filepath.Join(projectDir, ".agent-queue", "failed")
|
||
|
|
|
||
|
|
j.cleanupOrphanDispatchMeta(inboxDir, workingByTask)
|
||
|
|
j.rewriteStatusJSON(projectDir, inboxDir, doneDir, failedDir)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// cleanupOrphanDispatchMeta removes *.md.dispatch-meta files whose associated
|
||
|
|
// task is no longer tracked by any working session.
|
||
|
|
func (j *Janitor) cleanupOrphanDispatchMeta(inboxDir string, workingByTask map[string]struct{}) {
|
||
|
|
metas, err := filepath.Glob(filepath.Join(inboxDir, "*.md.dispatch-meta"))
|
||
|
|
if err != nil || len(metas) == 0 {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
for _, metaPath := range metas {
|
||
|
|
// Derive the task name from the meta filename: strip the .dispatch-meta suffix.
|
||
|
|
taskFile := metaPath[:len(metaPath)-len(".dispatch-meta")]
|
||
|
|
taskName := filepath.Base(taskFile)
|
||
|
|
// Strip .md suffix to get bare task ID.
|
||
|
|
if len(taskName) > 3 {
|
||
|
|
taskName = taskName[:len(taskName)-3] // remove ".md"
|
||
|
|
}
|
||
|
|
if _, active := workingByTask[taskName]; !active {
|
||
|
|
j.logger.Printf("removing orphaned dispatch-meta: %s", metaPath)
|
||
|
|
if err := os.Remove(metaPath); err != nil && !os.IsNotExist(err) {
|
||
|
|
j.logger.Printf("remove %s: %v", metaPath, err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// rewriteStatusJSON recalculates real filesystem counters and rewrites
|
||
|
|
// .agent-queue/status.json, preserving state/current_task when agent is working.
|
||
|
|
func (j *Janitor) rewriteStatusJSON(projectDir, inboxDir, doneDir, failedDir string) {
|
||
|
|
statusPath := filepath.Join(projectDir, ".agent-queue", "status.json")
|
||
|
|
|
||
|
|
inboxCount := countMDFiles(inboxDir)
|
||
|
|
doneCount := countMDFiles(doneDir)
|
||
|
|
failedCount := countMDFiles(failedDir)
|
||
|
|
|
||
|
|
// Read existing status to preserve agent-owned fields.
|
||
|
|
existing := agentQueueStatus{
|
||
|
|
State: "idle",
|
||
|
|
CurrentTask: nil,
|
||
|
|
}
|
||
|
|
if data, err := os.ReadFile(statusPath); err == nil {
|
||
|
|
_ = json.Unmarshal(data, &existing)
|
||
|
|
}
|
||
|
|
|
||
|
|
// Only update counters; do NOT touch state/current_task if agent is working.
|
||
|
|
existing.InboxCount = inboxCount
|
||
|
|
existing.DoneCount = doneCount
|
||
|
|
existing.FailedCount = failedCount
|
||
|
|
existing.LastHB = time.Now().UTC().Format(time.RFC3339)
|
||
|
|
|
||
|
|
data, err := json.Marshal(existing)
|
||
|
|
if err != nil {
|
||
|
|
j.logger.Printf("marshal status.json for %s: %v", projectDir, err)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
tmp := statusPath + ".tmp"
|
||
|
|
if err := os.WriteFile(tmp, data, 0644); err != nil {
|
||
|
|
j.logger.Printf("write tmp status.json for %s: %v", projectDir, err)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
if err := os.Rename(tmp, statusPath); err != nil {
|
||
|
|
j.logger.Printf("rename status.json for %s: %v", projectDir, err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// countMDFiles counts *.md files in dir that are NOT *.dispatched.
|
||
|
|
func countMDFiles(dir string) int {
|
||
|
|
entries, err := os.ReadDir(dir)
|
||
|
|
if err != nil {
|
||
|
|
return 0
|
||
|
|
}
|
||
|
|
count := 0
|
||
|
|
for _, e := range entries {
|
||
|
|
name := e.Name()
|
||
|
|
if filepath.Ext(name) == ".md" {
|
||
|
|
count++
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return count
|
||
|
|
}
|
||
|
|
|
||
|
|
// cleanupStaleAgentDoneFiles deletes /tmp/agent-done-* files older than 1 hour.
|
||
|
|
func (j *Janitor) cleanupStaleAgentDoneFiles() {
|
||
|
|
matches, err := filepath.Glob("/tmp/agent-done-*")
|
||
|
|
if err != nil || len(matches) == 0 {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
cutoff := time.Now().Add(-1 * time.Hour)
|
||
|
|
for _, path := range matches {
|
||
|
|
info, err := os.Stat(path)
|
||
|
|
if err != nil {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
if info.ModTime().Before(cutoff) {
|
||
|
|
j.logger.Printf("removing stale agent-done file: %s", path)
|
||
|
|
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
|
||
|
|
j.logger.Printf("remove %s: %v", path, err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|