diff --git a/agents/multi-agent/orchestrator.ts b/agents/multi-agent/orchestrator.ts index 43df043..ed66fa6 100644 --- a/agents/multi-agent/orchestrator.ts +++ b/agents/multi-agent/orchestrator.ts @@ -448,6 +448,27 @@ The solution should consider fault tolerance, data consistency, and cost optimiz analyzePerformance(metrics); + // Output special marker for server to parse consensus status + // Format: ORCHESTRATION_RESULT:{"consensus":true/false,"metrics":{...}} + console.log("\nORCHESTRATION_RESULT:" + JSON.stringify({ + consensus: metrics.final_consensus, + task_id: metrics.task_id, + metrics: metrics + })); + + // Exit with code 2 for consensus failure (distinct from error=1, success=0) + if (!metrics.final_consensus) { + console.log("\n[ORCHESTRATOR] Consensus NOT achieved - exiting with code 2"); + exitCode = 2; + + // Report consensus failure to observability + const pipelineId = process.env.PIPELINE_ID; + if (pipelineId) { + await reportErrorToObservability(pipelineId, "consensus_failure", "high", + `Agents failed to reach consensus. Conflicts: ${metrics.conflicts_detected}, Resolved: ${metrics.conflicts_resolved}`); + } + } + } catch (e: any) { console.error("Orchestrator error:", e.message); exitCode = 1; diff --git a/docs/MULTI_AGENT_PIPELINE_ARCHITECTURE.md b/docs/MULTI_AGENT_PIPELINE_ARCHITECTURE.md index 121a89e..8613227 100644 --- a/docs/MULTI_AGENT_PIPELINE_ARCHITECTURE.md +++ b/docs/MULTI_AGENT_PIPELINE_ARCHITECTURE.md @@ -410,6 +410,13 @@ Each agent displays: - [x] Token status/revoke/renew APIs - [x] Handoff report generation - [x] Diagnostic pipeline spawning +- [x] Consensus failure detection (exit code 2) +- [x] Consensus failure context recording +- [x] Fallback options (rerun, escalate, accept, download) +- [x] Failure report download +- [x] UI consensus failure alert with action buttons +- [x] Failure details modal +- [x] WebSocket notifications for consensus events --- diff --git a/ui/server.ts b/ui/server.ts index ae37c45..2656b83 100644 --- a/ui/server.ts +++ b/ui/server.ts @@ -1451,6 +1451,363 @@ function determineAgentLifecycle(pipelineStatus: string, agentState: any): strin } } +// ============================================================================= +// Consensus Failure Handling +// ============================================================================= + +interface ConsensusFailureContext { + pipeline_id: string; + task_id: string; + objective: string; + failure_time: string; + metrics: any; + proposals: any[]; + agent_states: any[]; + conflict_history: any[]; + blackboard_snapshot: any; + run_number: number; +} + +interface FallbackOption { + id: string; + label: string; + description: string; + action: "rerun" | "escalate" | "accept" | "download"; + tier_change?: number; + auto_available: boolean; +} + +const FALLBACK_OPTIONS: FallbackOption[] = [ + { + id: "rerun_same", + label: "Rerun with Same Agents", + description: "Spawn new ALPHA/BETA agents with the failed context for a fresh attempt", + action: "rerun", + auto_available: true + }, + { + id: "rerun_gamma", + label: "Rerun with GAMMA Mediator", + description: "Force-spawn GAMMA agent to mediate between conflicting proposals", + action: "rerun", + auto_available: true + }, + { + id: "escalate_tier", + label: "Escalate to Higher Tier", + description: "Increase agent tier permissions and retry with more capabilities", + action: "escalate", + tier_change: 1, + auto_available: false + }, + { + id: "accept_partial", + label: "Accept Partial Output", + description: "Mark pipeline complete with best available proposal (no consensus)", + action: "accept", + auto_available: true + }, + { + id: "download_log", + label: "Download Failure Log", + description: "Export full context for manual review or external processing", + action: "download", + auto_available: true + } +]; + +async function recordConsensusFailure( + pipelineId: string, + taskId: string, + metrics: any +): Promise { + const pipelineKey = `pipeline:${pipelineId}`; + const pipelineData = await redis.hGetAll(pipelineKey); + + // Get run number (increment if retrying) + const prevRunNumber = parseInt(pipelineData.run_number || "0"); + const runNumber = prevRunNumber + 1; + + // Collect all context for the failed run + const context: ConsensusFailureContext = { + pipeline_id: pipelineId, + task_id: taskId, + objective: pipelineData.objective || "", + failure_time: new Date().toISOString(), + metrics: metrics, + proposals: [], + agent_states: [], + conflict_history: [], + blackboard_snapshot: {}, + run_number: runNumber + }; + + // Collect proposals from blackboard (if available in Redis) + try { + const proposalKeys = await redis.keys(`blackboard:${taskId}:solutions:*`); + for (const key of proposalKeys) { + const proposal = await redis.get(key); + if (proposal) { + try { + context.proposals.push(JSON.parse(proposal)); + } catch { + context.proposals.push({ raw: proposal }); + } + } + } + + // Get agent states + const agentStates = await redis.hGetAll(`agents:${taskId}`); + for (const [role, state] of Object.entries(agentStates)) { + try { + context.agent_states.push({ role, ...JSON.parse(state as string) }); + } catch { + context.agent_states.push({ role, raw: state }); + } + } + + // Get message history for conflict analysis + const msgLog = await redis.lRange(`msg:${taskId}:log`, 0, -1); + context.conflict_history = msgLog.map(m => { + try { return JSON.parse(m); } catch { return { raw: m }; } + }).filter(m => m.type === "CONFLICT" || m.type === "PROPOSAL" || m.type === "VOTE"); + + // Get blackboard snapshot + const blackboardKeys = await redis.keys(`blackboard:${taskId}:*`); + for (const key of blackboardKeys) { + const section = key.split(":").pop() || ""; + const keyType = await redis.type(key); + if (keyType === "hash") { + context.blackboard_snapshot[section] = await redis.hGetAll(key); + } else if (keyType === "string") { + const val = await redis.get(key); + context.blackboard_snapshot[section] = val ? JSON.parse(val) : null; + } + } + } catch (e: any) { + console.error(`[CONSENSUS] Error collecting context: ${e.message}`); + } + + // Store the failure context in Dragonfly + const failureKey = `consensus_failure:${pipelineId}:run_${runNumber}`; + await redis.set(failureKey, JSON.stringify(context)); + + // Add to failure history list + await redis.rPush(`consensus_failures:${pipelineId}`, failureKey); + + // Update pipeline with failure info + await redis.hSet(pipelineKey, { + run_number: String(runNumber), + last_consensus_failure: new Date().toISOString(), + consensus_failure_count: String(runNumber) + }); + + await appendPipelineLog(pipelineId, "CONSENSUS", + `Consensus failure recorded (run #${runNumber}). ${context.proposals.length} proposals collected.`, "WARN"); + + broadcastUpdate("consensus_failure", { + pipeline_id: pipelineId, + run_number: runNumber, + proposals_count: context.proposals.length, + fallback_options: FALLBACK_OPTIONS + }); + + return context; +} + +async function getConsensusFailureContext(pipelineId: string, runNumber?: number): Promise { + if (runNumber) { + const data = await redis.get(`consensus_failure:${pipelineId}:run_${runNumber}`); + return data ? JSON.parse(data) : null; + } + + // Get latest failure + const failures = await redis.lRange(`consensus_failures:${pipelineId}`, -1, -1); + if (failures.length === 0) return null; + + const data = await redis.get(failures[0]); + return data ? JSON.parse(data) : null; +} + +async function getFailureHistory(pipelineId: string): Promise { + const failureKeys = await redis.lRange(`consensus_failures:${pipelineId}`, 0, -1); + const history: ConsensusFailureContext[] = []; + + for (const key of failureKeys) { + const data = await redis.get(key); + if (data) { + history.push(JSON.parse(data)); + } + } + + return history; +} + +async function handleFallbackAction( + pipelineId: string, + action: FallbackOption["action"], + optionId: string +): Promise<{ success: boolean; message: string; new_pipeline_id?: string }> { + const pipelineKey = `pipeline:${pipelineId}`; + const pipelineData = await redis.hGetAll(pipelineKey); + + if (!pipelineData.task_id) { + return { success: false, message: "Pipeline not found" }; + } + + await appendPipelineLog(pipelineId, "FALLBACK", `User selected fallback: ${optionId}`, "INFO"); + + switch (action) { + case "rerun": { + // Get the failure context to pass to new agents + const failureContext = await getConsensusFailureContext(pipelineId); + + // Create a new pipeline inheriting from this one + const newPipelineId = `pipeline-retry-${Date.now().toString(36)}`; + const forceGamma = optionId === "rerun_gamma"; + + await redis.hSet(`pipeline:${newPipelineId}`, { + task_id: pipelineData.task_id, + objective: pipelineData.objective, + status: "STARTING", + created_at: new Date().toISOString(), + agents: JSON.stringify([]), + parent_pipeline: pipelineId, + retry_of: pipelineId, + force_gamma: forceGamma ? "true" : "false", + prior_context: JSON.stringify(failureContext), + model: pipelineData.model || "anthropic/claude-sonnet-4", + timeout: pipelineData.timeout || "120", + auto_continue: "true" + }); + + // Update original pipeline status + await redis.hSet(pipelineKey, "status", "RETRYING"); + await redis.hSet(pipelineKey, "retry_pipeline", newPipelineId); + + await appendPipelineLog(pipelineId, "FALLBACK", + `Spawning retry pipeline ${newPipelineId}${forceGamma ? " with forced GAMMA" : ""}`, "INFO"); + + // Trigger the new orchestration + triggerOrchestration( + newPipelineId, + pipelineData.task_id, + pipelineData.objective + (forceGamma ? " [GAMMA MEDIATION REQUIRED]" : " [RETRY WITH PRIOR CONTEXT]"), + pipelineData.model || "anthropic/claude-sonnet-4", + parseInt(pipelineData.timeout || "120") + ); + + return { success: true, message: "Retry pipeline spawned", new_pipeline_id: newPipelineId }; + } + + case "escalate": { + const currentTier = parseInt(pipelineData.agent_tier || "1"); + const newTier = Math.min(currentTier + 1, 4); // Max tier is 4 + + if (newTier === currentTier) { + return { success: false, message: "Already at maximum tier level" }; + } + + // Create escalated pipeline + const newPipelineId = `pipeline-escalated-${Date.now().toString(36)}`; + const failureContext = await getConsensusFailureContext(pipelineId); + + await redis.hSet(`pipeline:${newPipelineId}`, { + task_id: pipelineData.task_id, + objective: pipelineData.objective, + status: "STARTING", + created_at: new Date().toISOString(), + agents: JSON.stringify([]), + parent_pipeline: pipelineId, + escalated_from: pipelineId, + agent_tier: String(newTier), + prior_context: JSON.stringify(failureContext), + model: pipelineData.model || "anthropic/claude-sonnet-4", + timeout: pipelineData.timeout || "120", + auto_continue: "true" + }); + + await redis.hSet(pipelineKey, "status", "ESCALATED"); + await redis.hSet(pipelineKey, "escalated_to", newPipelineId); + + await appendPipelineLog(pipelineId, "FALLBACK", + `Escalating to Tier ${newTier} with pipeline ${newPipelineId}`, "WARN"); + + triggerOrchestration( + newPipelineId, + pipelineData.task_id, + pipelineData.objective + ` [ESCALATED TO TIER ${newTier}]`, + pipelineData.model || "anthropic/claude-sonnet-4", + parseInt(pipelineData.timeout || "120") + ); + + return { success: true, message: `Escalated to Tier ${newTier}`, new_pipeline_id: newPipelineId }; + } + + case "accept": { + // Mark as complete with best available output + const failureContext = await getConsensusFailureContext(pipelineId); + const bestProposal = failureContext?.proposals?.[0] || null; + + await redis.hSet(pipelineKey, { + status: "COMPLETED_NO_CONSENSUS", + completed_at: new Date().toISOString(), + accepted_proposal: bestProposal ? JSON.stringify(bestProposal) : "", + user_accepted_fallback: "true" + }); + + await appendPipelineLog(pipelineId, "FALLBACK", + "User accepted partial output without consensus", "SUCCESS"); + + broadcastUpdate("pipeline_completed", { + pipeline_id: pipelineId, + status: "COMPLETED_NO_CONSENSUS", + had_consensus: false + }); + + return { success: true, message: "Pipeline marked complete with partial output" }; + } + + case "download": { + // Generate downloadable failure report - just return success, actual download via separate endpoint + return { success: true, message: "Failure log ready for download" }; + } + + default: + return { success: false, message: "Unknown action" }; + } +} + +async function generateFailureReport(pipelineId: string): Promise { + const failureContext = await getConsensusFailureContext(pipelineId); + const failureHistory = await getFailureHistory(pipelineId); + const pipelineData = await redis.hGetAll(`pipeline:${pipelineId}`); + const logs = await getPipelineLogs(pipelineId, 500); + + return { + report_type: "consensus_failure_report", + generated_at: new Date().toISOString(), + pipeline: { + id: pipelineId, + task_id: pipelineData.task_id, + objective: pipelineData.objective, + status: pipelineData.status, + created_at: pipelineData.created_at, + model: pipelineData.model + }, + current_failure: failureContext, + failure_history: failureHistory, + total_runs: failureHistory.length, + logs: logs, + recommendations: [ + "Review agent proposals for common ground", + "Consider simplifying the objective", + "Check for ambiguous requirements", + "Review conflict patterns in message history" + ] + }; +} + // Token renewal loop (runs every 30 minutes for active pipelines) async function runTokenRenewalLoop(): Promise { setInterval(async () => { @@ -1822,6 +2179,7 @@ async function triggerOrchestration( const reader = proc.stdout.getReader(); const decoder = new TextDecoder(); let buffer = ""; + let orchestrationResult: any = null; while (true) { const { done, value } = await reader.read(); @@ -1835,7 +2193,16 @@ async function triggerOrchestration( for (const line of lines) { if (line.trim()) { - await appendPipelineLog(pipelineId, "ORCHESTRATOR", line.trim()); + // Check for the special ORCHESTRATION_RESULT marker + if (line.startsWith("ORCHESTRATION_RESULT:")) { + try { + orchestrationResult = JSON.parse(line.substring("ORCHESTRATION_RESULT:".length)); + } catch (e) { + console.error("[ORCHESTRATOR] Failed to parse result:", e); + } + } else { + await appendPipelineLog(pipelineId, "ORCHESTRATOR", line.trim()); + } // Detect agent spawns and consensus events if (line.includes("[ALPHA]") || line.includes("[BETA]") || line.includes("[GAMMA]")) { @@ -1857,15 +2224,54 @@ async function triggerOrchestration( // Check exit code const exitCode = await proc.exited; + // Exit codes: + // 0 = Success (consensus achieved) + // 2 = Consensus failure (agents completed but no agreement) + // 1 = Error (crash or exception) + if (exitCode === 0) { + // Success - consensus achieved await redis.hSet(pipelineKey, "status", "COMPLETED"); await redis.hSet(pipelineKey, "completed_at", new Date().toISOString()); - await appendPipelineLog(pipelineId, "ORCHESTRATOR", "Orchestration completed successfully", "SUCCESS"); + await redis.hSet(pipelineKey, "final_consensus", "true"); + if (orchestrationResult?.metrics) { + await redis.hSet(pipelineKey, "final_metrics", JSON.stringify(orchestrationResult.metrics)); + } + await appendPipelineLog(pipelineId, "ORCHESTRATOR", "Orchestration completed with consensus", "SUCCESS"); broadcastUpdate("orchestration_complete", { pipeline_id: pipelineId, - status: "COMPLETED" + status: "COMPLETED", + consensus: true, + metrics: orchestrationResult?.metrics }); + + } else if (exitCode === 2) { + // Consensus failure - agents completed but no agreement + await redis.hSet(pipelineKey, "status", "CONSENSUS_FAILED"); + await redis.hSet(pipelineKey, "final_consensus", "false"); + if (orchestrationResult?.metrics) { + await redis.hSet(pipelineKey, "final_metrics", JSON.stringify(orchestrationResult.metrics)); + } + + await appendPipelineLog(pipelineId, "ORCHESTRATOR", + "Orchestration completed but agents failed to reach consensus", "WARN"); + + // Record the failure context for retry/escalation + await recordConsensusFailure(pipelineId, taskId, orchestrationResult?.metrics || {}); + + broadcastUpdate("orchestration_complete", { + pipeline_id: pipelineId, + status: "CONSENSUS_FAILED", + consensus: false, + metrics: orchestrationResult?.metrics, + fallback_options: FALLBACK_OPTIONS, + awaiting_user_action: true + }); + + // Do NOT mark completed_at - pipeline awaits user decision + } else { + // Error - crash or exception await redis.hSet(pipelineKey, "status", "ORCHESTRATION_FAILED"); await redis.hSet(pipelineKey, "completed_at", new Date().toISOString()); await appendPipelineLog(pipelineId, "ORCHESTRATOR", `Orchestration failed with exit code ${exitCode}`, "ERROR"); @@ -1877,7 +2283,9 @@ async function triggerOrchestration( } // Create checkpoint with final state - await createCheckpointNow(`Pipeline ${pipelineId} orchestration ${exitCode === 0 ? "completed" : "failed"}`); + const checkpointNote = exitCode === 0 ? "completed with consensus" : + exitCode === 2 ? "consensus failed - awaiting user action" : "failed"; + await createCheckpointNow(`Pipeline ${pipelineId} orchestration ${checkpointNote}`); } catch (e: any) { await redis.hSet(pipelineKey, "status", "ORCHESTRATION_ERROR"); @@ -3504,6 +3912,190 @@ function renderDashboard(): string { .status-badge.starting { background: rgba(210, 153, 34, 0.2); color: var(--accent-yellow); } .status-badge.completed { background: rgba(63, 185, 80, 0.2); color: var(--accent-green); } .status-badge.failed { background: rgba(248, 81, 73, 0.2); color: var(--accent-red); } + .status-badge.consensus_failed { background: rgba(210, 153, 34, 0.3); color: #f0a020; border: 1px solid #f0a020; } + .status-badge.orchestrating { background: rgba(139, 92, 246, 0.2); color: var(--accent-purple); } + .status-badge.retrying { background: rgba(88, 166, 255, 0.2); color: var(--accent-blue); } + .status-badge.escalated { background: rgba(210, 153, 34, 0.2); color: var(--accent-yellow); } + .status-badge.completed_no_consensus { background: rgba(63, 185, 80, 0.15); color: #8bc34a; } + + /* Consensus Failure Alert */ + .consensus-failure-alert { + background: rgba(210, 153, 34, 0.15); + border: 1px solid #f0a020; + border-radius: 6px; + padding: 12px; + margin: 8px 0; + } + + .consensus-failure-alert .alert-title { + color: #f0a020; + font-weight: 600; + font-size: 12px; + margin-bottom: 8px; + display: flex; + align-items: center; + gap: 6px; + } + + .consensus-failure-alert .alert-desc { + font-size: 11px; + color: var(--text-secondary); + margin-bottom: 12px; + } + + .fallback-options { + display: flex; + flex-direction: column; + gap: 6px; + } + + .fallback-option { + display: flex; + align-items: center; + justify-content: space-between; + padding: 8px 10px; + background: var(--bg-secondary); + border: 1px solid var(--border-color); + border-radius: 4px; + cursor: pointer; + transition: all 0.15s ease; + } + + .fallback-option:hover { + background: var(--bg-hover); + border-color: var(--accent-blue); + } + + .fallback-option .option-label { + font-size: 11px; + font-weight: 500; + color: var(--text-primary); + } + + .fallback-option .option-desc { + font-size: 10px; + color: var(--text-muted); + } + + .fallback-option button { + padding: 4px 10px; + font-size: 10px; + } + + /* Fallback Modal */ + .modal-overlay { + position: fixed; + top: 0; + left: 0; + right: 0; + bottom: 0; + background: rgba(0, 0, 0, 0.7); + display: flex; + align-items: center; + justify-content: center; + z-index: 1000; + } + + .modal-content { + background: var(--bg-primary); + border: 1px solid var(--border-color); + border-radius: 8px; + padding: 20px; + max-width: 500px; + width: 90%; + max-height: 80vh; + overflow-y: auto; + } + + .modal-header { + display: flex; + align-items: center; + justify-content: space-between; + margin-bottom: 16px; + } + + .modal-header h3 { + font-size: 14px; + color: var(--text-primary); + margin: 0; + } + + .modal-close { + background: none; + border: none; + color: var(--text-muted); + cursor: pointer; + font-size: 18px; + } + + .modal-body { + margin-bottom: 16px; + } + + .modal-section { + margin-bottom: 16px; + } + + .modal-section h4 { + font-size: 11px; + color: var(--text-secondary); + text-transform: uppercase; + margin-bottom: 8px; + } + + /* Notification Toast */ + .notification { + background: var(--bg-secondary); + border: 1px solid var(--border-color); + border-radius: 6px; + padding: 12px 16px; + min-width: 280px; + max-width: 400px; + box-shadow: 0 4px 12px rgba(0, 0, 0, 0.3); + animation: slideIn 0.3s ease; + } + + .notification.warn { + border-color: #f0a020; + background: rgba(210, 153, 34, 0.15); + } + + .notification.error { + border-color: var(--accent-red); + background: rgba(248, 81, 73, 0.15); + } + + .notification.success { + border-color: var(--accent-green); + background: rgba(63, 185, 80, 0.15); + } + + .notification-title { + font-size: 12px; + font-weight: 600; + color: var(--text-primary); + margin-bottom: 4px; + } + + .notification-message { + font-size: 11px; + color: var(--text-secondary); + } + + @keyframes slideIn { + from { transform: translateX(100%); opacity: 0; } + to { transform: translateX(0); opacity: 1; } + } + + @keyframes slideOut { + from { transform: translateX(0); opacity: 1; } + to { transform: translateX(100%); opacity: 0; } + } + + /* Consensus failed pipeline card highlight */ + .pipeline-card.consensus-failed { + border-color: #f0a020; + } .pipeline-objective { font-size: 11px; @@ -5035,6 +5627,21 @@ function renderDashboard(): string { loadOrchestration(); } + // Consensus failure events + if (msg.type === 'consensus_failure') { + loadPipelines(); + if (selectedPipelineId === msg.data.pipeline_id) { + loadLogs(selectedPipelineId); + } + // Show notification + showNotification('Consensus Failed', \`Pipeline \${msg.data.pipeline_id} failed to reach consensus. Action required.\`, 'warn'); + } + + if (msg.type === 'orchestration_complete' && msg.data.consensus === false) { + loadPipelines(); + showNotification('Consensus Failed', 'Agents completed but could not agree. Choose a fallback action.', 'warn'); + } + // New tab events if (msg.type === 'checkpoint_created') { if (currentTab === 'checkpoint') { @@ -5139,6 +5746,7 @@ function renderDashboard(): string { container.innerHTML = pipelinesData.map(p => { const isActive = p.pipeline_id === selectedPipelineId; const agents = p.agents || []; + const isConsensusFailed = p.status === 'CONSENSUS_FAILED'; const agentPills = agents.map(a => { const type = (a.type || 'UNKNOWN').toLowerCase(); @@ -5149,14 +5757,52 @@ function renderDashboard(): string { \`; }).join(''); + const consensusAlert = isConsensusFailed ? \` +
+
Consensus Failed
+
Agents completed but failed to reach agreement. Choose an action:
+
+
+
+
Rerun
+
Retry with fresh agents
+
+ +
+
+
+
Mediate
+
Force GAMMA mediator
+
+ +
+
+
+
Accept
+
Use best available output
+
+ +
+
+
+
Details
+
View failure report
+
+ +
+
+
+ \` : ''; + return \` -
+
\${p.pipeline_id} - \${p.status || 'UNKNOWN'} + \${p.status || 'UNKNOWN'}
\${p.objective || 'No objective'}
\${agentPills || 'No agents'}
+ \${consensusAlert}
\`; }).join(''); @@ -5170,6 +5816,161 @@ function renderDashboard(): string { await loadPlans(); } + // ========== Consensus Failure Handling ========== + + async function handleFallback(pipelineId, optionId) { + event.stopPropagation(); + + const confirmMessages = { + 'rerun_same': 'This will spawn new agents to retry the task. Continue?', + 'rerun_gamma': 'This will spawn GAMMA mediator to resolve conflicts. Continue?', + 'accept_partial': 'This will mark the pipeline complete without consensus. Continue?', + 'escalate_tier': 'This will escalate to a higher permission tier. Continue?' + }; + + if (confirmMessages[optionId] && !confirm(confirmMessages[optionId])) { + return; + } + + try { + const res = await fetch('/api/pipeline/consensus/fallback', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ pipeline_id: pipelineId, option_id: optionId }) + }); + + const result = await res.json(); + + if (result.success) { + alert(result.message); + await loadPipelines(); + + if (result.new_pipeline_id) { + selectedPipelineId = result.new_pipeline_id; + await loadLogs(result.new_pipeline_id); + } + } else { + alert('Error: ' + result.message); + } + } catch (e) { + console.error('Fallback error:', e); + alert('Failed to process fallback action'); + } + } + + async function showFailureDetails(pipelineId) { + event.stopPropagation(); + + try { + const report = await fetchJSON(\`/api/pipeline/consensus/report?pipeline_id=\${pipelineId}\`); + showFailureModal(pipelineId, report); + } catch (e) { + console.error('Error loading failure details:', e); + alert('Failed to load failure details'); + } + } + + function showFailureModal(pipelineId, report) { + const modal = document.createElement('div'); + modal.className = 'modal-overlay'; + modal.onclick = (e) => { if (e.target === modal) modal.remove(); }; + + const proposals = report.current_failure?.proposals || []; + const proposalsList = proposals.length > 0 + ? proposals.map((p, i) => \` +
+
Proposal \${i + 1}
+
+ \${JSON.stringify(p).substring(0, 200)}... +
+
+ \`).join('') + : '
No proposals collected
'; + + const recommendations = (report.recommendations || []) + .map(r => \`
  • \${r}
  • \`) + .join(''); + + modal.innerHTML = \` + + \`; + + document.body.appendChild(modal); + } + + function downloadFailureReport(pipelineId) { + window.open(\`/api/pipeline/consensus/download?pipeline_id=\${pipelineId}\`, '_blank'); + } + + // Show notification toast + function showNotification(title, message, type = 'info') { + const container = document.getElementById('notification-container') || createNotificationContainer(); + + const notification = document.createElement('div'); + notification.className = \`notification \${type}\`; + notification.innerHTML = \` +
    \${title}
    +
    \${message}
    + \`; + + container.appendChild(notification); + + // Auto-remove after 5 seconds + setTimeout(() => { + notification.style.animation = 'slideOut 0.3s ease forwards'; + setTimeout(() => notification.remove(), 300); + }, 5000); + } + + function createNotificationContainer() { + const container = document.createElement('div'); + container.id = 'notification-container'; + container.style.cssText = 'position: fixed; top: 16px; right: 16px; z-index: 2000; display: flex; flex-direction: column; gap: 8px;'; + document.body.appendChild(container); + return container; + } + // Load Logs async function loadLogs(pipelineId) { document.getElementById('log-pipeline').textContent = pipelineId; @@ -6910,6 +7711,94 @@ const server = Bun.serve({ return new Response(JSON.stringify({ pipeline_id: pipelineId, agents: enrichedAgents }), { headers }); } + // Consensus Failure Handling APIs + if (path === "/api/pipeline/consensus/status") { + const pipelineId = url.searchParams.get("pipeline_id"); + if (!pipelineId) { + return new Response(JSON.stringify({ error: "pipeline_id required" }), { status: 400, headers }); + } + const pipelineData = await redis.hGetAll(`pipeline:${pipelineId}`); + const failureContext = await getConsensusFailureContext(pipelineId); + const failureHistory = await getFailureHistory(pipelineId); + + return new Response(JSON.stringify({ + pipeline_id: pipelineId, + status: pipelineData.status, + final_consensus: pipelineData.final_consensus === "true", + consensus_failure_count: parseInt(pipelineData.consensus_failure_count || "0"), + awaiting_user_action: pipelineData.status === "CONSENSUS_FAILED", + fallback_options: pipelineData.status === "CONSENSUS_FAILED" ? FALLBACK_OPTIONS : [], + current_failure: failureContext, + failure_history_count: failureHistory.length + }), { headers }); + } + + if (path === "/api/pipeline/consensus/failure") { + const pipelineId = url.searchParams.get("pipeline_id"); + const runNumber = url.searchParams.get("run"); + if (!pipelineId) { + return new Response(JSON.stringify({ error: "pipeline_id required" }), { status: 400, headers }); + } + const context = await getConsensusFailureContext(pipelineId, runNumber ? parseInt(runNumber) : undefined); + if (!context) { + return new Response(JSON.stringify({ error: "No failure context found" }), { status: 404, headers }); + } + return new Response(JSON.stringify(context), { headers }); + } + + if (path === "/api/pipeline/consensus/history") { + const pipelineId = url.searchParams.get("pipeline_id"); + if (!pipelineId) { + return new Response(JSON.stringify({ error: "pipeline_id required" }), { status: 400, headers }); + } + const history = await getFailureHistory(pipelineId); + return new Response(JSON.stringify({ pipeline_id: pipelineId, failures: history }), { headers }); + } + + if (path === "/api/pipeline/consensus/fallback" && req.method === "POST") { + const body = await req.json() as { + pipeline_id: string; + option_id: string; + }; + if (!body.pipeline_id || !body.option_id) { + return new Response(JSON.stringify({ error: "pipeline_id and option_id required" }), { status: 400, headers }); + } + + const option = FALLBACK_OPTIONS.find(o => o.id === body.option_id); + if (!option) { + return new Response(JSON.stringify({ error: "Invalid fallback option" }), { status: 400, headers }); + } + + const result = await handleFallbackAction(body.pipeline_id, option.action, body.option_id); + return new Response(JSON.stringify(result), { headers }); + } + + if (path === "/api/pipeline/consensus/report") { + const pipelineId = url.searchParams.get("pipeline_id"); + if (!pipelineId) { + return new Response(JSON.stringify({ error: "pipeline_id required" }), { status: 400, headers }); + } + const report = await generateFailureReport(pipelineId); + return new Response(JSON.stringify(report), { headers }); + } + + if (path === "/api/pipeline/consensus/download") { + const pipelineId = url.searchParams.get("pipeline_id"); + if (!pipelineId) { + return new Response(JSON.stringify({ error: "pipeline_id required" }), { status: 400, headers }); + } + const report = await generateFailureReport(pipelineId); + const filename = `consensus-failure-${pipelineId}-${Date.now()}.json`; + + return new Response(JSON.stringify(report, null, 2), { + headers: { + "Content-Type": "application/json", + "Content-Disposition": `attachment; filename="${filename}"`, + "Access-Control-Allow-Origin": "*" + } + }); + } + // Plan Execution APIs if (path === "/api/plans") { const pipelineId = url.searchParams.get("pipeline_id");