profit ef18567674 Implement real supervisor-driven auto-recovery
Orchestrator changes:
- Force-spawn GAMMA on iteration_limit before abort
- GAMMA.synthesize() creates emergency handoff payload
- loadRecoveryContext() logs "Resuming from {task_id} handoff"
- POST to /api/pipeline/log for resume message visibility

AgentGamma changes:
- Add synthesize() method for emergency abort synthesis
- Merges existing proposals into coherent handoff
- Stores as synthesis_type: "abort_recovery"

Server changes:
- Add POST /api/pipeline/log endpoint for orchestrator logging
- Recovery pipeline properly inherits GAMMA synthesis

Test coverage:
- test_auto_recovery.py: 6 unit tests
- test_e2e_auto_recovery.py: 5 E2E tests
- test_supervisor_recovery.py: 3 supervisor tests
  - Success on attempt 2 (recovery works)
  - Max failures (3 retries then FAILED)
  - Success on attempt 1 (no recovery needed)

Recovery flow:
1. iteration_limit triggers
2. GAMMA force-spawned for emergency synthesis
3. Handoff dumped with GAMMA synthesis
4. Exit code 3 triggers auto-recovery
5. Recovery pipeline loads handoff
6. Logs "Resuming from {prior_pipeline} handoff"
7. Repeat up to 3 times or until success

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 19:47:56 -05:00
..

Multi-Agent Coordination System

Orchestrator for parallel agent execution and coordination

Overview

The Multi-Agent Coordination System manages parallel execution of multiple agents, providing shared state via a blackboard pattern, message passing, dynamic agent spawning, and comprehensive metrics collection.

Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                    Multi-Agent Orchestrator                              │
│                                                                          │
│   ┌──────────────────────────────────────────────────────────────────┐  │
│   │                     Coordination Layer                            │  │
│   │  ┌───────────┐  ┌────────────┐  ┌───────────┐  ┌──────────────┐  │  │
│   │  │Blackboard │  │AgentState  │  │  Spawn    │  │   Metrics    │  │  │
│   │  │ (Shared)  │  │  Manager   │  │Controller │  │  Collector   │  │  │
│   │  └───────────┘  └────────────┘  └───────────┘  └──────────────┘  │  │
│   └──────────────────────────────────────────────────────────────────┘  │
│                              │                                           │
│              ┌───────────────┼───────────────┐                          │
│              ▼               ▼               ▼                          │
│   ┌─────────────────┐ ┌─────────────┐ ┌─────────────┐                  │
│   │   Agent Alpha   │ │ Agent Beta  │ │ Agent Gamma │                  │
│   │   (Planner)     │ │ (Executor)  │ │ (Validator) │                  │
│   │                 │ │             │ │  (Dynamic)  │                  │
│   │   MessageBus    │ │ MessageBus  │ │ MessageBus  │                  │
│   └─────────────────┘ └─────────────┘ └─────────────┘                  │
│              │               │               │                          │
└──────────────┼───────────────┼───────────────┼──────────────────────────┘
               │               │               │
               ▼               ▼               ▼
        ┌───────────────────────────────────────────┐
        │              DragonflyDB                   │
        │  (State, Messages, Locks, Metrics)         │
        └───────────────────────────────────────────┘

Components

Orchestrator (orchestrator.ts - 410 lines)

Main coordination entry point:

  • Task initialization
  • Agent lifecycle management
  • Parallel execution control
  • Spawn condition monitoring
  • Results aggregation
const orchestrator = new MultiAgentOrchestrator("anthropic/claude-sonnet-4");
await orchestrator.initialize();
const results = await orchestrator.execute(taskDefinition);

Agents (agents.ts - 850 lines)

Three agent types with distinct roles:

Agent Role Capabilities
Alpha Planner Analyzes tasks, creates execution plans
Beta Executor Executes plan steps, reports progress
Gamma Validator Validates results, spawned conditionally

Coordination (coordination.ts - 450 lines)

Shared infrastructure classes:

Class Purpose
Blackboard Shared state storage (key-value)
MessageBus Inter-agent message passing
AgentStateManager Agent lifecycle and phase tracking
SpawnController Dynamic agent spawning
MetricsCollector Performance and compliance metrics

Types (types.ts - 65 lines)

TypeScript type definitions for:

  • TaskDefinition
  • CoordinationMetrics
  • SpawnCondition
  • AgentRole

Quick Start

# Enter directory
cd /opt/agent-governance/agents/multi-agent

# Install dependencies
bun install

# Run orchestrator
bun run orchestrator.ts

# Run with custom model
bun run orchestrator.ts --model "anthropic/claude-sonnet-4"

Coordination Patterns

Blackboard Pattern

Shared state accessible by all agents:

// Write to blackboard
await blackboard.set("plan", planData);

// Read from blackboard
const plan = await blackboard.get("plan");

// Watch for changes
blackboard.watch("results", (key, value) => {
  console.log(`Results updated: ${value}`);
});

Message Passing

Async communication between agents:

// Send message
await alphaBus.publish({
  from: "ALPHA",
  to: "BETA",
  type: "TASK_READY",
  payload: { stepId: "step-001" }
});

// Receive messages
betaBus.subscribe((message) => {
  if (message.type === "TASK_READY") {
    executeStep(message.payload.stepId);
  }
});

Dynamic Spawning

Agents spawned based on conditions:

// Define spawn condition
const gammaCondition: SpawnCondition = {
  trigger: "VALIDATION_NEEDED",
  threshold: 0.8,
  agentType: "GAMMA"
};

// Controller monitors and spawns
spawnController.registerCondition(gammaCondition);

Agent Lifecycle

INIT → READY → PLANNING → EXECUTING → VALIDATING → COMPLETE
                  │                        │
                  └──── FAILED ←──────────┘

Phase Transitions

// Update agent phase
await stateManager.setPhase("ALPHA", AgentPhase.PLANNING);

// Check phase
const phase = await stateManager.getPhase("BETA");

Metrics Collection

Comprehensive metrics tracked:

interface CoordinationMetrics {
  taskId: string;
  startTime: number;
  endTime?: number;
  agentMetrics: {
    [agentId: string]: {
      phases: string[];
      messagesSent: number;
      messagesReceived: number;
      errors: number;
    }
  };
  blackboardWrites: number;
  blackboardReads: number;
  spawnEvents: number;
}

Example Task Execution

import { MultiAgentOrchestrator } from "./orchestrator";
import type { TaskDefinition } from "./types";

const task: TaskDefinition = {
  id: "deploy-001",
  type: "deployment",
  description: "Deploy web service to sandbox",
  constraints: ["sandbox-only", "no-secrets"],
  timeout: 300000  // 5 minutes
};

const orchestrator = new MultiAgentOrchestrator();
await orchestrator.initialize();

const results = await orchestrator.execute(task);

console.log(`Status: ${results.status}`);
console.log(`Duration: ${results.duration}ms`);
console.log(`Agents used: ${results.agentsUsed.join(", ")}`);

DragonflyDB Keys

Key Pattern Purpose
task:{id}:blackboard:* Shared state
task:{id}:state:{agent} Agent state
task:{id}:bus:{agent} Message queue
task:{id}:metrics Coordination metrics
task:{id}:locks:* Distributed locks

Error Handling

try {
  await orchestrator.execute(task);
} catch (error) {
  if (error instanceof AgentTimeoutError) {
    // Agent exceeded timeout
  } else if (error instanceof CoordinationError) {
    // Infrastructure failure
  } else if (error instanceof SpawnLimitError) {
    // Too many agents spawned
  }
}

Testing

# Type check
bun run tsc --noEmit

# Run coordination tests
bun test

# Run with mock infrastructure
bun run orchestrator.ts --mock

Dependencies

Package Purpose
typescript Type system
redis DragonflyDB client
openai LLM integration

Configuration

const config = {
  maxAgents: 5,           // Maximum concurrent agents
  spawnTimeout: 10000,    // Spawn timeout (ms)
  messageTimeout: 5000,   // Message delivery timeout
  blackboardTTL: 3600,    // Key expiration (seconds)
  metricsInterval: 1000   // Metrics collection interval
};

Architecture Reference

Part of the Agent Governance System.

See also:


Last updated: 2026-01-24