diff --git a/crates/gateway/src/main.rs b/crates/gateway/src/main.rs index 14e97f1..722c098 100644 --- a/crates/gateway/src/main.rs +++ b/crates/gateway/src/main.rs @@ -118,12 +118,34 @@ async fn main() { .nest("/catalog", catalogd::service::router(registry.clone())) .nest("/query", queryd::service::router(engine.clone())) .nest("/ai", aibridge::service::router(ai_client.clone())) + ; + + // Phase E: scheduled ingest + let sched_store = ingestd::schedule::ScheduleStore::new(store.clone()); + if let Ok(n) = sched_store.rebuild().await { + if n > 0 { tracing::info!("rebuilt {n} persisted schedule(s)"); } + } + let scheduler = ingestd::schedule::Scheduler { + store: sched_store.clone(), + ingest: ingestd::schedule::SchedulerIngestDeps { + store: store.clone(), + registry: registry.clone(), + buckets: bucket_registry.clone(), + agent_handle: Some(agent_handle.clone()), + index_registry: index_reg.clone(), + }, + tick_secs: 10, + }; + scheduler.spawn(); + + app = app .nest("/ingest", ingestd::service::router(ingestd::service::IngestState { store: store.clone(), registry: registry.clone(), buckets: bucket_registry.clone(), agent_handle: agent_handle.clone(), index_registry: index_reg.clone(), + schedules: sched_store, })) .nest("/vectors", vectord::service::router(vectord::service::VectorState { store: store.clone(), diff --git a/crates/ingestd/src/lib.rs b/crates/ingestd/src/lib.rs index fc0c0f3..5c857a3 100644 --- a/crates/ingestd/src/lib.rs +++ b/crates/ingestd/src/lib.rs @@ -1,6 +1,7 @@ pub mod db_ingest; pub mod my_stream; pub mod pg_stream; +pub mod schedule; pub mod detect; pub mod csv_ingest; pub mod json_ingest; diff --git a/crates/ingestd/src/schedule.rs b/crates/ingestd/src/schedule.rs new file mode 100644 index 0000000..b260ade --- /dev/null +++ b/crates/ingestd/src/schedule.rs @@ -0,0 +1,626 @@ +//! Scheduled ingest — the production-substrate piece that turns the +//! lakehouse from "manual API toolkit" into a system that runs itself. +//! +//! A `ScheduleDef` declares "what to ingest" + "when to fire." A +//! background `Scheduler` task wakes periodically, asks each enabled +//! schedule whether it's due, and runs the matching ingest path. Outcomes +//! land back on the schedule so operators can see what happened without +//! `journalctl` archaeology. +//! +//! Storage shape: one JSON file per schedule at +//! `primary://_schedules/{id}.json`. Serializable via serde, rebuilt on +//! startup. Same write-once shape as the rest of the catalog — no DB. +//! +//! Concurrency model: single-flight per schedule_id. The scheduler holds +//! a `HashSet` of currently-running ids; a tick that finds a +//! schedule already in flight skips it (no queueing — a slow ingest +//! shouldn't pile up runs). Different schedules run concurrently. +//! +//! What's deliberately not in scope here: +//! - Cron expressions (the trigger enum has the variant but parsing is +//! stubbed). Intervals cover 90% of operational scheduling; cron is +//! easy to bolt on later. +//! - Backoff / retry policies. A failed run records the failure and +//! schedules `next_run_at` as if it succeeded — no exponential backoff. +//! - Distributed coordination. Single-machine scheduler; no leader +//! election. If you run two gateway instances they'll both fire. + +use chrono::{DateTime, Duration, Utc}; +use object_store::ObjectStore; +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use tokio::sync::RwLock; + +use storaged::ops; + +const PREFIX: &str = "_schedules"; + +// =================== Public types =================== + +/// What kind of ingest the schedule fires. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ScheduleKind { + /// Pull a MySQL/MariaDB table via the my_stream path. + Mysql { + /// `mysql://user:pass@host:port/db` — DSN with full credentials. + /// Stored encrypted-at-rest is a future concern; for now it lives + /// in plain JSON in the catalog. Don't put production root creds + /// here without disk-level encryption. + dsn: String, + table: String, + #[serde(default)] + dataset_name: Option, + #[serde(default)] + batch_size: Option, + #[serde(default)] + order_by: Option, + }, + /// Pull a Postgres table via the pg_stream path. + Postgres { + dsn: String, + table: String, + #[serde(default)] + dataset_name: Option, + #[serde(default)] + batch_size: Option, + #[serde(default)] + order_by: Option, + }, +} + +impl ScheduleKind { + pub fn label(&self) -> String { + match self { + ScheduleKind::Mysql { table, .. } => format!("mysql:{table}"), + ScheduleKind::Postgres { table, .. } => format!("postgres:{table}"), + } + } +} + +/// When the schedule fires. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ScheduleTrigger { + /// Run every N seconds. Time anchor = compute_next_run_at decision. + Interval { secs: u64 }, + /// Cron expression — parsing not implemented yet. Defining the + /// variant now so the JSON shape is forward-compatible. + Cron { expr: String }, +} + +/// Outcome of a single run, kept on the schedule for observability. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RunOutcome { + pub at: DateTime, + pub success: bool, + pub message: String, + pub rows: Option, + pub duration_secs: f32, +} + +/// One scheduled ingest definition. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ScheduleDef { + pub id: String, + pub kind: ScheduleKind, + pub trigger: ScheduleTrigger, + pub enabled: bool, + pub created_at: DateTime, + #[serde(default)] + pub created_by: String, + /// When the next run is due. Recomputed after each completion via + /// `compute_next_run_at`. On creation: set to "now" so the schedule + /// fires on the next scheduler tick. + pub next_run_at: DateTime, + #[serde(default)] + pub last_run_at: Option>, + #[serde(default)] + pub last_outcome: Option, + /// Total successful runs and total failures since creation. Useful + /// for at-a-glance "is this schedule healthy?" checks. + #[serde(default)] + pub run_count: u64, + #[serde(default)] + pub failure_count: u64, +} + +impl ScheduleDef { + /// True if this schedule should fire at `now`. Honors `enabled`. + pub fn is_due(&self, now: DateTime) -> bool { + self.enabled && now >= self.next_run_at + } +} + +// =================== Trigger semantics =================== +// +// J: THIS IS YOURS TO IMPLEMENT +// ===================================================== +// +// `compute_next_run_at` decides when a schedule fires next. The current +// state is the "since last completion" semantics — simple but has real +// trade-offs vs other strategies: +// +// 1. SINCE LAST COMPLETION (current default below): +// next = completed_at + interval +// Easy to reason about. A long-running ingest delays the next fire +// by however long it took. Drift is bounded — the schedule never +// "falls behind" trying to catch up. +// +// 2. ANCHORED INTERVAL: +// next = previous_target + interval, regardless of when run finished +// Keeps wall-clock alignment ("every 30min on the :00 and :30"). +// Risk: if ingest takes longer than the interval, the next run is +// "due immediately" and you can fire back-to-back forever. +// +// 3. ANCHORED + SKIP: +// Anchored, but skip a tick if the previous one is still running. +// Best of both, slightly more code. +// +// 4. CRON: +// Parse the expression, find the next match after `now`. Most +// expressive but pulls in a cron parser dep. +// +// Cron is currently stubbed — falling back to "now + 1 hour" so a +// mistakenly-configured Cron schedule doesn't melt the GPU. +pub fn compute_next_run_at( + trigger: &ScheduleTrigger, + completed_at: DateTime, + _previous_next: DateTime, +) -> DateTime { + // TODO(J): pick a strategy. Starter = since-last-completion. + match trigger { + ScheduleTrigger::Interval { secs } => { + completed_at + Duration::seconds(*secs as i64) + } + ScheduleTrigger::Cron { expr: _ } => { + // Cron parsing not implemented — fall back to a safe 1h + // window so a bad config can't fire-loop the system. + completed_at + Duration::hours(1) + } + } +} + +// =================== Persistence =================== + +/// Storage path for a schedule's JSON record. +fn schedule_key(id: &str) -> String { + let safe: String = id + .chars() + .map(|c| if c.is_ascii_alphanumeric() || c == '_' || c == '-' { c } else { '_' }) + .collect(); + format!("{PREFIX}/{safe}.json") +} + +/// In-memory + on-disk schedule registry. The Scheduler reads from here; +/// HTTP CRUD writes through here. +#[derive(Clone)] +pub struct ScheduleStore { + /// Bucket where schedule JSON lives. Always primary today — a + /// future enhancement could move to per-profile schedule buckets. + store: Arc, + schedules: Arc>>, +} + +impl ScheduleStore { + pub fn new(store: Arc) -> Self { + Self { + store, + schedules: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Load every persisted schedule into memory. Call on startup. + pub async fn rebuild(&self) -> Result { + let keys = ops::list(&self.store, Some(&format!("{PREFIX}/"))) + .await + .unwrap_or_default(); + let mut count = 0usize; + for key in keys { + if !key.ends_with(".json") { continue; } + match ops::get(&self.store, &key).await { + Ok(bytes) => match serde_json::from_slice::(&bytes) { + Ok(def) => { + self.schedules.write().await.insert(def.id.clone(), def); + count += 1; + } + Err(e) => tracing::warn!("schedule {key}: malformed: {e}"), + }, + Err(e) => tracing::warn!("schedule {key}: read failed: {e}"), + } + } + Ok(count) + } + + pub async fn list(&self) -> Vec { + let mut out: Vec = self.schedules.read().await.values().cloned().collect(); + out.sort_by(|a, b| a.id.cmp(&b.id)); + out + } + + pub async fn get(&self, id: &str) -> Option { + self.schedules.read().await.get(id).cloned() + } + + pub async fn put(&self, def: ScheduleDef) -> Result { + let json = serde_json::to_vec_pretty(&def).map_err(|e| e.to_string())?; + ops::put(&self.store, &schedule_key(&def.id), json.into()).await?; + self.schedules.write().await.insert(def.id.clone(), def.clone()); + Ok(def) + } + + pub async fn delete(&self, id: &str) -> Result { + // Object_store's delete is idempotent — silently OKs missing keys. + let _ = ops::delete(&self.store, &schedule_key(id)).await; + Ok(self.schedules.write().await.remove(id).is_some()) + } + + /// Update completion metadata + persist. Used by the scheduler + /// after a run finishes (success or failure). + pub async fn record_run( + &self, + id: &str, + outcome: RunOutcome, + ) -> Result<(), String> { + let mut guard = self.schedules.write().await; + let Some(def) = guard.get_mut(id) else { + return Err(format!("schedule '{id}' not registered")); + }; + def.last_run_at = Some(outcome.at); + if outcome.success { + def.run_count += 1; + } else { + def.failure_count += 1; + } + def.next_run_at = compute_next_run_at(&def.trigger, outcome.at, def.next_run_at); + def.last_outcome = Some(outcome); + + let json = serde_json::to_vec_pretty(def).map_err(|e| e.to_string())?; + let key = schedule_key(id); + let store = self.store.clone(); + // Drop the write lock before async I/O. + drop(guard); + ops::put(&store, &key, json.into()).await?; + Ok(()) + } +} + +// =================== Scheduler task =================== + +/// Long-running task that fires due schedules. Spawned at gateway +/// startup. Stop semantics: drop the handle (the loop exits when its +/// store is the only reference left, or sooner if you wire a stop signal). +pub struct Scheduler { + pub store: ScheduleStore, + pub ingest: SchedulerIngestDeps, + /// Polling cadence for the loop itself. The schedules' own intervals + /// are independent — this just controls how granularly we check + /// "is anything due?" Default 10s. + pub tick_secs: u64, +} + +/// What the scheduler needs to actually run an ingest. We pass in only +/// what's needed — keeps the dep set narrow. +#[derive(Clone)] +pub struct SchedulerIngestDeps { + pub store: Arc, + pub registry: catalogd::registry::Registry, + pub buckets: Arc, + /// Optional Phase 16.5 hook — push a DatasetAppended event if any + /// HNSW index is bound to the dataset that just got refreshed. + pub agent_handle: Option, + pub index_registry: vectord::index_registry::IndexRegistry, +} + +impl Scheduler { + /// Spawn the loop in the background. Returns immediately. + pub fn spawn(self) { + tokio::spawn(async move { + self.run().await; + }); + } + + async fn run(self) { + tracing::info!("scheduler started — tick={}s", self.tick_secs); + let in_flight: Arc>> = Arc::new(RwLock::new(HashSet::new())); + + loop { + tokio::time::sleep(std::time::Duration::from_secs(self.tick_secs)).await; + let now = Utc::now(); + let due: Vec = self.store.list().await + .into_iter() + .filter(|s| s.is_due(now)) + .collect(); + if due.is_empty() { continue; } + + for def in due { + { + let in_flight_guard = in_flight.read().await; + if in_flight_guard.contains(&def.id) { + // Previous run still going — skip this tick. + // We don't reschedule; next tick re-evaluates. + continue; + } + } + + let store = self.store.clone(); + let ingest = self.ingest.clone(); + let in_flight = in_flight.clone(); + let id = def.id.clone(); + + in_flight.write().await.insert(id.clone()); + + tokio::spawn(async move { + let t0 = std::time::Instant::now(); + let outcome = run_schedule(&def, &ingest).await; + let elapsed = t0.elapsed().as_secs_f32(); + let outcome = RunOutcome { + at: Utc::now(), + success: outcome.is_ok(), + message: match &outcome { + Ok(s) => s.clone(), + Err(e) => e.clone(), + }, + rows: outcome.as_ref().ok().and_then(parse_rows_from_message), + duration_secs: elapsed, + }; + if let Err(e) = store.record_run(&id, outcome).await { + tracing::warn!("scheduler: record_run({id}): {e}"); + } + in_flight.write().await.remove(&id); + }); + } + } + } +} + +/// Pluck a "rows=N" hint out of a success message — best-effort, no +/// hard contract. Lets us surface row counts in the schedule status +/// endpoint without forcing every ingest path to return structured +/// stats here. +fn parse_rows_from_message(msg: &String) -> Option { + msg.split_whitespace() + .find_map(|tok| tok.strip_prefix("rows=").and_then(|n| n.parse().ok())) +} + +/// Fire one schedule's ingest. Returns Ok(success_message) or Err(reason). +pub async fn run_schedule( + def: &ScheduleDef, + deps: &SchedulerIngestDeps, +) -> Result { + tracing::info!("scheduler: firing '{}' ({})", def.id, def.kind.label()); + match &def.kind { + ScheduleKind::Mysql { dsn, table, dataset_name, batch_size, order_by } => { + let req = crate::my_stream::MyStreamRequest { + dsn: dsn.clone(), + table: table.clone(), + dataset_name: dataset_name.clone(), + batch_size: *batch_size, + order_by: order_by.clone(), + limit: None, + }; + run_mysql(req, deps).await + } + ScheduleKind::Postgres { dsn, table, dataset_name, batch_size, order_by } => { + let req = crate::pg_stream::PgStreamRequest { + dsn: dsn.clone(), + table: table.clone(), + dataset_name: dataset_name.clone(), + batch_size: *batch_size, + order_by: order_by.clone(), + limit: None, + }; + run_postgres(req, deps).await + } + } +} + +async fn run_mysql( + req: crate::my_stream::MyStreamRequest, + deps: &SchedulerIngestDeps, +) -> Result { + let (parquet, result) = crate::my_stream::stream_table_to_parquet(&req).await?; + if result.rows == 0 { + return Ok(format!("rows=0 — table empty")); + } + persist_and_register( + &parquet, &result.table, req.dataset_name.as_deref(), + result.rows, "mysql", &req.dsn, deps, + ).await +} + +async fn run_postgres( + req: crate::pg_stream::PgStreamRequest, + deps: &SchedulerIngestDeps, +) -> Result { + let (parquet, result) = crate::pg_stream::stream_table_to_parquet(&req).await?; + if result.rows == 0 { + return Ok(format!("rows=0 — table empty")); + } + persist_and_register( + &parquet, &result.table, req.dataset_name.as_deref(), + result.rows, "postgresql", &req.dsn, deps, + ).await +} + +/// Shared write+register path. Mirrors the HTTP ingest handlers in +/// service.rs so the scheduled path produces identical catalog output: +/// PII detection, lineage with redacted DSN, mark-stale + agent trigger. +async fn persist_and_register( + parquet: &bytes::Bytes, + table: &str, + dataset_name_override: Option<&str>, + rows: usize, + source_system: &str, + dsn: &str, + deps: &SchedulerIngestDeps, +) -> Result { + use shared::types::{ColumnMeta, Lineage, ObjectRef, SchemaFingerprint, Sensitivity}; + + let (schema, _) = shared::arrow_helpers::parquet_to_record_batches(parquet) + .map_err(|e| format!("reparse parquet: {e}"))?; + + let dataset_name = dataset_name_override.unwrap_or(table).to_string(); + let storage_key = format!("datasets/{}.parquet", dataset_name); + let parquet_size = parquet.len() as u64; + let bucket = "primary".to_string(); + + let target_store = deps.buckets.get(&bucket)?; + ops::put(&target_store, &storage_key, parquet.clone()).await?; + + let schema_fp = shared::arrow_helpers::fingerprint_schema(&schema); + let now = Utc::now(); + deps.registry.register( + dataset_name.clone(), + SchemaFingerprint(schema_fp.0), + vec![ObjectRef { + bucket: bucket.clone(), + key: storage_key.clone(), + size_bytes: parquet_size, + created_at: now, + }], + ).await?; + + let col_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + let sensitivity = shared::pii::detect_dataset_sensitivity(&col_names); + let columns: Vec = schema.fields().iter().map(|f| { + let sens = shared::pii::detect_sensitivity(f.name()); + ColumnMeta { + name: f.name().clone(), + data_type: f.data_type().to_string(), + sensitivity: sens.clone(), + description: String::new(), + is_pii: matches!(sens, Some(Sensitivity::Pii)), + } + }).collect(); + + let lineage = Lineage { + source_system: source_system.to_string(), + source_file: format!("dsn: {}", redact_dsn(dsn)), + ingest_job: format!("scheduled-{}-{}", source_system, now.timestamp_millis()), + ingest_timestamp: now, + parent_datasets: vec![], + }; + + let _ = deps.registry.update_metadata(&dataset_name, catalogd::registry::MetadataUpdate { + sensitivity, + columns: Some(columns), + lineage: Some(lineage), + row_count: Some(rows as u64), + ..Default::default() + }).await; + + let _ = deps.registry.mark_embeddings_stale(&dataset_name).await; + + if let Some(agent) = &deps.agent_handle { + let bound = deps.index_registry.list(Some(&dataset_name), None).await; + for meta in bound { + let event = vectord::agent::TriggerEvent::dataset_appended( + meta.index_name.clone(), &dataset_name, + ); + let _ = agent.enqueue(event).await; + } + } + + Ok(format!("rows={rows} dataset={dataset_name} bytes={parquet_size}")) +} + +/// Same generic redactor used in service.rs — duplicated here to avoid +/// pulling service::redact_dsn behind a `pub` visibility just for this. +fn redact_dsn(dsn: &str) -> String { + let scheme_end = match dsn.find("://") { + Some(i) => i + 3, + None => return dsn.to_string(), + }; + let at_idx = match dsn.rfind('@') { + Some(i) if i > scheme_end => i, + _ => return dsn.to_string(), + }; + let userpass = &dsn[scheme_end..at_idx]; + let colon_offset = match userpass.find(':') { + Some(i) => i, + None => return dsn.to_string(), + }; + let colon_idx = scheme_end + colon_offset; + format!("{}:***{}", &dsn[..colon_idx], &dsn[at_idx..]) +} + +// =================== Tests =================== + +#[cfg(test)] +mod tests { + use super::*; + + fn mk(secs: u64) -> ScheduleDef { + ScheduleDef { + id: "test".into(), + kind: ScheduleKind::Mysql { + dsn: "mysql://x@host/db".into(), + table: "t".into(), + dataset_name: None, + batch_size: None, + order_by: None, + }, + trigger: ScheduleTrigger::Interval { secs }, + enabled: true, + created_at: Utc::now(), + created_by: "tester".into(), + next_run_at: Utc::now(), + last_run_at: None, + last_outcome: None, + run_count: 0, + failure_count: 0, + } + } + + #[test] + fn is_due_respects_enabled() { + let mut s = mk(60); + s.next_run_at = Utc::now() - Duration::seconds(5); + assert!(s.is_due(Utc::now())); + s.enabled = false; + assert!(!s.is_due(Utc::now())); + } + + #[test] + fn is_due_respects_future_time() { + let mut s = mk(60); + s.next_run_at = Utc::now() + Duration::seconds(60); + assert!(!s.is_due(Utc::now())); + } + + #[test] + fn next_run_since_last_completion_advances() { + let s = mk(120); + let completed = Utc::now(); + let next = compute_next_run_at(&s.trigger, completed, completed); + let delta = next - completed; + assert_eq!(delta.num_seconds(), 120); + } + + #[test] + fn cron_falls_back_to_one_hour() { + let trig = ScheduleTrigger::Cron { expr: "* * * * *".into() }; + let now = Utc::now(); + let next = compute_next_run_at(&trig, now, now); + let delta = next - now; + assert_eq!(delta.num_seconds(), 3600); + } + + #[test] + fn schedule_kind_label() { + let k = ScheduleKind::Mysql { + dsn: "x".into(), table: "customers".into(), + dataset_name: None, batch_size: None, order_by: None, + }; + assert_eq!(k.label(), "mysql:customers"); + } + + #[test] + fn redact_dsn_handles_mysql_and_postgres() { + assert_eq!(redact_dsn("mysql://u:secret@h/db"), "mysql://u:***@h/db"); + assert_eq!(redact_dsn("postgresql://u:secret@h:5432/db"), "postgresql://u:***@h:5432/db"); + assert_eq!(redact_dsn("mysql://u@h/db"), "mysql://u@h/db"); // no password + } +} diff --git a/crates/ingestd/src/service.rs b/crates/ingestd/src/service.rs index 90a8c80..1ae476f 100644 --- a/crates/ingestd/src/service.rs +++ b/crates/ingestd/src/service.rs @@ -1,9 +1,9 @@ use axum::{ Json, Router, - extract::{Multipart, Query, State}, + extract::{Multipart, Path, Query, State}, http::{HeaderMap, StatusCode}, response::IntoResponse, - routing::{get, post}, + routing::{delete, get, patch, post}, }; use bytes::Bytes; use object_store::ObjectStore; @@ -11,7 +11,7 @@ use serde::Deserialize; use std::sync::Arc; use catalogd::registry::Registry; -use crate::{db_ingest, my_stream, pg_stream, pipeline}; +use crate::{db_ingest, my_stream, pg_stream, pipeline, schedule}; use shared::arrow_helpers::record_batch_to_parquet; use shared::types::{ObjectRef, SchemaFingerprint}; use storaged::ops; @@ -30,6 +30,9 @@ pub struct IngestState { /// Used to look up which HNSW indexes are attached to the /// just-ingested dataset. Each matching index gets one trigger. pub index_registry: vectord::index_registry::IndexRegistry, + /// Scheduled-ingest registry. The scheduler task runs against this + /// store; HTTP CRUD endpoints write through it. + pub schedules: schedule::ScheduleStore, } /// Push `DatasetAppended` triggers for every HNSW index bound to this @@ -86,6 +89,12 @@ pub fn router(state: IngestState) -> Router { .route("/postgres/import", post(import_pg_table)) .route("/db", post(ingest_db_stream)) .route("/mysql", post(ingest_mysql_stream)) + // Phase E: scheduled ingest + .route("/schedules", get(list_schedules).post(create_schedule)) + .route("/schedules/{id}", get(get_schedule) + .patch(patch_schedule) + .delete(delete_schedule)) + .route("/schedules/{id}/run-now", post(run_schedule_now)) .with_state(state) } @@ -475,3 +484,141 @@ fn redact_dsn(dsn: &str) -> String { let colon_idx = scheme_end + colon_offset; format!("{}:***{}", &dsn[..colon_idx], &dsn[at_idx..]) } + +// --- Phase E: Scheduled ingest CRUD endpoints --- + +#[derive(Deserialize)] +struct CreateScheduleRequest { + /// Stable id used in URLs and on disk. Required so callers can + /// PATCH/DELETE deterministically. Sanitized on storage but the + /// caller's id is what gets persisted as the canonical key. + id: String, + kind: schedule::ScheduleKind, + trigger: schedule::ScheduleTrigger, + #[serde(default = "default_enabled")] + enabled: bool, + #[serde(default)] + created_by: String, +} + +fn default_enabled() -> bool { true } + +async fn create_schedule( + State(state): State, + Json(req): Json, +) -> impl IntoResponse { + if state.schedules.get(&req.id).await.is_some() { + return Err((StatusCode::CONFLICT, format!("schedule '{}' already exists", req.id))); + } + let now = chrono::Utc::now(); + let def = schedule::ScheduleDef { + id: req.id, + kind: req.kind, + trigger: req.trigger, + enabled: req.enabled, + created_at: now, + created_by: req.created_by, + // Fire on the next tick — operators usually want to verify + // their schedule works without waiting for the full interval. + next_run_at: now, + last_run_at: None, + last_outcome: None, + run_count: 0, + failure_count: 0, + }; + match state.schedules.put(def).await { + Ok(d) => Ok((StatusCode::CREATED, Json(d))), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} + +async fn list_schedules(State(state): State) -> impl IntoResponse { + Json(state.schedules.list().await) +} + +async fn get_schedule( + State(state): State, + Path(id): Path, +) -> impl IntoResponse { + match state.schedules.get(&id).await { + Some(d) => Ok(Json(d)), + None => Err((StatusCode::NOT_FOUND, format!("schedule '{id}' not found"))), + } +} + +#[derive(Deserialize)] +struct PatchScheduleRequest { + /// Toggle on/off without changing anything else. + #[serde(default)] + enabled: Option, + /// Replace the trigger spec — useful for bumping intervals. + #[serde(default)] + trigger: Option, +} + +async fn patch_schedule( + State(state): State, + Path(id): Path, + Json(req): Json, +) -> impl IntoResponse { + let Some(mut def) = state.schedules.get(&id).await else { + return Err((StatusCode::NOT_FOUND, format!("schedule '{id}' not found"))); + }; + if let Some(e) = req.enabled { def.enabled = e; } + if let Some(t) = req.trigger { def.trigger = t; } + match state.schedules.put(def).await { + Ok(d) => Ok(Json(d)), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} + +async fn delete_schedule( + State(state): State, + Path(id): Path, +) -> impl IntoResponse { + match state.schedules.delete(&id).await { + Ok(true) => Ok(Json(serde_json::json!({ "deleted": id }))), + Ok(false) => Err((StatusCode::NOT_FOUND, format!("schedule '{id}' not found"))), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} + +/// Force a schedule to fire right now, regardless of its next_run_at. +/// Records the outcome and reschedules. Useful for "test my new schedule +/// works without waiting an hour." +async fn run_schedule_now( + State(state): State, + Path(id): Path, +) -> impl IntoResponse { + let Some(def) = state.schedules.get(&id).await else { + return Err((StatusCode::NOT_FOUND, format!("schedule '{id}' not found"))); + }; + let deps = schedule::SchedulerIngestDeps { + store: state.store.clone(), + registry: state.registry.clone(), + buckets: state.buckets.clone(), + agent_handle: Some(state.agent_handle.clone()), + index_registry: state.index_registry.clone(), + }; + + let t0 = std::time::Instant::now(); + let result = schedule::run_schedule(&def, &deps).await; + let elapsed = t0.elapsed().as_secs_f32(); + let outcome = schedule::RunOutcome { + at: chrono::Utc::now(), + success: result.is_ok(), + message: match &result { + Ok(s) => s.clone(), + Err(e) => e.clone(), + }, + rows: result.as_ref().ok().and_then(|m: &String| { + m.split_whitespace() + .find_map(|tok| tok.strip_prefix("rows=").and_then(|n| n.parse().ok())) + }), + duration_secs: elapsed, + }; + if let Err(e) = state.schedules.record_run(&id, outcome.clone()).await { + tracing::warn!("schedule {id}: record_run failed: {e}"); + } + Ok(Json(outcome)) +}