//! Scheduled ingest — the production-substrate piece that turns the //! lakehouse from "manual API toolkit" into a system that runs itself. //! //! A `ScheduleDef` declares "what to ingest" + "when to fire." A //! background `Scheduler` task wakes periodically, asks each enabled //! schedule whether it's due, and runs the matching ingest path. Outcomes //! land back on the schedule so operators can see what happened without //! `journalctl` archaeology. //! //! Storage shape: one JSON file per schedule at //! `primary://_schedules/{id}.json`. Serializable via serde, rebuilt on //! startup. Same write-once shape as the rest of the catalog — no DB. //! //! Concurrency model: single-flight per schedule_id. The scheduler holds //! a `HashSet` of currently-running ids; a tick that finds a //! schedule already in flight skips it (no queueing — a slow ingest //! shouldn't pile up runs). Different schedules run concurrently. //! //! What's deliberately not in scope here: //! - Sub-minute cron precision. 5-field Unix cron is supported; seconds //! are pinned to 0. Intervals cover sub-minute cases; cron is //! easy to bolt on later. //! - Backoff / retry policies. A failed run records the failure and //! schedules `next_run_at` as if it succeeded — no exponential backoff. //! - Distributed coordination. Single-machine scheduler; no leader //! election. If you run two gateway instances they'll both fire. use chrono::{DateTime, Duration, Utc}; use object_store::ObjectStore; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use tokio::sync::RwLock; use storaged::ops; const PREFIX: &str = "_schedules"; // =================== Public types =================== /// What kind of ingest the schedule fires. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum ScheduleKind { /// Pull a MySQL/MariaDB table via the my_stream path. Mysql { /// `mysql://user:pass@host:port/db` — DSN with full credentials. /// Stored encrypted-at-rest is a future concern; for now it lives /// in plain JSON in the catalog. Don't put production root creds /// here without disk-level encryption. dsn: String, table: String, #[serde(default)] dataset_name: Option, #[serde(default)] batch_size: Option, #[serde(default)] order_by: Option, }, /// Pull a Postgres table via the pg_stream path. Postgres { dsn: String, table: String, #[serde(default)] dataset_name: Option, #[serde(default)] batch_size: Option, #[serde(default)] order_by: Option, }, } impl ScheduleKind { pub fn label(&self) -> String { match self { ScheduleKind::Mysql { table, .. } => format!("mysql:{table}"), ScheduleKind::Postgres { table, .. } => format!("postgres:{table}"), } } } /// When the schedule fires. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum ScheduleTrigger { /// Run every N seconds. Time anchor = compute_next_run_at decision. Interval { secs: u64 }, /// 5-field Unix cron expression: "min hour dom month dow". /// Seconds are implicitly 0 (minute-granularity scheduling). /// Examples: "15 14 * * *" = 14:15 UTC daily, "0 */6 * * *" = every /// 6 hours on the hour, "0 9 * * 1-5" = 09:00 UTC weekdays. /// Timezone: UTC always — the scheduler's clock is UTC and we don't /// carry a per-schedule timezone field. Cron { expr: String }, } /// Outcome of a single run, kept on the schedule for observability. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RunOutcome { pub at: DateTime, pub success: bool, pub message: String, pub rows: Option, pub duration_secs: f32, } /// One scheduled ingest definition. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ScheduleDef { pub id: String, pub kind: ScheduleKind, pub trigger: ScheduleTrigger, pub enabled: bool, pub created_at: DateTime, #[serde(default)] pub created_by: String, /// When the next run is due. Recomputed after each completion via /// `compute_next_run_at`. On creation: set to "now" so the schedule /// fires on the next scheduler tick. pub next_run_at: DateTime, #[serde(default)] pub last_run_at: Option>, #[serde(default)] pub last_outcome: Option, /// Total successful runs and total failures since creation. Useful /// for at-a-glance "is this schedule healthy?" checks. #[serde(default)] pub run_count: u64, #[serde(default)] pub failure_count: u64, } impl ScheduleDef { /// True if this schedule should fire at `now`. Honors `enabled`. pub fn is_due(&self, now: DateTime) -> bool { self.enabled && now >= self.next_run_at } } // =================== Trigger semantics =================== // // J: THIS IS YOURS TO IMPLEMENT // ===================================================== // // `compute_next_run_at` decides when a schedule fires next. The current // state is the "since last completion" semantics — simple but has real // trade-offs vs other strategies: // // 1. SINCE LAST COMPLETION (current default below): // next = completed_at + interval // Easy to reason about. A long-running ingest delays the next fire // by however long it took. Drift is bounded — the schedule never // "falls behind" trying to catch up. // // 2. ANCHORED INTERVAL: // next = previous_target + interval, regardless of when run finished // Keeps wall-clock alignment ("every 30min on the :00 and :30"). // Risk: if ingest takes longer than the interval, the next run is // "due immediately" and you can fire back-to-back forever. // // 3. ANCHORED + SKIP: // Anchored, but skip a tick if the previous one is still running. // Best of both, slightly more code. // // 4. CRON: // Parse the expression, find the next match after `now`. Most // expressive but pulls in a cron parser dep. // pub fn compute_next_run_at( trigger: &ScheduleTrigger, completed_at: DateTime, _previous_next: DateTime, ) -> DateTime { match trigger { ScheduleTrigger::Interval { secs } => { completed_at + Duration::seconds(*secs as i64) } ScheduleTrigger::Cron { expr } => { next_cron_fire(expr, completed_at) // Creation-time validation (see `validate_trigger`) rejects // unparseable expressions, so this fallback is only reached // if the schedule JSON was hand-edited on disk. One-hour // window keeps a bad expr from fire-looping the system. .unwrap_or(completed_at + Duration::hours(1)) } } } /// Parse a Vixie/POSIX cron expression and return the next fire time /// strictly after `after`. Accepts 5-field (`min hour dom month dow`) and /// 6-field (with leading `sec`) forms natively — croner handles both. /// Day-of-week follows Unix convention: 0=Sun, 1=Mon, …, 6=Sat (7=Sun as /// synonym). Returns None if the expression fails to parse or produces no /// upcoming match (which can happen for year-bounded patterns in the past). fn next_cron_fire(expr: &str, after: DateTime) -> Option> { use std::str::FromStr; let cron = croner::Cron::from_str(expr).ok()?; cron.find_next_occurrence(&after, false).ok() } /// Reject a ScheduleTrigger whose content can't be interpreted — used by /// the HTTP handlers to fail fast at create/patch time rather than falling /// back silently at fire time. Interval triggers get a sanity gate /// (no zero secs); cron triggers get a full parse through croner. pub fn validate_trigger(trigger: &ScheduleTrigger) -> Result<(), String> { match trigger { ScheduleTrigger::Interval { secs } => { if *secs == 0 { return Err("interval secs must be > 0".into()); } Ok(()) } ScheduleTrigger::Cron { expr } => { use std::str::FromStr; croner::Cron::from_str(expr) .map_err(|e| format!("invalid cron expression '{expr}': {e}"))?; Ok(()) } } } // =================== Persistence =================== /// Storage path for a schedule's JSON record. fn schedule_key(id: &str) -> String { let safe: String = id .chars() .map(|c| if c.is_ascii_alphanumeric() || c == '_' || c == '-' { c } else { '_' }) .collect(); format!("{PREFIX}/{safe}.json") } /// In-memory + on-disk schedule registry. The Scheduler reads from here; /// HTTP CRUD writes through here. #[derive(Clone)] pub struct ScheduleStore { /// Bucket where schedule JSON lives. Always primary today — a /// future enhancement could move to per-profile schedule buckets. store: Arc, schedules: Arc>>, } impl ScheduleStore { pub fn new(store: Arc) -> Self { Self { store, schedules: Arc::new(RwLock::new(HashMap::new())), } } /// Load every persisted schedule into memory. Call on startup. pub async fn rebuild(&self) -> Result { let keys = ops::list(&self.store, Some(&format!("{PREFIX}/"))) .await .unwrap_or_default(); let mut count = 0usize; for key in keys { if !key.ends_with(".json") { continue; } match ops::get(&self.store, &key).await { Ok(bytes) => match serde_json::from_slice::(&bytes) { Ok(def) => { self.schedules.write().await.insert(def.id.clone(), def); count += 1; } Err(e) => tracing::warn!("schedule {key}: malformed: {e}"), }, Err(e) => tracing::warn!("schedule {key}: read failed: {e}"), } } Ok(count) } pub async fn list(&self) -> Vec { let mut out: Vec = self.schedules.read().await.values().cloned().collect(); out.sort_by(|a, b| a.id.cmp(&b.id)); out } pub async fn get(&self, id: &str) -> Option { self.schedules.read().await.get(id).cloned() } pub async fn put(&self, def: ScheduleDef) -> Result { let json = serde_json::to_vec_pretty(&def).map_err(|e| e.to_string())?; ops::put(&self.store, &schedule_key(&def.id), json.into()).await?; self.schedules.write().await.insert(def.id.clone(), def.clone()); Ok(def) } pub async fn delete(&self, id: &str) -> Result { // Object_store's delete is idempotent — silently OKs missing keys. let _ = ops::delete(&self.store, &schedule_key(id)).await; Ok(self.schedules.write().await.remove(id).is_some()) } /// Update completion metadata + persist. Used by the scheduler /// after a run finishes (success or failure). pub async fn record_run( &self, id: &str, outcome: RunOutcome, ) -> Result<(), String> { let mut guard = self.schedules.write().await; let Some(def) = guard.get_mut(id) else { return Err(format!("schedule '{id}' not registered")); }; def.last_run_at = Some(outcome.at); if outcome.success { def.run_count += 1; } else { def.failure_count += 1; } def.next_run_at = compute_next_run_at(&def.trigger, outcome.at, def.next_run_at); def.last_outcome = Some(outcome); let json = serde_json::to_vec_pretty(def).map_err(|e| e.to_string())?; let key = schedule_key(id); let store = self.store.clone(); // Drop the write lock before async I/O. drop(guard); ops::put(&store, &key, json.into()).await?; Ok(()) } } // =================== Scheduler task =================== /// Long-running task that fires due schedules. Spawned at gateway /// startup. Stop semantics: drop the handle (the loop exits when its /// store is the only reference left, or sooner if you wire a stop signal). pub struct Scheduler { pub store: ScheduleStore, pub ingest: SchedulerIngestDeps, /// Polling cadence for the loop itself. The schedules' own intervals /// are independent — this just controls how granularly we check /// "is anything due?" Default 10s. pub tick_secs: u64, } /// What the scheduler needs to actually run an ingest. We pass in only /// what's needed — keeps the dep set narrow. #[derive(Clone)] pub struct SchedulerIngestDeps { pub store: Arc, pub registry: catalogd::registry::Registry, pub buckets: Arc, /// Optional Phase 16.5 hook — push a DatasetAppended event if any /// HNSW index is bound to the dataset that just got refreshed. pub agent_handle: Option, pub index_registry: vectord::index_registry::IndexRegistry, } impl Scheduler { /// Spawn the loop in the background. Returns immediately. pub fn spawn(self) { tokio::spawn(async move { self.run().await; }); } async fn run(self) { tracing::info!("scheduler started — tick={}s", self.tick_secs); let in_flight: Arc>> = Arc::new(RwLock::new(HashSet::new())); loop { tokio::time::sleep(std::time::Duration::from_secs(self.tick_secs)).await; let now = Utc::now(); let due: Vec = self.store.list().await .into_iter() .filter(|s| s.is_due(now)) .collect(); if due.is_empty() { continue; } for def in due { { let in_flight_guard = in_flight.read().await; if in_flight_guard.contains(&def.id) { // Previous run still going — skip this tick. // We don't reschedule; next tick re-evaluates. continue; } } let store = self.store.clone(); let ingest = self.ingest.clone(); let in_flight = in_flight.clone(); let id = def.id.clone(); in_flight.write().await.insert(id.clone()); tokio::spawn(async move { let t0 = std::time::Instant::now(); let outcome = run_schedule(&def, &ingest).await; let elapsed = t0.elapsed().as_secs_f32(); let outcome = RunOutcome { at: Utc::now(), success: outcome.is_ok(), message: match &outcome { Ok(s) => s.clone(), Err(e) => e.clone(), }, rows: outcome.as_ref().ok().and_then(parse_rows_from_message), duration_secs: elapsed, }; if let Err(e) = store.record_run(&id, outcome).await { tracing::warn!("scheduler: record_run({id}): {e}"); } in_flight.write().await.remove(&id); }); } } } } /// Pluck a "rows=N" hint out of a success message — best-effort, no /// hard contract. Lets us surface row counts in the schedule status /// endpoint without forcing every ingest path to return structured /// stats here. fn parse_rows_from_message(msg: &String) -> Option { msg.split_whitespace() .find_map(|tok| tok.strip_prefix("rows=").and_then(|n| n.parse().ok())) } /// Fire one schedule's ingest. Returns Ok(success_message) or Err(reason). pub async fn run_schedule( def: &ScheduleDef, deps: &SchedulerIngestDeps, ) -> Result { tracing::info!("scheduler: firing '{}' ({})", def.id, def.kind.label()); match &def.kind { ScheduleKind::Mysql { dsn, table, dataset_name, batch_size, order_by } => { let req = crate::my_stream::MyStreamRequest { dsn: dsn.clone(), table: table.clone(), dataset_name: dataset_name.clone(), batch_size: *batch_size, order_by: order_by.clone(), limit: None, }; run_mysql(req, deps).await } ScheduleKind::Postgres { dsn, table, dataset_name, batch_size, order_by } => { let req = crate::pg_stream::PgStreamRequest { dsn: dsn.clone(), table: table.clone(), dataset_name: dataset_name.clone(), batch_size: *batch_size, order_by: order_by.clone(), limit: None, }; run_postgres(req, deps).await } } } async fn run_mysql( req: crate::my_stream::MyStreamRequest, deps: &SchedulerIngestDeps, ) -> Result { let (parquet, result) = crate::my_stream::stream_table_to_parquet(&req).await?; if result.rows == 0 { return Ok(format!("rows=0 — table empty")); } persist_and_register( &parquet, &result.table, req.dataset_name.as_deref(), result.rows, "mysql", &req.dsn, deps, ).await } async fn run_postgres( req: crate::pg_stream::PgStreamRequest, deps: &SchedulerIngestDeps, ) -> Result { let (parquet, result) = crate::pg_stream::stream_table_to_parquet(&req).await?; if result.rows == 0 { return Ok(format!("rows=0 — table empty")); } persist_and_register( &parquet, &result.table, req.dataset_name.as_deref(), result.rows, "postgresql", &req.dsn, deps, ).await } /// Shared write+register path. Mirrors the HTTP ingest handlers in /// service.rs so the scheduled path produces identical catalog output: /// PII detection, lineage with redacted DSN, mark-stale + agent trigger. async fn persist_and_register( parquet: &bytes::Bytes, table: &str, dataset_name_override: Option<&str>, rows: usize, source_system: &str, dsn: &str, deps: &SchedulerIngestDeps, ) -> Result { use shared::types::{ColumnMeta, Lineage, ObjectRef, SchemaFingerprint, Sensitivity}; let (schema, _) = shared::arrow_helpers::parquet_to_record_batches(parquet) .map_err(|e| format!("reparse parquet: {e}"))?; let dataset_name = dataset_name_override.unwrap_or(table).to_string(); let storage_key = format!("datasets/{}.parquet", dataset_name); let parquet_size = parquet.len() as u64; let bucket = "primary".to_string(); let target_store = deps.buckets.get(&bucket)?; ops::put(&target_store, &storage_key, parquet.clone()).await?; let schema_fp = shared::arrow_helpers::fingerprint_schema(&schema); let now = Utc::now(); deps.registry.register( dataset_name.clone(), SchemaFingerprint(schema_fp.0), vec![ObjectRef { bucket: bucket.clone(), key: storage_key.clone(), size_bytes: parquet_size, created_at: now, }], ).await?; let col_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); let sensitivity = shared::pii::detect_dataset_sensitivity(&col_names); let columns: Vec = schema.fields().iter().map(|f| { let sens = shared::pii::detect_sensitivity(f.name()); ColumnMeta { name: f.name().clone(), data_type: f.data_type().to_string(), sensitivity: sens.clone(), description: String::new(), is_pii: matches!(sens, Some(Sensitivity::Pii)), } }).collect(); let lineage = Lineage { source_system: source_system.to_string(), source_file: format!("dsn: {}", redact_dsn(dsn)), ingest_job: format!("scheduled-{}-{}", source_system, now.timestamp_millis()), ingest_timestamp: now, parent_datasets: vec![], }; let _ = deps.registry.update_metadata(&dataset_name, catalogd::registry::MetadataUpdate { sensitivity, columns: Some(columns), lineage: Some(lineage), row_count: Some(rows as u64), ..Default::default() }).await; let _ = deps.registry.mark_embeddings_stale(&dataset_name).await; if let Some(agent) = &deps.agent_handle { let bound = deps.index_registry.list(Some(&dataset_name), None).await; for meta in bound { let event = vectord::agent::TriggerEvent::dataset_appended( meta.index_name.clone(), &dataset_name, ); let _ = agent.enqueue(event).await; } } Ok(format!("rows={rows} dataset={dataset_name} bytes={parquet_size}")) } /// Same generic redactor used in service.rs — duplicated here to avoid /// pulling service::redact_dsn behind a `pub` visibility just for this. fn redact_dsn(dsn: &str) -> String { let scheme_end = match dsn.find("://") { Some(i) => i + 3, None => return dsn.to_string(), }; let at_idx = match dsn.rfind('@') { Some(i) if i > scheme_end => i, _ => return dsn.to_string(), }; let userpass = &dsn[scheme_end..at_idx]; let colon_offset = match userpass.find(':') { Some(i) => i, None => return dsn.to_string(), }; let colon_idx = scheme_end + colon_offset; format!("{}:***{}", &dsn[..colon_idx], &dsn[at_idx..]) } // =================== Tests =================== #[cfg(test)] mod tests { use super::*; use chrono::Timelike; fn mk(secs: u64) -> ScheduleDef { ScheduleDef { id: "test".into(), kind: ScheduleKind::Mysql { dsn: "mysql://x@host/db".into(), table: "t".into(), dataset_name: None, batch_size: None, order_by: None, }, trigger: ScheduleTrigger::Interval { secs }, enabled: true, created_at: Utc::now(), created_by: "tester".into(), next_run_at: Utc::now(), last_run_at: None, last_outcome: None, run_count: 0, failure_count: 0, } } #[test] fn is_due_respects_enabled() { let mut s = mk(60); s.next_run_at = Utc::now() - Duration::seconds(5); assert!(s.is_due(Utc::now())); s.enabled = false; assert!(!s.is_due(Utc::now())); } #[test] fn is_due_respects_future_time() { let mut s = mk(60); s.next_run_at = Utc::now() + Duration::seconds(60); assert!(!s.is_due(Utc::now())); } #[test] fn next_run_since_last_completion_advances() { let s = mk(120); let completed = Utc::now(); let next = compute_next_run_at(&s.trigger, completed, completed); let delta = next - completed; assert_eq!(delta.num_seconds(), 120); } #[test] fn cron_every_minute_fires_within_sixty_seconds() { let trig = ScheduleTrigger::Cron { expr: "* * * * *".into() }; let now = Utc::now(); let next = compute_next_run_at(&trig, now, now); let delta = next - now; assert!(delta.num_seconds() > 0 && delta.num_seconds() <= 60, "expected next fire within 60s, got {}s", delta.num_seconds()); } #[test] fn cron_daily_at_1415_utc_is_within_24h() { // 14:15 UTC daily — whether we're before or after 14:15 today, the // next fire is at most 24h out. let trig = ScheduleTrigger::Cron { expr: "15 14 * * *".into() }; let now: DateTime = "2026-04-20T10:00:00Z".parse().unwrap(); let next = compute_next_run_at(&trig, now, now); assert!(next > now); assert!((next - now).num_hours() <= 24); // And the minute-of-hour is 15. assert_eq!(next.minute(), 15); assert_eq!(next.hour(), 14); } #[test] fn cron_weekday_skips_weekend() { // 09:00 UTC weekdays only. On a Saturday, next fire is Monday. let trig = ScheduleTrigger::Cron { expr: "0 9 * * 1-5".into() }; // 2026-04-18 is a Saturday. let sat: DateTime = "2026-04-18T10:00:00Z".parse().unwrap(); let next = compute_next_run_at(&trig, sat, sat); // Monday 2026-04-20 at 09:00 UTC. assert_eq!(next, "2026-04-20T09:00:00Z".parse::>().unwrap()); } #[test] fn cron_six_field_seconds_granularity() { // 6-field (seconds included): fire every minute at 30s past the minute. let trig = ScheduleTrigger::Cron { expr: "30 * * * * *".into() }; let now = Utc::now(); let next = compute_next_run_at(&trig, now, now); assert_eq!(next.second(), 30); } #[test] fn validate_rejects_bad_cron() { let bad = ScheduleTrigger::Cron { expr: "not a cron".into() }; let err = validate_trigger(&bad).unwrap_err(); assert!(err.contains("cron"), "error should mention cron, got: {err}"); } #[test] fn validate_rejects_wrong_field_count() { // 4 fields — neither 5 nor 6. let bad = ScheduleTrigger::Cron { expr: "* * * *".into() }; assert!(validate_trigger(&bad).is_err()); } #[test] fn validate_rejects_zero_interval() { let bad = ScheduleTrigger::Interval { secs: 0 }; assert!(validate_trigger(&bad).is_err()); } #[test] fn validate_accepts_good_cron() { let good = ScheduleTrigger::Cron { expr: "0 */6 * * *".into() }; assert!(validate_trigger(&good).is_ok()); } #[test] fn schedule_kind_label() { let k = ScheduleKind::Mysql { dsn: "x".into(), table: "customers".into(), dataset_name: None, batch_size: None, order_by: None, }; assert_eq!(k.label(), "mysql:customers"); } #[test] fn redact_dsn_handles_mysql_and_postgres() { assert_eq!(redact_dsn("mysql://u:secret@h/db"), "mysql://u:***@h/db"); assert_eq!(redact_dsn("postgresql://u:secret@h:5432/db"), "postgresql://u:***@h:5432/db"); assert_eq!(redact_dsn("mysql://u@h/db"), "mysql://u@h/db"); // no password } }