profit 5b1fcf6d27 Phase 28-36 body of work
Accumulated since a6f12e2 (Phase 21 Rust port + Phase 27 versioning):

- Phase 36: embed_semaphore on VectorState (permits=1) serializes
  seed embed calls — prevents sidecar socket collisions under
  concurrent /seed stress load
- Phase 31+: run_stress.ts 6-task diverse stress scaffolding;
  run_e2e_rated.ts + orchestrator.ts tightening
- Catalog dedupe cleanup: 16 duplicate manifests removed; canonical
  candidates.parquet (10.5MB -> 76KB) + placements.parquet (1.2MB ->
  11KB) regenerated post-dedupe; fresh manifests for active datasets
- vectord: harness EvalSet refinements (+181), agent portfolio
  rotation + ingest triggers (+158), autotune + rag adjustments
- catalogd/storaged/ingestd/mcp-server: misc tightening
- docs: Phase 28-36 PRD entries + DECISIONS ADR additions;
  control-plane pivot banner added to top of docs/PRD.md (pointing
  at docs/CONTROL_PLANE_PRD.md which lands in next commit)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 02:41:15 -05:00

633 lines
23 KiB
Rust

use axum::{
Json, Router,
extract::{Multipart, Path, Query, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
routing::{delete, get, patch, post},
};
use bytes::Bytes;
use object_store::ObjectStore;
use serde::Deserialize;
use std::sync::Arc;
use catalogd::registry::Registry;
use crate::{db_ingest, my_stream, pg_stream, pipeline, schedule};
use shared::arrow_helpers::record_batch_to_parquet;
use shared::types::{ObjectRef, SchemaFingerprint};
use storaged::ops;
use storaged::registry::BucketRegistry;
#[derive(Clone)]
pub struct IngestState {
pub store: Arc<dyn ObjectStore>,
pub registry: Registry,
/// Federation layer 2: lookup target bucket from request headers.
pub buckets: Arc<BucketRegistry>,
/// Phase 16.5: when ingest marks a dataset's embeddings stale, push
/// a `DatasetAppended` trigger so the autotune agent can schedule
/// re-trials. Holds the agent handle (no-op when agent disabled).
pub agent_handle: vectord::agent::AgentHandle,
/// Used to look up which HNSW indexes are attached to the
/// just-ingested dataset. Each matching index gets one trigger.
pub index_registry: vectord::index_registry::IndexRegistry,
/// Scheduled-ingest registry. The scheduler task runs against this
/// store; HTTP CRUD endpoints write through it.
pub schedules: schedule::ScheduleStore,
}
/// Push `DatasetAppended` triggers for every HNSW index bound to this
/// dataset. Called after a successful ingest. Logs failures but never
/// fails the ingest — the agent is best-effort, not load-bearing.
async fn notify_agent_on_append(state: &IngestState, dataset_name: &str) {
let indexes = state.index_registry.list(Some(dataset_name), None).await;
if indexes.is_empty() {
return;
}
for meta in indexes {
let event = vectord::agent::TriggerEvent::dataset_appended(
meta.index_name.clone(), dataset_name,
);
if let Err(e) = state.agent_handle.enqueue(event).await {
tracing::warn!("agent enqueue failed for '{}': {}", meta.index_name, e);
} else {
tracing::info!(
"agent: enqueued DatasetAppended({}) for index '{}'",
dataset_name, meta.index_name,
);
}
}
}
/// Resolve the target bucket from `X-Lakehouse-Bucket` header.
/// Returns `(bucket_name, store_for_writes)`. Falls back to "primary"
/// when the header is absent. Returns Err with the canonical bucket
/// list when the header names an unknown bucket.
fn resolve_bucket(
headers: &HeaderMap,
buckets: &BucketRegistry,
) -> Result<(String, Arc<dyn ObjectStore>), (StatusCode, String)> {
let target = headers
.get("x-lakehouse-bucket")
.and_then(|h| h.to_str().ok())
.map(|s| s.to_string())
.unwrap_or_else(|| buckets.default_name().to_string());
match buckets.get(&target) {
Ok(store) => Ok((target, store)),
Err(_) => Err((
StatusCode::NOT_FOUND,
format!("unknown bucket '{}' — use GET /storage/buckets to list", target),
)),
}
}
pub fn router(state: IngestState) -> Router {
Router::new()
.route("/health", get(health))
.route("/file", post(ingest_file))
.route("/postgres/tables", post(list_pg_tables))
.route("/postgres/import", post(import_pg_table))
.route("/db", post(ingest_db_stream))
.route("/mysql", post(ingest_mysql_stream))
// Phase E: scheduled ingest
.route("/schedules", get(list_schedules).post(create_schedule))
.route("/schedules/{id}", get(get_schedule)
.patch(patch_schedule)
.delete(delete_schedule))
.route("/schedules/{id}/run-now", post(run_schedule_now))
.with_state(state)
}
async fn health() -> &'static str {
"ingestd ok"
}
#[derive(Deserialize)]
struct IngestQuery {
name: Option<String>,
}
async fn ingest_file(
State(state): State<IngestState>,
Query(query): Query<IngestQuery>,
headers: HeaderMap,
mut multipart: Multipart,
) -> impl IntoResponse {
let (bucket, store) = resolve_bucket(&headers, &state.buckets)?;
let field = match multipart.next_field().await {
Ok(Some(f)) => f,
Ok(None) => return Err((StatusCode::BAD_REQUEST, "no file uploaded".to_string())),
Err(e) => return Err((StatusCode::BAD_REQUEST, format!("multipart error: {e}"))),
};
let filename = field.file_name().unwrap_or("unknown").to_string();
let content = field.bytes().await
.map_err(|e| (StatusCode::BAD_REQUEST, format!("read error: {e}")))?;
tracing::info!(
"ingest '{}' ({} bytes) -> bucket={}",
filename, content.len(), bucket,
);
let dataset_name = query.name.as_deref();
match pipeline::ingest_file_to_bucket(&filename, &content, dataset_name, &bucket, &store, &state.registry).await {
Ok(result) => {
if !result.deduplicated {
notify_agent_on_append(&state, &result.dataset_name).await;
}
if result.deduplicated {
Ok((StatusCode::OK, Json(result)))
} else {
Ok((StatusCode::CREATED, Json(result)))
}
}
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
// --- PostgreSQL Import ---
/// List tables in a PostgreSQL database.
async fn list_pg_tables(
Json(config): Json<db_ingest::DbConfig>,
) -> impl IntoResponse {
match db_ingest::list_postgres_tables(&config).await {
Ok(tables) => Ok(Json(tables)),
Err(e) => Err((StatusCode::BAD_GATEWAY, e)),
}
}
#[derive(Deserialize)]
struct PgImportRequest {
#[serde(flatten)]
config: db_ingest::DbConfig,
table: String,
/// Override dataset name (defaults to table name)
dataset_name: Option<String>,
/// Max rows to import (None = all)
limit: Option<usize>,
}
/// Import a PostgreSQL table into the lakehouse.
async fn import_pg_table(
State(state): State<IngestState>,
Json(req): Json<PgImportRequest>,
) -> Result<(StatusCode, Json<serde_json::Value>), (StatusCode, String)> {
tracing::info!("importing postgres table '{}' from {}:{}/{}",
req.table, req.config.host, req.config.port, req.config.database);
// Import from Postgres
let (schema, batches, db_result) = db_ingest::import_postgres_table(
&req.config, &req.table, req.limit,
).await.map_err(|e| (StatusCode::BAD_GATEWAY, e))?;
if batches.is_empty() || db_result.rows == 0 {
return Ok((StatusCode::OK, Json(serde_json::json!({
"table": req.table,
"rows": 0,
"message": "table is empty",
}))));
}
// Convert to Parquet
let mut all_parquet = Vec::new();
for batch in &batches {
let pq = record_batch_to_parquet(batch)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
all_parquet.extend_from_slice(&pq);
}
let dataset_name = req.dataset_name.unwrap_or_else(|| req.table.clone());
let storage_key = format!("datasets/{}.parquet", dataset_name);
let parquet_size = all_parquet.len() as u64;
// Store
ops::put(&state.store, &storage_key, Bytes::from(all_parquet))
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
// Register
let schema_fp = shared::arrow_helpers::fingerprint_schema(&schema);
let now = chrono::Utc::now();
state.registry.register(
dataset_name.clone(),
SchemaFingerprint(schema_fp.0),
vec![ObjectRef {
bucket: "data".to_string(),
key: storage_key.clone(),
size_bytes: parquet_size,
created_at: now,
}],
).await.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
// Auto-populate metadata (PII detection, lineage)
let col_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
let sensitivity = shared::pii::detect_dataset_sensitivity(&col_names);
let columns: Vec<shared::types::ColumnMeta> = schema.fields().iter().map(|f| {
let sens = shared::pii::detect_sensitivity(f.name());
shared::types::ColumnMeta {
name: f.name().clone(),
data_type: f.data_type().to_string(),
sensitivity: sens.clone(),
description: String::new(),
is_pii: matches!(sens, Some(shared::types::Sensitivity::Pii)),
}
}).collect();
let lineage = shared::types::Lineage {
source_system: "postgresql".to_string(),
source_file: format!("{}:{}/{}.{}", req.config.host, req.config.port, req.config.database, req.table),
ingest_job: format!("pg-import-{}", now.timestamp_millis()),
ingest_timestamp: now,
parent_datasets: vec![],
};
let _ = state.registry.update_metadata(&dataset_name, catalogd::registry::MetadataUpdate {
sensitivity,
columns: Some(columns),
lineage: Some(lineage),
row_count: Some(db_result.rows as u64),
..Default::default()
}).await;
tracing::info!("imported '{}' from postgres: {} rows → {}", dataset_name, db_result.rows, storage_key);
Ok((StatusCode::CREATED, Json(serde_json::json!({
"dataset_name": dataset_name,
"table": req.table,
"rows": db_result.rows,
"columns": db_result.columns,
"schema": db_result.schema_detected,
"storage_key": storage_key,
"size_bytes": parquet_size,
}))))
}
/// Streaming DSN-based ingest. Paginates via ORDER BY + LIMIT/OFFSET so
/// large tables don't blow up memory. This is the path the task spec calls
/// `POST /ingest/db` with `{dsn, table, dataset_name}`.
async fn ingest_db_stream(
State(state): State<IngestState>,
headers: HeaderMap,
Json(req): Json<pg_stream::PgStreamRequest>,
) -> Result<(StatusCode, Json<serde_json::Value>), (StatusCode, String)> {
let (bucket, store) = resolve_bucket(&headers, &state.buckets)?;
tracing::info!(
"pg stream ingest: table='{}' dataset='{:?}' bucket='{}' batch_size={:?}",
req.table, req.dataset_name, bucket, req.batch_size,
);
// Stream from Postgres into Parquet bytes.
let (parquet, stream_result) = pg_stream::stream_table_to_parquet(&req)
.await
.map_err(|e| (StatusCode::BAD_GATEWAY, e))?;
if stream_result.rows == 0 {
return Ok((StatusCode::OK, Json(serde_json::json!({
"table": req.table,
"rows": 0,
"message": "table is empty",
}))));
}
// Recover schema from the parquet footer — keeps a single source of truth
// for the schema fingerprint + column metadata.
let (schema, _) = shared::arrow_helpers::parquet_to_record_batches(&parquet)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("reparse parquet: {e}")))?;
let dataset_name = req.dataset_name.clone().unwrap_or_else(|| req.table.clone());
let storage_key = format!("datasets/{}.parquet", dataset_name);
let size_bytes = parquet.len() as u64;
ops::put(&store, &storage_key, parquet)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
let schema_fp = shared::arrow_helpers::fingerprint_schema(&schema);
let now = chrono::Utc::now();
state.registry.register(
dataset_name.clone(),
SchemaFingerprint(schema_fp.0),
vec![ObjectRef {
bucket: bucket.clone(),
key: storage_key.clone(),
size_bytes,
created_at: now,
}],
).await.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
// Rich metadata: PII auto-detect, lineage, row_count, columns.
let col_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
let sensitivity = shared::pii::detect_dataset_sensitivity(&col_names);
let columns: Vec<shared::types::ColumnMeta> = schema.fields().iter().map(|f| {
let sens = shared::pii::detect_sensitivity(f.name());
shared::types::ColumnMeta {
name: f.name().clone(),
data_type: f.data_type().to_string(),
sensitivity: sens.clone(),
description: String::new(),
is_pii: matches!(sens, Some(shared::types::Sensitivity::Pii)),
}
}).collect();
let lineage = shared::types::Lineage {
source_system: "postgresql".to_string(),
source_file: format!("dsn: {}", redact_dsn(&req.dsn)),
ingest_job: format!("pg-stream-{}", now.timestamp_millis()),
ingest_timestamp: now,
parent_datasets: vec![],
};
let _ = state.registry.update_metadata(&dataset_name, catalogd::registry::MetadataUpdate {
sensitivity,
columns: Some(columns),
lineage: Some(lineage),
row_count: Some(stream_result.rows as u64),
..Default::default()
}).await;
// Phase C: mark embeddings stale if the dataset already had a vector
// index. No-op for newly-created datasets.
let _ = state.registry.mark_embeddings_stale(&dataset_name).await;
// Phase 16.5: notify autotune agent. No-op if no indexes attached.
notify_agent_on_append(&state, &dataset_name).await;
Ok((StatusCode::CREATED, Json(serde_json::json!({
"dataset_name": dataset_name,
"table": stream_result.table,
"rows": stream_result.rows,
"batches": stream_result.batches,
"columns": stream_result.columns,
"schema": stream_result.schema,
"storage_key": storage_key,
"size_bytes": size_bytes,
"duration_secs": stream_result.duration_secs,
}))))
}
/// Streaming MySQL ingest. Mirrors `ingest_db_stream` — same pagination +
/// Parquet-footer schema recovery + PII/lineage metadata. Different driver,
/// otherwise identical.
async fn ingest_mysql_stream(
State(state): State<IngestState>,
headers: HeaderMap,
Json(req): Json<my_stream::MyStreamRequest>,
) -> Result<(StatusCode, Json<serde_json::Value>), (StatusCode, String)> {
let (bucket, store) = resolve_bucket(&headers, &state.buckets)?;
tracing::info!(
"mysql stream ingest: table='{}' dataset='{:?}' bucket='{}' batch_size={:?}",
req.table, req.dataset_name, bucket, req.batch_size,
);
let (parquet, stream_result) = my_stream::stream_table_to_parquet(&req)
.await
.map_err(|e| (StatusCode::BAD_GATEWAY, e))?;
if stream_result.rows == 0 {
return Ok((StatusCode::OK, Json(serde_json::json!({
"table": req.table,
"rows": 0,
"message": "table is empty",
}))));
}
let (schema, _) = shared::arrow_helpers::parquet_to_record_batches(&parquet)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("reparse parquet: {e}")))?;
let dataset_name = req.dataset_name.clone().unwrap_or_else(|| req.table.clone());
let storage_key = format!("datasets/{}.parquet", dataset_name);
let size_bytes = parquet.len() as u64;
ops::put(&store, &storage_key, parquet)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
let schema_fp = shared::arrow_helpers::fingerprint_schema(&schema);
let now = chrono::Utc::now();
state.registry.register(
dataset_name.clone(),
SchemaFingerprint(schema_fp.0),
vec![ObjectRef {
bucket: bucket.clone(),
key: storage_key.clone(),
size_bytes,
created_at: now,
}],
).await.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
let col_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
let sensitivity = shared::pii::detect_dataset_sensitivity(&col_names);
let columns: Vec<shared::types::ColumnMeta> = schema.fields().iter().map(|f| {
let sens = shared::pii::detect_sensitivity(f.name());
shared::types::ColumnMeta {
name: f.name().clone(),
data_type: f.data_type().to_string(),
sensitivity: sens.clone(),
description: String::new(),
is_pii: matches!(sens, Some(shared::types::Sensitivity::Pii)),
}
}).collect();
let lineage = shared::types::Lineage {
source_system: "mysql".to_string(),
source_file: format!("dsn: {}", redact_dsn(&req.dsn)),
ingest_job: format!("mysql-stream-{}", now.timestamp_millis()),
ingest_timestamp: now,
parent_datasets: vec![],
};
let _ = state.registry.update_metadata(&dataset_name, catalogd::registry::MetadataUpdate {
sensitivity,
columns: Some(columns),
lineage: Some(lineage),
row_count: Some(stream_result.rows as u64),
..Default::default()
}).await;
// Phase C: mark stale + Phase 16.5: notify agent.
let _ = state.registry.mark_embeddings_stale(&dataset_name).await;
notify_agent_on_append(&state, &dataset_name).await;
Ok((StatusCode::CREATED, Json(serde_json::json!({
"dataset_name": dataset_name,
"table": stream_result.table,
"rows": stream_result.rows,
"batches": stream_result.batches,
"columns": stream_result.columns,
"schema": stream_result.schema,
"storage_key": storage_key,
"size_bytes": size_bytes,
"duration_secs": stream_result.duration_secs,
}))))
}
/// Redact the password in any `scheme://user:pass@host/...` DSN. Works
/// for postgres, mysql, and any other scheme using the same shape.
fn redact_dsn(dsn: &str) -> String {
let scheme_end = match dsn.find("://") {
Some(i) => i + 3,
None => return dsn.to_string(),
};
let at_idx = match dsn.rfind('@') {
Some(i) if i > scheme_end => i,
_ => return dsn.to_string(),
};
// Find the password separator (first `:` after scheme, before `@`).
let userpass = &dsn[scheme_end..at_idx];
let colon_offset = match userpass.find(':') {
Some(i) => i,
None => return dsn.to_string(), // no password in DSN
};
let colon_idx = scheme_end + colon_offset;
format!("{}:***{}", &dsn[..colon_idx], &dsn[at_idx..])
}
// --- Phase E: Scheduled ingest CRUD endpoints ---
#[derive(Deserialize)]
struct CreateScheduleRequest {
/// Stable id used in URLs and on disk. Required so callers can
/// PATCH/DELETE deterministically. Sanitized on storage but the
/// caller's id is what gets persisted as the canonical key.
id: String,
kind: schedule::ScheduleKind,
trigger: schedule::ScheduleTrigger,
#[serde(default = "default_enabled")]
enabled: bool,
#[serde(default)]
created_by: String,
}
fn default_enabled() -> bool { true }
async fn create_schedule(
State(state): State<IngestState>,
Json(req): Json<CreateScheduleRequest>,
) -> impl IntoResponse {
if state.schedules.get(&req.id).await.is_some() {
return Err((StatusCode::CONFLICT, format!("schedule '{}' already exists", req.id)));
}
if let Err(e) = schedule::validate_trigger(&req.trigger) {
return Err((StatusCode::BAD_REQUEST, e));
}
let now = chrono::Utc::now();
let def = schedule::ScheduleDef {
id: req.id,
kind: req.kind,
trigger: req.trigger,
enabled: req.enabled,
created_at: now,
created_by: req.created_by,
// Fire on the next tick — operators usually want to verify
// their schedule works without waiting for the full interval.
next_run_at: now,
last_run_at: None,
last_outcome: None,
run_count: 0,
failure_count: 0,
};
match state.schedules.put(def).await {
Ok(d) => Ok((StatusCode::CREATED, Json(d))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
async fn list_schedules(State(state): State<IngestState>) -> impl IntoResponse {
Json(state.schedules.list().await)
}
async fn get_schedule(
State(state): State<IngestState>,
Path(id): Path<String>,
) -> impl IntoResponse {
match state.schedules.get(&id).await {
Some(d) => Ok(Json(d)),
None => Err((StatusCode::NOT_FOUND, format!("schedule '{id}' not found"))),
}
}
#[derive(Deserialize)]
struct PatchScheduleRequest {
/// Toggle on/off without changing anything else.
#[serde(default)]
enabled: Option<bool>,
/// Replace the trigger spec — useful for bumping intervals.
#[serde(default)]
trigger: Option<schedule::ScheduleTrigger>,
}
async fn patch_schedule(
State(state): State<IngestState>,
Path(id): Path<String>,
Json(req): Json<PatchScheduleRequest>,
) -> impl IntoResponse {
let Some(mut def) = state.schedules.get(&id).await else {
return Err((StatusCode::NOT_FOUND, format!("schedule '{id}' not found")));
};
if let Some(t) = &req.trigger {
if let Err(e) = schedule::validate_trigger(t) {
return Err((StatusCode::BAD_REQUEST, e));
}
}
if let Some(e) = req.enabled { def.enabled = e; }
if let Some(t) = req.trigger { def.trigger = t; }
match state.schedules.put(def).await {
Ok(d) => Ok(Json(d)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
async fn delete_schedule(
State(state): State<IngestState>,
Path(id): Path<String>,
) -> impl IntoResponse {
match state.schedules.delete(&id).await {
Ok(true) => Ok(Json(serde_json::json!({ "deleted": id }))),
Ok(false) => Err((StatusCode::NOT_FOUND, format!("schedule '{id}' not found"))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
/// Force a schedule to fire right now, regardless of its next_run_at.
/// Records the outcome and reschedules. Useful for "test my new schedule
/// works without waiting an hour."
async fn run_schedule_now(
State(state): State<IngestState>,
Path(id): Path<String>,
) -> impl IntoResponse {
let Some(def) = state.schedules.get(&id).await else {
return Err((StatusCode::NOT_FOUND, format!("schedule '{id}' not found")));
};
let deps = schedule::SchedulerIngestDeps {
store: state.store.clone(),
registry: state.registry.clone(),
buckets: state.buckets.clone(),
agent_handle: Some(state.agent_handle.clone()),
index_registry: state.index_registry.clone(),
};
let t0 = std::time::Instant::now();
let result = schedule::run_schedule(&def, &deps).await;
let elapsed = t0.elapsed().as_secs_f32();
let outcome = schedule::RunOutcome {
at: chrono::Utc::now(),
success: result.is_ok(),
message: match &result {
Ok(s) => s.clone(),
Err(e) => e.clone(),
},
rows: result.as_ref().ok().and_then(|m: &String| {
m.split_whitespace()
.find_map(|tok| tok.strip_prefix("rows=").and_then(|n| n.parse().ok()))
}),
duration_secs: elapsed,
};
if let Err(e) = state.schedules.record_run(&id, outcome.clone()).await {
tracing::warn!("schedule {id}: record_run failed: {e}");
}
Ok(Json(outcome))
}