root bb05c4412e Phase 6: Ingest pipeline — CSV, JSON, PDF, text file support
- ingestd crate: detect file type → parse → schema detection → Parquet → catalog
- CSV: auto-detect column types (int, float, bool, string), handles $, %, commas
  Strips dollar signs from amounts, flexible row parsing, sanitized column names
- JSON: array or newline-delimited, nested object flattening (a.b.c → a_b_c)
- PDF: text extraction via lopdf, one row per page (source_file, page_number, text)
- Text/SMS: line-based ingestion with line numbers
- Dedup: SHA-256 content hash, re-ingest same file = no-op
- Gateway: POST /ingest/file multipart upload, 256MB body limit
- Schema detection per ADR-010: ambiguous types default to String
- 12 unit tests passing (CSV parsing, JSON flattening, type inference, dedup)
- Tested: messy CSV with missing data, dollar amounts, N/A values → queryable

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 08:07:31 -05:00

104 lines
2.9 KiB
Rust

use sha2::{Digest, Sha256};
/// Detected file type from content inspection.
#[derive(Debug, Clone, PartialEq)]
pub enum FileType {
Csv,
Json,
NdJson, // newline-delimited JSON
Pdf,
Text, // plain text, SMS logs, etc.
Unknown,
}
/// Detect file type from filename extension and content sniffing.
pub fn detect_file_type(filename: &str, content: &[u8]) -> FileType {
// Extension-based first
let lower = filename.to_lowercase();
if lower.ends_with(".csv") || lower.ends_with(".tsv") {
return FileType::Csv;
}
if lower.ends_with(".json") {
// Check if it's newline-delimited JSON
if content.iter().take(4096).filter(|&&b| b == b'\n').count() > 2 {
let first_line = content.split(|&b| b == b'\n').next().unwrap_or(b"");
if first_line.starts_with(b"{") {
return FileType::NdJson;
}
}
return FileType::Json;
}
if lower.ends_with(".ndjson") || lower.ends_with(".jsonl") {
return FileType::NdJson;
}
if lower.ends_with(".pdf") {
return FileType::Pdf;
}
if lower.ends_with(".txt") || lower.ends_with(".log") || lower.ends_with(".sms") {
return FileType::Text;
}
// Content sniffing fallback
if content.starts_with(b"%PDF") {
return FileType::Pdf;
}
if content.starts_with(b"[") || content.starts_with(b"{") {
return FileType::Json;
}
// Check if it looks like CSV (has commas and newlines in first chunk)
let sample = &content[..content.len().min(4096)];
let comma_count = sample.iter().filter(|&&b| b == b',').count();
let newline_count = sample.iter().filter(|&&b| b == b'\n').count();
if comma_count > 3 && newline_count > 1 {
return FileType::Csv;
}
// If it's valid UTF-8, treat as text
if std::str::from_utf8(sample).is_ok() {
return FileType::Text;
}
FileType::Unknown
}
/// Compute SHA-256 hash of content for deduplication.
pub fn content_hash(content: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(content);
format!("{:x}", hasher.finalize())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn detect_csv_by_extension() {
assert_eq!(detect_file_type("data.csv", b"a,b,c\n1,2,3"), FileType::Csv);
}
#[test]
fn detect_json_by_extension() {
assert_eq!(detect_file_type("data.json", b"[{\"a\":1}]"), FileType::Json);
}
#[test]
fn detect_pdf_by_magic() {
assert_eq!(detect_file_type("unknown", b"%PDF-1.4 blah"), FileType::Pdf);
}
#[test]
fn detect_csv_by_content() {
let csv = b"name,age,city\nAlice,30,NYC\nBob,25,LA\n";
assert_eq!(detect_file_type("unknown.dat", csv), FileType::Csv);
}
#[test]
fn content_hash_deterministic() {
let h1 = content_hash(b"hello world");
let h2 = content_hash(b"hello world");
assert_eq!(h1, h2);
}
}