diff --git a/crates/gateway/src/main.rs b/crates/gateway/src/main.rs index 4a30b62..283683b 100644 --- a/crates/gateway/src/main.rs +++ b/crates/gateway/src/main.rs @@ -108,6 +108,13 @@ async fn main() { .allow_headers(Any)) .layer(TraceLayer::new_for_http()); + // File watcher — auto-ingest from ./inbox + let _watcher = ingestd::watcher::spawn_watcher( + ingestd::watcher::WatchConfig::default(), + store.clone(), + registry.clone(), + ); + // Start gRPC server on port+1 let grpc_port = config.gateway.port + 1; let catalog_grpc = catalogd::grpc::CatalogGrpc::new(registry); diff --git a/crates/ingestd/src/lib.rs b/crates/ingestd/src/lib.rs index b4485c7..e21ee5b 100644 --- a/crates/ingestd/src/lib.rs +++ b/crates/ingestd/src/lib.rs @@ -5,3 +5,4 @@ pub mod pdf_ingest; pub mod pipeline; pub mod schema_evolution; pub mod service; +pub mod watcher; diff --git a/crates/ingestd/src/watcher.rs b/crates/ingestd/src/watcher.rs new file mode 100644 index 0000000..cf342e1 --- /dev/null +++ b/crates/ingestd/src/watcher.rs @@ -0,0 +1,122 @@ +/// File watcher: monitors a directory for new files and auto-ingests them. +/// Drop a CSV, JSON, PDF, or text file → auto-detected → Parquet → queryable. +/// Processed files are moved to a "processed/" subdirectory. + +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::Duration; +use tokio::time; + +use catalogd::registry::Registry; +use object_store::ObjectStore; + +use crate::pipeline; + +/// Watch config. +pub struct WatchConfig { + pub watch_dir: PathBuf, + pub processed_dir: PathBuf, + pub failed_dir: PathBuf, + pub poll_interval: Duration, +} + +impl Default for WatchConfig { + fn default() -> Self { + Self { + watch_dir: PathBuf::from("./inbox"), + processed_dir: PathBuf::from("./inbox/processed"), + failed_dir: PathBuf::from("./inbox/failed"), + poll_interval: Duration::from_secs(10), + } + } +} + +/// Start the file watcher as a background task. +pub fn spawn_watcher( + config: WatchConfig, + store: Arc, + registry: Registry, +) -> tokio::task::JoinHandle<()> { + // Create directories + std::fs::create_dir_all(&config.watch_dir).ok(); + std::fs::create_dir_all(&config.processed_dir).ok(); + std::fs::create_dir_all(&config.failed_dir).ok(); + + tracing::info!("file watcher started: watching {}", config.watch_dir.display()); + + tokio::spawn(async move { + let mut interval = time::interval(config.poll_interval); + loop { + interval.tick().await; + if let Err(e) = process_inbox(&config, &store, ®istry).await { + tracing::error!("watcher error: {e}"); + } + } + }) +} + +/// Scan inbox directory and process any new files. +async fn process_inbox( + config: &WatchConfig, + store: &Arc, + registry: &Registry, +) -> Result<(), String> { + let entries = std::fs::read_dir(&config.watch_dir) + .map_err(|e| format!("can't read inbox: {e}"))?; + + for entry in entries { + let entry = match entry { + Ok(e) => e, + Err(_) => continue, + }; + + let path = entry.path(); + + // Skip directories and hidden files + if path.is_dir() || path.file_name().map_or(true, |n| n.to_string_lossy().starts_with('.')) { + continue; + } + + let filename = path.file_name().unwrap().to_string_lossy().to_string(); + tracing::info!("watcher: found new file '{}'", filename); + + // Read file + let content = match std::fs::read(&path) { + Ok(c) => c, + Err(e) => { + tracing::error!("watcher: can't read '{}': {e}", filename); + move_file(&path, &config.failed_dir.join(&filename)); + continue; + } + }; + + // Ingest + match pipeline::ingest_file(&filename, &content, None, store, registry).await { + Ok(result) => { + if result.deduplicated { + tracing::info!("watcher: '{}' already ingested, skipping", filename); + } else { + tracing::info!("watcher: ingested '{}' → {} ({} rows, {} cols)", + filename, result.dataset_name, result.rows, result.columns); + } + // Move to processed + move_file(&path, &config.processed_dir.join(&filename)); + } + Err(e) => { + tracing::error!("watcher: failed to ingest '{}': {e}", filename); + move_file(&path, &config.failed_dir.join(&filename)); + } + } + } + + Ok(()) +} + +fn move_file(from: &Path, to: &Path) { + if let Err(e) = std::fs::rename(from, to) { + // rename fails across filesystems, try copy+delete + if let Err(e2) = std::fs::copy(from, to).and_then(|_| std::fs::remove_file(from)) { + tracing::warn!("can't move {} → {}: {e}, copy failed: {e2}", from.display(), to.display()); + } + } +} diff --git a/data/_catalog/manifests/f0b6f408-71a5-4365-bd1d-98c1e176096a.json b/data/_catalog/manifests/f0b6f408-71a5-4365-bd1d-98c1e176096a.json new file mode 100644 index 0000000..f3bb6ca --- /dev/null +++ b/data/_catalog/manifests/f0b6f408-71a5-4365-bd1d-98c1e176096a.json @@ -0,0 +1,79 @@ +{ + "id": "f0b6f408-71a5-4365-bd1d-98c1e176096a", + "name": "new_candidates", + "schema_fingerprint": "0fcee1ac176b9bc45ab392bbd4401042803eb80d0de48b1fa6a5e20ffb27fa8d", + "objects": [ + { + "bucket": "data", + "key": "datasets/new_candidates.parquet", + "size_bytes": 2731, + "created_at": "2026-03-28T01:04:22.787759218Z" + } + ], + "created_at": "2026-03-28T01:04:22.787759835Z", + "updated_at": "2026-03-28T01:04:22.788113964Z", + "description": "", + "owner": "", + "sensitivity": "pii", + "columns": [ + { + "name": "name", + "data_type": "Utf8", + "sensitivity": "pii", + "description": "", + "is_pii": true + }, + { + "name": "phone", + "data_type": "Utf8", + "sensitivity": "pii", + "description": "", + "is_pii": true + }, + { + "name": "email", + "data_type": "Utf8", + "sensitivity": "pii", + "description": "", + "is_pii": true + }, + { + "name": "city", + "data_type": "Utf8", + "sensitivity": null, + "description": "", + "is_pii": false + }, + { + "name": "state", + "data_type": "Utf8", + "sensitivity": null, + "description": "", + "is_pii": false + }, + { + "name": "skills", + "data_type": "Utf8", + "sensitivity": null, + "description": "", + "is_pii": false + }, + { + "name": "years", + "data_type": "Int64", + "sensitivity": null, + "description": "", + "is_pii": false + } + ], + "lineage": { + "source_system": "csv", + "source_file": "new_candidates.csv", + "ingest_job": "ingest-1774659862787", + "ingest_timestamp": "2026-03-28T01:04:22.787759218Z", + "parent_datasets": [] + }, + "freshness": null, + "tags": [], + "row_count": 3 +} \ No newline at end of file diff --git a/data/datasets/new_candidates.parquet b/data/datasets/new_candidates.parquet new file mode 100644 index 0000000..34753b7 Binary files /dev/null and b/data/datasets/new_candidates.parquet differ diff --git a/inbox/processed/new_candidates.csv b/inbox/processed/new_candidates.csv new file mode 100644 index 0000000..872ad12 --- /dev/null +++ b/inbox/processed/new_candidates.csv @@ -0,0 +1,4 @@ +Name,Phone,Email,City,State,Skills,Years +Sarah Connor,(312) 555-0001,sarah@gmail.com,Chicago,IL,"Python,AWS,Docker",8 +John Wick,(212) 555-0002,john.wick@outlook.com,New York,NY,"Java,Spring Boot,Kubernetes",12 +Ellen Ripley,(713) 555-0003,ripley@yahoo.com,Houston,TX,"C++,Linux,Embedded",15