lakehouse/crates/ingestd/src/pipeline.rs
root 24f1249a62 Federation layer 2: header routing + cross-bucket SQL
Three pieces of the multi-bucket federation made real:

1. Catalog migration (POST /catalog/migrate-buckets)
   - One-shot normalizer for ObjectRef.bucket field
   - Empty -> "primary"; legacy "data"/"local" -> "primary"
   - Idempotent; re-running on canonical state is no-op
   - Ran on existing catalog: 12 refs renamed from "data", 2 already
     "primary", all 14 now canonical

2. X-Lakehouse-Bucket header middleware on ingest
   - resolve_bucket() helper extracts header, returns
     (bucket_name, store) or 404 with valid bucket list
   - ingest_file and ingest_db_stream now route writes per-request
   - Defaults to "primary" when header absent
   - pipeline::ingest_file_to_bucket records the actual bucket on the
     ObjectRef so catalog stays the source of truth for "where does this
     data live"
   - Verified: ingest with X-Lakehouse-Bucket: testing lands in
     data/_testing/, ingest without header lands in data/, bad header
     returns 404 with hint

3. queryd registers every bucket with DataFusion
   - QueryEngine now holds Arc<BucketRegistry> instead of single store
   - build_context iterates all buckets, registers each as a separate
     ObjectStore under URL scheme "lakehouse-{bucket}://"
   - ListingTable URLs include the per-object bucket scheme so
     DataFusion routes scans automatically based on ObjectRef.bucket
   - Profile bucket names like "profile:user" sanitized to
     "lakehouse-profile-user" since URL host segments can't contain ":"
   - Tolerant of duplicate manifest entries (pre-existing
     pipeline::ingest_file behavior creates a fresh dataset id per
     ingest); duplicates skipped with debug log
   - Backward compat: legacy "lakehouse://data/" URL still registered
     pointing at primary

Success gate: cross-bucket CROSS JOIN
  SELECT p.name, p.role, a.species
  FROM people_test p          (bucket: testing)
  CROSS JOIN animals a        (bucket: primary)
  LIMIT 5
returns rows correctly. DataFusion routed each scan to its bucket's
ObjectStore based on the URL scheme.

No regressions: SELECT COUNT(*) FROM candidates still returns 100000
from the primary bucket.

Deferred to Phase 17:
- POST /profile/{user}/activate (HNSW hot-load on profile switch)
- vectord storage paths becoming bucket-scoped (trial journals,
  eval sets per-profile)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 08:52:32 -05:00

220 lines
7.8 KiB
Rust

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use object_store::ObjectStore;
use std::sync::Arc;
use catalogd::registry::Registry;
use shared::arrow_helpers::{fingerprint_schema, record_batch_to_parquet};
use shared::types::{ObjectRef, SchemaFingerprint};
use storaged::ops;
use crate::detect::{FileType, content_hash, detect_file_type};
use crate::csv_ingest;
use crate::json_ingest;
use crate::pdf_ingest;
/// Result of an ingest operation.
#[derive(Debug, Clone, serde::Serialize)]
pub struct IngestResult {
pub dataset_name: String,
pub file_type: String,
pub rows: usize,
pub columns: usize,
pub storage_key: String,
pub content_hash: String,
pub schema_fingerprint: String,
pub deduplicated: bool,
}
/// Full ingest pipeline: detect → parse → dedup → store → register.
///
/// Bucket name (federation layer 2) is recorded on the ObjectRef so the
/// catalog knows which bucket holds the data — defaults to "primary" via
/// the ingest service's resolve_bucket helper.
pub async fn ingest_file(
filename: &str,
content: &[u8],
dataset_name: Option<&str>,
store: &Arc<dyn ObjectStore>,
registry: &Registry,
) -> Result<IngestResult, String> {
ingest_file_to_bucket(filename, content, dataset_name, "primary", store, registry).await
}
/// Same as `ingest_file` but with explicit target bucket name on the
/// ObjectRef. Header-aware ingest endpoints call this directly.
pub async fn ingest_file_to_bucket(
filename: &str,
content: &[u8],
dataset_name: Option<&str>,
bucket: &str,
store: &Arc<dyn ObjectStore>,
registry: &Registry,
) -> Result<IngestResult, String> {
// 1. Detect file type
let file_type = detect_file_type(filename, content);
tracing::info!("ingesting '{}' as {:?} ({} bytes)", filename, file_type, content.len());
// 2. Parse into Arrow
let (schema, batches) = match file_type {
FileType::Csv => csv_ingest::parse_csv(content)?,
FileType::Json | FileType::NdJson => json_ingest::parse_json(content)?,
FileType::Pdf => pdf_ingest::parse_pdf(content, filename)?,
FileType::Text => parse_text_file(content, filename)?,
FileType::Unknown => return Err(format!("unknown file type for '{filename}'")),
};
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
let total_cols = schema.fields().len();
if total_rows == 0 {
return Err("file contains no data".into());
}
// 3. Content hash for dedup
let hash = content_hash(content);
// 4. Check if already ingested (same content hash)
let existing = registry.list().await;
if existing.iter().any(|d| d.schema_fingerprint.0 == hash) {
tracing::info!("file already ingested (hash: {}), skipping", &hash[..12]);
return Ok(IngestResult {
dataset_name: dataset_name.unwrap_or(filename).to_string(),
file_type: format!("{:?}", file_type),
rows: total_rows,
columns: total_cols,
storage_key: String::new(),
content_hash: hash,
schema_fingerprint: String::new(),
deduplicated: true,
});
}
// 5. Convert to Parquet
let mut all_parquet = Vec::new();
for batch in &batches {
let pq = record_batch_to_parquet(batch)?;
all_parquet.extend_from_slice(&pq);
}
let parquet_bytes = Bytes::from(all_parquet);
let parquet_size = parquet_bytes.len() as u64;
// 6. Store in object storage
let name = dataset_name.unwrap_or_else(|| {
filename.rsplit('/').next().unwrap_or(filename)
.rsplit('.').last().unwrap_or(filename)
});
let safe_name = sanitize_dataset_name(name);
let storage_key = format!("datasets/{}.parquet", safe_name);
ops::put(store, &storage_key, parquet_bytes).await?;
tracing::info!("stored {} as {} ({} bytes)", filename, storage_key, parquet_size);
// 7. Register in catalog with rich metadata
let schema_fp = fingerprint_schema(&schema);
let now = chrono::Utc::now();
let obj_ref = ObjectRef {
bucket: bucket.to_string(),
key: storage_key.clone(),
size_bytes: parquet_size,
created_at: now,
};
// Register base dataset
registry.register(
safe_name.clone(),
SchemaFingerprint(hash.clone()),
vec![obj_ref],
).await?;
// Auto-populate metadata: PII detection, column info, lineage, row count
let col_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
let dataset_sensitivity = shared::pii::detect_dataset_sensitivity(&col_names);
let column_meta: Vec<shared::types::ColumnMeta> = schema.fields().iter().map(|f| {
let sens = shared::pii::detect_sensitivity(f.name());
shared::types::ColumnMeta {
name: f.name().clone(),
data_type: f.data_type().to_string(),
sensitivity: sens.clone(),
description: String::new(),
is_pii: matches!(sens, Some(shared::types::Sensitivity::Pii)),
}
}).collect();
let lineage = shared::types::Lineage {
source_system: format!("{:?}", file_type).to_lowercase(),
source_file: filename.to_string(),
ingest_job: format!("ingest-{}", now.timestamp_millis()),
ingest_timestamp: now,
parent_datasets: vec![],
};
let pii_count = column_meta.iter().filter(|c| c.is_pii).count();
if pii_count > 0 {
tracing::info!("auto-detected {} PII columns in '{}'", pii_count, safe_name);
}
let _ = registry.update_metadata(&safe_name, catalogd::registry::MetadataUpdate {
sensitivity: dataset_sensitivity,
columns: Some(column_meta),
lineage: Some(lineage),
row_count: Some(total_rows as u64),
..Default::default()
}).await;
// Phase C: if this dataset already had embeddings, they're now stale.
// mark_embeddings_stale is a no-op for never-embedded datasets.
let _ = registry.mark_embeddings_stale(&safe_name).await;
Ok(IngestResult {
dataset_name: safe_name,
file_type: format!("{:?}", file_type),
rows: total_rows,
columns: total_cols,
storage_key,
content_hash: hash,
schema_fingerprint: schema_fp.0,
deduplicated: false,
})
}
/// Parse plain text / SMS logs into a simple table.
fn parse_text_file(content: &[u8], filename: &str) -> Result<(SchemaRef, Vec<RecordBatch>), String> {
let text = String::from_utf8_lossy(content);
let lines: Vec<&str> = text.lines().filter(|l| !l.trim().is_empty()).collect();
if lines.is_empty() {
return Err("text file is empty".into());
}
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
arrow::datatypes::Field::new("source_file", arrow::datatypes::DataType::Utf8, false),
arrow::datatypes::Field::new("line_number", arrow::datatypes::DataType::Int32, false),
arrow::datatypes::Field::new("text", arrow::datatypes::DataType::Utf8, false),
]));
let sources: Vec<&str> = vec![filename; lines.len()];
let line_nums: Vec<i32> = (1..=lines.len() as i32).collect();
let arrays: Vec<arrow::array::ArrayRef> = vec![
Arc::new(arrow::array::StringArray::from(sources)),
Arc::new(arrow::array::Int32Array::from(line_nums)),
Arc::new(arrow::array::StringArray::from(lines)),
];
let batch = RecordBatch::try_new(schema.clone(), arrays)
.map_err(|e| format!("RecordBatch error: {e}"))?;
Ok((schema, vec![batch]))
}
fn sanitize_dataset_name(name: &str) -> String {
let clean: String = name.chars()
.map(|c| if c.is_alphanumeric() || c == '_' { c.to_ascii_lowercase() } else { '_' })
.collect();
let trimmed = clean.trim_matches('_').to_string();
if trimmed.is_empty() { "unnamed_dataset".to_string() } else { trimmed }
}