#!/usr/bin/env python3 """ Directory Status Management System =================================== Manages README.md and STATUS.md files across the agent-governance project. Ensures every subdirectory has proper documentation and status tracking. Usage: python status.py sweep [--fix] python status.py update --phase [--task ] python status.py init python status.py dashboard python status.py template [readme|status] """ import os import sys import json import argparse from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Tuple from dataclasses import dataclass, field from enum import Enum # Project root PROJECT_ROOT = Path("/opt/agent-governance") # Directories to skip (data/generated content) SKIP_DIRS = { "__pycache__", "node_modules", ".git", "logs", "storage", "dragonfly-data", "credentials", "workspace", ".claude", } # Directories that are data-only (no README needed) DATA_DIRS = { "logs", "storage", "dragonfly-data", "credentials", "workspace", "packages", # evidence packages are generated } class StatusPhase(str, Enum): """Status phases for directories.""" COMPLETE = "complete" IN_PROGRESS = "in_progress" BLOCKED = "blocked" NEEDS_REVIEW = "needs_review" NOT_STARTED = "not_started" STATUS_ICONS = { StatusPhase.COMPLETE: "", StatusPhase.IN_PROGRESS: "", StatusPhase.BLOCKED: "", StatusPhase.NEEDS_REVIEW: "", StatusPhase.NOT_STARTED: "", } @dataclass class DirectoryStatus: """Status information for a directory.""" path: Path has_readme: bool = False has_status: bool = False phase: StatusPhase = StatusPhase.NOT_STARTED last_updated: Optional[datetime] = None tasks: List[Dict] = field(default_factory=list) dependencies: List[str] = field(default_factory=list) issues: List[str] = field(default_factory=list) # ============================================================================= # Templates # ============================================================================= README_TEMPLATE = '''# {title} > {purpose} ## Overview {overview} ## Key Files | File | Description | |------|-------------| {files_table} ## Interfaces / APIs {interfaces} ## Status {status_badge} See [STATUS.md](./STATUS.md) for detailed progress tracking. ## Architecture Reference Part of the [Agent Governance System](/opt/agent-governance/docs/ARCHITECTURE.md). Parent: [{parent_name}]({parent_path}) --- *Last updated: {timestamp}* ''' STATUS_TEMPLATE = '''# Status: {title} ## Current Phase {phase_badge} ## Tasks | Status | Task | Updated | |--------|------|---------| {tasks_table} ## Dependencies {dependencies} ## Issues / Blockers {issues} ## Activity Log {activity_log} --- *Last updated: {timestamp}* ''' ACTIVITY_ENTRY_TEMPLATE = '''### {timestamp} - **Phase**: {phase} - **Action**: {action} - **Details**: {details} ''' def get_readme_template() -> str: return README_TEMPLATE def get_status_template() -> str: return STATUS_TEMPLATE # ============================================================================= # Directory Analysis # ============================================================================= def should_skip_dir(dir_path: Path) -> bool: """Check if directory should be skipped.""" name = dir_path.name if name.startswith("."): return True if name in SKIP_DIRS: return True # Skip evidence package subdirs (generated) if "evd-" in name: return True return False def is_data_dir(dir_path: Path) -> bool: """Check if directory is data-only.""" return dir_path.name in DATA_DIRS def get_all_directories() -> List[Path]: """Get all directories that should have README/STATUS.""" dirs = [] for root, subdirs, files in os.walk(PROJECT_ROOT): root_path = Path(root) # Filter subdirs in-place to skip certain directories subdirs[:] = [d for d in subdirs if not should_skip_dir(root_path / d)] # Skip data directories if is_data_dir(root_path): continue dirs.append(root_path) return sorted(dirs) def analyze_directory(dir_path: Path) -> DirectoryStatus: """Analyze a directory's status files.""" status = DirectoryStatus(path=dir_path) readme_path = dir_path / "README.md" status_path = dir_path / "STATUS.md" alt_status_path = dir_path / "README.status.md" status.has_readme = readme_path.exists() status.has_status = status_path.exists() or alt_status_path.exists() # Parse STATUS.md if exists actual_status_path = status_path if status_path.exists() else alt_status_path if actual_status_path.exists(): try: content = actual_status_path.read_text() status.phase = parse_phase_from_status(content) status.last_updated = parse_timestamp_from_status(content) status.tasks = parse_tasks_from_status(content) status.dependencies = parse_dependencies_from_status(content) status.issues = parse_issues_from_status(content) except Exception: pass return status def parse_phase_from_status(content: str) -> StatusPhase: """Extract phase from STATUS.md content.""" content_lower = content.lower() if "complete" in content_lower and ("phase" in content_lower or "status" in content_lower): if "in_progress" not in content_lower and "in progress" not in content_lower: return StatusPhase.COMPLETE if "in_progress" in content_lower or "in progress" in content_lower: return StatusPhase.IN_PROGRESS if "blocked" in content_lower: return StatusPhase.BLOCKED if "needs_review" in content_lower or "needs review" in content_lower: return StatusPhase.NEEDS_REVIEW return StatusPhase.NOT_STARTED def parse_timestamp_from_status(content: str) -> Optional[datetime]: """Extract last updated timestamp from STATUS.md.""" import re # Look for ISO format timestamps pattern = r'(\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2})' matches = re.findall(pattern, content) if matches: try: ts = matches[-1].replace(" ", "T") return datetime.fromisoformat(ts) except: pass return None def parse_tasks_from_status(content: str) -> List[Dict]: """Extract tasks from STATUS.md.""" tasks = [] import re # Look for task table rows pattern = r'\|\s*([x\- ✓✗])\s*\|\s*([^|]+)\|' for match in re.finditer(pattern, content, re.IGNORECASE): status_char = match.group(1).strip() task_text = match.group(2).strip() if task_text and task_text != "Task": # Skip header done = status_char.lower() in ['x', '✓', 'done', 'complete'] tasks.append({"task": task_text, "done": done}) return tasks def parse_dependencies_from_status(content: str) -> List[str]: """Extract dependencies from STATUS.md.""" deps = [] in_deps_section = False for line in content.split('\n'): if '## Dependencies' in line or '## Depends' in line: in_deps_section = True continue if in_deps_section: if line.startswith('##'): break if line.strip().startswith('-'): deps.append(line.strip()[1:].strip()) return deps def parse_issues_from_status(content: str) -> List[str]: """Extract issues/blockers from STATUS.md.""" issues = [] in_issues_section = False for line in content.split('\n'): if '## Issues' in line or '## Blockers' in line: in_issues_section = True continue if in_issues_section: if line.startswith('##'): break if line.strip().startswith('-'): issues.append(line.strip()[1:].strip()) return issues # ============================================================================= # File Generation # ============================================================================= def get_directory_purpose(dir_path: Path) -> str: """Get a purpose description for a directory based on its name/location.""" purposes = { "agents": "Agent implementations and configurations", "analytics": "Learning analytics and pattern detection", "bin": "CLI tools and executable scripts", "checkpoint": "Context checkpoint and session management", "docs": "System documentation and architecture specs", "evidence": "Audit evidence and execution artifacts", "integrations": "External service integrations (GitHub, Slack)", "inventory": "Infrastructure inventory management", "ledger": "SQLite audit ledger and transaction logs", "lib": "Shared library code and utilities", "orchestrator": "Multi-agent orchestration system", "pipeline": "Pipeline DSL, schemas, and templates", "preflight": "Pre-execution validation and checks", "runtime": "Runtime governance and agent lifecycle", "sandbox": "Sandboxed execution environments", "schemas": "JSON schemas for validation", "teams": "Hierarchical team framework", "testing": "Test utilities and helpers", "tests": "Test suites and test infrastructure", "ui": "User interface components", "wrappers": "Tool wrappers for sandboxed execution", "tier0-agent": "Observer-tier agent (read-only)", "tier1-agent": "Executor-tier agent (infrastructure changes)", "multi-agent": "Multi-agent coordination demos", "llm-planner": "LLM-based planning agent (Python)", "llm-planner-ts": "LLM-based planning agent (TypeScript)", "chaos": "Chaos testing framework", "multi-agent-chaos": "Multi-agent chaos test suite", "github": "GitHub integration module", "slack": "Slack integration module", "common": "Common integration utilities", "framework": "Team framework core implementation", "templates": "Configuration and pipeline templates", "examples": "Example configurations and demos", "schemas": "JSON schema definitions", "mocks": "Mock implementations for testing", "unit": "Unit tests", "integration": "Integration tests", "scenarios": "Test scenarios", "governance": "Governance system tests", "terraform": "Terraform sandbox and examples", "ansible": "Ansible sandbox and examples", } name = dir_path.name if name in purposes: return purposes[name] # Check parent context parent = dir_path.parent.name if parent in purposes: return f"{purposes[parent]} - {name} submodule" return f"Component of the Agent Governance System" def get_key_files(dir_path: Path) -> List[Tuple[str, str]]: """Get key files in a directory with descriptions.""" files = [] # Prioritize certain file types priority_patterns = [ ("*.py", "Python module"), ("*.ts", "TypeScript module"), ("*.js", "JavaScript module"), ("*.yaml", "Configuration"), ("*.yml", "Configuration"), ("*.json", "Data/Schema"), ("*.sh", "Shell script"), ("*.sql", "Database schema"), ("*.md", "Documentation"), ] seen = set() for pattern, desc in priority_patterns: for f in dir_path.glob(pattern): if f.name not in seen and not f.name.startswith('.'): if f.name not in ('README.md', 'STATUS.md', 'README.status.md'): files.append((f.name, f"{desc}")) seen.add(f.name) return files[:10] # Limit to top 10 def generate_readme(dir_path: Path, phase: StatusPhase = StatusPhase.NOT_STARTED) -> str: """Generate README.md content for a directory.""" name = dir_path.name title = name.replace("-", " ").replace("_", " ").title() purpose = get_directory_purpose(dir_path) # Get key files key_files = get_key_files(dir_path) if key_files: files_table = "\n".join(f"| `{f}` | {d} |" for f, d in key_files) else: files_table = "| *No files yet* | |" # Get parent info parent = dir_path.parent if parent == PROJECT_ROOT: parent_name = "Project Root" parent_path = "/opt/agent-governance" else: parent_name = parent.name.replace("-", " ").title() parent_path = f".." # Status badge icon = STATUS_ICONS.get(phase, "") phase_label = phase.value.replace("_", " ").title() status_badge = f"**{icon} {phase_label}**" # Overview - try to be smart about it overview = f"This directory contains {purpose.lower()}." # Interfaces placeholder interfaces = "*Document any APIs, CLI commands, or interfaces here.*" return README_TEMPLATE.format( title=title, purpose=purpose, overview=overview, files_table=files_table, interfaces=interfaces, status_badge=status_badge, parent_name=parent_name, parent_path=parent_path, timestamp=datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC"), ) def generate_status(dir_path: Path, phase: StatusPhase = StatusPhase.NOT_STARTED, tasks: List[Dict] = None, dependencies: List[str] = None, issues: List[str] = None, action: str = "Initialized") -> str: """Generate STATUS.md content for a directory.""" name = dir_path.name title = name.replace("-", " ").replace("_", " ").title() # Phase badge icon = STATUS_ICONS.get(phase, "") phase_label = phase.value.replace("_", " ").upper() phase_badge = f"**{icon} {phase_label}**" # Tasks table tasks = tasks or [] if tasks: tasks_table = "\n".join( f"| {'✓' if t.get('done') else '☐'} | {t['task']} | {t.get('updated', 'N/A')} |" for t in tasks ) else: tasks_table = "| ☐ | *No tasks defined* | - |" # Dependencies dependencies = dependencies or [] if dependencies: deps_text = "\n".join(f"- {d}" for d in dependencies) else: deps_text = "*No external dependencies.*" # Issues issues = issues or [] if issues: issues_text = "\n".join(f"- {i}" for i in issues) else: issues_text = "*No current issues or blockers.*" # Activity log timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") activity_log = ACTIVITY_ENTRY_TEMPLATE.format( timestamp=timestamp, phase=phase_label, action=action, details="Status tracking initialized for this directory." ) return STATUS_TEMPLATE.format( title=title, phase_badge=phase_badge, tasks_table=tasks_table, dependencies=deps_text, issues=issues_text, activity_log=activity_log, timestamp=timestamp, ) # ============================================================================= # Commands # ============================================================================= def cmd_sweep(args): """Check all directories for README/STATUS files.""" dirs = get_all_directories() missing_readme = [] missing_status = [] outdated = [] print("=" * 60) print("DIRECTORY STATUS SWEEP") print("=" * 60) print(f"Scanning {len(dirs)} directories...\n") for dir_path in dirs: status = analyze_directory(dir_path) rel_path = dir_path.relative_to(PROJECT_ROOT) issues = [] if not status.has_readme: issues.append("missing README.md") missing_readme.append(dir_path) if not status.has_status: issues.append("missing STATUS.md") missing_status.append(dir_path) if status.last_updated: age = datetime.now(timezone.utc) - status.last_updated.replace(tzinfo=timezone.utc) if age.days > 7: issues.append(f"outdated ({age.days} days)") outdated.append(dir_path) if issues: print(f" {rel_path}: {', '.join(issues)}") print() print("-" * 60) print(f"Missing README.md: {len(missing_readme)}") print(f"Missing STATUS.md: {len(missing_status)}") print(f"Outdated (>7 days): {len(outdated)}") print("-" * 60) if args.fix and (missing_readme or missing_status): print("\nFixing missing files...") for dir_path in set(missing_readme + missing_status): rel_path = dir_path.relative_to(PROJECT_ROOT) readme_path = dir_path / "README.md" status_path = dir_path / "STATUS.md" if not readme_path.exists(): readme_path.write_text(generate_readme(dir_path)) print(f" Created: {rel_path}/README.md") if not status_path.exists(): status_path.write_text(generate_status(dir_path)) print(f" Created: {rel_path}/STATUS.md") print("\nDone!") elif missing_readme or missing_status: print("\nRun with --fix to create missing files.") return 0 def cmd_update(args): """Update status for a directory.""" dir_path = Path(args.directory).resolve() if not dir_path.exists(): print(f"Error: Directory not found: {dir_path}") return 1 status_path = dir_path / "STATUS.md" # Parse phase phase = StatusPhase.IN_PROGRESS if args.phase: try: phase = StatusPhase(args.phase.lower().replace(" ", "_")) except ValueError: print(f"Error: Invalid phase '{args.phase}'") print(f"Valid phases: {', '.join(p.value for p in StatusPhase)}") return 1 # Read existing or create new if status_path.exists(): content = status_path.read_text() # Update phase in existing content timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") # Add new activity log entry new_entry = ACTIVITY_ENTRY_TEMPLATE.format( timestamp=timestamp, phase=phase.value.upper(), action=args.task or args.note or "Status update", details=args.note or f"Phase updated to {phase.value}", ) # Update phase badge icon = STATUS_ICONS.get(phase, "") phase_label = phase.value.replace("_", " ").upper() new_badge = f"**{icon} {phase_label}**" import re content = re.sub(r'\*\*[✅🚧❗⚠️⬜] [A-Z_]+\*\*', new_badge, content) # Insert new activity entry if "## Activity Log" in content: content = content.replace( "## Activity Log\n", f"## Activity Log\n\n{new_entry}" ) # Update timestamp content = re.sub( r'\*Last updated: [^*]+\*', f'*Last updated: {timestamp}*', content ) status_path.write_text(content) else: # Create new STATUS.md content = generate_status( dir_path, phase=phase, action=args.task or "Initialized", ) status_path.write_text(content) rel_path = dir_path.relative_to(PROJECT_ROOT) if str(dir_path).startswith(str(PROJECT_ROOT)) else dir_path print(f"Updated: {rel_path}/STATUS.md") print(f" Phase: {phase.value}") if args.task: print(f" Task: {args.task}") if args.note: print(f" Note: {args.note}") # Create lightweight checkpoint delta unless disabled if not getattr(args, 'no_checkpoint', False): try: create_checkpoint_delta(str(rel_path), phase.value, args.task or args.note) except Exception as e: # Don't fail the status update if checkpoint fails print(f" (Checkpoint delta skipped: {e})") return 0 def create_checkpoint_delta(dir_path: str, phase: str, action: str = None): """ Create a lightweight checkpoint recording the status change. Integrates with the checkpoint system for state tracking. """ import subprocess # Build notes for the checkpoint action_str = f": {action}" if action else "" notes = f"Status update - {dir_path} -> {phase}{action_str}" # Call checkpoint command to create delta result = subprocess.run( ["python3", "/opt/agent-governance/checkpoint/checkpoint.py", "now", "--notes", notes], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: # Extract checkpoint ID from output for line in result.stdout.split('\n'): if line.startswith("ID:"): ckpt_id = line.split(":")[1].strip() print(f" Checkpoint: {ckpt_id}") break def cmd_init(args): """Initialize README/STATUS in a directory.""" dir_path = Path(args.directory).resolve() if not dir_path.exists(): print(f"Error: Directory not found: {dir_path}") return 1 readme_path = dir_path / "README.md" status_path = dir_path / "STATUS.md" rel_path = dir_path.relative_to(PROJECT_ROOT) if str(dir_path).startswith(str(PROJECT_ROOT)) else dir_path created = [] if not readme_path.exists() or args.force: readme_path.write_text(generate_readme(dir_path)) created.append("README.md") if not status_path.exists() or args.force: status_path.write_text(generate_status(dir_path)) created.append("STATUS.md") if created: print(f"Initialized {rel_path}:") for f in created: print(f" Created: {f}") else: print(f"Files already exist in {rel_path}. Use --force to overwrite.") return 0 def cmd_dashboard(args): """Show status overview of all directories.""" dirs = get_all_directories() by_phase = {phase: [] for phase in StatusPhase} for dir_path in dirs: status = analyze_directory(dir_path) by_phase[status.phase].append(status) print("=" * 70) print("PROJECT STATUS DASHBOARD") print("=" * 70) print() total = len(dirs) complete = len(by_phase[StatusPhase.COMPLETE]) in_progress = len(by_phase[StatusPhase.IN_PROGRESS]) blocked = len(by_phase[StatusPhase.BLOCKED]) needs_review = len(by_phase[StatusPhase.NEEDS_REVIEW]) not_started = len(by_phase[StatusPhase.NOT_STARTED]) # Progress bar pct_complete = (complete / total * 100) if total > 0 else 0 bar_width = 40 filled = int(bar_width * complete / total) if total > 0 else 0 bar = "█" * filled + "░" * (bar_width - filled) print(f"Progress: [{bar}] {pct_complete:.1f}%") print() # Summary counts print(f" ✅ Complete: {complete:3d}") print(f" 🚧 In Progress: {in_progress:3d}") print(f" ❗ Blocked: {blocked:3d}") print(f" ⚠️ Needs Review: {needs_review:3d}") print(f" ⬜ Not Started: {not_started:3d}") print(f" ─────────────────────") print(f" Total: {total:3d}") print() # Show non-complete directories if in_progress or blocked or needs_review: print("-" * 70) print("ACTIVE DIRECTORIES:") print("-" * 70) for phase in [StatusPhase.BLOCKED, StatusPhase.IN_PROGRESS, StatusPhase.NEEDS_REVIEW]: if by_phase[phase]: icon = STATUS_ICONS[phase] print(f"\n{icon} {phase.value.replace('_', ' ').upper()}:") for status in by_phase[phase]: rel_path = status.path.relative_to(PROJECT_ROOT) age_str = "" if status.last_updated: age = datetime.now(timezone.utc) - status.last_updated.replace(tzinfo=timezone.utc) age_str = f" (updated {age.days}d ago)" if age.days > 0 else " (updated today)" print(f" {rel_path}{age_str}") print() return 0 def cmd_template(args): """Show file templates.""" if args.type == "readme": print(README_TEMPLATE) elif args.type == "status": print(STATUS_TEMPLATE) else: print("README.md Template:") print("-" * 40) print(README_TEMPLATE[:500] + "...") print() print("STATUS.md Template:") print("-" * 40) print(STATUS_TEMPLATE[:500] + "...") return 0 # ============================================================================= # Main # ============================================================================= def main(): parser = argparse.ArgumentParser(description="Directory Status Management") subparsers = parser.add_subparsers(dest="command", help="Commands") # sweep sweep_parser = subparsers.add_parser("sweep", help="Check all directories") sweep_parser.add_argument("--fix", action="store_true", help="Create missing files") # update update_parser = subparsers.add_parser("update", help="Update directory status") update_parser.add_argument("directory", help="Directory to update") update_parser.add_argument("--phase", help="Set phase") update_parser.add_argument("--task", help="Add task description") update_parser.add_argument("--note", help="Add note") update_parser.add_argument("--deps", help="Set dependencies (comma-separated)") update_parser.add_argument("--no-checkpoint", action="store_true", help="Skip creating checkpoint delta") # init init_parser = subparsers.add_parser("init", help="Initialize directory") init_parser.add_argument("directory", help="Directory to initialize") init_parser.add_argument("--force", action="store_true", help="Overwrite existing") # dashboard subparsers.add_parser("dashboard", help="Show status overview") # template template_parser = subparsers.add_parser("template", help="Show templates") template_parser.add_argument("type", nargs="?", choices=["readme", "status"], help="Template type") args = parser.parse_args() if not args.command: parser.print_help() return 1 commands = { "sweep": cmd_sweep, "update": cmd_update, "init": cmd_init, "dashboard": cmd_dashboard, "template": cmd_template, } return commands[args.command](args) if __name__ == "__main__": sys.exit(main())