profit 77655c298c Initial commit: Agent Governance System Phase 8
Phase 8 Production Hardening with complete governance infrastructure:

- Vault integration with tiered policies (T0-T4)
- DragonflyDB state management
- SQLite audit ledger
- Pipeline DSL and templates
- Promotion/revocation engine
- Checkpoint system for session persistence
- Health manager and circuit breaker for fault tolerance
- GitHub/Slack integrations
- Architectural test pipeline with bug watcher, suggestion engine, council review
- Multi-agent chaos testing framework

Test Results:
- Governance tests: 68/68 passing
- E2E workflow: 16/16 passing
- Phase 2 Vault: 14/14 passing
- Integration tests: 27/27 passing

Coverage: 57.6% average across 12 phases

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-23 22:07:06 -05:00

398 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Governed LLM Agent
==================
An LLM-powered agent that operates under the governance runtime.
Combines planning capabilities with full governance compliance.
"""
import json
import sys
from datetime import datetime, timezone
from typing import Optional
from openai import OpenAI
from pydantic import BaseModel
from governance import (
GovernanceManager,
WorkerRuntime,
InstructionPacket,
ErrorBudget,
AgentPhase,
AgentStatus,
)
# =============================================================================
# Configuration
# =============================================================================
def get_openrouter_client() -> OpenAI:
"""Get OpenRouter client with API key from Vault"""
import subprocess
with open("/opt/vault/init-keys.json") as f:
token = json.load(f)["root_token"]
result = subprocess.run([
"curl", "-sk",
"-H", f"X-Vault-Token: {token}",
"https://127.0.0.1:8200/v1/secret/data/api-keys/openrouter"
], capture_output=True, text=True)
api_key = json.loads(result.stdout)["data"]["data"]["api_key"]
return OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=api_key
)
# =============================================================================
# Governed LLM Agent
# =============================================================================
class GovernedLLMAgent:
"""
An LLM agent that operates under governance control.
Lifecycle:
1. Receive instruction packet
2. Bootstrap with governance runtime
3. Execute phases: PREFLIGHT -> PLAN -> EXECUTE -> VERIFY -> PACKAGE -> REPORT
4. Handle errors within budget
5. Create handoff if revoked
"""
def __init__(self, agent_id: str, model: str = "anthropic/claude-sonnet-4"):
self.agent_id = agent_id
self.model = model
self.gov = GovernanceManager()
self.runtime: Optional[WorkerRuntime] = None
self.llm: Optional[OpenAI] = None
def _now(self) -> str:
return datetime.now(timezone.utc).isoformat()
def create_task(self, task_id: str, objective: str, constraints: dict = None) -> bool:
"""Create an instruction packet for this agent"""
packet = InstructionPacket(
agent_id=self.agent_id,
task_id=task_id,
created_for="Governed LLM Task",
objective=objective,
deliverables=["implementation plan", "execution logs", "artifacts"],
constraints=constraints or {
"scope": ["sandbox only"],
"forbidden": ["no prod access", "no unrecorded changes"],
"required_steps": ["plan before execute", "verify after execute"]
},
success_criteria=["plan generated", "artifacts registered"],
error_budget=ErrorBudget(
max_total_errors=10,
max_same_error_repeats=3,
max_procedure_violations=1
),
escalation_rules=[
"If confidence < 0.7 -> escalate",
"If blocked > 10m -> escalate"
]
)
return self.gov.create_instruction_packet(packet)
def start(self) -> tuple[bool, str]:
"""Bootstrap the governed agent"""
print(f"\n{'='*60}")
print(f"GOVERNED LLM AGENT: {self.agent_id}")
print(f"Model: {self.model}")
print(f"{'='*60}\n")
# Initialize runtime
self.runtime = WorkerRuntime(self.agent_id)
# Bootstrap (reads revocations, loads packet, acquires lock)
ok, msg = self.runtime.bootstrap()
if not ok:
print(f"[FATAL] Bootstrap failed: {msg}")
return False, msg
# Initialize LLM client
self.llm = get_openrouter_client()
print(f"[READY] Agent bootstrapped successfully")
print(f"[TASK] {self.runtime.packet.objective}")
print(f"[CONSTRAINTS] {self.runtime.packet.constraints}")
return True, "READY"
def run_preflight(self) -> bool:
"""PREFLIGHT phase: scope and dependency checks"""
if not self.runtime:
return False
if not self.runtime.transition(AgentPhase.PREFLIGHT, "scope_check"):
return False
packet = self.runtime.packet
# Check scope constraints
scope = packet.constraints.get("scope", [])
print(f"[PREFLIGHT] Scope constraints: {scope}")
# Check forbidden actions
forbidden = packet.constraints.get("forbidden", [])
print(f"[PREFLIGHT] Forbidden actions: {forbidden}")
# Check required steps
required = packet.constraints.get("required_steps", [])
print(f"[PREFLIGHT] Required steps: {required}")
self.runtime.transition(AgentPhase.PREFLIGHT, "preflight_complete",
"All preflight checks passed")
return True
def run_plan(self) -> Optional[dict]:
"""PLAN phase: generate implementation plan using LLM"""
if not self.runtime or not self.llm:
return None
if not self.runtime.transition(AgentPhase.PLAN, "generating_plan"):
return None
packet = self.runtime.packet
# Build prompt
system_prompt = f"""You are a governed infrastructure agent operating under strict constraints.
Your task: {packet.objective}
Constraints you MUST follow:
- Scope: {packet.constraints.get('scope', [])}
- Forbidden: {packet.constraints.get('forbidden', [])}
- Required steps: {packet.constraints.get('required_steps', [])}
You are in the PLAN phase. Generate a detailed plan but DO NOT execute anything.
Output your plan as JSON:
{{
"title": "Plan title",
"confidence": 0.0-1.0,
"steps": [
{{"step": 1, "action": "description", "phase": "PLAN|EXECUTE|VERIFY", "reversible": true}}
],
"assumptions": [],
"risks": [],
"estimated_duration": "X minutes"
}}"""
try:
response = self.llm.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Create an implementation plan for: {packet.objective}"}
],
max_tokens=2000,
temperature=0.3
)
llm_response = response.choices[0].message.content
# Parse plan
try:
json_match = llm_response[llm_response.find("{"):llm_response.rfind("}")+1]
plan = json.loads(json_match)
except:
plan = {"raw_response": llm_response, "confidence": 0.5}
# Register plan artifact
self.runtime.register_artifact("plan", f"plan_{self.agent_id}_{self._now()}")
# Check confidence
confidence = plan.get("confidence", 0.5)
if confidence < 0.7:
print(f"[PLAN] Low confidence ({confidence}), would escalate in production")
self.runtime.transition(AgentPhase.PLAN, "plan_complete",
f"Plan generated with confidence {confidence}")
return plan
except Exception as e:
self.runtime.report_error("LLM_ERROR", str(e))
return None
def run_execute(self, plan: dict) -> bool:
"""EXECUTE phase: simulate execution (in real system, would apply changes)"""
if not self.runtime:
return False
# Verify we have a plan artifact (compliance requirement)
if not self.gov.has_required_artifact(self.runtime.packet.task_id, "plan"):
self.runtime.report_violation("EXECUTE_WITHOUT_PLAN")
return False
if not self.runtime.transition(AgentPhase.EXECUTE, "executing"):
return False
steps = plan.get("steps", [])
print(f"[EXECUTE] Simulating execution of {len(steps)} steps...")
for step in steps:
step_num = step.get("step", "?")
action = step.get("action", "unknown")
print(f" Step {step_num}: {action[:60]}...")
# In real implementation, would execute the action here
# For now, just register it as done
self.runtime.register_artifact(
f"step_{step_num}",
f"executed_{step_num}_{self._now()}"
)
self.runtime.transition(AgentPhase.EXECUTE, "execute_complete")
return True
def run_verify(self) -> bool:
"""VERIFY phase: post-execution checks"""
if not self.runtime:
return False
if not self.runtime.transition(AgentPhase.VERIFY, "verifying"):
return False
# Check all artifacts were created
artifacts = self.gov.get_artifacts(self.runtime.packet.task_id)
print(f"[VERIFY] Registered artifacts: {len(artifacts)}")
# In real system, would run actual verification
self.runtime.transition(AgentPhase.VERIFY, "verify_complete",
f"Verified {len(artifacts)} artifacts")
return True
def run_package(self) -> dict:
"""PACKAGE phase: collect all outputs"""
if not self.runtime:
return {}
if not self.runtime.transition(AgentPhase.PACKAGE, "packaging"):
return {}
artifacts = self.gov.get_artifacts(self.runtime.packet.task_id)
errors = self.gov.get_error_counts(self.agent_id)
package = {
"agent_id": self.agent_id,
"task_id": self.runtime.packet.task_id,
"objective": self.runtime.packet.objective,
"artifacts": artifacts,
"error_counts": errors,
"completed_at": self._now()
}
self.runtime.register_artifact("package", f"package_{self._now()}")
self.runtime.transition(AgentPhase.PACKAGE, "package_complete")
return package
def run_report(self, package: dict) -> dict:
"""REPORT phase: generate final report"""
if not self.runtime:
return {}
if not self.runtime.transition(AgentPhase.REPORT, "reporting"):
return {}
report = {
"agent_id": self.agent_id,
"task_id": package.get("task_id"),
"status": "COMPLETED",
"summary": f"Completed objective: {package.get('objective')}",
"artifacts_count": len(package.get("artifacts", [])),
"errors_encountered": package.get("error_counts", {}).get("total_errors", 0),
"timestamp": self._now()
}
self.runtime.transition(AgentPhase.REPORT, "report_complete")
return report
def finish(self, report: dict) -> bool:
"""Complete the agent's work"""
if not self.runtime:
return False
return self.runtime.complete(f"Task completed: {report.get('summary', 'done')}")
def run_full_lifecycle(self) -> dict:
"""Run the complete agent lifecycle"""
# Start
ok, msg = self.start()
if not ok:
return {"status": "FAILED", "reason": msg}
# Preflight
if not self.run_preflight():
return {"status": "FAILED", "reason": "PREFLIGHT_FAILED"}
# Plan
plan = self.run_plan()
if not plan:
return {"status": "FAILED", "reason": "PLAN_FAILED"}
print(f"\n[PLAN GENERATED]")
print(json.dumps(plan, indent=2))
# Execute
if not self.run_execute(plan):
return {"status": "FAILED", "reason": "EXECUTE_FAILED"}
# Verify
if not self.run_verify():
return {"status": "FAILED", "reason": "VERIFY_FAILED"}
# Package
package = self.run_package()
# Report
report = self.run_report(package)
# Finish
self.finish(report)
print(f"\n{'='*60}")
print("FINAL REPORT")
print(f"{'='*60}")
print(json.dumps(report, indent=2))
return report
# =============================================================================
# CLI
# =============================================================================
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: governed_agent.py <agent_id> <task_id> <objective>")
print(" governed_agent.py <agent_id> <task_id> <objective> --model <model>")
sys.exit(1)
agent_id = sys.argv[1]
task_id = sys.argv[2]
objective = sys.argv[3]
model = "anthropic/claude-sonnet-4"
if "--model" in sys.argv:
model = sys.argv[sys.argv.index("--model") + 1]
# Create agent
agent = GovernedLLMAgent(agent_id, model=model)
# Create task
agent.create_task(task_id, objective)
# Run full lifecycle
result = agent.run_full_lifecycle()
sys.exit(0 if result.get("status") == "COMPLETED" else 1)