use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use bytes::Bytes; use object_store::ObjectStore; use std::sync::Arc; use catalogd::registry::Registry; use shared::arrow_helpers::{fingerprint_schema, record_batch_to_parquet}; use shared::types::{ObjectRef, SchemaFingerprint}; use storaged::ops; use crate::detect::{FileType, content_hash, detect_file_type}; use crate::csv_ingest; use crate::json_ingest; use crate::pdf_ingest; /// Result of an ingest operation. #[derive(Debug, Clone, serde::Serialize)] pub struct IngestResult { pub dataset_name: String, pub file_type: String, pub rows: usize, pub columns: usize, pub storage_key: String, pub content_hash: String, pub schema_fingerprint: String, pub deduplicated: bool, } /// Full ingest pipeline: detect → parse → dedup → store → register. /// /// Bucket name (federation layer 2) is recorded on the ObjectRef so the /// catalog knows which bucket holds the data — defaults to "primary" via /// the ingest service's resolve_bucket helper. pub async fn ingest_file( filename: &str, content: &[u8], dataset_name: Option<&str>, store: &Arc, registry: &Registry, ) -> Result { ingest_file_to_bucket(filename, content, dataset_name, "primary", store, registry).await } /// Same as `ingest_file` but with explicit target bucket name on the /// ObjectRef. Header-aware ingest endpoints call this directly. pub async fn ingest_file_to_bucket( filename: &str, content: &[u8], dataset_name: Option<&str>, bucket: &str, store: &Arc, registry: &Registry, ) -> Result { // 1. Detect file type let file_type = detect_file_type(filename, content); tracing::info!("ingesting '{}' as {:?} ({} bytes)", filename, file_type, content.len()); // 2. Parse into Arrow let (schema, batches) = match file_type { FileType::Csv => csv_ingest::parse_csv(content)?, FileType::Json | FileType::NdJson => json_ingest::parse_json(content)?, FileType::Pdf => pdf_ingest::parse_pdf(content, filename)?, FileType::Text => parse_text_file(content, filename)?, FileType::Unknown => return Err(format!("unknown file type for '{filename}'")), }; let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); let total_cols = schema.fields().len(); if total_rows == 0 { return Err("file contains no data".into()); } // 3. Content hash for dedup let hash = content_hash(content); // 4. Check if already ingested (same content hash) let existing = registry.list().await; if existing.iter().any(|d| d.schema_fingerprint.0 == hash) { tracing::info!("file already ingested (hash: {}), skipping", &hash[..12]); return Ok(IngestResult { dataset_name: dataset_name.unwrap_or(filename).to_string(), file_type: format!("{:?}", file_type), rows: total_rows, columns: total_cols, storage_key: String::new(), content_hash: hash, schema_fingerprint: String::new(), deduplicated: true, }); } // 5. Convert to Parquet let mut all_parquet = Vec::new(); for batch in &batches { let pq = record_batch_to_parquet(batch)?; all_parquet.extend_from_slice(&pq); } let parquet_bytes = Bytes::from(all_parquet); let parquet_size = parquet_bytes.len() as u64; // 6. Store in object storage let name = dataset_name.unwrap_or_else(|| { filename.rsplit('/').next().unwrap_or(filename) .rsplit('.').last().unwrap_or(filename) }); let safe_name = sanitize_dataset_name(name); let storage_key = format!("datasets/{}.parquet", safe_name); ops::put(store, &storage_key, parquet_bytes).await?; tracing::info!("stored {} as {} ({} bytes)", filename, storage_key, parquet_size); // 7. Register in catalog with rich metadata let schema_fp = fingerprint_schema(&schema); let now = chrono::Utc::now(); let obj_ref = ObjectRef { bucket: bucket.to_string(), key: storage_key.clone(), size_bytes: parquet_size, created_at: now, }; // Register base dataset registry.register( safe_name.clone(), SchemaFingerprint(hash.clone()), vec![obj_ref], ).await?; // Auto-populate metadata: PII detection, column info, lineage, row count let col_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); let dataset_sensitivity = shared::pii::detect_dataset_sensitivity(&col_names); let column_meta: Vec = schema.fields().iter().map(|f| { let sens = shared::pii::detect_sensitivity(f.name()); shared::types::ColumnMeta { name: f.name().clone(), data_type: f.data_type().to_string(), sensitivity: sens.clone(), description: String::new(), is_pii: matches!(sens, Some(shared::types::Sensitivity::Pii)), } }).collect(); let lineage = shared::types::Lineage { source_system: format!("{:?}", file_type).to_lowercase(), source_file: filename.to_string(), ingest_job: format!("ingest-{}", now.timestamp_millis()), ingest_timestamp: now, parent_datasets: vec![], }; let pii_count = column_meta.iter().filter(|c| c.is_pii).count(); if pii_count > 0 { tracing::info!("auto-detected {} PII columns in '{}'", pii_count, safe_name); } let _ = registry.update_metadata(&safe_name, catalogd::registry::MetadataUpdate { sensitivity: dataset_sensitivity, columns: Some(column_meta), lineage: Some(lineage), row_count: Some(total_rows as u64), ..Default::default() }).await; // Phase C: if this dataset already had embeddings, they're now stale. // mark_embeddings_stale is a no-op for never-embedded datasets. let _ = registry.mark_embeddings_stale(&safe_name).await; Ok(IngestResult { dataset_name: safe_name, file_type: format!("{:?}", file_type), rows: total_rows, columns: total_cols, storage_key, content_hash: hash, schema_fingerprint: schema_fp.0, deduplicated: false, }) } /// Parse plain text / SMS logs into a simple table. fn parse_text_file(content: &[u8], filename: &str) -> Result<(SchemaRef, Vec), String> { let text = String::from_utf8_lossy(content); let lines: Vec<&str> = text.lines().filter(|l| !l.trim().is_empty()).collect(); if lines.is_empty() { return Err("text file is empty".into()); } let schema = Arc::new(arrow::datatypes::Schema::new(vec![ arrow::datatypes::Field::new("source_file", arrow::datatypes::DataType::Utf8, false), arrow::datatypes::Field::new("line_number", arrow::datatypes::DataType::Int32, false), arrow::datatypes::Field::new("text", arrow::datatypes::DataType::Utf8, false), ])); let sources: Vec<&str> = vec![filename; lines.len()]; let line_nums: Vec = (1..=lines.len() as i32).collect(); let arrays: Vec = vec![ Arc::new(arrow::array::StringArray::from(sources)), Arc::new(arrow::array::Int32Array::from(line_nums)), Arc::new(arrow::array::StringArray::from(lines)), ]; let batch = RecordBatch::try_new(schema.clone(), arrays) .map_err(|e| format!("RecordBatch error: {e}"))?; Ok((schema, vec![batch])) } fn sanitize_dataset_name(name: &str) -> String { let clean: String = name.chars() .map(|c| if c.is_alphanumeric() || c == '_' { c.to_ascii_lowercase() } else { '_' }) .collect(); let trimmed = clean.trim_matches('_').to_string(); if trimmed.is_empty() { "unnamed_dataset".to_string() } else { trimmed } }