/// Database connector: pull tables from PostgreSQL directly into Parquet. /// Connects, reads schema, streams rows into Arrow RecordBatches, writes Parquet. /// No ORM, no schema definition — schema inferred from the database. use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, StringArray}; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; use std::sync::Arc; use tokio_postgres::{Client, NoTls, types::Type as PgType}; /// Connection config for a database. #[derive(Debug, Clone, serde::Deserialize)] pub struct DbConfig { pub host: String, pub port: u16, pub database: String, pub user: String, pub password: String, } impl DbConfig { pub fn connection_string(&self) -> String { if self.password.is_empty() { format!( "host={} port={} dbname={} user={}", self.host, self.port, self.database, self.user ) } else { format!( "host={} port={} dbname={} user={} password={}", self.host, self.port, self.database, self.user, self.password ) } } } /// Result of a database table import. #[derive(Debug, Clone, serde::Serialize)] pub struct DbIngestResult { pub table_name: String, pub rows: usize, pub columns: usize, pub schema_detected: Vec, // column name: type pairs } /// Connect to PostgreSQL and import a table as Arrow RecordBatches. pub async fn import_postgres_table( config: &DbConfig, table_name: &str, limit: Option, ) -> Result<(Arc, Vec, DbIngestResult), String> { // Connect let (client, connection) = tokio_postgres::connect(&config.connection_string(), NoTls) .await .map_err(|e| format!("postgres connect error: {e}"))?; // Spawn connection handler tokio::spawn(async move { if let Err(e) = connection.await { tracing::error!("postgres connection error: {e}"); } }); tracing::info!("connected to postgres {}:{}/{}", config.host, config.port, config.database); // Get column info let columns = get_table_columns(&client, table_name).await?; if columns.is_empty() { return Err(format!("table '{}' not found or has no columns", table_name)); } let schema_detected: Vec = columns.iter() .map(|(name, pg_type)| format!("{}: {}", name, pg_type_to_string(pg_type))) .collect(); tracing::info!("table '{}': {} columns", table_name, columns.len()); // Build Arrow schema let arrow_fields: Vec = columns.iter() .map(|(name, pg_type)| Field::new(name, pg_type_to_arrow(pg_type), true)) .collect(); let schema = Arc::new(Schema::new(arrow_fields)); // Query rows let sql = match limit { Some(n) => format!("SELECT * FROM \"{}\" LIMIT {}", table_name, n), None => format!("SELECT * FROM \"{}\"", table_name), }; let rows = client.query(&sql, &[]) .await .map_err(|e| format!("query error: {e}"))?; let row_count = rows.len(); tracing::info!("fetched {} rows from '{}'", row_count, table_name); if row_count == 0 { return Ok((schema.clone(), vec![], DbIngestResult { table_name: table_name.to_string(), rows: 0, columns: columns.len(), schema_detected, })); } // Convert to Arrow arrays let mut arrays: Vec = Vec::new(); for (col_idx, (_, pg_type)) in columns.iter().enumerate() { let array = rows_to_arrow_column(&rows, col_idx, pg_type)?; arrays.push(array); } let batch = RecordBatch::try_new(schema.clone(), arrays) .map_err(|e| format!("RecordBatch error: {e}"))?; let result = DbIngestResult { table_name: table_name.to_string(), rows: row_count, columns: columns.len(), schema_detected, }; Ok((schema, vec![batch], result)) } /// List all tables in the database. pub async fn list_postgres_tables(config: &DbConfig) -> Result, String> { let (client, connection) = tokio_postgres::connect(&config.connection_string(), NoTls) .await .map_err(|e| format!("connect error: {e}"))?; tokio::spawn(async move { let _ = connection.await; }); let rows = client.query( "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name", &[], ).await.map_err(|e| format!("query error: {e}"))?; Ok(rows.iter().map(|r| r.get::<_, String>(0)).collect()) } /// Get column names and types for a table. async fn get_table_columns(client: &Client, table_name: &str) -> Result, String> { // Use a dummy query to get column types let stmt = client.prepare(&format!("SELECT * FROM \"{}\" LIMIT 0", table_name)) .await .map_err(|e| format!("prepare error for '{}': {e}", table_name))?; Ok(stmt.columns().iter() .map(|col| (col.name().to_string(), col.type_().clone())) .collect()) } /// Convert a Postgres type to Arrow DataType. fn pg_type_to_arrow(pg_type: &PgType) -> DataType { match *pg_type { PgType::BOOL => DataType::Boolean, PgType::INT2 | PgType::INT4 => DataType::Int32, PgType::INT8 | PgType::OID => DataType::Int64, PgType::FLOAT4 | PgType::FLOAT8 | PgType::NUMERIC => DataType::Float64, _ => DataType::Utf8, // everything else → string (safe default per ADR-010) } } fn pg_type_to_string(pg_type: &PgType) -> String { format!("{}", pg_type) } /// Convert a column of Postgres rows to an Arrow array. fn rows_to_arrow_column( rows: &[tokio_postgres::Row], col_idx: usize, pg_type: &PgType, ) -> Result { match *pg_type { PgType::BOOL => { let vals: Vec> = rows.iter() .map(|r| r.try_get(col_idx).ok()) .collect(); Ok(Arc::new(BooleanArray::from(vals))) } PgType::INT2 => { let vals: Vec> = rows.iter() .map(|r| r.try_get::<_, i16>(col_idx).ok().map(|v| v as i32)) .collect(); Ok(Arc::new(Int32Array::from(vals))) } PgType::INT4 => { let vals: Vec> = rows.iter() .map(|r| r.try_get(col_idx).ok()) .collect(); Ok(Arc::new(Int32Array::from(vals))) } PgType::INT8 | PgType::OID => { let vals: Vec> = rows.iter() .map(|r| r.try_get(col_idx).ok()) .collect(); Ok(Arc::new(Int64Array::from(vals))) } PgType::FLOAT4 => { let vals: Vec> = rows.iter() .map(|r| r.try_get::<_, f32>(col_idx).ok().map(|v| v as f64)) .collect(); Ok(Arc::new(Float64Array::from(vals))) } PgType::FLOAT8 => { let vals: Vec> = rows.iter() .map(|r| r.try_get(col_idx).ok()) .collect(); Ok(Arc::new(Float64Array::from(vals))) } _ => { // Default: try to get as string let vals: Vec> = rows.iter() .map(|r| { r.try_get::<_, String>(col_idx).ok() .or_else(|| r.try_get::<_, serde_json::Value>(col_idx).ok().map(|v| v.to_string())) .or_else(|| Some("".to_string())) }) .collect(); Ok(Arc::new(StringArray::from(vals))) } } }