mod access; mod access_service; mod auth; mod observability; mod tools; mod v1; use axum::{Router, extract::DefaultBodyLimit, routing::get}; use proto::lakehouse::catalog_service_server::CatalogServiceServer; use shared::config::Config; use tower_http::cors::{Any, CorsLayer}; use tower_http::trace::TraceLayer; #[tokio::main] async fn main() { // Load config let config = Config::load_or_default(); // Initialize tracing + observability observability::init_tracing( &config.observability.service_name, &config.observability.exporter, ); tracing::info!("config loaded: gateway={}:{}, storage={}", config.gateway.host, config.gateway.port, config.storage.root); // Secrets provider let secrets = std::sync::Arc::new(shared::secrets::FileSecretsProvider::from_env()); if let Err(e) = secrets.load().await { tracing::warn!("secrets load: {e} — bucket credentials may be missing"); } // Federation: bucket registry (primary + any [[storage.buckets]] entries + rescue) let bucket_registry = std::sync::Arc::new( storaged::registry::BucketRegistry::from_config(&config.storage, secrets.clone()) .await .expect("bucket registry init failed") ); let buckets = bucket_registry.list().await; tracing::info!( "bucket registry: {} buckets [{}], rescue={:?}", buckets.len(), buckets.iter().map(|b| format!("{}:{}", b.name, b.backend)).collect::>().join(", "), bucket_registry.rescue_name(), ); // Back-compat: existing components that expect a single Arc // get the primary bucket's store. As we migrate call sites to be bucket- // aware, they'll call bucket_registry.get(name) directly. let store = bucket_registry.default_store(); // Catalog let registry = catalogd::registry::Registry::new(store.clone()); if let Err(e) = registry.rebuild().await { tracing::warn!("catalog rebuild failed (empty store?): {e}"); } // Query engine with 16GB memory cache let cache = queryd::cache::MemCache::new(16 * 1024 * 1024 * 1024); let engine = queryd::context::QueryEngine::new(registry.clone(), bucket_registry.clone(), cache); // Event journal — append-only mutation log (flush every 100 events) let journal = journald::journal::Journal::new(store.clone(), 100); // Access control let access = access::AccessControl::new(config.auth.enabled); access.register_defaults().await; // Workspace manager for agent-specific overlays let workspace_mgr = queryd::workspace::WorkspaceManager::new(store.clone()); if let Err(e) = workspace_mgr.rebuild().await { tracing::warn!("workspace rebuild: {e}"); } // AI sidecar client let ai_client = aibridge::client::AiClient::new(&config.sidecar.url); // Vector service components — built before the router because both the // /vectors service AND ingestd need the agent handle to enqueue triggers. let index_reg = vectord::index_registry::IndexRegistry::new(store.clone()); let _ = index_reg.rebuild().await; let hnsw = vectord::hnsw::HnswStore::new(); let emb_cache = vectord::embedding_cache::EmbeddingCache::new(store.clone()); // Phase B (federation layer 2): trial journals + promotion files now // live in each index's recorded bucket (IndexMeta.bucket), not always // primary. Journal / registry resolve per-call via the bucket registry. let tj = vectord::trial::TrialJournal::new(bucket_registry.clone(), index_reg.clone()); let pr = vectord::promotion::PromotionRegistry::new(bucket_registry.clone(), index_reg.clone()); let hs = vectord::harness::HarnessStore::new(bucket_registry.clone(), index_reg.clone()); // Phase 19: playbook memory. Load cached state; empty-on-miss is fine, // operators call POST /vectors/playbook_memory/rebuild to populate. let pbm = vectord::playbook_memory::PlaybookMemory::new(store.clone()); let _ = pbm.load_from_storage().await; // Phase 16.2: spawn the autotune agent. When config.agent.enabled=false // this returns a handle that drops triggers silently — no surprise load. let agent_cfg = vectord::agent::AgentConfig { enabled: config.agent.enabled, cycle_interval_secs: config.agent.cycle_interval_secs, cooldown_between_trials_secs: config.agent.cooldown_between_trials_secs, min_recall: config.agent.min_recall, max_trials_per_hour: config.agent.max_trials_per_hour, }; let agent_handle = vectord::agent::spawn( agent_cfg, vectord::agent::AgentDeps { store: store.clone(), ai_client: ai_client.clone(), catalog: registry.clone(), index_registry: index_reg.clone(), hnsw_store: hnsw.clone(), embedding_cache: emb_cache.clone(), trial_journal: tj.clone(), promotion_registry: pr.clone(), harness_store: hs.clone(), }, ); // HTTP router let mut app = Router::new() .route("/health", get(health)) .nest("/storage", storaged::service::router(store.clone()) .merge(storaged::federation_service::router(bucket_registry.clone()))) .nest("/catalog", catalogd::service::router(registry.clone())) .nest("/query", queryd::service::router(engine.clone())) .nest("/ai", aibridge::service::router(ai_client.clone())) ; // Phase E: scheduled ingest let sched_store = ingestd::schedule::ScheduleStore::new(store.clone()); if let Ok(n) = sched_store.rebuild().await { if n > 0 { tracing::info!("rebuilt {n} persisted schedule(s)"); } } let scheduler = ingestd::schedule::Scheduler { store: sched_store.clone(), ingest: ingestd::schedule::SchedulerIngestDeps { store: store.clone(), registry: registry.clone(), buckets: bucket_registry.clone(), agent_handle: Some(agent_handle.clone()), index_registry: index_reg.clone(), }, tick_secs: 10, }; scheduler.spawn(); app = app .nest("/ingest", ingestd::service::router(ingestd::service::IngestState { store: store.clone(), registry: registry.clone(), buckets: bucket_registry.clone(), agent_handle: agent_handle.clone(), index_registry: index_reg.clone(), schedules: sched_store, })) .nest("/vectors", vectord::service::router(vectord::service::VectorState { store: store.clone(), ai_client: ai_client.clone(), job_tracker: vectord::jobs::JobTracker::new(), index_registry: index_reg.clone(), hnsw_store: hnsw, embedding_cache: emb_cache, trial_journal: tj, harness_store: hs, catalog: registry.clone(), promotion_registry: pr, agent_handle, bucket_registry: bucket_registry.clone(), active_profile: std::sync::Arc::new(tokio::sync::RwLock::new(None)), lance: vectord::lance_backend::LanceRegistry::new( bucket_registry.clone(), index_reg.clone(), ), playbook_memory: pbm, embed_semaphore: std::sync::Arc::new(tokio::sync::Semaphore::new(1)), })) .nest("/workspaces", queryd::workspace_service::router(workspace_mgr)) .nest("/journal", journald::service::router(journal)) .nest("/access", access_service::router(access)) .nest("/tools", tools::service::router({ let tool_reg = tools::registry::ToolRegistry::new_with_defaults(); tool_reg.register_defaults().await; tools::ToolState { registry: tool_reg, query_fn: tools::QueryExecutor::new(engine.clone()), } })) // Phase 38 — Universal API skeleton. Thin OpenAI-compatible // surface over the existing aibridge → Ollama path. Future // phases add provider adapters (39), routing engine (40), // session state (41), etc. All without changing this mount. .nest("/v1", v1::router(v1::V1State { ai_client: ai_client.clone(), usage: std::sync::Arc::new(tokio::sync::RwLock::new(v1::Usage::default())), // Phase 39 first slice — Ollama Cloud adapter. Key resolved // from OLLAMA_CLOUD_KEY env, then /root/llm_team_config.json, // then OLLAMA_CLOUD_API_KEY env. None = cloud routes 503. ollama_cloud_key: { let k = v1::ollama_cloud::resolve_cloud_key(); if k.is_some() { tracing::info!("v1: Ollama Cloud key loaded — /v1/chat provider=ollama_cloud enabled"); } else { tracing::warn!("v1: no Ollama Cloud key in env or /root/llm_team_config.json — cloud routes will 503"); } k }, // Phase 40 early deliverable — Langfuse trace emitter. // Defaults match mcp-server/tracing.ts conventions so // gateway traces land in the same staffing project. langfuse: { let c = v1::langfuse_trace::LangfuseClient::from_env_or_defaults(); if c.is_some() { tracing::info!("v1: Langfuse tracing enabled — /v1/chat calls will appear at localhost:3001"); } else { tracing::warn!("v1: Langfuse keys missing — /v1/chat calls will not be traced"); } c }, })); // Auth middleware (if enabled) if config.auth.enabled { if let Some(ref key) = config.auth.api_key { tracing::info!("API key auth enabled"); let api_key = auth::ApiKey(key.clone()); app = app.layer(axum::Extension(api_key)); // Note: auth middleware applied per-route in production // For now, the ApiKey extension is available for handlers to check } else { tracing::warn!("auth enabled but no api_key set — all requests allowed"); } } app = app .layer(DefaultBodyLimit::max(512 * 1024 * 1024)) // 512MB — supports 500K worker CSV .layer(CorsLayer::new() .allow_origin(Any) .allow_methods(Any) .allow_headers(Any)) .layer(TraceLayer::new_for_http()); // File watcher — auto-ingest from ./inbox let _watcher = ingestd::watcher::spawn_watcher( ingestd::watcher::WatchConfig::default(), store.clone(), registry.clone(), ); // Start gRPC server on port+1 let grpc_port = config.gateway.port + 1; let catalog_grpc = catalogd::grpc::CatalogGrpc::new(registry); let grpc_addr = format!("{}:{}", config.gateway.host, grpc_port).parse().unwrap(); tokio::spawn(async move { tracing::info!("gRPC server listening on {grpc_addr}"); tonic::transport::Server::builder() .add_service(CatalogServiceServer::new(catalog_grpc)) .serve(grpc_addr) .await .expect("gRPC server failed"); }); // Start HTTP server let http_addr = format!("{}:{}", config.gateway.host, config.gateway.port); tracing::info!("HTTP gateway listening on {http_addr}"); let listener = tokio::net::TcpListener::bind(&http_addr).await.unwrap(); axum::serve(listener, app).await.unwrap(); } async fn health() -> &'static str { "lakehouse ok" }