Four shipped features and a PRD realignment, all measured end-to-end:
HNSW trial system (Phase 15 horizon item → complete)
- vectord: EmbeddingCache, harness (eval sets + brute-force ground truth),
TrialJournal, parameterized HnswConfig on build_index_with_config
- /vectors/hnsw/trial, /hnsw/trials/{idx}, /hnsw/trials/{idx}/best,
/hnsw/evals/{name}/autogen, /hnsw/cache/stats
- Measured on resumes_100k_v2 (100K × 768d): brute-force 44ms -> HNSW 873us
at 100% recall@10. ec=80 es=30 locked as HnswConfig::default()
- Lower ec values trade recall for build time: 20/30 = 0.96 recall in 8s,
80/30 = 1.00 recall in 230s
Catalog manifest repair
- catalogd: resync_from_parquet reads parquet footers to restore row_count
and columns on drifted manifests
- POST /catalog/datasets/{name}/resync + POST /catalog/resync-missing
- All 7 staffing tables recovered to PRD-matching 2,469,278 rows
Federation foundation (ADR-017)
- shared::secrets: SecretsProvider trait + FileSecretsProvider (reads
/etc/lakehouse/secrets.toml, enforces 0600 perms)
- storaged::registry::BucketRegistry — multi-bucket resolution with
rescue_bucket read fallback and reachability probing
- storaged::error_journal — bucket op failures visible in one HTTP call
- storaged::append_log — write-once batched append pattern (fixes the RMW
anti-pattern llms3.com calls out; errors and trial journals both use it)
- /storage/buckets, /storage/errors, /storage/bucket-health,
/storage/errors/{flush,compact}
- Bucket-aware I/O at /storage/buckets/{bucket}/objects/{*key} with
X-Lakehouse-Rescue-Used observability headers on fallback
Postgres streaming ingest
- ingestd::pg_stream: DSN parser, batched ORDER BY + LIMIT/OFFSET pagination
into ArrowWriter, lineage redacts password
- POST /ingest/db — verified against live knowledge_base.team_runs
(586 rows × 13 cols, 6 batches, 196ms end-to-end)
PRD realignment (2026-04-16)
- Dual use case: staffing analytics + local LLM knowledge substrate
- Removed "multi-tenancy (single-owner system)" from non-goals
- Added invariants 8-11: indexes hot-swappable, per-reader profiles,
trials-as-data, operational failures findable in one HTTP call
- New phases 16 (hot-swap generations), 17 (model profiles + dataset
bindings), 18 (Lance vs Parquet+sidecar evaluation)
- Known ceilings table documents the 5M vector wall and escape hatches
- ADR-017 (federation), ADR-018 (append-log pattern) added
- EXECUTION_PLAN.md sequences phases B-E with success gates and
decision rules
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
309 lines
11 KiB
Rust
309 lines
11 KiB
Rust
/// Streaming PostgreSQL ingest.
|
|
///
|
|
/// The original `db_ingest::import_postgres_table` loads every row into
|
|
/// memory before emitting Parquet — fine for small tables, blows up on
|
|
/// 1M+ rows. This module paginates via `ORDER BY <pk> LIMIT N OFFSET M`
|
|
/// and streams batches into an `ArrowWriter`, closing once exhausted.
|
|
///
|
|
/// Pagination is OFFSET-based (not keyset) — simpler, works on any table
|
|
/// without needing to know the PK type. OFFSET N scans N rows per fetch,
|
|
/// which becomes quadratic for very large tables. Upgrade to keyset when
|
|
/// a user actually ingests something that hurts.
|
|
|
|
use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray};
|
|
use arrow::datatypes::{DataType, Field, Schema};
|
|
use parquet::arrow::ArrowWriter;
|
|
use std::sync::Arc;
|
|
use tokio_postgres::{Client, NoTls, types::Type as PgType};
|
|
|
|
use crate::db_ingest::DbConfig;
|
|
|
|
/// Request shape for streaming ingest — takes a DSN string and optional
|
|
/// tuning knobs.
|
|
#[derive(Debug, Clone, serde::Deserialize)]
|
|
pub struct PgStreamRequest {
|
|
/// postgresql://user:pass@host:port/db — alternative to DbConfig.
|
|
pub dsn: String,
|
|
pub table: String,
|
|
#[serde(default)]
|
|
pub dataset_name: Option<String>,
|
|
/// Rows per fetch. Default 10_000.
|
|
#[serde(default)]
|
|
pub batch_size: Option<usize>,
|
|
/// Column to ORDER BY for stable pagination. If omitted, the first
|
|
/// column returned by the schema probe is used.
|
|
#[serde(default)]
|
|
pub order_by: Option<String>,
|
|
/// Hard cap on total rows (for sampling / previews).
|
|
#[serde(default)]
|
|
pub limit: Option<usize>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, serde::Serialize)]
|
|
pub struct PgStreamResult {
|
|
pub table: String,
|
|
pub rows: usize,
|
|
pub batches: usize,
|
|
pub columns: usize,
|
|
pub schema: Vec<String>,
|
|
pub parquet_bytes: u64,
|
|
pub duration_secs: f32,
|
|
}
|
|
|
|
/// Parse a postgresql:// DSN into a DbConfig.
|
|
/// Supports: postgresql://[user[:password]@]host[:port][/db]
|
|
/// Does NOT support: query parameters (sslmode etc) — add if needed.
|
|
pub fn parse_dsn(dsn: &str) -> Result<DbConfig, String> {
|
|
let rest = dsn
|
|
.strip_prefix("postgresql://")
|
|
.or_else(|| dsn.strip_prefix("postgres://"))
|
|
.ok_or_else(|| "DSN must start with postgresql:// or postgres://".to_string())?;
|
|
|
|
// Split off path (database) first.
|
|
let (auth_host, database) = match rest.split_once('/') {
|
|
Some((ah, db)) => (ah, db.split('?').next().unwrap_or(db).to_string()),
|
|
None => (rest, "postgres".to_string()),
|
|
};
|
|
|
|
// Split user[:password] from host[:port].
|
|
let (userpass, hostport) = match auth_host.rsplit_once('@') {
|
|
Some((up, hp)) => (Some(up), hp),
|
|
None => (None, auth_host),
|
|
};
|
|
|
|
let (user, password) = match userpass {
|
|
Some(up) => match up.split_once(':') {
|
|
Some((u, p)) => (u.to_string(), p.to_string()),
|
|
None => (up.to_string(), String::new()),
|
|
},
|
|
None => ("postgres".to_string(), String::new()),
|
|
};
|
|
|
|
let (host, port) = match hostport.rsplit_once(':') {
|
|
Some((h, p)) => (
|
|
h.to_string(),
|
|
p.parse::<u16>().map_err(|_| format!("invalid port in DSN: {p}"))?,
|
|
),
|
|
None => (hostport.to_string(), 5432),
|
|
};
|
|
|
|
if host.is_empty() {
|
|
return Err("DSN has no host".into());
|
|
}
|
|
|
|
Ok(DbConfig { host, port, database, user, password })
|
|
}
|
|
|
|
/// Stream a Postgres table as Parquet bytes.
|
|
///
|
|
/// Returns the full Parquet payload + summary stats. The payload is
|
|
/// written to an in-memory buffer via `ArrowWriter` so each batch gets
|
|
/// columnar compression as it arrives — memory footprint stays roughly
|
|
/// at one batch plus Parquet footer state.
|
|
pub async fn stream_table_to_parquet(
|
|
req: &PgStreamRequest,
|
|
) -> Result<(bytes::Bytes, PgStreamResult), String> {
|
|
let t0 = std::time::Instant::now();
|
|
let config = parse_dsn(&req.dsn)?;
|
|
let batch_size = req.batch_size.unwrap_or(10_000).max(1);
|
|
|
|
let (client, connection) = tokio_postgres::connect(&config.connection_string(), NoTls)
|
|
.await
|
|
.map_err(|e| format!("postgres connect: {e}"))?;
|
|
tokio::spawn(async move {
|
|
if let Err(e) = connection.await {
|
|
tracing::error!("pg connection: {e}");
|
|
}
|
|
});
|
|
|
|
// Probe schema.
|
|
let columns = probe_columns(&client, &req.table).await?;
|
|
if columns.is_empty() {
|
|
return Err(format!("table '{}' not found or has no columns", req.table));
|
|
}
|
|
|
|
let arrow_fields: Vec<Field> = columns
|
|
.iter()
|
|
.map(|(name, pg)| Field::new(name, pg_type_to_arrow(pg), true))
|
|
.collect();
|
|
let schema = Arc::new(Schema::new(arrow_fields));
|
|
let schema_report: Vec<String> = columns
|
|
.iter()
|
|
.map(|(n, t)| format!("{}:{}", n, t))
|
|
.collect();
|
|
|
|
// Pagination key: user-specified or first column.
|
|
let order_col = req.order_by.clone().unwrap_or_else(|| columns[0].0.clone());
|
|
|
|
// Stream batches into ArrowWriter.
|
|
let mut buf: Vec<u8> = Vec::with_capacity(1024 * 1024);
|
|
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)
|
|
.map_err(|e| format!("ArrowWriter init: {e}"))?;
|
|
|
|
let mut total_rows: usize = 0;
|
|
let mut batch_count: usize = 0;
|
|
let row_cap = req.limit.unwrap_or(usize::MAX);
|
|
|
|
loop {
|
|
let remaining = row_cap.saturating_sub(total_rows);
|
|
if remaining == 0 { break; }
|
|
let fetch = remaining.min(batch_size);
|
|
|
|
let sql = format!(
|
|
"SELECT * FROM \"{}\" ORDER BY \"{}\" LIMIT {} OFFSET {}",
|
|
req.table, order_col, fetch, total_rows,
|
|
);
|
|
|
|
let rows = client
|
|
.query(&sql, &[])
|
|
.await
|
|
.map_err(|e| format!("fetch batch at offset {total_rows}: {e}"))?;
|
|
|
|
if rows.is_empty() { break; }
|
|
|
|
let n = rows.len();
|
|
let arrays: Vec<ArrayRef> = columns
|
|
.iter()
|
|
.enumerate()
|
|
.map(|(idx, (_, pg))| rows_to_column(&rows, idx, pg))
|
|
.collect::<Result<_, _>>()?;
|
|
let batch = RecordBatch::try_new(schema.clone(), arrays)
|
|
.map_err(|e| format!("RecordBatch: {e}"))?;
|
|
writer.write(&batch).map_err(|e| format!("ArrowWriter::write: {e}"))?;
|
|
|
|
total_rows += n;
|
|
batch_count += 1;
|
|
tracing::info!(
|
|
"pg stream '{}': fetched batch {} ({} rows, total {})",
|
|
req.table, batch_count, n, total_rows,
|
|
);
|
|
|
|
if n < fetch { break; } // short read = end of table
|
|
}
|
|
|
|
writer.close().map_err(|e| format!("ArrowWriter::close: {e}"))?;
|
|
let parquet_bytes = buf.len() as u64;
|
|
let duration = t0.elapsed().as_secs_f32();
|
|
|
|
let result = PgStreamResult {
|
|
table: req.table.clone(),
|
|
rows: total_rows,
|
|
batches: batch_count,
|
|
columns: columns.len(),
|
|
schema: schema_report,
|
|
parquet_bytes,
|
|
duration_secs: duration,
|
|
};
|
|
|
|
Ok((bytes::Bytes::from(buf), result))
|
|
}
|
|
|
|
async fn probe_columns(client: &Client, table: &str) -> Result<Vec<(String, PgType)>, String> {
|
|
let stmt = client
|
|
.prepare(&format!("SELECT * FROM \"{}\" LIMIT 0", table))
|
|
.await
|
|
.map_err(|e| format!("prepare schema probe: {e}"))?;
|
|
Ok(stmt
|
|
.columns()
|
|
.iter()
|
|
.map(|c| (c.name().to_string(), c.type_().clone()))
|
|
.collect())
|
|
}
|
|
|
|
fn pg_type_to_arrow(pg: &PgType) -> DataType {
|
|
match *pg {
|
|
PgType::BOOL => DataType::Boolean,
|
|
PgType::INT2 | PgType::INT4 => DataType::Int32,
|
|
PgType::INT8 | PgType::OID => DataType::Int64,
|
|
PgType::FLOAT4 | PgType::FLOAT8 | PgType::NUMERIC => DataType::Float64,
|
|
_ => DataType::Utf8,
|
|
}
|
|
}
|
|
|
|
fn rows_to_column(
|
|
rows: &[tokio_postgres::Row],
|
|
idx: usize,
|
|
pg: &PgType,
|
|
) -> Result<ArrayRef, String> {
|
|
match *pg {
|
|
PgType::BOOL => {
|
|
let v: Vec<Option<bool>> = rows.iter().map(|r| r.try_get(idx).ok()).collect();
|
|
Ok(Arc::new(BooleanArray::from(v)))
|
|
}
|
|
PgType::INT2 => {
|
|
let v: Vec<Option<i32>> = rows.iter()
|
|
.map(|r| r.try_get::<_, i16>(idx).ok().map(|x| x as i32)).collect();
|
|
Ok(Arc::new(Int32Array::from(v)))
|
|
}
|
|
PgType::INT4 => {
|
|
let v: Vec<Option<i32>> = rows.iter().map(|r| r.try_get(idx).ok()).collect();
|
|
Ok(Arc::new(Int32Array::from(v)))
|
|
}
|
|
PgType::INT8 | PgType::OID => {
|
|
let v: Vec<Option<i64>> = rows.iter().map(|r| r.try_get(idx).ok()).collect();
|
|
Ok(Arc::new(Int64Array::from(v)))
|
|
}
|
|
PgType::FLOAT4 => {
|
|
let v: Vec<Option<f64>> = rows.iter()
|
|
.map(|r| r.try_get::<_, f32>(idx).ok().map(|x| x as f64)).collect();
|
|
Ok(Arc::new(Float64Array::from(v)))
|
|
}
|
|
PgType::FLOAT8 => {
|
|
let v: Vec<Option<f64>> = rows.iter().map(|r| r.try_get(idx).ok()).collect();
|
|
Ok(Arc::new(Float64Array::from(v)))
|
|
}
|
|
_ => {
|
|
// Safe-default per ADR-010: everything else becomes string.
|
|
let v: Vec<Option<String>> = rows.iter().map(|r| {
|
|
r.try_get::<_, String>(idx).ok()
|
|
.or_else(|| r.try_get::<_, serde_json::Value>(idx).ok().map(|j| j.to_string()))
|
|
.or_else(|| {
|
|
// Timestamps + uuid, rendered via Display.
|
|
r.try_get::<_, chrono::DateTime<chrono::Utc>>(idx).ok()
|
|
.map(|t| t.to_rfc3339())
|
|
})
|
|
.or_else(|| r.try_get::<_, uuid::Uuid>(idx).ok().map(|u| u.to_string()))
|
|
.or(Some(String::new()))
|
|
}).collect();
|
|
Ok(Arc::new(StringArray::from(v)))
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn parse_dsn_full() {
|
|
let c = parse_dsn("postgresql://daisy:secret@db.example.com:6543/my_db").unwrap();
|
|
assert_eq!(c.host, "db.example.com");
|
|
assert_eq!(c.port, 6543);
|
|
assert_eq!(c.user, "daisy");
|
|
assert_eq!(c.password, "secret");
|
|
assert_eq!(c.database, "my_db");
|
|
}
|
|
|
|
#[test]
|
|
fn parse_dsn_minimal() {
|
|
let c = parse_dsn("postgresql://localhost/knowledge_base").unwrap();
|
|
assert_eq!(c.host, "localhost");
|
|
assert_eq!(c.port, 5432);
|
|
assert_eq!(c.user, "postgres");
|
|
assert_eq!(c.password, "");
|
|
assert_eq!(c.database, "knowledge_base");
|
|
}
|
|
|
|
#[test]
|
|
fn parse_dsn_no_password() {
|
|
let c = parse_dsn("postgres://postgres@127.0.0.1:5432/mydb").unwrap();
|
|
assert_eq!(c.user, "postgres");
|
|
assert_eq!(c.password, "");
|
|
}
|
|
|
|
#[test]
|
|
fn parse_dsn_rejects_non_pg() {
|
|
assert!(parse_dsn("http://host/db").is_err());
|
|
}
|
|
}
|