mod access; mod access_service; mod auth; mod execution_loop; mod observability; mod tools; mod v1; use axum::{Router, extract::DefaultBodyLimit, routing::get}; use proto::lakehouse::catalog_service_server::CatalogServiceServer; use shared::config::Config; use tower_http::cors::{Any, CorsLayer}; use tower_http::trace::TraceLayer; #[tokio::main] async fn main() { // Load config let config = Config::load_or_default(); // Initialize tracing + observability observability::init_tracing( &config.observability.service_name, &config.observability.exporter, ); tracing::info!("config loaded: gateway={}:{}, storage={}", config.gateway.host, config.gateway.port, config.storage.root); // Secrets provider let secrets = std::sync::Arc::new(shared::secrets::FileSecretsProvider::from_env()); if let Err(e) = secrets.load().await { tracing::warn!("secrets load: {e} — bucket credentials may be missing"); } // Federation: bucket registry (primary + any [[storage.buckets]] entries + rescue) let bucket_registry = std::sync::Arc::new( storaged::registry::BucketRegistry::from_config(&config.storage, secrets.clone()) .await .expect("bucket registry init failed") ); let buckets = bucket_registry.list().await; tracing::info!( "bucket registry: {} buckets [{}], rescue={:?}", buckets.len(), buckets.iter().map(|b| format!("{}:{}", b.name, b.backend)).collect::>().join(", "), bucket_registry.rescue_name(), ); // Back-compat: existing components that expect a single Arc // get the primary bucket's store. As we migrate call sites to be bucket- // aware, they'll call bucket_registry.get(name) directly. let store = bucket_registry.default_store(); // Catalog let registry = catalogd::registry::Registry::new(store.clone()); if let Err(e) = registry.rebuild().await { tracing::warn!("catalog rebuild failed (empty store?): {e}"); } // Query engine with 16GB memory cache let cache = queryd::cache::MemCache::new(16 * 1024 * 1024 * 1024); let engine = queryd::context::QueryEngine::new(registry.clone(), bucket_registry.clone(), cache); // Event journal — append-only mutation log (flush every 100 events) let journal = journald::journal::Journal::new(store.clone(), 100); // Access control let access = access::AccessControl::new(config.auth.enabled); access.register_defaults().await; // Phase 42 — file-backed truth rules. Probes the `truth/` directory // at repo root (or $LAKEHOUSE_TRUTH_DIR override) and logs how many // rules load. Current request paths still build their own stores // via truth::default_truth_store() / truth::sql_query_guard_store(); // the composed-at-boot store gets plumbed through V1State in a // follow-up. This boot probe catches parse errors + duplicate-ID // collisions early rather than at first request. { let truth_dir = std::env::var("LAKEHOUSE_TRUTH_DIR") .unwrap_or_else(|_| "/home/profit/lakehouse/truth".to_string()); if std::path::Path::new(&truth_dir).exists() { let mut probe_store = truth::default_truth_store(); match truth::loader::load_from_dir(&mut probe_store, &truth_dir) { Ok(n) => tracing::info!("truth: loaded {n} file-backed rule(s) from {truth_dir}"), Err(e) => tracing::warn!("truth: failed to load rules from {truth_dir}: {e}"), } } else { tracing::debug!("truth: no rule dir at {truth_dir}, skipping file-backed load"); } } // Workspace manager for agent-specific overlays let workspace_mgr = queryd::workspace::WorkspaceManager::new(store.clone()); if let Err(e) = workspace_mgr.rebuild().await { tracing::warn!("workspace rebuild: {e}"); } // AI sidecar clients — Phase 44 part 3 (2026-04-27). // // Two flavors of the same client: // - `ai_client_direct` posts directly to ${sidecar}/generate. Used // inside the gateway by V1State + the legacy /ai proxy. These // call sites are themselves the implementation of /v1/chat // (or its sidecar shim), so routing them through /v1/chat // would self-loop. // - `ai_client_observable` posts via ${gateway}/v1/chat with // provider="ollama". Used by vectord modules (autotune agent, // /vectors service) so their LLM calls land in /v1/usage and // Langfuse traces. Adds one localhost HTTP hop per call (~ms); // accepted for the observability gain. // // The gateway can call its own /v1/chat over localhost during // boot's transient period because we don't fire any LLM calls // until the listener is up — the observable client is just // configured here, not exercised. let ai_client_direct = aibridge::client::AiClient::new(&config.sidecar.url); let gateway_self_url = format!("http://{}:{}", config.gateway.host, config.gateway.port); let ai_client_observable = aibridge::client::AiClient::new_with_gateway( &config.sidecar.url, &gateway_self_url, ); // Backwards-compat alias for the (many) existing references in this file. // Defaults to direct so the existing wiring (V1State, /ai proxy) // keeps its non-self-loop transport. New vectord wiring below // explicitly uses ai_client_observable. let ai_client = ai_client_direct.clone(); // Vector service components — built before the router because both the // /vectors service AND ingestd need the agent handle to enqueue triggers. let index_reg = vectord::index_registry::IndexRegistry::new(store.clone()); let _ = index_reg.rebuild().await; let hnsw = vectord::hnsw::HnswStore::new(); let emb_cache = vectord::embedding_cache::EmbeddingCache::new(store.clone()); // Phase B (federation layer 2): trial journals + promotion files now // live in each index's recorded bucket (IndexMeta.bucket), not always // primary. Journal / registry resolve per-call via the bucket registry. let tj = vectord::trial::TrialJournal::new(bucket_registry.clone(), index_reg.clone()); let pr = vectord::promotion::PromotionRegistry::new(bucket_registry.clone(), index_reg.clone()); let hs = vectord::harness::HarnessStore::new(bucket_registry.clone(), index_reg.clone()); // Phase 19: playbook memory. Load cached state; empty-on-miss is fine, // operators call POST /vectors/playbook_memory/rebuild to populate. let pbm = vectord::playbook_memory::PlaybookMemory::new(store.clone()); let _ = pbm.load_from_storage().await; // Pathway memory — consensus-designed sidecar for full-context // backtracking + hot-swap of successful review pathways. Same // load-on-boot pattern as playbook_memory: empty state is fine, // operators start populating via scrum_master_pipeline.ts. let pwm = vectord::pathway_memory::PathwayMemory::new(store.clone()); let _ = pwm.load_from_storage().await; // Phase 16.2: spawn the autotune agent. When config.agent.enabled=false // this returns a handle that drops triggers silently — no surprise load. let agent_cfg = vectord::agent::AgentConfig { enabled: config.agent.enabled, cycle_interval_secs: config.agent.cycle_interval_secs, cooldown_between_trials_secs: config.agent.cooldown_between_trials_secs, min_recall: config.agent.min_recall, max_trials_per_hour: config.agent.max_trials_per_hour, }; let agent_handle = vectord::agent::spawn( agent_cfg, vectord::agent::AgentDeps { store: store.clone(), // Observable: autotune agent's LLM calls go through // /v1/chat for /v1/usage + Langfuse visibility. ai_client: ai_client_observable.clone(), catalog: registry.clone(), index_registry: index_reg.clone(), hnsw_store: hnsw.clone(), embedding_cache: emb_cache.clone(), trial_journal: tj.clone(), promotion_registry: pr.clone(), harness_store: hs.clone(), }, ); // HTTP router let mut app = Router::new() .route("/health", get(health)) .nest("/storage", storaged::service::router(store.clone()) .merge(storaged::federation_service::router(bucket_registry.clone()))) .nest("/catalog", catalogd::service::router(registry.clone())) .nest("/query", queryd::service::router(engine.clone())) .nest("/ai", aibridge::service::router(ai_client.clone())) ; // Phase E: scheduled ingest let sched_store = ingestd::schedule::ScheduleStore::new(store.clone()); if let Ok(n) = sched_store.rebuild().await { if n > 0 { tracing::info!("rebuilt {n} persisted schedule(s)"); } } let scheduler = ingestd::schedule::Scheduler { store: sched_store.clone(), ingest: ingestd::schedule::SchedulerIngestDeps { store: store.clone(), registry: registry.clone(), buckets: bucket_registry.clone(), agent_handle: Some(agent_handle.clone()), index_registry: index_reg.clone(), }, tick_secs: 10, }; scheduler.spawn(); app = app .nest("/ingest", ingestd::service::router(ingestd::service::IngestState { store: store.clone(), registry: registry.clone(), buckets: bucket_registry.clone(), agent_handle: agent_handle.clone(), index_registry: index_reg.clone(), schedules: sched_store, // P9-001 fix 2026-04-23: journal reference flows into ingest so // successful uploads emit a record_ingest event. Journal is Clone // (Arc inside) so the /journal nest below still sees the // same buffer + persistence. journal: Some(journal.clone()), })) .nest("/vectors", vectord::service::router(vectord::service::VectorState { store: store.clone(), // Observable: /vectors service's LLM calls (RAG, summary, // playbook synthesis, etc.) flow through /v1/chat. ai_client: ai_client_observable.clone(), job_tracker: vectord::jobs::JobTracker::new(), index_registry: index_reg.clone(), hnsw_store: hnsw, embedding_cache: emb_cache, trial_journal: tj, harness_store: hs, catalog: registry.clone(), promotion_registry: pr, agent_handle, bucket_registry: bucket_registry.clone(), active_profile: std::sync::Arc::new(tokio::sync::RwLock::new(None)), lance: vectord::lance_backend::LanceRegistry::new( bucket_registry.clone(), index_reg.clone(), ), playbook_memory: pbm, pathway_memory: pwm, embed_semaphore: std::sync::Arc::new(tokio::sync::Semaphore::new(1)), })) .nest("/workspaces", queryd::workspace_service::router(workspace_mgr)) .nest("/journal", journald::service::router(journal)) .nest("/access", access_service::router(access)) .nest("/tools", tools::service::router({ let tool_reg = tools::registry::ToolRegistry::new(); tool_reg.register_defaults().await; tools::ToolState { registry: tool_reg, query_fn: tools::QueryExecutor::new(engine.clone()), truth: std::sync::Arc::new(truth::sql_query_guard_store()), } })) // Phase 38 — Universal API skeleton. Thin OpenAI-compatible // surface over the existing aibridge → Ollama path. Future // phases add provider adapters (39), routing engine (40), // session state (41), etc. All without changing this mount. .nest("/v1", v1::router(v1::V1State { ai_client: ai_client.clone(), usage: std::sync::Arc::new(tokio::sync::RwLock::new(v1::Usage::default())), // Phase 39 first slice — Ollama Cloud adapter. Key resolved // from OLLAMA_CLOUD_KEY env, then /root/llm_team_config.json, // then OLLAMA_CLOUD_API_KEY env. None = cloud routes 503. ollama_cloud_key: { let k = v1::ollama_cloud::resolve_cloud_key(); if k.is_some() { tracing::info!("v1: Ollama Cloud key loaded — /v1/chat provider=ollama_cloud enabled"); } else { tracing::warn!("v1: no Ollama Cloud key in env or /root/llm_team_config.json — cloud routes will 503"); } k }, openrouter_key: { // 2026-04-24 free-tier rescue rung for iter 5+. Shares // the LLM Team UI's OPENROUTER_API_KEY so both systems // draw from one quota. let k = v1::openrouter::resolve_openrouter_key(); if k.is_some() { tracing::info!("v1: OpenRouter key loaded — /v1/chat provider=openrouter enabled"); } else { tracing::warn!("v1: no OpenRouter key — openrouter rescue rung will 503"); } k }, gemini_key: { // Phase 40 provider. GEMINI_API_KEY in env or .env. let k = v1::gemini::resolve_gemini_key(); if k.is_some() { tracing::info!("v1: Gemini key loaded — /v1/chat provider=gemini enabled"); } else { tracing::debug!("v1: no Gemini key — provider=gemini will 503"); } k }, claude_key: { // Phase 40 provider. ANTHROPIC_API_KEY in env or .env. let k = v1::claude::resolve_claude_key(); if k.is_some() { tracing::info!("v1: Claude key loaded — /v1/chat provider=claude enabled"); } else { tracing::debug!("v1: no Claude key — provider=claude will 503"); } k }, kimi_key: { // Direct Kimi For Coding (api.kimi.com) — bypasses the // broken-upstream kimi-k2:1t and OpenRouter rate caps. // Key from /etc/lakehouse/kimi.env (KIMI_API_KEY=sk-kimi-…). let k = v1::kimi::resolve_kimi_key(); if k.is_some() { tracing::info!("v1: Kimi key loaded — /v1/chat provider=kimi enabled (model=kimi-for-coding)"); } else { tracing::debug!("v1: no Kimi key — provider=kimi will 503"); } k }, opencode_key: { // OpenCode GO multi-vendor gateway — Claude Opus 4.7, // GPT-5.5-pro, Gemini 3.1-pro, Kimi K2.6, DeepSeek, GLM, // Qwen + free-tier. Key from /etc/lakehouse/opencode.env. let k = v1::opencode::resolve_opencode_key(); if k.is_some() { tracing::info!("v1: OpenCode key loaded — /v1/chat provider=opencode enabled (40 models)"); } else { tracing::debug!("v1: no OpenCode key — provider=opencode will 503"); } k }, validate_workers: { // Load workers_500k.parquet snapshot for /v1/validate. // Path overridable via LH_WORKERS_PARQUET env. Missing // file is non-fatal — validators run schema/PII checks // unaffected; only worker-existence checks fail clean. let path_str = std::env::var("LH_WORKERS_PARQUET") .unwrap_or_else(|_| "/home/profit/lakehouse/data/datasets/workers_500k.parquet".into()); let path = std::path::Path::new(&path_str); if path.exists() { match validator::staffing::parquet_lookup::load_workers_parquet(path) { Ok(lookup) => { tracing::info!("v1: workers parquet loaded from {} — /v1/validate worker-existence checks enabled", path_str); lookup } Err(e) => { tracing::warn!("v1: workers parquet at {} unreadable ({e}) — /v1/validate worker-existence checks will fail Consistency", path_str); std::sync::Arc::new(validator::InMemoryWorkerLookup::new()) } } } else { tracing::warn!("v1: workers parquet at {} not found — /v1/validate worker-existence checks will fail Consistency", path_str); std::sync::Arc::new(validator::InMemoryWorkerLookup::new()) } }, // Phase 40 early deliverable — Langfuse trace emitter. // Defaults match mcp-server/tracing.ts conventions so // gateway traces land in the same staffing project. langfuse: { let c = v1::langfuse_trace::LangfuseClient::from_env_or_defaults(); if c.is_some() { tracing::info!("v1: Langfuse tracing enabled — /v1/chat calls will appear at localhost:3001"); } else { tracing::warn!("v1: Langfuse keys missing — /v1/chat calls will not be traced"); } c }, // Coordinator session JSONL — one row per /v1/iterate // session for offline DuckDB analysis. Cross-runtime // parity with Go-side validatord (commit 1a3a82a). session_log: { let path = &config.gateway.session_log_path; let s = v1::session_log::SessionLogger::from_path(path); if s.is_some() { tracing::info!( "v1: session log enabled — coordinator sessions written to {}", path ); } else { tracing::info!("v1: session log disabled (set [gateway].session_log_path to enable)"); } s }, })); // Auth middleware (if enabled) — P5-001 fix 2026-04-23: // previously only inserted the ApiKey as an extension and never layered // the middleware, so auth.enabled=true enforced nothing. Now wraps the // router with from_fn_with_state, which calls api_key_auth on every // request. /health is exempted inside the middleware (LB probes). if config.auth.enabled { if let Some(ref key) = config.auth.api_key { tracing::info!("API key auth enabled — enforcing on all routes except /health"); let api_key = auth::ApiKey(key.clone()); app = app.layer(axum::middleware::from_fn_with_state( api_key, auth::api_key_auth, )); } else { tracing::warn!("auth enabled but no api_key set — all requests allowed"); } } app = app .layer(DefaultBodyLimit::max(512 * 1024 * 1024)) // 512MB — supports 500K worker CSV .layer(CorsLayer::new() .allow_origin(Any) .allow_methods(Any) .allow_headers(Any)) .layer(TraceLayer::new_for_http()); // File watcher — auto-ingest from ./inbox let _watcher = ingestd::watcher::spawn_watcher( ingestd::watcher::WatchConfig::default(), store.clone(), registry.clone(), ); // Start gRPC server on port+1 let grpc_port = config.gateway.port + 1; let catalog_grpc = catalogd::grpc::CatalogGrpc::new(registry); let grpc_addr = format!("{}:{}", config.gateway.host, grpc_port).parse().unwrap(); tokio::spawn(async move { tracing::info!("gRPC server listening on {grpc_addr}"); tonic::transport::Server::builder() .add_service(CatalogServiceServer::new(catalog_grpc)) .serve(grpc_addr) .await .expect("gRPC server failed"); }); // Start HTTP server let http_addr = format!("{}:{}", config.gateway.host, config.gateway.port); tracing::info!("HTTP gateway listening on {http_addr}"); let listener = tokio::net::TcpListener::bind(&http_addr).await.unwrap(); axum::serve(listener, app).await.unwrap(); } async fn health() -> &'static str { "lakehouse ok" }