Fix lost runs on disconnect + separate public/admin runs in history

Server-side save (survives page refresh/close):
- Moved save_run() from generator (client-dependent) into pipeline thread
- Pipeline thread collects responses server-side independently
- save_run() executes in pipeline thread's finally block — ALWAYS runs
- Even if user closes browser mid-run, the run completes and saves to DB

Public user tracking:
- Runs from demo/showcase users tagged with config.owner = "public"
- Admin runs tagged with actual username
- History list shows orange "PUB" badge on public user runs
- owner column added to history list query for fast filtering

Architecture change:
- _pipeline_collected[] built by pipeline thread (not generator loop)
- _run_config stored before generator starts, accessible by pipeline thread
- run_saved SSE event emitted from pipeline thread after save
- Generator's collected[] still tracks for display, but save is independent

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-03-29 08:57:10 -05:00
parent 39c421806d
commit ac54743f54

View File

@ -6425,7 +6425,7 @@ def get_runs():
try:
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
_cols = "id, mode, prompt, models_used, created_at, archived, quality_score, score_method, tags, notes, config->>'source' as source, config->>'parent_run' as parent_run"
_cols = "id, mode, prompt, models_used, created_at, archived, quality_score, score_method, tags, notes, config->>'source' as source, config->>'parent_run' as parent_run, config->>'owner' as owner"
if show == "archived":
cur.execute(f"SELECT {_cols} FROM team_runs WHERE archived = true ORDER BY created_at DESC LIMIT 200")
elif show == "all":
@ -7041,7 +7041,16 @@ function renderTable(runs) {
// ID
var idEl = document.createElement('span'); idEl.className = 'run-id'; idEl.textContent = '#'+r.id; row.appendChild(idEl);
// Mode
var modeEl = document.createElement('span'); modeEl.className = 'run-mode'; modeEl.textContent = r.mode; row.appendChild(modeEl);
var modeEl = document.createElement('span'); modeEl.className = 'run-mode';
modeEl.textContent = r.mode;
if (r.owner === 'public') {
var pubPill = document.createElement('span');
pubPill.style.cssText = 'font-size:7px;background:rgba(245,158,11,0.1);color:var(--orange);padding:1px 4px;border-radius:2px;border:1px solid rgba(245,158,11,0.2);margin-left:3px;vertical-align:middle';
pubPill.textContent = 'PUB';
pubPill.title = 'Public/demo user';
modeEl.appendChild(pubPill);
}
row.appendChild(modeEl);
// Prompt
var promptEl = document.createElement('span'); promptEl.className = 'run-prompt'; promptEl.textContent = (r.prompt||'').substring(0,100); promptEl.title = r.prompt||''; row.appendChild(promptEl);
// Models
@ -8416,12 +8425,16 @@ def run_team():
run_id = str(_uuid.uuid4())[:8]
username = session.get("username", "unknown")
is_public = not session.get("user_id")
run_owner = "public" if is_public else username
_active_runs[run_id] = {
"mode": mode, "user": username, "prompt": prompt[:100],
"mode": mode, "user": run_owner, "prompt": prompt[:100],
"started": time.time(), "step": 0, "total_steps": 0,
"substep": "", "events": 0, "errors": [],
"responses_size": 0
}
# Store config for server-side save (survives client disconnect)
_run_config = {"mode": mode, "prompt": config.get("prompt", ""), "config": config, "owner": run_owner, "is_public": is_public}
def generate():
import queue
@ -8440,13 +8453,23 @@ def run_team():
hb_thread = threading.Thread(target=heartbeat, daemon=True)
hb_thread.start()
# Runner thread: executes the mode runner and pushes events to queue
# Runner thread: executes the mode runner, saves to DB even if client disconnects
_pipeline_collected = [] # shared with generator for run_saved event
def run_pipeline():
try:
runner = RUNNERS.get(mode)
if runner:
for event_str in runner(config):
event_queue.put(event_str)
# Also collect responses server-side
if event_str.startswith("data: "):
try:
ev = json.loads(event_str[6:].strip())
if ev.get("type") == "response":
_pipeline_collected.append({"model": ev.get("model", ""), "text": ev.get("text", ""), "role": ev.get("role", "")})
except Exception:
pass
else:
event_queue.put(sse({"type": "response", "model": "system", "text": f"Unknown mode: {mode}", "role": "error"}))
event_queue.put(sse({"type": "done"}))
@ -8455,6 +8478,14 @@ def run_team():
event_queue.put(sse({"type": "response", "model": "system", "text": f"Pipeline error: {e}", "role": "error"}))
event_queue.put(sse({"type": "done"}))
finally:
# ALWAYS save — even if client disconnected
if _pipeline_collected:
cfg = dict(_run_config.get("config", {}))
if _run_config.get("is_public"):
cfg["owner"] = "public"
rid = save_run(_run_config["mode"], _run_config["prompt"], cfg, _pipeline_collected)
if rid:
event_queue.put(sse({"type": "run_saved", "run_id": rid}))
event_queue.put(None) # sentinel
pipeline_thread = threading.Thread(target=run_pipeline, daemon=True)
@ -8498,10 +8529,6 @@ def run_team():
run["response_count"] = len(collected)
_log_run(dict(run, run_id=run_id))
_active_runs.pop(run_id, None)
if collected:
rid = save_run(mode, config.get("prompt", ""), config, collected)
if rid:
yield sse({"type": "run_saved", "run_id": rid})
return Response(generate(), mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no", "Connection": "keep-alive"})