root 2f8b347f37
Some checks failed
lakehouse/auditor 11 warnings — see review
pathway_memory: consensus-designed sidecar + hot-swap learning loop
10-probe N=3 consensus (kimi-k2:1t / gpt-oss:120b / qwen3.5:latest /
deepseek-v3.1:671b / qwen3-coder:480b / mistral-large-3:675b /
qwen3.5:397b + 2 stability re-probes; 2 openrouter probes 429'd) locked
the design across three rounds. Full JSON responses in
data/_kb/consensus_reducer_design_{mocq3akn,mocq6pi1,mocqatik}.json.

What it does

Preserves FULL backtrack context per reviewed file (ladder attempts +
latencies + reject reasons, KB chunks with provenance + cosine + rank,
observer signals, context7 bridge hits, sub-pipeline calls, audit
consensus) and indexes them by narrow fingerprint for hot-swap of
proven review pathways.

When scrum reviews a file:
  1. narrow fingerprint = task_class + file_prefix + signal_class
  2. query_hot_swap checks pathway memory for a match that passes
     probation (≥3 replays @ ≥80% success) + audit gate + similarity
     (≥0.90 cosine on normalized-metadata-token embedding)
  3. if hot-swap eligible, recommended model tried first in the ladder
  4. replay outcome reported back, updating the pathway's success_rate
  5. pathways below 0.80 after ≥3 replays retire permanently (sticky)
  6. full PathwayTrace always inserted at end of review — hot-swap
     grows with use, it doesn't bootstrap from nothing

Gate design is load-bearing:
  - narrow fingerprint (6 of 8 consensus models converged on the same
    3-field composition; lock) — enables generalization within crate
  - probation ≥3 replays — binomial tail at 80% is ~5%, below is noise
  - success rate ≥0.80 — mistral + qwen3-coder independently proposed
    this exact threshold across two rounds
  - similarity ≥0.90 — middle of the 0.85/0.95 consensus spread
  - bootstrap: null audit_consensus ALLOWED (auditor → pathway update
    not wired yet; probation + success_rate gates alone enforce safety
    during bootstrap; explicit audit FAIL still blocks)
  - retirement is sticky — prevents oscillation on noise

Files

  + crates/vectord/src/pathway_memory.rs  (new, 600 lines + 18 tests)
    PathwayTrace, LadderAttempt, KbChunkRef, ObserverSignal, BridgeHit,
    SubPipelineCall, AuditConsensus, HotSwapCandidate, PathwayMemory,
    PathwayMemoryStats. 18/18 tests green.
    Cosine + 32-bucket L2-normalized embedding; mirror of TS impl.
  M crates/vectord/src/lib.rs
    pub mod pathway_memory;
  M crates/vectord/src/service.rs
    VectorState grows pathway_memory field;
    4 HTTP handlers (/pathway/insert, /pathway/query,
    /pathway/record_replay, /pathway/stats).
  M crates/gateway/src/main.rs
    Construct PathwayMemory + load from storage on boot,
    wire into VectorState.
  M tests/real-world/scrum_master_pipeline.ts
    Byte-matching TS bucket-hash (verified same bucket indices as
    Rust); pre-ladder hot-swap query; ladder reorder on hit;
    per-attempt latency capture; post-accept trace insert
    (fire-and-forget); replay outcome recording;
    observer /event emits pathway_hot_swap_hit, pathway_similarity,
    rungs_saved per review for the VCP UI.
  M ui/server.ts
    /data/pathway_stats aggregates /vectors/pathway/stats +
    scrum_reviews.jsonl window for the value metric.
  M ui/ui.js
    Three new metric cards:
      · pathway reuse rate (activity: is it firing?)
      · avg rungs saved (value: is it earning its keep?)
      · pathways tracked (stability: retirement = learning)

What's not in this commit (queued)

  - auditor → pathway audit_consensus update wire (explicit audit-fail
    block activates when this lands)
  - bridge_hits + sub_pipeline_calls population from context7 / LLM
    Team extract results (fields wired, callers not yet)
  - replay log (PathwayReplayOutcome {matched_id, succeeded, ts}) as
    a separate jsonl for forensic audit of why specific replays failed

Why > summarization

Summaries discard the causal chain. With this, auditor can verify
citation provenance, applier can distinguish lucky from learned paths,
and the matrix indexing actually stores end-to-end pathways instead of
just RAG chunks — which is what J meant by "why aren't we using it
for everything."

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 05:15:32 -05:00

304 lines
13 KiB
Rust

mod access;
mod access_service;
mod auth;
mod execution_loop;
mod observability;
mod tools;
mod v1;
use axum::{Router, extract::DefaultBodyLimit, routing::get};
use proto::lakehouse::catalog_service_server::CatalogServiceServer;
use shared::config::Config;
use tower_http::cors::{Any, CorsLayer};
use tower_http::trace::TraceLayer;
#[tokio::main]
async fn main() {
// Load config
let config = Config::load_or_default();
// Initialize tracing + observability
observability::init_tracing(
&config.observability.service_name,
&config.observability.exporter,
);
tracing::info!("config loaded: gateway={}:{}, storage={}",
config.gateway.host, config.gateway.port, config.storage.root);
// Secrets provider
let secrets = std::sync::Arc::new(shared::secrets::FileSecretsProvider::from_env());
if let Err(e) = secrets.load().await {
tracing::warn!("secrets load: {e} — bucket credentials may be missing");
}
// Federation: bucket registry (primary + any [[storage.buckets]] entries + rescue)
let bucket_registry = std::sync::Arc::new(
storaged::registry::BucketRegistry::from_config(&config.storage, secrets.clone())
.await
.expect("bucket registry init failed")
);
let buckets = bucket_registry.list().await;
tracing::info!(
"bucket registry: {} buckets [{}], rescue={:?}",
buckets.len(),
buckets.iter().map(|b| format!("{}:{}", b.name, b.backend)).collect::<Vec<_>>().join(", "),
bucket_registry.rescue_name(),
);
// Back-compat: existing components that expect a single Arc<ObjectStore>
// get the primary bucket's store. As we migrate call sites to be bucket-
// aware, they'll call bucket_registry.get(name) directly.
let store = bucket_registry.default_store();
// Catalog
let registry = catalogd::registry::Registry::new(store.clone());
if let Err(e) = registry.rebuild().await {
tracing::warn!("catalog rebuild failed (empty store?): {e}");
}
// Query engine with 16GB memory cache
let cache = queryd::cache::MemCache::new(16 * 1024 * 1024 * 1024);
let engine = queryd::context::QueryEngine::new(registry.clone(), bucket_registry.clone(), cache);
// Event journal — append-only mutation log (flush every 100 events)
let journal = journald::journal::Journal::new(store.clone(), 100);
// Access control
let access = access::AccessControl::new(config.auth.enabled);
access.register_defaults().await;
// Workspace manager for agent-specific overlays
let workspace_mgr = queryd::workspace::WorkspaceManager::new(store.clone());
if let Err(e) = workspace_mgr.rebuild().await {
tracing::warn!("workspace rebuild: {e}");
}
// AI sidecar client
let ai_client = aibridge::client::AiClient::new(&config.sidecar.url);
// Vector service components — built before the router because both the
// /vectors service AND ingestd need the agent handle to enqueue triggers.
let index_reg = vectord::index_registry::IndexRegistry::new(store.clone());
let _ = index_reg.rebuild().await;
let hnsw = vectord::hnsw::HnswStore::new();
let emb_cache = vectord::embedding_cache::EmbeddingCache::new(store.clone());
// Phase B (federation layer 2): trial journals + promotion files now
// live in each index's recorded bucket (IndexMeta.bucket), not always
// primary. Journal / registry resolve per-call via the bucket registry.
let tj = vectord::trial::TrialJournal::new(bucket_registry.clone(), index_reg.clone());
let pr = vectord::promotion::PromotionRegistry::new(bucket_registry.clone(), index_reg.clone());
let hs = vectord::harness::HarnessStore::new(bucket_registry.clone(), index_reg.clone());
// Phase 19: playbook memory. Load cached state; empty-on-miss is fine,
// operators call POST /vectors/playbook_memory/rebuild to populate.
let pbm = vectord::playbook_memory::PlaybookMemory::new(store.clone());
let _ = pbm.load_from_storage().await;
// Pathway memory — consensus-designed sidecar for full-context
// backtracking + hot-swap of successful review pathways. Same
// load-on-boot pattern as playbook_memory: empty state is fine,
// operators start populating via scrum_master_pipeline.ts.
let pwm = vectord::pathway_memory::PathwayMemory::new(store.clone());
let _ = pwm.load_from_storage().await;
// Phase 16.2: spawn the autotune agent. When config.agent.enabled=false
// this returns a handle that drops triggers silently — no surprise load.
let agent_cfg = vectord::agent::AgentConfig {
enabled: config.agent.enabled,
cycle_interval_secs: config.agent.cycle_interval_secs,
cooldown_between_trials_secs: config.agent.cooldown_between_trials_secs,
min_recall: config.agent.min_recall,
max_trials_per_hour: config.agent.max_trials_per_hour,
};
let agent_handle = vectord::agent::spawn(
agent_cfg,
vectord::agent::AgentDeps {
store: store.clone(),
ai_client: ai_client.clone(),
catalog: registry.clone(),
index_registry: index_reg.clone(),
hnsw_store: hnsw.clone(),
embedding_cache: emb_cache.clone(),
trial_journal: tj.clone(),
promotion_registry: pr.clone(),
harness_store: hs.clone(),
},
);
// HTTP router
let mut app = Router::new()
.route("/health", get(health))
.nest("/storage", storaged::service::router(store.clone())
.merge(storaged::federation_service::router(bucket_registry.clone())))
.nest("/catalog", catalogd::service::router(registry.clone()))
.nest("/query", queryd::service::router(engine.clone()))
.nest("/ai", aibridge::service::router(ai_client.clone()))
;
// Phase E: scheduled ingest
let sched_store = ingestd::schedule::ScheduleStore::new(store.clone());
if let Ok(n) = sched_store.rebuild().await {
if n > 0 { tracing::info!("rebuilt {n} persisted schedule(s)"); }
}
let scheduler = ingestd::schedule::Scheduler {
store: sched_store.clone(),
ingest: ingestd::schedule::SchedulerIngestDeps {
store: store.clone(),
registry: registry.clone(),
buckets: bucket_registry.clone(),
agent_handle: Some(agent_handle.clone()),
index_registry: index_reg.clone(),
},
tick_secs: 10,
};
scheduler.spawn();
app = app
.nest("/ingest", ingestd::service::router(ingestd::service::IngestState {
store: store.clone(),
registry: registry.clone(),
buckets: bucket_registry.clone(),
agent_handle: agent_handle.clone(),
index_registry: index_reg.clone(),
schedules: sched_store,
// P9-001 fix 2026-04-23: journal reference flows into ingest so
// successful uploads emit a record_ingest event. Journal is Clone
// (Arc<RwLock> inside) so the /journal nest below still sees the
// same buffer + persistence.
journal: Some(journal.clone()),
}))
.nest("/vectors", vectord::service::router(vectord::service::VectorState {
store: store.clone(),
ai_client: ai_client.clone(),
job_tracker: vectord::jobs::JobTracker::new(),
index_registry: index_reg.clone(),
hnsw_store: hnsw,
embedding_cache: emb_cache,
trial_journal: tj,
harness_store: hs,
catalog: registry.clone(),
promotion_registry: pr,
agent_handle,
bucket_registry: bucket_registry.clone(),
active_profile: std::sync::Arc::new(tokio::sync::RwLock::new(None)),
lance: vectord::lance_backend::LanceRegistry::new(
bucket_registry.clone(), index_reg.clone(),
),
playbook_memory: pbm,
pathway_memory: pwm,
embed_semaphore: std::sync::Arc::new(tokio::sync::Semaphore::new(1)),
}))
.nest("/workspaces", queryd::workspace_service::router(workspace_mgr))
.nest("/journal", journald::service::router(journal))
.nest("/access", access_service::router(access))
.nest("/tools", tools::service::router({
let tool_reg = tools::registry::ToolRegistry::new_with_defaults();
tool_reg.register_defaults().await;
tools::ToolState {
registry: tool_reg,
query_fn: tools::QueryExecutor::new(engine.clone()),
}
}))
// Phase 38 — Universal API skeleton. Thin OpenAI-compatible
// surface over the existing aibridge → Ollama path. Future
// phases add provider adapters (39), routing engine (40),
// session state (41), etc. All without changing this mount.
.nest("/v1", v1::router(v1::V1State {
ai_client: ai_client.clone(),
usage: std::sync::Arc::new(tokio::sync::RwLock::new(v1::Usage::default())),
// Phase 39 first slice — Ollama Cloud adapter. Key resolved
// from OLLAMA_CLOUD_KEY env, then /root/llm_team_config.json,
// then OLLAMA_CLOUD_API_KEY env. None = cloud routes 503.
ollama_cloud_key: {
let k = v1::ollama_cloud::resolve_cloud_key();
if k.is_some() {
tracing::info!("v1: Ollama Cloud key loaded — /v1/chat provider=ollama_cloud enabled");
} else {
tracing::warn!("v1: no Ollama Cloud key in env or /root/llm_team_config.json — cloud routes will 503");
}
k
},
openrouter_key: {
// 2026-04-24 free-tier rescue rung for iter 5+. Shares
// the LLM Team UI's OPENROUTER_API_KEY so both systems
// draw from one quota.
let k = v1::openrouter::resolve_openrouter_key();
if k.is_some() {
tracing::info!("v1: OpenRouter key loaded — /v1/chat provider=openrouter enabled");
} else {
tracing::warn!("v1: no OpenRouter key — openrouter rescue rung will 503");
}
k
},
// Phase 40 early deliverable — Langfuse trace emitter.
// Defaults match mcp-server/tracing.ts conventions so
// gateway traces land in the same staffing project.
langfuse: {
let c = v1::langfuse_trace::LangfuseClient::from_env_or_defaults();
if c.is_some() {
tracing::info!("v1: Langfuse tracing enabled — /v1/chat calls will appear at localhost:3001");
} else {
tracing::warn!("v1: Langfuse keys missing — /v1/chat calls will not be traced");
}
c
},
}));
// Auth middleware (if enabled) — P5-001 fix 2026-04-23:
// previously only inserted the ApiKey as an extension and never layered
// the middleware, so auth.enabled=true enforced nothing. Now wraps the
// router with from_fn_with_state, which calls api_key_auth on every
// request. /health is exempted inside the middleware (LB probes).
if config.auth.enabled {
if let Some(ref key) = config.auth.api_key {
tracing::info!("API key auth enabled — enforcing on all routes except /health");
let api_key = auth::ApiKey(key.clone());
app = app.layer(axum::middleware::from_fn_with_state(
api_key,
auth::api_key_auth,
));
} else {
tracing::warn!("auth enabled but no api_key set — all requests allowed");
}
}
app = app
.layer(DefaultBodyLimit::max(512 * 1024 * 1024)) // 512MB — supports 500K worker CSV
.layer(CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any))
.layer(TraceLayer::new_for_http());
// File watcher — auto-ingest from ./inbox
let _watcher = ingestd::watcher::spawn_watcher(
ingestd::watcher::WatchConfig::default(),
store.clone(),
registry.clone(),
);
// Start gRPC server on port+1
let grpc_port = config.gateway.port + 1;
let catalog_grpc = catalogd::grpc::CatalogGrpc::new(registry);
let grpc_addr = format!("{}:{}", config.gateway.host, grpc_port).parse().unwrap();
tokio::spawn(async move {
tracing::info!("gRPC server listening on {grpc_addr}");
tonic::transport::Server::builder()
.add_service(CatalogServiceServer::new(catalog_grpc))
.serve(grpc_addr)
.await
.expect("gRPC server failed");
});
// Start HTTP server
let http_addr = format!("{}:{}", config.gateway.host, config.gateway.port);
tracing::info!("HTTP gateway listening on {http_addr}");
let listener = tokio::net::TcpListener::bind(&http_addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
}
async fn health() -> &'static str {
"lakehouse ok"
}