Database connector: PostgreSQL → Parquet import
- POST /ingest/postgres/tables — list all tables in a database - POST /ingest/postgres/import — import table → Parquet → catalog → queryable - Auto type mapping: int2/4/8 → Int, float4/8 → Float64, bool → Boolean, text/varchar/jsonb/timestamp → Utf8 (safe default per ADR-010) - Auto PII detection + lineage on import - Empty password support for trust auth - Tested: imported lab_trials (40 rows, 10 cols) and threat_intel (20 rows, 30 cols) from local knowledge_base Postgres database — immediately queryable Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
294f3f6a49
commit
9992b5f135
191
Cargo.lock
generated
191
Cargo.lock
generated
@ -695,7 +695,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a6139a8597ed92cf816dfb33f5dd6cf0bb93a6adc938f11039f371bc5bcd26c3"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"phf",
|
||||
"phf 0.12.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -2220,6 +2220,12 @@ dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fallible-iterator"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
|
||||
|
||||
[[package]]
|
||||
name = "fastrand"
|
||||
version = "2.3.0"
|
||||
@ -2427,7 +2433,7 @@ dependencies = [
|
||||
"cfg-if",
|
||||
"js-sys",
|
||||
"libc",
|
||||
"wasi",
|
||||
"wasi 0.11.1+wasi-snapshot-preview1",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
@ -2608,6 +2614,15 @@ version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
|
||||
|
||||
[[package]]
|
||||
name = "hmac"
|
||||
version = "0.12.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
|
||||
dependencies = [
|
||||
"digest",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "http"
|
||||
version = "1.4.0"
|
||||
@ -2914,6 +2929,7 @@ dependencies = [
|
||||
"shared",
|
||||
"storaged",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
@ -3174,6 +3190,15 @@ version = "0.2.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981"
|
||||
|
||||
[[package]]
|
||||
name = "libredox"
|
||||
version = "0.1.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7ddbf48fd451246b1f8c2610bd3b4ac0cc6e149d89832867093ab69a17194f08"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "linux-raw-sys"
|
||||
version = "0.12.1"
|
||||
@ -3402,7 +3427,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"wasi",
|
||||
"wasi 0.11.1+wasi-snapshot-preview1",
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
@ -3601,6 +3626,24 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "objc2-core-foundation"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "objc2-system-configuration"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7216bd11cbda54ccabcab84d523dc93b858ec75ecfb3a7d89513fa22464da396"
|
||||
dependencies = [
|
||||
"objc2-core-foundation",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "object"
|
||||
version = "0.37.3"
|
||||
@ -3813,7 +3856,17 @@ version = "0.12.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "913273894cec178f401a31ec4b656318d95473527be05c0752cc41cdc32be8b7"
|
||||
dependencies = [
|
||||
"phf_shared",
|
||||
"phf_shared 0.12.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "phf"
|
||||
version = "0.13.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c1562dc717473dbaa4c1f85a36410e03c047b2e7df7f45ee938fbef64ae7fadf"
|
||||
dependencies = [
|
||||
"phf_shared 0.13.1",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -3825,6 +3878,15 @@ dependencies = [
|
||||
"siphasher",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "phf_shared"
|
||||
version = "0.13.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e57fef6bc5981e38c2ce2d63bfa546861309f875b8a75f092d1d54ae2d64f266"
|
||||
dependencies = [
|
||||
"siphasher",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project"
|
||||
version = "1.1.11"
|
||||
@ -3863,6 +3925,38 @@ version = "0.3.32"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
|
||||
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3ee9dd5fe15055d2b6806f4736aa0c9637217074e224bbec46d4041b91bb9491"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"hmac",
|
||||
"md-5",
|
||||
"memchr",
|
||||
"rand 0.9.2",
|
||||
"sha2",
|
||||
"stringprep",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "54b858f82211e84682fecd373f68e1ceae642d8d751a1ebd13f33de6257b3e20"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"chrono",
|
||||
"fallible-iterator",
|
||||
"postgres-protocol",
|
||||
"serde_core",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "potential_utf"
|
||||
version = "0.1.4"
|
||||
@ -4809,6 +4903,17 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "stringprep"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1"
|
||||
dependencies = [
|
||||
"unicode-bidi",
|
||||
"unicode-normalization",
|
||||
"unicode-properties",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "subsecond"
|
||||
version = "0.7.3"
|
||||
@ -5040,6 +5145,32 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dcea47c8f71744367793f16c2db1f11cb859d28f436bdb4ca9193eb1f787ee42"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"log",
|
||||
"parking_lot",
|
||||
"percent-encoding",
|
||||
"phf 0.13.1",
|
||||
"pin-project-lite",
|
||||
"postgres-protocol",
|
||||
"postgres-types",
|
||||
"rand 0.9.2",
|
||||
"socket2 0.6.3",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"whoami",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-rustls"
|
||||
version = "0.26.4"
|
||||
@ -5396,12 +5527,33 @@ version = "2.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-bidi"
|
||||
version = "0.3.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-ident"
|
||||
version = "1.0.24"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-normalization"
|
||||
version = "0.1.25"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5fd4f6878c9cb28d874b009da9e8d183b5abc80117c40bbd187a1fde336be6e8"
|
||||
dependencies = [
|
||||
"tinyvec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unicode-properties"
|
||||
version = "0.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7df058c713841ad818f1dc5d3fd88063241cc61f49f5fbea4b951e8cf5a8d71d"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-segmentation"
|
||||
version = "1.13.2"
|
||||
@ -5541,6 +5693,15 @@ version = "0.11.1+wasi-snapshot-preview1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b"
|
||||
|
||||
[[package]]
|
||||
name = "wasi"
|
||||
version = "0.14.7+wasi-0.2.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "883478de20367e224c0090af9cf5f9fa85bed63a95c1abf3afc5c083ebc06e8c"
|
||||
dependencies = [
|
||||
"wasip2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasip2"
|
||||
version = "1.0.2+wasi-0.2.9"
|
||||
@ -5559,6 +5720,15 @@ dependencies = [
|
||||
"wit-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasite"
|
||||
version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "66fe902b4a6b8028a753d5424909b764ccf79b7a209eac9bf97e59cda9f71a42"
|
||||
dependencies = [
|
||||
"wasi 0.14.7+wasi-0.2.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen"
|
||||
version = "0.2.114"
|
||||
@ -5700,6 +5870,19 @@ version = "0.1.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a28ac98ddc8b9274cb41bb4d9d4d5c425b6020c50c46f25559911905610b4a88"
|
||||
|
||||
[[package]]
|
||||
name = "whoami"
|
||||
version = "2.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d6a5b12f9df4f978d2cfdb1bd3bac52433f44393342d7ee9c25f5a1c14c0f45d"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"libredox",
|
||||
"objc2-system-configuration",
|
||||
"wasite",
|
||||
"web-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "winapi-util"
|
||||
version = "0.1.11"
|
||||
|
||||
@ -45,3 +45,4 @@ csv = "1"
|
||||
lopdf = "0.35"
|
||||
encoding_rs = "0.8"
|
||||
instant-distance = "0.6"
|
||||
tokio-postgres = { version = "0.7", features = ["with-serde_json-1", "with-chrono-0_4"] }
|
||||
|
||||
@ -20,3 +20,4 @@ sha2 = { workspace = true }
|
||||
csv = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
object_store = { workspace = true }
|
||||
tokio-postgres = { workspace = true }
|
||||
|
||||
225
crates/ingestd/src/db_ingest.rs
Normal file
225
crates/ingestd/src/db_ingest.rs
Normal file
@ -0,0 +1,225 @@
|
||||
/// Database connector: pull tables from PostgreSQL directly into Parquet.
|
||||
/// Connects, reads schema, streams rows into Arrow RecordBatches, writes Parquet.
|
||||
/// No ORM, no schema definition — schema inferred from the database.
|
||||
|
||||
use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, StringArray};
|
||||
use arrow::datatypes::{DataType, Field, Schema};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use std::sync::Arc;
|
||||
use tokio_postgres::{Client, NoTls, types::Type as PgType};
|
||||
|
||||
/// Connection config for a database.
|
||||
#[derive(Debug, Clone, serde::Deserialize)]
|
||||
pub struct DbConfig {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
pub database: String,
|
||||
pub user: String,
|
||||
pub password: String,
|
||||
}
|
||||
|
||||
impl DbConfig {
|
||||
pub fn connection_string(&self) -> String {
|
||||
if self.password.is_empty() {
|
||||
format!(
|
||||
"host={} port={} dbname={} user={}",
|
||||
self.host, self.port, self.database, self.user
|
||||
)
|
||||
} else {
|
||||
format!(
|
||||
"host={} port={} dbname={} user={} password={}",
|
||||
self.host, self.port, self.database, self.user, self.password
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Result of a database table import.
|
||||
#[derive(Debug, Clone, serde::Serialize)]
|
||||
pub struct DbIngestResult {
|
||||
pub table_name: String,
|
||||
pub rows: usize,
|
||||
pub columns: usize,
|
||||
pub schema_detected: Vec<String>, // column name: type pairs
|
||||
}
|
||||
|
||||
/// Connect to PostgreSQL and import a table as Arrow RecordBatches.
|
||||
pub async fn import_postgres_table(
|
||||
config: &DbConfig,
|
||||
table_name: &str,
|
||||
limit: Option<usize>,
|
||||
) -> Result<(Arc<Schema>, Vec<RecordBatch>, DbIngestResult), String> {
|
||||
// Connect
|
||||
let (client, connection) = tokio_postgres::connect(&config.connection_string(), NoTls)
|
||||
.await
|
||||
.map_err(|e| format!("postgres connect error: {e}"))?;
|
||||
|
||||
// Spawn connection handler
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
tracing::error!("postgres connection error: {e}");
|
||||
}
|
||||
});
|
||||
|
||||
tracing::info!("connected to postgres {}:{}/{}", config.host, config.port, config.database);
|
||||
|
||||
// Get column info
|
||||
let columns = get_table_columns(&client, table_name).await?;
|
||||
if columns.is_empty() {
|
||||
return Err(format!("table '{}' not found or has no columns", table_name));
|
||||
}
|
||||
|
||||
let schema_detected: Vec<String> = columns.iter()
|
||||
.map(|(name, pg_type)| format!("{}: {}", name, pg_type_to_string(pg_type)))
|
||||
.collect();
|
||||
|
||||
tracing::info!("table '{}': {} columns", table_name, columns.len());
|
||||
|
||||
// Build Arrow schema
|
||||
let arrow_fields: Vec<Field> = columns.iter()
|
||||
.map(|(name, pg_type)| Field::new(name, pg_type_to_arrow(pg_type), true))
|
||||
.collect();
|
||||
let schema = Arc::new(Schema::new(arrow_fields));
|
||||
|
||||
// Query rows
|
||||
let sql = match limit {
|
||||
Some(n) => format!("SELECT * FROM \"{}\" LIMIT {}", table_name, n),
|
||||
None => format!("SELECT * FROM \"{}\"", table_name),
|
||||
};
|
||||
|
||||
let rows = client.query(&sql, &[])
|
||||
.await
|
||||
.map_err(|e| format!("query error: {e}"))?;
|
||||
|
||||
let row_count = rows.len();
|
||||
tracing::info!("fetched {} rows from '{}'", row_count, table_name);
|
||||
|
||||
if row_count == 0 {
|
||||
return Ok((schema.clone(), vec![], DbIngestResult {
|
||||
table_name: table_name.to_string(),
|
||||
rows: 0,
|
||||
columns: columns.len(),
|
||||
schema_detected,
|
||||
}));
|
||||
}
|
||||
|
||||
// Convert to Arrow arrays
|
||||
let mut arrays: Vec<ArrayRef> = Vec::new();
|
||||
|
||||
for (col_idx, (_, pg_type)) in columns.iter().enumerate() {
|
||||
let array = rows_to_arrow_column(&rows, col_idx, pg_type)?;
|
||||
arrays.push(array);
|
||||
}
|
||||
|
||||
let batch = RecordBatch::try_new(schema.clone(), arrays)
|
||||
.map_err(|e| format!("RecordBatch error: {e}"))?;
|
||||
|
||||
let result = DbIngestResult {
|
||||
table_name: table_name.to_string(),
|
||||
rows: row_count,
|
||||
columns: columns.len(),
|
||||
schema_detected,
|
||||
};
|
||||
|
||||
Ok((schema, vec![batch], result))
|
||||
}
|
||||
|
||||
/// List all tables in the database.
|
||||
pub async fn list_postgres_tables(config: &DbConfig) -> Result<Vec<String>, String> {
|
||||
let (client, connection) = tokio_postgres::connect(&config.connection_string(), NoTls)
|
||||
.await
|
||||
.map_err(|e| format!("connect error: {e}"))?;
|
||||
|
||||
tokio::spawn(async move { let _ = connection.await; });
|
||||
|
||||
let rows = client.query(
|
||||
"SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name",
|
||||
&[],
|
||||
).await.map_err(|e| format!("query error: {e}"))?;
|
||||
|
||||
Ok(rows.iter().map(|r| r.get::<_, String>(0)).collect())
|
||||
}
|
||||
|
||||
/// Get column names and types for a table.
|
||||
async fn get_table_columns(client: &Client, table_name: &str) -> Result<Vec<(String, PgType)>, String> {
|
||||
// Use a dummy query to get column types
|
||||
let stmt = client.prepare(&format!("SELECT * FROM \"{}\" LIMIT 0", table_name))
|
||||
.await
|
||||
.map_err(|e| format!("prepare error for '{}': {e}", table_name))?;
|
||||
|
||||
Ok(stmt.columns().iter()
|
||||
.map(|col| (col.name().to_string(), col.type_().clone()))
|
||||
.collect())
|
||||
}
|
||||
|
||||
/// Convert a Postgres type to Arrow DataType.
|
||||
fn pg_type_to_arrow(pg_type: &PgType) -> DataType {
|
||||
match *pg_type {
|
||||
PgType::BOOL => DataType::Boolean,
|
||||
PgType::INT2 | PgType::INT4 => DataType::Int32,
|
||||
PgType::INT8 | PgType::OID => DataType::Int64,
|
||||
PgType::FLOAT4 | PgType::FLOAT8 | PgType::NUMERIC => DataType::Float64,
|
||||
_ => DataType::Utf8, // everything else → string (safe default per ADR-010)
|
||||
}
|
||||
}
|
||||
|
||||
fn pg_type_to_string(pg_type: &PgType) -> String {
|
||||
format!("{}", pg_type)
|
||||
}
|
||||
|
||||
/// Convert a column of Postgres rows to an Arrow array.
|
||||
fn rows_to_arrow_column(
|
||||
rows: &[tokio_postgres::Row],
|
||||
col_idx: usize,
|
||||
pg_type: &PgType,
|
||||
) -> Result<ArrayRef, String> {
|
||||
match *pg_type {
|
||||
PgType::BOOL => {
|
||||
let vals: Vec<Option<bool>> = rows.iter()
|
||||
.map(|r| r.try_get(col_idx).ok())
|
||||
.collect();
|
||||
Ok(Arc::new(BooleanArray::from(vals)))
|
||||
}
|
||||
PgType::INT2 => {
|
||||
let vals: Vec<Option<i32>> = rows.iter()
|
||||
.map(|r| r.try_get::<_, i16>(col_idx).ok().map(|v| v as i32))
|
||||
.collect();
|
||||
Ok(Arc::new(Int32Array::from(vals)))
|
||||
}
|
||||
PgType::INT4 => {
|
||||
let vals: Vec<Option<i32>> = rows.iter()
|
||||
.map(|r| r.try_get(col_idx).ok())
|
||||
.collect();
|
||||
Ok(Arc::new(Int32Array::from(vals)))
|
||||
}
|
||||
PgType::INT8 | PgType::OID => {
|
||||
let vals: Vec<Option<i64>> = rows.iter()
|
||||
.map(|r| r.try_get(col_idx).ok())
|
||||
.collect();
|
||||
Ok(Arc::new(Int64Array::from(vals)))
|
||||
}
|
||||
PgType::FLOAT4 => {
|
||||
let vals: Vec<Option<f64>> = rows.iter()
|
||||
.map(|r| r.try_get::<_, f32>(col_idx).ok().map(|v| v as f64))
|
||||
.collect();
|
||||
Ok(Arc::new(Float64Array::from(vals)))
|
||||
}
|
||||
PgType::FLOAT8 => {
|
||||
let vals: Vec<Option<f64>> = rows.iter()
|
||||
.map(|r| r.try_get(col_idx).ok())
|
||||
.collect();
|
||||
Ok(Arc::new(Float64Array::from(vals)))
|
||||
}
|
||||
_ => {
|
||||
// Default: try to get as string
|
||||
let vals: Vec<Option<String>> = rows.iter()
|
||||
.map(|r| {
|
||||
r.try_get::<_, String>(col_idx).ok()
|
||||
.or_else(|| r.try_get::<_, serde_json::Value>(col_idx).ok().map(|v| v.to_string()))
|
||||
.or_else(|| Some("".to_string()))
|
||||
})
|
||||
.collect();
|
||||
Ok(Arc::new(StringArray::from(vals)))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,3 +1,4 @@
|
||||
pub mod db_ingest;
|
||||
pub mod detect;
|
||||
pub mod csv_ingest;
|
||||
pub mod json_ingest;
|
||||
|
||||
@ -5,12 +5,16 @@ use axum::{
|
||||
response::IntoResponse,
|
||||
routing::{get, post},
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use object_store::ObjectStore;
|
||||
use serde::Deserialize;
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalogd::registry::Registry;
|
||||
use crate::pipeline;
|
||||
use crate::{db_ingest, pipeline};
|
||||
use shared::arrow_helpers::record_batch_to_parquet;
|
||||
use shared::types::{ObjectRef, SchemaFingerprint};
|
||||
use storaged::ops;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct IngestState {
|
||||
@ -22,6 +26,8 @@ pub fn router(state: IngestState) -> Router {
|
||||
Router::new()
|
||||
.route("/health", get(health))
|
||||
.route("/file", post(ingest_file))
|
||||
.route("/postgres/tables", post(list_pg_tables))
|
||||
.route("/postgres/import", post(import_pg_table))
|
||||
.with_state(state)
|
||||
}
|
||||
|
||||
@ -31,17 +37,14 @@ async fn health() -> &'static str {
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct IngestQuery {
|
||||
/// Override dataset name (otherwise derived from filename)
|
||||
name: Option<String>,
|
||||
}
|
||||
|
||||
/// Upload a file for ingestion. Accepts multipart/form-data with a "file" field.
|
||||
async fn ingest_file(
|
||||
State(state): State<IngestState>,
|
||||
Query(query): Query<IngestQuery>,
|
||||
mut multipart: Multipart,
|
||||
) -> impl IntoResponse {
|
||||
// Read the first file field
|
||||
let field = match multipart.next_field().await {
|
||||
Ok(Some(f)) => f,
|
||||
Ok(None) => return Err((StatusCode::BAD_REQUEST, "no file uploaded".to_string())),
|
||||
@ -67,3 +70,121 @@ async fn ingest_file(
|
||||
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
||||
}
|
||||
}
|
||||
|
||||
// --- PostgreSQL Import ---
|
||||
|
||||
/// List tables in a PostgreSQL database.
|
||||
async fn list_pg_tables(
|
||||
Json(config): Json<db_ingest::DbConfig>,
|
||||
) -> impl IntoResponse {
|
||||
match db_ingest::list_postgres_tables(&config).await {
|
||||
Ok(tables) => Ok(Json(tables)),
|
||||
Err(e) => Err((StatusCode::BAD_GATEWAY, e)),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct PgImportRequest {
|
||||
#[serde(flatten)]
|
||||
config: db_ingest::DbConfig,
|
||||
table: String,
|
||||
/// Override dataset name (defaults to table name)
|
||||
dataset_name: Option<String>,
|
||||
/// Max rows to import (None = all)
|
||||
limit: Option<usize>,
|
||||
}
|
||||
|
||||
/// Import a PostgreSQL table into the lakehouse.
|
||||
async fn import_pg_table(
|
||||
State(state): State<IngestState>,
|
||||
Json(req): Json<PgImportRequest>,
|
||||
) -> Result<(StatusCode, Json<serde_json::Value>), (StatusCode, String)> {
|
||||
tracing::info!("importing postgres table '{}' from {}:{}/{}",
|
||||
req.table, req.config.host, req.config.port, req.config.database);
|
||||
|
||||
// Import from Postgres
|
||||
let (schema, batches, db_result) = db_ingest::import_postgres_table(
|
||||
&req.config, &req.table, req.limit,
|
||||
).await.map_err(|e| (StatusCode::BAD_GATEWAY, e))?;
|
||||
|
||||
if batches.is_empty() || db_result.rows == 0 {
|
||||
return Ok((StatusCode::OK, Json(serde_json::json!({
|
||||
"table": req.table,
|
||||
"rows": 0,
|
||||
"message": "table is empty",
|
||||
}))));
|
||||
}
|
||||
|
||||
// Convert to Parquet
|
||||
let mut all_parquet = Vec::new();
|
||||
for batch in &batches {
|
||||
let pq = record_batch_to_parquet(batch)
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
|
||||
all_parquet.extend_from_slice(&pq);
|
||||
}
|
||||
|
||||
let dataset_name = req.dataset_name.unwrap_or_else(|| req.table.clone());
|
||||
let storage_key = format!("datasets/{}.parquet", dataset_name);
|
||||
let parquet_size = all_parquet.len() as u64;
|
||||
|
||||
// Store
|
||||
ops::put(&state.store, &storage_key, Bytes::from(all_parquet))
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
|
||||
|
||||
// Register
|
||||
let schema_fp = shared::arrow_helpers::fingerprint_schema(&schema);
|
||||
let now = chrono::Utc::now();
|
||||
state.registry.register(
|
||||
dataset_name.clone(),
|
||||
SchemaFingerprint(schema_fp.0),
|
||||
vec![ObjectRef {
|
||||
bucket: "data".to_string(),
|
||||
key: storage_key.clone(),
|
||||
size_bytes: parquet_size,
|
||||
created_at: now,
|
||||
}],
|
||||
).await.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
|
||||
|
||||
// Auto-populate metadata (PII detection, lineage)
|
||||
let col_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
|
||||
let sensitivity = shared::pii::detect_dataset_sensitivity(&col_names);
|
||||
let columns: Vec<shared::types::ColumnMeta> = schema.fields().iter().map(|f| {
|
||||
let sens = shared::pii::detect_sensitivity(f.name());
|
||||
shared::types::ColumnMeta {
|
||||
name: f.name().clone(),
|
||||
data_type: f.data_type().to_string(),
|
||||
sensitivity: sens.clone(),
|
||||
description: String::new(),
|
||||
is_pii: matches!(sens, Some(shared::types::Sensitivity::Pii)),
|
||||
}
|
||||
}).collect();
|
||||
|
||||
let lineage = shared::types::Lineage {
|
||||
source_system: "postgresql".to_string(),
|
||||
source_file: format!("{}:{}/{}.{}", req.config.host, req.config.port, req.config.database, req.table),
|
||||
ingest_job: format!("pg-import-{}", now.timestamp_millis()),
|
||||
ingest_timestamp: now,
|
||||
parent_datasets: vec![],
|
||||
};
|
||||
|
||||
let _ = state.registry.update_metadata(&dataset_name, catalogd::registry::MetadataUpdate {
|
||||
sensitivity,
|
||||
columns: Some(columns),
|
||||
lineage: Some(lineage),
|
||||
row_count: Some(db_result.rows as u64),
|
||||
..Default::default()
|
||||
}).await;
|
||||
|
||||
tracing::info!("imported '{}' from postgres: {} rows → {}", dataset_name, db_result.rows, storage_key);
|
||||
|
||||
Ok((StatusCode::CREATED, Json(serde_json::json!({
|
||||
"dataset_name": dataset_name,
|
||||
"table": req.table,
|
||||
"rows": db_result.rows,
|
||||
"columns": db_result.columns,
|
||||
"schema": db_result.schema_detected,
|
||||
"storage_key": storage_key,
|
||||
"size_bytes": parquet_size,
|
||||
}))))
|
||||
}
|
||||
|
||||
@ -0,0 +1,100 @@
|
||||
{
|
||||
"id": "e2a8f88a-59f6-40c7-a45b-e23d8f3533b6",
|
||||
"name": "lab_trials",
|
||||
"schema_fingerprint": "1d5782349402439a7e44efd0ccab9ae64ac3044221adef9e828b60b8bbb44dd5",
|
||||
"objects": [
|
||||
{
|
||||
"bucket": "data",
|
||||
"key": "datasets/lab_trials.parquet",
|
||||
"size_bytes": 64646,
|
||||
"created_at": "2026-03-28T01:14:03.026116573Z"
|
||||
}
|
||||
],
|
||||
"created_at": "2026-03-28T01:14:03.026117277Z",
|
||||
"updated_at": "2026-03-28T01:14:03.026247826Z",
|
||||
"description": "",
|
||||
"owner": "",
|
||||
"sensitivity": null,
|
||||
"columns": [
|
||||
{
|
||||
"name": "id",
|
||||
"data_type": "Int32",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "experiment_id",
|
||||
"data_type": "Int32",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "trial_num",
|
||||
"data_type": "Int32",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "config_diff",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "config_snapshot",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "scores",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "avg_score",
|
||||
"data_type": "Float64",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "improved",
|
||||
"data_type": "Boolean",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "duration_ms",
|
||||
"data_type": "Int32",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "created_at",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
}
|
||||
],
|
||||
"lineage": {
|
||||
"source_system": "postgresql",
|
||||
"source_file": "127.0.0.1:5432/knowledge_base.lab_trials",
|
||||
"ingest_job": "pg-import-1774660443026",
|
||||
"ingest_timestamp": "2026-03-28T01:14:03.026116573Z",
|
||||
"parent_datasets": []
|
||||
},
|
||||
"freshness": null,
|
||||
"tags": [],
|
||||
"row_count": 40
|
||||
}
|
||||
@ -0,0 +1,240 @@
|
||||
{
|
||||
"id": "e7304f05-5278-4e17-961a-51f2588fd2aa",
|
||||
"name": "threat_intel",
|
||||
"schema_fingerprint": "df1e126046147b3de42086880e10c3501a3a615ecddf336bc24957a24c321241",
|
||||
"objects": [
|
||||
{
|
||||
"bucket": "data",
|
||||
"key": "datasets/threat_intel.parquet",
|
||||
"size_bytes": 111130,
|
||||
"created_at": "2026-03-28T01:14:03.054140697Z"
|
||||
}
|
||||
],
|
||||
"created_at": "2026-03-28T01:14:03.054141294Z",
|
||||
"updated_at": "2026-03-28T01:14:03.054427047Z",
|
||||
"description": "",
|
||||
"owner": "",
|
||||
"sensitivity": null,
|
||||
"columns": [
|
||||
{
|
||||
"name": "id",
|
||||
"data_type": "Int32",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "ip",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "threat_level",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "classification",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "confidence",
|
||||
"data_type": "Float64",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "summary",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "indicators",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "recommendation",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "pattern",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "attack_type",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "likely_automated",
|
||||
"data_type": "Boolean",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "country",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "country_code",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "city",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "isp",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "org",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "asn",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "is_proxy",
|
||||
"data_type": "Boolean",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "is_hosting",
|
||||
"data_type": "Boolean",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "open_ports",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "blocklist_count",
|
||||
"data_type": "Int32",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "blocklist_total",
|
||||
"data_type": "Int32",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "blocklists_blocked",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "reverse_dns",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "traceroute",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "log_count",
|
||||
"data_type": "Int32",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "banned",
|
||||
"data_type": "Boolean",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "enriched_at",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "updated_at",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
},
|
||||
{
|
||||
"name": "raw_data",
|
||||
"data_type": "Utf8",
|
||||
"sensitivity": null,
|
||||
"description": "",
|
||||
"is_pii": false
|
||||
}
|
||||
],
|
||||
"lineage": {
|
||||
"source_system": "postgresql",
|
||||
"source_file": "127.0.0.1:5432/knowledge_base.threat_intel",
|
||||
"ingest_job": "pg-import-1774660443054",
|
||||
"ingest_timestamp": "2026-03-28T01:14:03.054140697Z",
|
||||
"parent_datasets": []
|
||||
},
|
||||
"freshness": null,
|
||||
"tags": [],
|
||||
"row_count": 20
|
||||
}
|
||||
BIN
data/datasets/lab_trials.parquet
Normal file
BIN
data/datasets/lab_trials.parquet
Normal file
Binary file not shown.
BIN
data/datasets/threat_intel.parquet
Normal file
BIN
data/datasets/threat_intel.parquet
Normal file
Binary file not shown.
Loading…
x
Reference in New Issue
Block a user