Scheduled ingest: file watcher auto-ingests from ./inbox
- Drop CSV/JSON/PDF/text into ./inbox → auto-detected → Parquet → queryable - Polls every 10 seconds (configurable) - Processed files moved to ./inbox/processed/ - Failed files moved to ./inbox/failed/ - Dedup: same file dropped twice = no-op - Watcher starts automatically on gateway boot - Tested: CSV dropped → queryable in <15s Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
04770c97eb
commit
294f3f6a49
@ -108,6 +108,13 @@ async fn main() {
|
|||||||
.allow_headers(Any))
|
.allow_headers(Any))
|
||||||
.layer(TraceLayer::new_for_http());
|
.layer(TraceLayer::new_for_http());
|
||||||
|
|
||||||
|
// File watcher — auto-ingest from ./inbox
|
||||||
|
let _watcher = ingestd::watcher::spawn_watcher(
|
||||||
|
ingestd::watcher::WatchConfig::default(),
|
||||||
|
store.clone(),
|
||||||
|
registry.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
// Start gRPC server on port+1
|
// Start gRPC server on port+1
|
||||||
let grpc_port = config.gateway.port + 1;
|
let grpc_port = config.gateway.port + 1;
|
||||||
let catalog_grpc = catalogd::grpc::CatalogGrpc::new(registry);
|
let catalog_grpc = catalogd::grpc::CatalogGrpc::new(registry);
|
||||||
|
|||||||
@ -5,3 +5,4 @@ pub mod pdf_ingest;
|
|||||||
pub mod pipeline;
|
pub mod pipeline;
|
||||||
pub mod schema_evolution;
|
pub mod schema_evolution;
|
||||||
pub mod service;
|
pub mod service;
|
||||||
|
pub mod watcher;
|
||||||
|
|||||||
122
crates/ingestd/src/watcher.rs
Normal file
122
crates/ingestd/src/watcher.rs
Normal file
@ -0,0 +1,122 @@
|
|||||||
|
/// File watcher: monitors a directory for new files and auto-ingests them.
|
||||||
|
/// Drop a CSV, JSON, PDF, or text file → auto-detected → Parquet → queryable.
|
||||||
|
/// Processed files are moved to a "processed/" subdirectory.
|
||||||
|
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::time;
|
||||||
|
|
||||||
|
use catalogd::registry::Registry;
|
||||||
|
use object_store::ObjectStore;
|
||||||
|
|
||||||
|
use crate::pipeline;
|
||||||
|
|
||||||
|
/// Watch config.
|
||||||
|
pub struct WatchConfig {
|
||||||
|
pub watch_dir: PathBuf,
|
||||||
|
pub processed_dir: PathBuf,
|
||||||
|
pub failed_dir: PathBuf,
|
||||||
|
pub poll_interval: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for WatchConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
watch_dir: PathBuf::from("./inbox"),
|
||||||
|
processed_dir: PathBuf::from("./inbox/processed"),
|
||||||
|
failed_dir: PathBuf::from("./inbox/failed"),
|
||||||
|
poll_interval: Duration::from_secs(10),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Start the file watcher as a background task.
|
||||||
|
pub fn spawn_watcher(
|
||||||
|
config: WatchConfig,
|
||||||
|
store: Arc<dyn ObjectStore>,
|
||||||
|
registry: Registry,
|
||||||
|
) -> tokio::task::JoinHandle<()> {
|
||||||
|
// Create directories
|
||||||
|
std::fs::create_dir_all(&config.watch_dir).ok();
|
||||||
|
std::fs::create_dir_all(&config.processed_dir).ok();
|
||||||
|
std::fs::create_dir_all(&config.failed_dir).ok();
|
||||||
|
|
||||||
|
tracing::info!("file watcher started: watching {}", config.watch_dir.display());
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut interval = time::interval(config.poll_interval);
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
if let Err(e) = process_inbox(&config, &store, ®istry).await {
|
||||||
|
tracing::error!("watcher error: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Scan inbox directory and process any new files.
|
||||||
|
async fn process_inbox(
|
||||||
|
config: &WatchConfig,
|
||||||
|
store: &Arc<dyn ObjectStore>,
|
||||||
|
registry: &Registry,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
let entries = std::fs::read_dir(&config.watch_dir)
|
||||||
|
.map_err(|e| format!("can't read inbox: {e}"))?;
|
||||||
|
|
||||||
|
for entry in entries {
|
||||||
|
let entry = match entry {
|
||||||
|
Ok(e) => e,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
let path = entry.path();
|
||||||
|
|
||||||
|
// Skip directories and hidden files
|
||||||
|
if path.is_dir() || path.file_name().map_or(true, |n| n.to_string_lossy().starts_with('.')) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let filename = path.file_name().unwrap().to_string_lossy().to_string();
|
||||||
|
tracing::info!("watcher: found new file '{}'", filename);
|
||||||
|
|
||||||
|
// Read file
|
||||||
|
let content = match std::fs::read(&path) {
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("watcher: can't read '{}': {e}", filename);
|
||||||
|
move_file(&path, &config.failed_dir.join(&filename));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Ingest
|
||||||
|
match pipeline::ingest_file(&filename, &content, None, store, registry).await {
|
||||||
|
Ok(result) => {
|
||||||
|
if result.deduplicated {
|
||||||
|
tracing::info!("watcher: '{}' already ingested, skipping", filename);
|
||||||
|
} else {
|
||||||
|
tracing::info!("watcher: ingested '{}' → {} ({} rows, {} cols)",
|
||||||
|
filename, result.dataset_name, result.rows, result.columns);
|
||||||
|
}
|
||||||
|
// Move to processed
|
||||||
|
move_file(&path, &config.processed_dir.join(&filename));
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("watcher: failed to ingest '{}': {e}", filename);
|
||||||
|
move_file(&path, &config.failed_dir.join(&filename));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn move_file(from: &Path, to: &Path) {
|
||||||
|
if let Err(e) = std::fs::rename(from, to) {
|
||||||
|
// rename fails across filesystems, try copy+delete
|
||||||
|
if let Err(e2) = std::fs::copy(from, to).and_then(|_| std::fs::remove_file(from)) {
|
||||||
|
tracing::warn!("can't move {} → {}: {e}, copy failed: {e2}", from.display(), to.display());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,79 @@
|
|||||||
|
{
|
||||||
|
"id": "f0b6f408-71a5-4365-bd1d-98c1e176096a",
|
||||||
|
"name": "new_candidates",
|
||||||
|
"schema_fingerprint": "0fcee1ac176b9bc45ab392bbd4401042803eb80d0de48b1fa6a5e20ffb27fa8d",
|
||||||
|
"objects": [
|
||||||
|
{
|
||||||
|
"bucket": "data",
|
||||||
|
"key": "datasets/new_candidates.parquet",
|
||||||
|
"size_bytes": 2731,
|
||||||
|
"created_at": "2026-03-28T01:04:22.787759218Z"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"created_at": "2026-03-28T01:04:22.787759835Z",
|
||||||
|
"updated_at": "2026-03-28T01:04:22.788113964Z",
|
||||||
|
"description": "",
|
||||||
|
"owner": "",
|
||||||
|
"sensitivity": "pii",
|
||||||
|
"columns": [
|
||||||
|
{
|
||||||
|
"name": "name",
|
||||||
|
"data_type": "Utf8",
|
||||||
|
"sensitivity": "pii",
|
||||||
|
"description": "",
|
||||||
|
"is_pii": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "phone",
|
||||||
|
"data_type": "Utf8",
|
||||||
|
"sensitivity": "pii",
|
||||||
|
"description": "",
|
||||||
|
"is_pii": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "email",
|
||||||
|
"data_type": "Utf8",
|
||||||
|
"sensitivity": "pii",
|
||||||
|
"description": "",
|
||||||
|
"is_pii": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "city",
|
||||||
|
"data_type": "Utf8",
|
||||||
|
"sensitivity": null,
|
||||||
|
"description": "",
|
||||||
|
"is_pii": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "state",
|
||||||
|
"data_type": "Utf8",
|
||||||
|
"sensitivity": null,
|
||||||
|
"description": "",
|
||||||
|
"is_pii": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "skills",
|
||||||
|
"data_type": "Utf8",
|
||||||
|
"sensitivity": null,
|
||||||
|
"description": "",
|
||||||
|
"is_pii": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "years",
|
||||||
|
"data_type": "Int64",
|
||||||
|
"sensitivity": null,
|
||||||
|
"description": "",
|
||||||
|
"is_pii": false
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"lineage": {
|
||||||
|
"source_system": "csv",
|
||||||
|
"source_file": "new_candidates.csv",
|
||||||
|
"ingest_job": "ingest-1774659862787",
|
||||||
|
"ingest_timestamp": "2026-03-28T01:04:22.787759218Z",
|
||||||
|
"parent_datasets": []
|
||||||
|
},
|
||||||
|
"freshness": null,
|
||||||
|
"tags": [],
|
||||||
|
"row_count": 3
|
||||||
|
}
|
||||||
BIN
data/datasets/new_candidates.parquet
Normal file
BIN
data/datasets/new_candidates.parquet
Normal file
Binary file not shown.
4
inbox/processed/new_candidates.csv
Normal file
4
inbox/processed/new_candidates.csv
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
Name,Phone,Email,City,State,Skills,Years
|
||||||
|
Sarah Connor,(312) 555-0001,sarah@gmail.com,Chicago,IL,"Python,AWS,Docker",8
|
||||||
|
John Wick,(212) 555-0002,john.wick@outlook.com,New York,NY,"Java,Spring Boot,Kubernetes",12
|
||||||
|
Ellen Ripley,(713) 555-0003,ripley@yahoo.com,Houston,TX,"C++,Linux,Embedded",15
|
||||||
|
Loading…
x
Reference in New Issue
Block a user