/// File watcher: monitors a directory for new files and auto-ingests them. /// Drop a CSV, JSON, PDF, or text file → auto-detected → Parquet → queryable. /// Processed files are moved to a "processed/" subdirectory. use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use tokio::time; use catalogd::registry::Registry; use object_store::ObjectStore; use crate::pipeline; /// Watch config. pub struct WatchConfig { pub watch_dir: PathBuf, pub processed_dir: PathBuf, pub failed_dir: PathBuf, pub poll_interval: Duration, } impl Default for WatchConfig { fn default() -> Self { Self { watch_dir: PathBuf::from("./inbox"), processed_dir: PathBuf::from("./inbox/processed"), failed_dir: PathBuf::from("./inbox/failed"), poll_interval: Duration::from_secs(10), } } } /// Start the file watcher as a background task. pub fn spawn_watcher( config: WatchConfig, store: Arc, registry: Registry, ) -> tokio::task::JoinHandle<()> { // Create directories std::fs::create_dir_all(&config.watch_dir).ok(); std::fs::create_dir_all(&config.processed_dir).ok(); std::fs::create_dir_all(&config.failed_dir).ok(); tracing::info!("file watcher started: watching {}", config.watch_dir.display()); tokio::spawn(async move { let mut interval = time::interval(config.poll_interval); loop { interval.tick().await; if let Err(e) = process_inbox(&config, &store, ®istry).await { tracing::error!("watcher error: {e}"); } } }) } /// Scan inbox directory and process any new files. async fn process_inbox( config: &WatchConfig, store: &Arc, registry: &Registry, ) -> Result<(), String> { let entries = std::fs::read_dir(&config.watch_dir) .map_err(|e| format!("can't read inbox: {e}"))?; for entry in entries { let entry = match entry { Ok(e) => e, Err(_) => continue, }; let path = entry.path(); // Skip directories and hidden files. Bind filename once via // let-else so the subsequent use is unwrap-free — previous // version relied on a map_or guard above + an .unwrap() here // being consistent, which is a fragile invariant. if path.is_dir() { continue; } let Some(fn_os) = path.file_name() else { continue; }; let filename = fn_os.to_string_lossy().to_string(); if filename.starts_with('.') { continue; } tracing::info!("watcher: found new file '{}'", filename); // Read file let content = match std::fs::read(&path) { Ok(c) => c, Err(e) => { tracing::error!("watcher: can't read '{}': {e}", filename); move_file(&path, &config.failed_dir.join(&filename)); continue; } }; // Ingest match pipeline::ingest_file(&filename, &content, None, store, registry).await { Ok(result) => { if result.deduplicated { tracing::info!("watcher: '{}' already ingested, skipping", filename); } else { tracing::info!("watcher: ingested '{}' → {} ({} rows, {} cols)", filename, result.dataset_name, result.rows, result.columns); } // Move to processed move_file(&path, &config.processed_dir.join(&filename)); } Err(e) => { tracing::error!("watcher: failed to ingest '{}': {e}", filename); move_file(&path, &config.failed_dir.join(&filename)); } } } Ok(()) } fn move_file(from: &Path, to: &Path) { if let Err(e) = std::fs::rename(from, to) { // rename fails across filesystems, try copy+delete if let Err(e2) = std::fs::copy(from, to).and_then(|_| std::fs::remove_file(from)) { tracing::warn!("can't move {} → {}: {e}, copy failed: {e2}", from.display(), to.display()); } } }