Phase 8 Production Hardening with complete governance infrastructure: - Vault integration with tiered policies (T0-T4) - DragonflyDB state management - SQLite audit ledger - Pipeline DSL and templates - Promotion/revocation engine - Checkpoint system for session persistence - Health manager and circuit breaker for fault tolerance - GitHub/Slack integrations - Architectural test pipeline with bug watcher, suggestion engine, council review - Multi-agent chaos testing framework Test Results: - Governance tests: 68/68 passing - E2E workflow: 16/16 passing - Phase 2 Vault: 14/14 passing - Integration tests: 27/27 passing Coverage: 57.6% average across 12 phases Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
850 lines
26 KiB
Python
Executable File
850 lines
26 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Directory Status Management System
|
|
===================================
|
|
|
|
Manages README.md and STATUS.md files across the agent-governance project.
|
|
Ensures every subdirectory has proper documentation and status tracking.
|
|
|
|
Usage:
|
|
python status.py sweep [--fix]
|
|
python status.py update <dir> --phase <phase> [--task <task>]
|
|
python status.py init <dir>
|
|
python status.py dashboard
|
|
python status.py template [readme|status]
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import argparse
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
|
|
# Project root
|
|
PROJECT_ROOT = Path("/opt/agent-governance")
|
|
|
|
# Directories to skip (data/generated content)
|
|
SKIP_DIRS = {
|
|
"__pycache__",
|
|
"node_modules",
|
|
".git",
|
|
"logs",
|
|
"storage",
|
|
"dragonfly-data",
|
|
"credentials",
|
|
"workspace",
|
|
".claude",
|
|
}
|
|
|
|
# Directories that are data-only (no README needed)
|
|
DATA_DIRS = {
|
|
"logs",
|
|
"storage",
|
|
"dragonfly-data",
|
|
"credentials",
|
|
"workspace",
|
|
"packages", # evidence packages are generated
|
|
}
|
|
|
|
|
|
class StatusPhase(str, Enum):
|
|
"""Status phases for directories."""
|
|
COMPLETE = "complete"
|
|
IN_PROGRESS = "in_progress"
|
|
BLOCKED = "blocked"
|
|
NEEDS_REVIEW = "needs_review"
|
|
NOT_STARTED = "not_started"
|
|
|
|
|
|
STATUS_ICONS = {
|
|
StatusPhase.COMPLETE: "",
|
|
StatusPhase.IN_PROGRESS: "",
|
|
StatusPhase.BLOCKED: "",
|
|
StatusPhase.NEEDS_REVIEW: "",
|
|
StatusPhase.NOT_STARTED: "",
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class DirectoryStatus:
|
|
"""Status information for a directory."""
|
|
path: Path
|
|
has_readme: bool = False
|
|
has_status: bool = False
|
|
phase: StatusPhase = StatusPhase.NOT_STARTED
|
|
last_updated: Optional[datetime] = None
|
|
tasks: List[Dict] = field(default_factory=list)
|
|
dependencies: List[str] = field(default_factory=list)
|
|
issues: List[str] = field(default_factory=list)
|
|
|
|
|
|
# =============================================================================
|
|
# Templates
|
|
# =============================================================================
|
|
|
|
README_TEMPLATE = '''# {title}
|
|
|
|
> {purpose}
|
|
|
|
## Overview
|
|
|
|
{overview}
|
|
|
|
## Key Files
|
|
|
|
| File | Description |
|
|
|------|-------------|
|
|
{files_table}
|
|
|
|
## Interfaces / APIs
|
|
|
|
{interfaces}
|
|
|
|
## Status
|
|
|
|
{status_badge}
|
|
|
|
See [STATUS.md](./STATUS.md) for detailed progress tracking.
|
|
|
|
## Architecture Reference
|
|
|
|
Part of the [Agent Governance System](/opt/agent-governance/docs/ARCHITECTURE.md).
|
|
|
|
Parent: [{parent_name}]({parent_path})
|
|
|
|
---
|
|
*Last updated: {timestamp}*
|
|
'''
|
|
|
|
STATUS_TEMPLATE = '''# Status: {title}
|
|
|
|
## Current Phase
|
|
|
|
{phase_badge}
|
|
|
|
## Tasks
|
|
|
|
| Status | Task | Updated |
|
|
|--------|------|---------|
|
|
{tasks_table}
|
|
|
|
## Dependencies
|
|
|
|
{dependencies}
|
|
|
|
## Issues / Blockers
|
|
|
|
{issues}
|
|
|
|
## Activity Log
|
|
|
|
{activity_log}
|
|
|
|
---
|
|
*Last updated: {timestamp}*
|
|
'''
|
|
|
|
ACTIVITY_ENTRY_TEMPLATE = '''### {timestamp}
|
|
- **Phase**: {phase}
|
|
- **Action**: {action}
|
|
- **Details**: {details}
|
|
'''
|
|
|
|
|
|
def get_readme_template() -> str:
|
|
return README_TEMPLATE
|
|
|
|
|
|
def get_status_template() -> str:
|
|
return STATUS_TEMPLATE
|
|
|
|
|
|
# =============================================================================
|
|
# Directory Analysis
|
|
# =============================================================================
|
|
|
|
def should_skip_dir(dir_path: Path) -> bool:
|
|
"""Check if directory should be skipped."""
|
|
name = dir_path.name
|
|
if name.startswith("."):
|
|
return True
|
|
if name in SKIP_DIRS:
|
|
return True
|
|
# Skip evidence package subdirs (generated)
|
|
if "evd-" in name:
|
|
return True
|
|
return False
|
|
|
|
|
|
def is_data_dir(dir_path: Path) -> bool:
|
|
"""Check if directory is data-only."""
|
|
return dir_path.name in DATA_DIRS
|
|
|
|
|
|
def get_all_directories() -> List[Path]:
|
|
"""Get all directories that should have README/STATUS."""
|
|
dirs = []
|
|
for root, subdirs, files in os.walk(PROJECT_ROOT):
|
|
root_path = Path(root)
|
|
|
|
# Filter subdirs in-place to skip certain directories
|
|
subdirs[:] = [d for d in subdirs if not should_skip_dir(root_path / d)]
|
|
|
|
# Skip data directories
|
|
if is_data_dir(root_path):
|
|
continue
|
|
|
|
dirs.append(root_path)
|
|
|
|
return sorted(dirs)
|
|
|
|
|
|
def analyze_directory(dir_path: Path) -> DirectoryStatus:
|
|
"""Analyze a directory's status files."""
|
|
status = DirectoryStatus(path=dir_path)
|
|
|
|
readme_path = dir_path / "README.md"
|
|
status_path = dir_path / "STATUS.md"
|
|
alt_status_path = dir_path / "README.status.md"
|
|
|
|
status.has_readme = readme_path.exists()
|
|
status.has_status = status_path.exists() or alt_status_path.exists()
|
|
|
|
# Parse STATUS.md if exists
|
|
actual_status_path = status_path if status_path.exists() else alt_status_path
|
|
if actual_status_path.exists():
|
|
try:
|
|
content = actual_status_path.read_text()
|
|
status.phase = parse_phase_from_status(content)
|
|
status.last_updated = parse_timestamp_from_status(content)
|
|
status.tasks = parse_tasks_from_status(content)
|
|
status.dependencies = parse_dependencies_from_status(content)
|
|
status.issues = parse_issues_from_status(content)
|
|
except Exception:
|
|
pass
|
|
|
|
return status
|
|
|
|
|
|
def parse_phase_from_status(content: str) -> StatusPhase:
|
|
"""Extract phase from STATUS.md content."""
|
|
content_lower = content.lower()
|
|
if "complete" in content_lower and ("phase" in content_lower or "status" in content_lower):
|
|
if "in_progress" not in content_lower and "in progress" not in content_lower:
|
|
return StatusPhase.COMPLETE
|
|
if "in_progress" in content_lower or "in progress" in content_lower:
|
|
return StatusPhase.IN_PROGRESS
|
|
if "blocked" in content_lower:
|
|
return StatusPhase.BLOCKED
|
|
if "needs_review" in content_lower or "needs review" in content_lower:
|
|
return StatusPhase.NEEDS_REVIEW
|
|
return StatusPhase.NOT_STARTED
|
|
|
|
|
|
def parse_timestamp_from_status(content: str) -> Optional[datetime]:
|
|
"""Extract last updated timestamp from STATUS.md."""
|
|
import re
|
|
# Look for ISO format timestamps
|
|
pattern = r'(\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2})'
|
|
matches = re.findall(pattern, content)
|
|
if matches:
|
|
try:
|
|
ts = matches[-1].replace(" ", "T")
|
|
return datetime.fromisoformat(ts)
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
|
|
def parse_tasks_from_status(content: str) -> List[Dict]:
|
|
"""Extract tasks from STATUS.md."""
|
|
tasks = []
|
|
import re
|
|
# Look for task table rows
|
|
pattern = r'\|\s*([x\- ✓✗])\s*\|\s*([^|]+)\|'
|
|
for match in re.finditer(pattern, content, re.IGNORECASE):
|
|
status_char = match.group(1).strip()
|
|
task_text = match.group(2).strip()
|
|
if task_text and task_text != "Task": # Skip header
|
|
done = status_char.lower() in ['x', '✓', 'done', 'complete']
|
|
tasks.append({"task": task_text, "done": done})
|
|
return tasks
|
|
|
|
|
|
def parse_dependencies_from_status(content: str) -> List[str]:
|
|
"""Extract dependencies from STATUS.md."""
|
|
deps = []
|
|
in_deps_section = False
|
|
for line in content.split('\n'):
|
|
if '## Dependencies' in line or '## Depends' in line:
|
|
in_deps_section = True
|
|
continue
|
|
if in_deps_section:
|
|
if line.startswith('##'):
|
|
break
|
|
if line.strip().startswith('-'):
|
|
deps.append(line.strip()[1:].strip())
|
|
return deps
|
|
|
|
|
|
def parse_issues_from_status(content: str) -> List[str]:
|
|
"""Extract issues/blockers from STATUS.md."""
|
|
issues = []
|
|
in_issues_section = False
|
|
for line in content.split('\n'):
|
|
if '## Issues' in line or '## Blockers' in line:
|
|
in_issues_section = True
|
|
continue
|
|
if in_issues_section:
|
|
if line.startswith('##'):
|
|
break
|
|
if line.strip().startswith('-'):
|
|
issues.append(line.strip()[1:].strip())
|
|
return issues
|
|
|
|
|
|
# =============================================================================
|
|
# File Generation
|
|
# =============================================================================
|
|
|
|
def get_directory_purpose(dir_path: Path) -> str:
|
|
"""Get a purpose description for a directory based on its name/location."""
|
|
purposes = {
|
|
"agents": "Agent implementations and configurations",
|
|
"analytics": "Learning analytics and pattern detection",
|
|
"bin": "CLI tools and executable scripts",
|
|
"checkpoint": "Context checkpoint and session management",
|
|
"docs": "System documentation and architecture specs",
|
|
"evidence": "Audit evidence and execution artifacts",
|
|
"integrations": "External service integrations (GitHub, Slack)",
|
|
"inventory": "Infrastructure inventory management",
|
|
"ledger": "SQLite audit ledger and transaction logs",
|
|
"lib": "Shared library code and utilities",
|
|
"orchestrator": "Multi-agent orchestration system",
|
|
"pipeline": "Pipeline DSL, schemas, and templates",
|
|
"preflight": "Pre-execution validation and checks",
|
|
"runtime": "Runtime governance and agent lifecycle",
|
|
"sandbox": "Sandboxed execution environments",
|
|
"schemas": "JSON schemas for validation",
|
|
"teams": "Hierarchical team framework",
|
|
"testing": "Test utilities and helpers",
|
|
"tests": "Test suites and test infrastructure",
|
|
"ui": "User interface components",
|
|
"wrappers": "Tool wrappers for sandboxed execution",
|
|
"tier0-agent": "Observer-tier agent (read-only)",
|
|
"tier1-agent": "Executor-tier agent (infrastructure changes)",
|
|
"multi-agent": "Multi-agent coordination demos",
|
|
"llm-planner": "LLM-based planning agent (Python)",
|
|
"llm-planner-ts": "LLM-based planning agent (TypeScript)",
|
|
"chaos": "Chaos testing framework",
|
|
"multi-agent-chaos": "Multi-agent chaos test suite",
|
|
"github": "GitHub integration module",
|
|
"slack": "Slack integration module",
|
|
"common": "Common integration utilities",
|
|
"framework": "Team framework core implementation",
|
|
"templates": "Configuration and pipeline templates",
|
|
"examples": "Example configurations and demos",
|
|
"schemas": "JSON schema definitions",
|
|
"mocks": "Mock implementations for testing",
|
|
"unit": "Unit tests",
|
|
"integration": "Integration tests",
|
|
"scenarios": "Test scenarios",
|
|
"governance": "Governance system tests",
|
|
"terraform": "Terraform sandbox and examples",
|
|
"ansible": "Ansible sandbox and examples",
|
|
}
|
|
|
|
name = dir_path.name
|
|
if name in purposes:
|
|
return purposes[name]
|
|
|
|
# Check parent context
|
|
parent = dir_path.parent.name
|
|
if parent in purposes:
|
|
return f"{purposes[parent]} - {name} submodule"
|
|
|
|
return f"Component of the Agent Governance System"
|
|
|
|
|
|
def get_key_files(dir_path: Path) -> List[Tuple[str, str]]:
|
|
"""Get key files in a directory with descriptions."""
|
|
files = []
|
|
|
|
# Prioritize certain file types
|
|
priority_patterns = [
|
|
("*.py", "Python module"),
|
|
("*.ts", "TypeScript module"),
|
|
("*.js", "JavaScript module"),
|
|
("*.yaml", "Configuration"),
|
|
("*.yml", "Configuration"),
|
|
("*.json", "Data/Schema"),
|
|
("*.sh", "Shell script"),
|
|
("*.sql", "Database schema"),
|
|
("*.md", "Documentation"),
|
|
]
|
|
|
|
seen = set()
|
|
for pattern, desc in priority_patterns:
|
|
for f in dir_path.glob(pattern):
|
|
if f.name not in seen and not f.name.startswith('.'):
|
|
if f.name not in ('README.md', 'STATUS.md', 'README.status.md'):
|
|
files.append((f.name, f"{desc}"))
|
|
seen.add(f.name)
|
|
|
|
return files[:10] # Limit to top 10
|
|
|
|
|
|
def generate_readme(dir_path: Path, phase: StatusPhase = StatusPhase.NOT_STARTED) -> str:
|
|
"""Generate README.md content for a directory."""
|
|
name = dir_path.name
|
|
title = name.replace("-", " ").replace("_", " ").title()
|
|
purpose = get_directory_purpose(dir_path)
|
|
|
|
# Get key files
|
|
key_files = get_key_files(dir_path)
|
|
if key_files:
|
|
files_table = "\n".join(f"| `{f}` | {d} |" for f, d in key_files)
|
|
else:
|
|
files_table = "| *No files yet* | |"
|
|
|
|
# Get parent info
|
|
parent = dir_path.parent
|
|
if parent == PROJECT_ROOT:
|
|
parent_name = "Project Root"
|
|
parent_path = "/opt/agent-governance"
|
|
else:
|
|
parent_name = parent.name.replace("-", " ").title()
|
|
parent_path = f".."
|
|
|
|
# Status badge
|
|
icon = STATUS_ICONS.get(phase, "")
|
|
phase_label = phase.value.replace("_", " ").title()
|
|
status_badge = f"**{icon} {phase_label}**"
|
|
|
|
# Overview - try to be smart about it
|
|
overview = f"This directory contains {purpose.lower()}."
|
|
|
|
# Interfaces placeholder
|
|
interfaces = "*Document any APIs, CLI commands, or interfaces here.*"
|
|
|
|
return README_TEMPLATE.format(
|
|
title=title,
|
|
purpose=purpose,
|
|
overview=overview,
|
|
files_table=files_table,
|
|
interfaces=interfaces,
|
|
status_badge=status_badge,
|
|
parent_name=parent_name,
|
|
parent_path=parent_path,
|
|
timestamp=datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC"),
|
|
)
|
|
|
|
|
|
def generate_status(dir_path: Path, phase: StatusPhase = StatusPhase.NOT_STARTED,
|
|
tasks: List[Dict] = None, dependencies: List[str] = None,
|
|
issues: List[str] = None, action: str = "Initialized") -> str:
|
|
"""Generate STATUS.md content for a directory."""
|
|
name = dir_path.name
|
|
title = name.replace("-", " ").replace("_", " ").title()
|
|
|
|
# Phase badge
|
|
icon = STATUS_ICONS.get(phase, "")
|
|
phase_label = phase.value.replace("_", " ").upper()
|
|
phase_badge = f"**{icon} {phase_label}**"
|
|
|
|
# Tasks table
|
|
tasks = tasks or []
|
|
if tasks:
|
|
tasks_table = "\n".join(
|
|
f"| {'✓' if t.get('done') else '☐'} | {t['task']} | {t.get('updated', 'N/A')} |"
|
|
for t in tasks
|
|
)
|
|
else:
|
|
tasks_table = "| ☐ | *No tasks defined* | - |"
|
|
|
|
# Dependencies
|
|
dependencies = dependencies or []
|
|
if dependencies:
|
|
deps_text = "\n".join(f"- {d}" for d in dependencies)
|
|
else:
|
|
deps_text = "*No external dependencies.*"
|
|
|
|
# Issues
|
|
issues = issues or []
|
|
if issues:
|
|
issues_text = "\n".join(f"- {i}" for i in issues)
|
|
else:
|
|
issues_text = "*No current issues or blockers.*"
|
|
|
|
# Activity log
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
activity_log = ACTIVITY_ENTRY_TEMPLATE.format(
|
|
timestamp=timestamp,
|
|
phase=phase_label,
|
|
action=action,
|
|
details="Status tracking initialized for this directory."
|
|
)
|
|
|
|
return STATUS_TEMPLATE.format(
|
|
title=title,
|
|
phase_badge=phase_badge,
|
|
tasks_table=tasks_table,
|
|
dependencies=deps_text,
|
|
issues=issues_text,
|
|
activity_log=activity_log,
|
|
timestamp=timestamp,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Commands
|
|
# =============================================================================
|
|
|
|
def cmd_sweep(args):
|
|
"""Check all directories for README/STATUS files."""
|
|
dirs = get_all_directories()
|
|
|
|
missing_readme = []
|
|
missing_status = []
|
|
outdated = []
|
|
|
|
print("=" * 60)
|
|
print("DIRECTORY STATUS SWEEP")
|
|
print("=" * 60)
|
|
print(f"Scanning {len(dirs)} directories...\n")
|
|
|
|
for dir_path in dirs:
|
|
status = analyze_directory(dir_path)
|
|
rel_path = dir_path.relative_to(PROJECT_ROOT)
|
|
|
|
issues = []
|
|
if not status.has_readme:
|
|
issues.append("missing README.md")
|
|
missing_readme.append(dir_path)
|
|
if not status.has_status:
|
|
issues.append("missing STATUS.md")
|
|
missing_status.append(dir_path)
|
|
|
|
if status.last_updated:
|
|
age = datetime.now(timezone.utc) - status.last_updated.replace(tzinfo=timezone.utc)
|
|
if age.days > 7:
|
|
issues.append(f"outdated ({age.days} days)")
|
|
outdated.append(dir_path)
|
|
|
|
if issues:
|
|
print(f" {rel_path}: {', '.join(issues)}")
|
|
|
|
print()
|
|
print("-" * 60)
|
|
print(f"Missing README.md: {len(missing_readme)}")
|
|
print(f"Missing STATUS.md: {len(missing_status)}")
|
|
print(f"Outdated (>7 days): {len(outdated)}")
|
|
print("-" * 60)
|
|
|
|
if args.fix and (missing_readme or missing_status):
|
|
print("\nFixing missing files...")
|
|
for dir_path in set(missing_readme + missing_status):
|
|
rel_path = dir_path.relative_to(PROJECT_ROOT)
|
|
|
|
readme_path = dir_path / "README.md"
|
|
status_path = dir_path / "STATUS.md"
|
|
|
|
if not readme_path.exists():
|
|
readme_path.write_text(generate_readme(dir_path))
|
|
print(f" Created: {rel_path}/README.md")
|
|
|
|
if not status_path.exists():
|
|
status_path.write_text(generate_status(dir_path))
|
|
print(f" Created: {rel_path}/STATUS.md")
|
|
|
|
print("\nDone!")
|
|
elif missing_readme or missing_status:
|
|
print("\nRun with --fix to create missing files.")
|
|
|
|
return 0
|
|
|
|
|
|
def cmd_update(args):
|
|
"""Update status for a directory."""
|
|
dir_path = Path(args.directory).resolve()
|
|
|
|
if not dir_path.exists():
|
|
print(f"Error: Directory not found: {dir_path}")
|
|
return 1
|
|
|
|
status_path = dir_path / "STATUS.md"
|
|
|
|
# Parse phase
|
|
phase = StatusPhase.IN_PROGRESS
|
|
if args.phase:
|
|
try:
|
|
phase = StatusPhase(args.phase.lower().replace(" ", "_"))
|
|
except ValueError:
|
|
print(f"Error: Invalid phase '{args.phase}'")
|
|
print(f"Valid phases: {', '.join(p.value for p in StatusPhase)}")
|
|
return 1
|
|
|
|
# Read existing or create new
|
|
if status_path.exists():
|
|
content = status_path.read_text()
|
|
# Update phase in existing content
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
|
|
# Add new activity log entry
|
|
new_entry = ACTIVITY_ENTRY_TEMPLATE.format(
|
|
timestamp=timestamp,
|
|
phase=phase.value.upper(),
|
|
action=args.task or args.note or "Status update",
|
|
details=args.note or f"Phase updated to {phase.value}",
|
|
)
|
|
|
|
# Update phase badge
|
|
icon = STATUS_ICONS.get(phase, "")
|
|
phase_label = phase.value.replace("_", " ").upper()
|
|
new_badge = f"**{icon} {phase_label}**"
|
|
|
|
import re
|
|
content = re.sub(r'\*\*[✅🚧❗⚠️⬜] [A-Z_]+\*\*', new_badge, content)
|
|
|
|
# Insert new activity entry
|
|
if "## Activity Log" in content:
|
|
content = content.replace(
|
|
"## Activity Log\n",
|
|
f"## Activity Log\n\n{new_entry}"
|
|
)
|
|
|
|
# Update timestamp
|
|
content = re.sub(
|
|
r'\*Last updated: [^*]+\*',
|
|
f'*Last updated: {timestamp}*',
|
|
content
|
|
)
|
|
|
|
status_path.write_text(content)
|
|
else:
|
|
# Create new STATUS.md
|
|
content = generate_status(
|
|
dir_path,
|
|
phase=phase,
|
|
action=args.task or "Initialized",
|
|
)
|
|
status_path.write_text(content)
|
|
|
|
rel_path = dir_path.relative_to(PROJECT_ROOT) if str(dir_path).startswith(str(PROJECT_ROOT)) else dir_path
|
|
print(f"Updated: {rel_path}/STATUS.md")
|
|
print(f" Phase: {phase.value}")
|
|
if args.task:
|
|
print(f" Task: {args.task}")
|
|
if args.note:
|
|
print(f" Note: {args.note}")
|
|
|
|
# Create lightweight checkpoint delta unless disabled
|
|
if not getattr(args, 'no_checkpoint', False):
|
|
try:
|
|
create_checkpoint_delta(str(rel_path), phase.value, args.task or args.note)
|
|
except Exception as e:
|
|
# Don't fail the status update if checkpoint fails
|
|
print(f" (Checkpoint delta skipped: {e})")
|
|
|
|
return 0
|
|
|
|
|
|
def create_checkpoint_delta(dir_path: str, phase: str, action: str = None):
|
|
"""
|
|
Create a lightweight checkpoint recording the status change.
|
|
Integrates with the checkpoint system for state tracking.
|
|
"""
|
|
import subprocess
|
|
|
|
# Build notes for the checkpoint
|
|
action_str = f": {action}" if action else ""
|
|
notes = f"Status update - {dir_path} -> {phase}{action_str}"
|
|
|
|
# Call checkpoint command to create delta
|
|
result = subprocess.run(
|
|
["python3", "/opt/agent-governance/checkpoint/checkpoint.py",
|
|
"now", "--notes", notes],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
# Extract checkpoint ID from output
|
|
for line in result.stdout.split('\n'):
|
|
if line.startswith("ID:"):
|
|
ckpt_id = line.split(":")[1].strip()
|
|
print(f" Checkpoint: {ckpt_id}")
|
|
break
|
|
|
|
|
|
def cmd_init(args):
|
|
"""Initialize README/STATUS in a directory."""
|
|
dir_path = Path(args.directory).resolve()
|
|
|
|
if not dir_path.exists():
|
|
print(f"Error: Directory not found: {dir_path}")
|
|
return 1
|
|
|
|
readme_path = dir_path / "README.md"
|
|
status_path = dir_path / "STATUS.md"
|
|
|
|
rel_path = dir_path.relative_to(PROJECT_ROOT) if str(dir_path).startswith(str(PROJECT_ROOT)) else dir_path
|
|
|
|
created = []
|
|
if not readme_path.exists() or args.force:
|
|
readme_path.write_text(generate_readme(dir_path))
|
|
created.append("README.md")
|
|
|
|
if not status_path.exists() or args.force:
|
|
status_path.write_text(generate_status(dir_path))
|
|
created.append("STATUS.md")
|
|
|
|
if created:
|
|
print(f"Initialized {rel_path}:")
|
|
for f in created:
|
|
print(f" Created: {f}")
|
|
else:
|
|
print(f"Files already exist in {rel_path}. Use --force to overwrite.")
|
|
|
|
return 0
|
|
|
|
|
|
def cmd_dashboard(args):
|
|
"""Show status overview of all directories."""
|
|
dirs = get_all_directories()
|
|
|
|
by_phase = {phase: [] for phase in StatusPhase}
|
|
|
|
for dir_path in dirs:
|
|
status = analyze_directory(dir_path)
|
|
by_phase[status.phase].append(status)
|
|
|
|
print("=" * 70)
|
|
print("PROJECT STATUS DASHBOARD")
|
|
print("=" * 70)
|
|
print()
|
|
|
|
total = len(dirs)
|
|
complete = len(by_phase[StatusPhase.COMPLETE])
|
|
in_progress = len(by_phase[StatusPhase.IN_PROGRESS])
|
|
blocked = len(by_phase[StatusPhase.BLOCKED])
|
|
needs_review = len(by_phase[StatusPhase.NEEDS_REVIEW])
|
|
not_started = len(by_phase[StatusPhase.NOT_STARTED])
|
|
|
|
# Progress bar
|
|
pct_complete = (complete / total * 100) if total > 0 else 0
|
|
bar_width = 40
|
|
filled = int(bar_width * complete / total) if total > 0 else 0
|
|
bar = "█" * filled + "░" * (bar_width - filled)
|
|
print(f"Progress: [{bar}] {pct_complete:.1f}%")
|
|
print()
|
|
|
|
# Summary counts
|
|
print(f" ✅ Complete: {complete:3d}")
|
|
print(f" 🚧 In Progress: {in_progress:3d}")
|
|
print(f" ❗ Blocked: {blocked:3d}")
|
|
print(f" ⚠️ Needs Review: {needs_review:3d}")
|
|
print(f" ⬜ Not Started: {not_started:3d}")
|
|
print(f" ─────────────────────")
|
|
print(f" Total: {total:3d}")
|
|
print()
|
|
|
|
# Show non-complete directories
|
|
if in_progress or blocked or needs_review:
|
|
print("-" * 70)
|
|
print("ACTIVE DIRECTORIES:")
|
|
print("-" * 70)
|
|
|
|
for phase in [StatusPhase.BLOCKED, StatusPhase.IN_PROGRESS, StatusPhase.NEEDS_REVIEW]:
|
|
if by_phase[phase]:
|
|
icon = STATUS_ICONS[phase]
|
|
print(f"\n{icon} {phase.value.replace('_', ' ').upper()}:")
|
|
for status in by_phase[phase]:
|
|
rel_path = status.path.relative_to(PROJECT_ROOT)
|
|
age_str = ""
|
|
if status.last_updated:
|
|
age = datetime.now(timezone.utc) - status.last_updated.replace(tzinfo=timezone.utc)
|
|
age_str = f" (updated {age.days}d ago)" if age.days > 0 else " (updated today)"
|
|
print(f" {rel_path}{age_str}")
|
|
|
|
print()
|
|
return 0
|
|
|
|
|
|
def cmd_template(args):
|
|
"""Show file templates."""
|
|
if args.type == "readme":
|
|
print(README_TEMPLATE)
|
|
elif args.type == "status":
|
|
print(STATUS_TEMPLATE)
|
|
else:
|
|
print("README.md Template:")
|
|
print("-" * 40)
|
|
print(README_TEMPLATE[:500] + "...")
|
|
print()
|
|
print("STATUS.md Template:")
|
|
print("-" * 40)
|
|
print(STATUS_TEMPLATE[:500] + "...")
|
|
return 0
|
|
|
|
|
|
# =============================================================================
|
|
# Main
|
|
# =============================================================================
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Directory Status Management")
|
|
subparsers = parser.add_subparsers(dest="command", help="Commands")
|
|
|
|
# sweep
|
|
sweep_parser = subparsers.add_parser("sweep", help="Check all directories")
|
|
sweep_parser.add_argument("--fix", action="store_true", help="Create missing files")
|
|
|
|
# update
|
|
update_parser = subparsers.add_parser("update", help="Update directory status")
|
|
update_parser.add_argument("directory", help="Directory to update")
|
|
update_parser.add_argument("--phase", help="Set phase")
|
|
update_parser.add_argument("--task", help="Add task description")
|
|
update_parser.add_argument("--note", help="Add note")
|
|
update_parser.add_argument("--deps", help="Set dependencies (comma-separated)")
|
|
update_parser.add_argument("--no-checkpoint", action="store_true",
|
|
help="Skip creating checkpoint delta")
|
|
|
|
# init
|
|
init_parser = subparsers.add_parser("init", help="Initialize directory")
|
|
init_parser.add_argument("directory", help="Directory to initialize")
|
|
init_parser.add_argument("--force", action="store_true", help="Overwrite existing")
|
|
|
|
# dashboard
|
|
subparsers.add_parser("dashboard", help="Show status overview")
|
|
|
|
# template
|
|
template_parser = subparsers.add_parser("template", help="Show templates")
|
|
template_parser.add_argument("type", nargs="?", choices=["readme", "status"],
|
|
help="Template type")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.command:
|
|
parser.print_help()
|
|
return 1
|
|
|
|
commands = {
|
|
"sweep": cmd_sweep,
|
|
"update": cmd_update,
|
|
"init": cmd_init,
|
|
"dashboard": cmd_dashboard,
|
|
"template": cmd_template,
|
|
}
|
|
|
|
return commands[args.command](args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|