Fix SSE stream reliability: threaded server, async keepalive, streaming responses

- Enable Flask threaded=True for concurrent request handling
- Refactor generate() to use producer-consumer queue pattern:
  - Runner executes in background thread, pushes events to queue
  - Heartbeat thread sends keepalive every 10s independently
  - Generator reads from queue — stream never goes silent
- Brainstorm mode: stream responses as they arrive (was waiting for all)
- Prevents nginx/browser timeout during long model queries

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-03-26 01:27:42 -05:00
parent 242dec7509
commit c124b01681

View File

@ -3661,17 +3661,58 @@ def run_team():
}
def generate():
import queue
collected = []
run = _active_runs[run_id]
last_heartbeat = time.time()
event_queue = queue.Queue()
stop_heartbeat = threading.Event()
# Heartbeat thread: sends keepalive every 10s to prevent connection timeout
def heartbeat():
while not stop_heartbeat.is_set():
stop_heartbeat.wait(10)
if not stop_heartbeat.is_set():
event_queue.put(": keepalive\n\n")
hb_thread = threading.Thread(target=heartbeat, daemon=True)
hb_thread.start()
# Runner thread: executes the mode runner and pushes events to queue
def run_pipeline():
try:
runner = RUNNERS.get(mode)
if runner:
for event_str in runner(config):
event_queue.put(event_str)
else:
event_queue.put(sse({"type": "response", "model": "system", "text": f"Unknown mode: {mode}", "role": "error"}))
event_queue.put(sse({"type": "done"}))
except Exception as e:
run["errors"].append({"model": "system", "error": str(e)[:500], "time": time.time()})
event_queue.put(sse({"type": "response", "model": "system", "text": f"Pipeline error: {e}", "role": "error"}))
event_queue.put(sse({"type": "done"}))
finally:
event_queue.put(None) # sentinel
pipeline_thread = threading.Thread(target=run_pipeline, daemon=True)
pipeline_thread.start()
try:
runner = RUNNERS.get(mode)
if runner:
for event_str in runner(config):
yield event_str
while True:
try:
event_str = event_queue.get(timeout=30)
except queue.Empty:
# Safety keepalive if heartbeat thread died
yield ": keepalive\n\n"
continue
if event_str is None:
break
yield event_str
# Track events
if event_str.startswith("data: "):
run["events"] += 1
try:
data = json.loads(event_str.replace("data: ", "", 1).strip())
data = json.loads(event_str[6:].strip())
evt_type = data.get("type")
if evt_type == "response":
text = data.get("text", "")
@ -3687,19 +3728,8 @@ def run_team():
run["substep"] = data.get("message", "")
except Exception:
pass
# SSE keepalive — prevent nginx/browser timeout during gaps
now = time.time()
if now - last_heartbeat > 15:
yield ": keepalive\n\n"
last_heartbeat = now
else:
yield sse({"type": "response", "model": "system", "text": f"Unknown mode: {mode}", "role": "error"})
yield sse({"type": "done"})
except Exception as e:
run["errors"].append({"model": "system", "error": str(e)[:500], "time": time.time()})
yield sse({"type": "response", "model": "system", "text": f"Pipeline error: {e}", "role": "error"})
yield sse({"type": "done"})
finally:
stop_heartbeat.set()
run["finished"] = time.time()
run["duration"] = round(run["finished"] - run["started"], 1)
run["response_count"] = len(collected)
@ -3721,11 +3751,24 @@ def run_brainstorm(config):
yield sse({"type": "clear"})
yield sse({"type": "progress", "step": 1, "total_steps": total, "substep": f"Querying {len(models)} models in parallel...", "percent": 10})
yield sse({"type": "status", "message": f"Querying {len(models)} models..."})
responses = parallel_query(models, prompt)
for i, (m, r) in enumerate(responses.items()):
pct = 10 + int(((i + 1) / len(responses)) * 50)
yield sse({"type": "progress", "step": 1, "total_steps": total, "substep": f"{i+1}/{len(responses)} models responded", "percent": pct})
yield sse({"type": "response", "model": m, "text": r, "role": "respondent"})
# Stream responses as they arrive instead of waiting for all
responses = {}
completed = 0
max_timeout = max((_get_timeout(m) for m in models), default=300) + 30
with ThreadPoolExecutor(max_workers=max(len(models), 1)) as pool:
futures = {pool.submit(safe_query, m, prompt): m for m in models}
for future in as_completed(futures, timeout=max_timeout):
m = futures[future]
try:
r = future.result(timeout=10)
except Exception as e:
r = f"Error: {e}"
responses[m] = r
completed += 1
pct = 10 + int((completed / len(models)) * 50)
yield sse({"type": "progress", "step": 1, "total_steps": total, "substep": f"{completed}/{len(models)} models responded", "percent": pct})
role = "error" if r.startswith("Error:") else "respondent"
yield sse({"type": "response", "model": m, "text": r, "role": role})
if len(responses) > 1:
yield sse({"type": "progress", "step": 2, "total_steps": total, "substep": f"Synthesizing with {synthesizer}...", "percent": 70})
yield sse({"type": "status", "message": f"Synthesizing with {synthesizer}..."})
@ -4691,4 +4734,4 @@ def run_extract(config):
if __name__ == "__main__":
print("\n LLM Team UI running at http://localhost:5000\n")
app.run(host="127.0.0.1", port=5000, debug=False)
app.run(host="127.0.0.1", port=5000, debug=False, threaded=True)