/// Streaming PostgreSQL ingest. /// /// The original `db_ingest::import_postgres_table` loads every row into /// memory before emitting Parquet — fine for small tables, blows up on /// 1M+ rows. This module paginates via `ORDER BY LIMIT N OFFSET M` /// and streams batches into an `ArrowWriter`, closing once exhausted. /// /// Pagination is OFFSET-based (not keyset) — simpler, works on any table /// without needing to know the PK type. OFFSET N scans N rows per fetch, /// which becomes quadratic for very large tables. Upgrade to keyset when /// a user actually ingests something that hurts. use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray}; use arrow::datatypes::{DataType, Field, Schema}; use parquet::arrow::ArrowWriter; use std::sync::Arc; use tokio_postgres::{Client, NoTls, types::Type as PgType}; use crate::db_ingest::DbConfig; /// Request shape for streaming ingest — takes a DSN string and optional /// tuning knobs. #[derive(Debug, Clone, serde::Deserialize)] pub struct PgStreamRequest { /// postgresql://user:pass@host:port/db — alternative to DbConfig. pub dsn: String, pub table: String, #[serde(default)] pub dataset_name: Option, /// Rows per fetch. Default 10_000. #[serde(default)] pub batch_size: Option, /// Column to ORDER BY for stable pagination. If omitted, the first /// column returned by the schema probe is used. #[serde(default)] pub order_by: Option, /// Hard cap on total rows (for sampling / previews). #[serde(default)] pub limit: Option, } #[derive(Debug, Clone, serde::Serialize)] pub struct PgStreamResult { pub table: String, pub rows: usize, pub batches: usize, pub columns: usize, pub schema: Vec, pub parquet_bytes: u64, pub duration_secs: f32, } /// Parse a postgresql:// DSN into a DbConfig. /// Supports: postgresql://[user[:password]@]host[:port][/db] /// Does NOT support: query parameters (sslmode etc) — add if needed. pub fn parse_dsn(dsn: &str) -> Result { let rest = dsn .strip_prefix("postgresql://") .or_else(|| dsn.strip_prefix("postgres://")) .ok_or_else(|| "DSN must start with postgresql:// or postgres://".to_string())?; // Split off path (database) first. let (auth_host, database) = match rest.split_once('/') { Some((ah, db)) => (ah, db.split('?').next().unwrap_or(db).to_string()), None => (rest, "postgres".to_string()), }; // Split user[:password] from host[:port]. let (userpass, hostport) = match auth_host.rsplit_once('@') { Some((up, hp)) => (Some(up), hp), None => (None, auth_host), }; let (user, password) = match userpass { Some(up) => match up.split_once(':') { Some((u, p)) => (u.to_string(), p.to_string()), None => (up.to_string(), String::new()), }, None => ("postgres".to_string(), String::new()), }; let (host, port) = match hostport.rsplit_once(':') { Some((h, p)) => ( h.to_string(), p.parse::().map_err(|_| format!("invalid port in DSN: {p}"))?, ), None => (hostport.to_string(), 5432), }; if host.is_empty() { return Err("DSN has no host".into()); } Ok(DbConfig { host, port, database, user, password }) } /// Stream a Postgres table as Parquet bytes. /// /// Returns the full Parquet payload + summary stats. The payload is /// written to an in-memory buffer via `ArrowWriter` so each batch gets /// columnar compression as it arrives — memory footprint stays roughly /// at one batch plus Parquet footer state. pub async fn stream_table_to_parquet( req: &PgStreamRequest, ) -> Result<(bytes::Bytes, PgStreamResult), String> { let t0 = std::time::Instant::now(); let config = parse_dsn(&req.dsn)?; let batch_size = req.batch_size.unwrap_or(10_000).max(1); let (client, connection) = tokio_postgres::connect(&config.connection_string(), NoTls) .await .map_err(|e| format!("postgres connect: {e}"))?; tokio::spawn(async move { if let Err(e) = connection.await { tracing::error!("pg connection: {e}"); } }); // Probe schema. let columns = probe_columns(&client, &req.table).await?; if columns.is_empty() { return Err(format!("table '{}' not found or has no columns", req.table)); } let arrow_fields: Vec = columns .iter() .map(|(name, pg)| Field::new(name, pg_type_to_arrow(pg), true)) .collect(); let schema = Arc::new(Schema::new(arrow_fields)); let schema_report: Vec = columns .iter() .map(|(n, t)| format!("{}:{}", n, t)) .collect(); // Pagination key: user-specified or first column. let order_col = req.order_by.clone().unwrap_or_else(|| columns[0].0.clone()); // Stream batches into ArrowWriter. let mut buf: Vec = Vec::with_capacity(1024 * 1024); let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None) .map_err(|e| format!("ArrowWriter init: {e}"))?; let mut total_rows: usize = 0; let mut batch_count: usize = 0; let row_cap = req.limit.unwrap_or(usize::MAX); loop { let remaining = row_cap.saturating_sub(total_rows); if remaining == 0 { break; } let fetch = remaining.min(batch_size); let sql = format!( "SELECT * FROM \"{}\" ORDER BY \"{}\" LIMIT {} OFFSET {}", req.table, order_col, fetch, total_rows, ); let rows = client .query(&sql, &[]) .await .map_err(|e| format!("fetch batch at offset {total_rows}: {e}"))?; if rows.is_empty() { break; } let n = rows.len(); let arrays: Vec = columns .iter() .enumerate() .map(|(idx, (_, pg))| rows_to_column(&rows, idx, pg)) .collect::>()?; let batch = RecordBatch::try_new(schema.clone(), arrays) .map_err(|e| format!("RecordBatch: {e}"))?; writer.write(&batch).map_err(|e| format!("ArrowWriter::write: {e}"))?; total_rows += n; batch_count += 1; tracing::info!( "pg stream '{}': fetched batch {} ({} rows, total {})", req.table, batch_count, n, total_rows, ); if n < fetch { break; } // short read = end of table } writer.close().map_err(|e| format!("ArrowWriter::close: {e}"))?; let parquet_bytes = buf.len() as u64; let duration = t0.elapsed().as_secs_f32(); let result = PgStreamResult { table: req.table.clone(), rows: total_rows, batches: batch_count, columns: columns.len(), schema: schema_report, parquet_bytes, duration_secs: duration, }; Ok((bytes::Bytes::from(buf), result)) } async fn probe_columns(client: &Client, table: &str) -> Result, String> { let stmt = client .prepare(&format!("SELECT * FROM \"{}\" LIMIT 0", table)) .await .map_err(|e| format!("prepare schema probe: {e}"))?; Ok(stmt .columns() .iter() .map(|c| (c.name().to_string(), c.type_().clone())) .collect()) } fn pg_type_to_arrow(pg: &PgType) -> DataType { match *pg { PgType::BOOL => DataType::Boolean, PgType::INT2 | PgType::INT4 => DataType::Int32, PgType::INT8 | PgType::OID => DataType::Int64, PgType::FLOAT4 | PgType::FLOAT8 | PgType::NUMERIC => DataType::Float64, _ => DataType::Utf8, } } fn rows_to_column( rows: &[tokio_postgres::Row], idx: usize, pg: &PgType, ) -> Result { match *pg { PgType::BOOL => { let v: Vec> = rows.iter().map(|r| r.try_get(idx).ok()).collect(); Ok(Arc::new(BooleanArray::from(v))) } PgType::INT2 => { let v: Vec> = rows.iter() .map(|r| r.try_get::<_, i16>(idx).ok().map(|x| x as i32)).collect(); Ok(Arc::new(Int32Array::from(v))) } PgType::INT4 => { let v: Vec> = rows.iter().map(|r| r.try_get(idx).ok()).collect(); Ok(Arc::new(Int32Array::from(v))) } PgType::INT8 | PgType::OID => { let v: Vec> = rows.iter().map(|r| r.try_get(idx).ok()).collect(); Ok(Arc::new(Int64Array::from(v))) } PgType::FLOAT4 => { let v: Vec> = rows.iter() .map(|r| r.try_get::<_, f32>(idx).ok().map(|x| x as f64)).collect(); Ok(Arc::new(Float64Array::from(v))) } PgType::FLOAT8 => { let v: Vec> = rows.iter().map(|r| r.try_get(idx).ok()).collect(); Ok(Arc::new(Float64Array::from(v))) } _ => { // Safe-default per ADR-010: everything else becomes string. let v: Vec> = rows.iter().map(|r| { r.try_get::<_, String>(idx).ok() .or_else(|| r.try_get::<_, serde_json::Value>(idx).ok().map(|j| j.to_string())) .or_else(|| { // Timestamps + uuid, rendered via Display. r.try_get::<_, chrono::DateTime>(idx).ok() .map(|t| t.to_rfc3339()) }) .or_else(|| r.try_get::<_, uuid::Uuid>(idx).ok().map(|u| u.to_string())) .or(Some(String::new())) }).collect(); Ok(Arc::new(StringArray::from(v))) } } } #[cfg(test)] mod tests { use super::*; #[test] fn parse_dsn_full() { let c = parse_dsn("postgresql://daisy:secret@db.example.com:6543/my_db").unwrap(); assert_eq!(c.host, "db.example.com"); assert_eq!(c.port, 6543); assert_eq!(c.user, "daisy"); assert_eq!(c.password, "secret"); assert_eq!(c.database, "my_db"); } #[test] fn parse_dsn_minimal() { let c = parse_dsn("postgresql://localhost/knowledge_base").unwrap(); assert_eq!(c.host, "localhost"); assert_eq!(c.port, 5432); assert_eq!(c.user, "postgres"); assert_eq!(c.password, ""); assert_eq!(c.database, "knowledge_base"); } #[test] fn parse_dsn_no_password() { let c = parse_dsn("postgres://postgres@127.0.0.1:5432/mydb").unwrap(); assert_eq!(c.user, "postgres"); assert_eq!(c.password, ""); } #[test] fn parse_dsn_rejects_non_pg() { assert!(parse_dsn("http://host/db").is_err()); } }