root 294f3f6a49 Scheduled ingest: file watcher auto-ingests from ./inbox
- Drop CSV/JSON/PDF/text into ./inbox → auto-detected → Parquet → queryable
- Polls every 10 seconds (configurable)
- Processed files moved to ./inbox/processed/
- Failed files moved to ./inbox/failed/
- Dedup: same file dropped twice = no-op
- Watcher starts automatically on gateway boot
- Tested: CSV dropped → queryable in <15s

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 20:04:40 -05:00

123 lines
3.9 KiB
Rust

/// File watcher: monitors a directory for new files and auto-ingests them.
/// Drop a CSV, JSON, PDF, or text file → auto-detected → Parquet → queryable.
/// Processed files are moved to a "processed/" subdirectory.
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use catalogd::registry::Registry;
use object_store::ObjectStore;
use crate::pipeline;
/// Watch config.
pub struct WatchConfig {
pub watch_dir: PathBuf,
pub processed_dir: PathBuf,
pub failed_dir: PathBuf,
pub poll_interval: Duration,
}
impl Default for WatchConfig {
fn default() -> Self {
Self {
watch_dir: PathBuf::from("./inbox"),
processed_dir: PathBuf::from("./inbox/processed"),
failed_dir: PathBuf::from("./inbox/failed"),
poll_interval: Duration::from_secs(10),
}
}
}
/// Start the file watcher as a background task.
pub fn spawn_watcher(
config: WatchConfig,
store: Arc<dyn ObjectStore>,
registry: Registry,
) -> tokio::task::JoinHandle<()> {
// Create directories
std::fs::create_dir_all(&config.watch_dir).ok();
std::fs::create_dir_all(&config.processed_dir).ok();
std::fs::create_dir_all(&config.failed_dir).ok();
tracing::info!("file watcher started: watching {}", config.watch_dir.display());
tokio::spawn(async move {
let mut interval = time::interval(config.poll_interval);
loop {
interval.tick().await;
if let Err(e) = process_inbox(&config, &store, &registry).await {
tracing::error!("watcher error: {e}");
}
}
})
}
/// Scan inbox directory and process any new files.
async fn process_inbox(
config: &WatchConfig,
store: &Arc<dyn ObjectStore>,
registry: &Registry,
) -> Result<(), String> {
let entries = std::fs::read_dir(&config.watch_dir)
.map_err(|e| format!("can't read inbox: {e}"))?;
for entry in entries {
let entry = match entry {
Ok(e) => e,
Err(_) => continue,
};
let path = entry.path();
// Skip directories and hidden files
if path.is_dir() || path.file_name().map_or(true, |n| n.to_string_lossy().starts_with('.')) {
continue;
}
let filename = path.file_name().unwrap().to_string_lossy().to_string();
tracing::info!("watcher: found new file '{}'", filename);
// Read file
let content = match std::fs::read(&path) {
Ok(c) => c,
Err(e) => {
tracing::error!("watcher: can't read '{}': {e}", filename);
move_file(&path, &config.failed_dir.join(&filename));
continue;
}
};
// Ingest
match pipeline::ingest_file(&filename, &content, None, store, registry).await {
Ok(result) => {
if result.deduplicated {
tracing::info!("watcher: '{}' already ingested, skipping", filename);
} else {
tracing::info!("watcher: ingested '{}' → {} ({} rows, {} cols)",
filename, result.dataset_name, result.rows, result.columns);
}
// Move to processed
move_file(&path, &config.processed_dir.join(&filename));
}
Err(e) => {
tracing::error!("watcher: failed to ingest '{}': {e}", filename);
move_file(&path, &config.failed_dir.join(&filename));
}
}
}
Ok(())
}
fn move_file(from: &Path, to: &Path) {
if let Err(e) = std::fs::rename(from, to) {
// rename fails across filesystems, try copy+delete
if let Err(e2) = std::fs::copy(from, to).and_then(|_| std::fs::remove_file(from)) {
tracing::warn!("can't move {} → {}: {e}, copy failed: {e2}", from.display(), to.display());
}
}
}