profit 8c6e7831e9 Add Phase 10-12 implementation: multi-tenant, marketplace, observability
Major additions:
- marketplace/: Agent template registry with FTS5 search, ratings, versioning
- observability/: Prometheus metrics, distributed tracing, structured logging
- ledger/migrations/: Database migration scripts for multi-tenant support
- tests/governance/: 15 new test files for phases 6-12 (295 total tests)
- bin/validate-phases: Full 12-phase validation script

New features:
- Multi-tenant support with tenant isolation and quota enforcement
- Agent marketplace with semantic versioning and search
- Observability with metrics, tracing, and log correlation
- Tier-1 agent bootstrap scripts

Updated components:
- ledger/api.py: Extended API for tenants, marketplace, observability
- ledger/schema.sql: Added tenant, project, marketplace tables
- testing/framework.ts: Enhanced test framework
- checkpoint/checkpoint.py: Improved checkpoint management

Archived:
- External integrations (Slack/GitHub/PagerDuty) moved to .archive/
- Old checkpoint files cleaned up

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 18:39:47 -05:00

401 lines
12 KiB
TypeScript

/**
* Multi-Agent Governance Integration
* ===================================
* Provides governance ledger integration for the multi-agent coordination system.
* Logs all agent actions to the SQLite governance ledger for audit and compliance.
*/
import { Database } from "bun:sqlite";
import type { AgentRole, AgentMessage, CoordinationMetrics } from "./types";
// =============================================================================
// Configuration
// =============================================================================
const LEDGER_PATH = "/opt/agent-governance/ledger/governance.db";
// =============================================================================
// Governance Logger
// =============================================================================
export class GovernanceLedger {
private db: Database;
private taskId: string;
constructor(taskId: string) {
this.taskId = taskId;
this.db = new Database(LEDGER_PATH);
this.ensureTable();
}
private ensureTable(): void {
// Ensure orchestration_log table exists
this.db.run(`
CREATE TABLE IF NOT EXISTS orchestration_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
task_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
agent_role TEXT NOT NULL,
action TEXT NOT NULL,
decision TEXT NOT NULL,
confidence REAL DEFAULT 0,
details TEXT,
success INTEGER DEFAULT 1,
error_message TEXT
)
`);
// Ensure multi_agent_metrics table exists
this.db.run(`
CREATE TABLE IF NOT EXISTS multi_agent_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
task_id TEXT NOT NULL,
total_messages INTEGER DEFAULT 0,
blackboard_writes INTEGER DEFAULT 0,
blackboard_reads INTEGER DEFAULT 0,
conflicts_detected INTEGER DEFAULT 0,
conflicts_resolved INTEGER DEFAULT 0,
gamma_spawned INTEGER DEFAULT 0,
gamma_spawn_reason TEXT,
consensus_achieved INTEGER DEFAULT 0,
performance_score REAL DEFAULT 0,
duration_seconds REAL DEFAULT 0
)
`);
}
/**
* Log an agent action to the governance ledger
*/
logAction(
agentId: string,
agentRole: AgentRole,
action: string,
decision: "EXECUTE" | "SKIP" | "BLOCKED" | "ERROR",
options: {
confidence?: number;
details?: any;
success?: boolean;
errorMessage?: string;
} = {}
): void {
const timestamp = new Date().toISOString();
const { confidence = 0, details, success = true, errorMessage } = options;
this.db.run(
`INSERT INTO orchestration_log
(timestamp, task_id, agent_id, agent_role, action, decision, confidence, details, success, error_message)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
[
timestamp,
this.taskId,
agentId,
agentRole,
action,
decision,
confidence,
details ? JSON.stringify(details) : null,
success ? 1 : 0,
errorMessage || null,
]
);
}
/**
* Log agent initialization
*/
logAgentInit(agentId: string, agentRole: AgentRole, model: string): void {
this.logAction(agentId, agentRole, "INITIALIZE", "EXECUTE", {
confidence: 1.0,
details: { model, timestamp: new Date().toISOString() },
});
}
/**
* Log message sent
*/
logMessage(from: AgentRole, to: AgentRole | "ALL", messageType: string, payload: any): void {
this.logAction(`${from}-${this.taskId}`, from, `MESSAGE_${messageType}`, "EXECUTE", {
details: { to, messageType, payloadSize: JSON.stringify(payload).length },
});
}
/**
* Log blackboard operation
*/
logBlackboardOp(agentRole: AgentRole, operation: "READ" | "WRITE", section: string, key: string): void {
this.logAction(`${agentRole}-${this.taskId}`, agentRole, `BLACKBOARD_${operation}`, "EXECUTE", {
details: { section, key },
});
}
/**
* Log spawn event
*/
logSpawn(agentRole: AgentRole, reason: string): void {
this.logAction(`${agentRole}-${this.taskId}`, agentRole, "SPAWN", "EXECUTE", {
confidence: 1.0,
details: { reason, timestamp: new Date().toISOString() },
});
}
/**
* Log consensus vote
*/
logVote(agentRole: AgentRole, proposalId: string, vote: string, reasoning: string): void {
this.logAction(`${agentRole}-${this.taskId}`, agentRole, "CONSENSUS_VOTE", "EXECUTE", {
details: { proposalId, vote, reasoning },
});
}
/**
* Log final metrics
*/
logMetrics(metrics: CoordinationMetrics): void {
const timestamp = new Date().toISOString();
const duration = metrics.end_time
? (new Date(metrics.end_time).getTime() - new Date(metrics.start_time).getTime()) / 1000
: 0;
this.db.run(
`INSERT INTO multi_agent_metrics
(timestamp, task_id, total_messages, blackboard_writes, blackboard_reads,
conflicts_detected, conflicts_resolved, gamma_spawned, gamma_spawn_reason,
consensus_achieved, performance_score, duration_seconds)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
[
timestamp,
this.taskId,
metrics.total_messages,
metrics.blackboard_writes,
metrics.blackboard_reads,
metrics.conflicts_detected,
metrics.conflicts_resolved,
metrics.gamma_spawned ? 1 : 0,
metrics.gamma_spawn_reason || null,
metrics.final_consensus ? 1 : 0,
metrics.performance_score,
duration,
]
);
}
/**
* Get action history for this task
*/
getTaskHistory(): any[] {
return this.db
.query(
`SELECT * FROM orchestration_log WHERE task_id = ? ORDER BY timestamp ASC`
)
.all(this.taskId);
}
/**
* Get agent statistics
*/
getAgentStats(agentRole: AgentRole): { totalActions: number; successRate: number } {
const result = this.db
.query(
`SELECT COUNT(*) as total, SUM(success) as successes
FROM orchestration_log
WHERE task_id = ? AND agent_role = ?`
)
.get(this.taskId, agentRole) as { total: number; successes: number };
return {
totalActions: result.total,
successRate: result.total > 0 ? result.successes / result.total : 0,
};
}
close(): void {
this.db.close();
}
}
// =============================================================================
// Diagnostics
// =============================================================================
export interface DiagnosticReport {
timestamp: string;
taskId: string;
status: "HEALTHY" | "DEGRADED" | "UNHEALTHY";
agents: {
role: AgentRole;
status: string;
actionsLogged: number;
lastActivity: string | null;
}[];
infrastructure: {
ledgerConnected: boolean;
dragonflyConnected: boolean;
vaultAccessible: boolean;
};
warnings: string[];
errors: string[];
}
export async function runDiagnostics(taskId: string): Promise<DiagnosticReport> {
const warnings: string[] = [];
const errors: string[] = [];
const timestamp = new Date().toISOString();
// Check ledger
let ledgerConnected = false;
let agentStats: { role: AgentRole; status: string; actionsLogged: number; lastActivity: string | null }[] = [];
try {
const ledger = new GovernanceLedger(taskId);
ledgerConnected = true;
// Get agent stats from ledger
for (const role of ["ALPHA", "BETA", "GAMMA"] as AgentRole[]) {
const result = ledger.db
.query(
`SELECT COUNT(*) as count, MAX(timestamp) as last
FROM orchestration_log
WHERE task_id = ? AND agent_role = ?`
)
.get(taskId, role) as { count: number; last: string | null };
agentStats.push({
role,
status: result.count > 0 ? "ACTIVE" : "INACTIVE",
actionsLogged: result.count,
lastActivity: result.last,
});
}
ledger.close();
} catch (e: any) {
errors.push(`Ledger error: ${e.message}`);
}
// Check DragonflyDB
let dragonflyConnected = false;
try {
const { $ } = await import("bun");
const initKeys = await Bun.file("/opt/vault/init-keys.json").json();
const token = initKeys.root_token;
const result = await $`curl -sk -H "X-Vault-Token: ${token}" https://127.0.0.1:8200/v1/secret/data/services/dragonfly`.json();
const creds = result.data.data;
// Quick ping test
const { createClient } = await import("redis");
const client = createClient({
url: `redis://${creds.host}:${creds.port}`,
password: creds.password,
});
await client.connect();
await client.ping();
await client.quit();
dragonflyConnected = true;
} catch (e: any) {
warnings.push(`DragonflyDB: ${e.message}`);
}
// Check Vault
let vaultAccessible = false;
try {
const initKeys = await Bun.file("/opt/vault/init-keys.json").json();
vaultAccessible = !!initKeys.root_token;
} catch (e: any) {
warnings.push(`Vault: ${e.message}`);
}
// Determine overall status
let status: DiagnosticReport["status"] = "HEALTHY";
if (errors.length > 0) {
status = "UNHEALTHY";
} else if (warnings.length > 0) {
status = "DEGRADED";
}
return {
timestamp,
taskId,
status,
agents: agentStats,
infrastructure: {
ledgerConnected,
dragonflyConnected,
vaultAccessible,
},
warnings,
errors,
};
}
// =============================================================================
// Health Check Endpoint
// =============================================================================
export interface HealthStatus {
status: "ok" | "degraded" | "error";
timestamp: string;
checks: {
name: string;
status: "pass" | "fail";
message?: string;
}[];
}
export async function healthCheck(): Promise<HealthStatus> {
const checks: HealthStatus["checks"] = [];
const timestamp = new Date().toISOString();
// Check 1: Ledger database
try {
const db = new Database(LEDGER_PATH);
db.query("SELECT 1").get();
db.close();
checks.push({ name: "ledger_db", status: "pass" });
} catch (e: any) {
checks.push({ name: "ledger_db", status: "fail", message: e.message });
}
// Check 2: Vault access
try {
const initKeys = await Bun.file("/opt/vault/init-keys.json").json();
if (initKeys.root_token) {
checks.push({ name: "vault_access", status: "pass" });
} else {
checks.push({ name: "vault_access", status: "fail", message: "No root token" });
}
} catch (e: any) {
checks.push({ name: "vault_access", status: "fail", message: e.message });
}
// Check 3: DragonflyDB
try {
const { $ } = await import("bun");
const initKeys = await Bun.file("/opt/vault/init-keys.json").json();
const result = await $`curl -sk -H "X-Vault-Token: ${initKeys.root_token}" https://127.0.0.1:8200/v1/secret/data/services/dragonfly`.json();
const creds = result.data.data;
const { createClient } = await import("redis");
const client = createClient({
url: `redis://${creds.host}:${creds.port}`,
password: creds.password,
});
await client.connect();
await client.ping();
await client.quit();
checks.push({ name: "dragonfly", status: "pass" });
} catch (e: any) {
checks.push({ name: "dragonfly", status: "fail", message: e.message });
}
// Determine overall status
const failedChecks = checks.filter((c) => c.status === "fail");
let status: HealthStatus["status"] = "ok";
if (failedChecks.length > 0) {
status = failedChecks.length === checks.length ? "error" : "degraded";
}
return { status, timestamp, checks };
}