From c124b016814540456b6c78c49b70b8c357cd12fd Mon Sep 17 00:00:00 2001 From: root Date: Thu, 26 Mar 2026 01:27:42 -0500 Subject: [PATCH] Fix SSE stream reliability: threaded server, async keepalive, streaming responses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Enable Flask threaded=True for concurrent request handling - Refactor generate() to use producer-consumer queue pattern: - Runner executes in background thread, pushes events to queue - Heartbeat thread sends keepalive every 10s independently - Generator reads from queue — stream never goes silent - Brainstorm mode: stream responses as they arrive (was waiting for all) - Prevents nginx/browser timeout during long model queries Co-Authored-By: Claude Opus 4.6 (1M context) --- llm_team_ui.py | 91 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 67 insertions(+), 24 deletions(-) diff --git a/llm_team_ui.py b/llm_team_ui.py index efdd8b4..8dc5036 100644 --- a/llm_team_ui.py +++ b/llm_team_ui.py @@ -3661,17 +3661,58 @@ def run_team(): } def generate(): + import queue collected = [] run = _active_runs[run_id] - last_heartbeat = time.time() + event_queue = queue.Queue() + stop_heartbeat = threading.Event() + + # Heartbeat thread: sends keepalive every 10s to prevent connection timeout + def heartbeat(): + while not stop_heartbeat.is_set(): + stop_heartbeat.wait(10) + if not stop_heartbeat.is_set(): + event_queue.put(": keepalive\n\n") + + hb_thread = threading.Thread(target=heartbeat, daemon=True) + hb_thread.start() + + # Runner thread: executes the mode runner and pushes events to queue + def run_pipeline(): + try: + runner = RUNNERS.get(mode) + if runner: + for event_str in runner(config): + event_queue.put(event_str) + else: + event_queue.put(sse({"type": "response", "model": "system", "text": f"Unknown mode: {mode}", "role": "error"})) + event_queue.put(sse({"type": "done"})) + except Exception as e: + run["errors"].append({"model": "system", "error": str(e)[:500], "time": time.time()}) + event_queue.put(sse({"type": "response", "model": "system", "text": f"Pipeline error: {e}", "role": "error"})) + event_queue.put(sse({"type": "done"})) + finally: + event_queue.put(None) # sentinel + + pipeline_thread = threading.Thread(target=run_pipeline, daemon=True) + pipeline_thread.start() + try: - runner = RUNNERS.get(mode) - if runner: - for event_str in runner(config): - yield event_str + while True: + try: + event_str = event_queue.get(timeout=30) + except queue.Empty: + # Safety keepalive if heartbeat thread died + yield ": keepalive\n\n" + continue + if event_str is None: + break + yield event_str + # Track events + if event_str.startswith("data: "): run["events"] += 1 try: - data = json.loads(event_str.replace("data: ", "", 1).strip()) + data = json.loads(event_str[6:].strip()) evt_type = data.get("type") if evt_type == "response": text = data.get("text", "") @@ -3687,19 +3728,8 @@ def run_team(): run["substep"] = data.get("message", "") except Exception: pass - # SSE keepalive — prevent nginx/browser timeout during gaps - now = time.time() - if now - last_heartbeat > 15: - yield ": keepalive\n\n" - last_heartbeat = now - else: - yield sse({"type": "response", "model": "system", "text": f"Unknown mode: {mode}", "role": "error"}) - yield sse({"type": "done"}) - except Exception as e: - run["errors"].append({"model": "system", "error": str(e)[:500], "time": time.time()}) - yield sse({"type": "response", "model": "system", "text": f"Pipeline error: {e}", "role": "error"}) - yield sse({"type": "done"}) finally: + stop_heartbeat.set() run["finished"] = time.time() run["duration"] = round(run["finished"] - run["started"], 1) run["response_count"] = len(collected) @@ -3721,11 +3751,24 @@ def run_brainstorm(config): yield sse({"type": "clear"}) yield sse({"type": "progress", "step": 1, "total_steps": total, "substep": f"Querying {len(models)} models in parallel...", "percent": 10}) yield sse({"type": "status", "message": f"Querying {len(models)} models..."}) - responses = parallel_query(models, prompt) - for i, (m, r) in enumerate(responses.items()): - pct = 10 + int(((i + 1) / len(responses)) * 50) - yield sse({"type": "progress", "step": 1, "total_steps": total, "substep": f"{i+1}/{len(responses)} models responded", "percent": pct}) - yield sse({"type": "response", "model": m, "text": r, "role": "respondent"}) + # Stream responses as they arrive instead of waiting for all + responses = {} + completed = 0 + max_timeout = max((_get_timeout(m) for m in models), default=300) + 30 + with ThreadPoolExecutor(max_workers=max(len(models), 1)) as pool: + futures = {pool.submit(safe_query, m, prompt): m for m in models} + for future in as_completed(futures, timeout=max_timeout): + m = futures[future] + try: + r = future.result(timeout=10) + except Exception as e: + r = f"Error: {e}" + responses[m] = r + completed += 1 + pct = 10 + int((completed / len(models)) * 50) + yield sse({"type": "progress", "step": 1, "total_steps": total, "substep": f"{completed}/{len(models)} models responded", "percent": pct}) + role = "error" if r.startswith("Error:") else "respondent" + yield sse({"type": "response", "model": m, "text": r, "role": role}) if len(responses) > 1: yield sse({"type": "progress", "step": 2, "total_steps": total, "substep": f"Synthesizing with {synthesizer}...", "percent": 70}) yield sse({"type": "status", "message": f"Synthesizing with {synthesizer}..."}) @@ -4691,4 +4734,4 @@ def run_extract(config): if __name__ == "__main__": print("\n LLM Team UI running at http://localhost:5000\n") - app.run(host="127.0.0.1", port=5000, debug=False) + app.run(host="127.0.0.1", port=5000, debug=False, threaded=True)