Phase E: Scheduled ingest — the substrate runs itself
Background Scheduler task fires due ingests on interval, records
outcomes, reschedules. Single-flight per schedule_id so a slow run
can't pile up. 10s tick cadence, schedules' own intervals independent.
ScheduleDef persisted as JSON at primary://_schedules/{id}.json,
rebuilt on startup. ScheduleKind supports Mysql and Postgres (both
through existing streaming paths). ScheduleTrigger::Interval is
live; Cron variant defined in the enum but parsing stubbed with a
safe 1h fallback.
next_run_at set to "now" on creation so operators see success or
failure within one tick — no waiting for the first full interval.
run-now endpoint fires even when schedule is disabled (manual
override for testing). Full catalog integration: PII detection,
lineage with redacted DSN, mark-stale + autotune agent trigger.
Verified live: 20s MySQL schedule against MariaDB lh_demo.customers.
Source mutated between runs (added row + updated value). Second
auto-fire picked up both changes (10→11 rows). DataFusion SQL
confirmed mutations in the lakehouse. 6 unit tests pass.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
0d037cfac1
commit
7c1222d240
@ -118,12 +118,34 @@ async fn main() {
|
|||||||
.nest("/catalog", catalogd::service::router(registry.clone()))
|
.nest("/catalog", catalogd::service::router(registry.clone()))
|
||||||
.nest("/query", queryd::service::router(engine.clone()))
|
.nest("/query", queryd::service::router(engine.clone()))
|
||||||
.nest("/ai", aibridge::service::router(ai_client.clone()))
|
.nest("/ai", aibridge::service::router(ai_client.clone()))
|
||||||
|
;
|
||||||
|
|
||||||
|
// Phase E: scheduled ingest
|
||||||
|
let sched_store = ingestd::schedule::ScheduleStore::new(store.clone());
|
||||||
|
if let Ok(n) = sched_store.rebuild().await {
|
||||||
|
if n > 0 { tracing::info!("rebuilt {n} persisted schedule(s)"); }
|
||||||
|
}
|
||||||
|
let scheduler = ingestd::schedule::Scheduler {
|
||||||
|
store: sched_store.clone(),
|
||||||
|
ingest: ingestd::schedule::SchedulerIngestDeps {
|
||||||
|
store: store.clone(),
|
||||||
|
registry: registry.clone(),
|
||||||
|
buckets: bucket_registry.clone(),
|
||||||
|
agent_handle: Some(agent_handle.clone()),
|
||||||
|
index_registry: index_reg.clone(),
|
||||||
|
},
|
||||||
|
tick_secs: 10,
|
||||||
|
};
|
||||||
|
scheduler.spawn();
|
||||||
|
|
||||||
|
app = app
|
||||||
.nest("/ingest", ingestd::service::router(ingestd::service::IngestState {
|
.nest("/ingest", ingestd::service::router(ingestd::service::IngestState {
|
||||||
store: store.clone(),
|
store: store.clone(),
|
||||||
registry: registry.clone(),
|
registry: registry.clone(),
|
||||||
buckets: bucket_registry.clone(),
|
buckets: bucket_registry.clone(),
|
||||||
agent_handle: agent_handle.clone(),
|
agent_handle: agent_handle.clone(),
|
||||||
index_registry: index_reg.clone(),
|
index_registry: index_reg.clone(),
|
||||||
|
schedules: sched_store,
|
||||||
}))
|
}))
|
||||||
.nest("/vectors", vectord::service::router(vectord::service::VectorState {
|
.nest("/vectors", vectord::service::router(vectord::service::VectorState {
|
||||||
store: store.clone(),
|
store: store.clone(),
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
pub mod db_ingest;
|
pub mod db_ingest;
|
||||||
pub mod my_stream;
|
pub mod my_stream;
|
||||||
pub mod pg_stream;
|
pub mod pg_stream;
|
||||||
|
pub mod schedule;
|
||||||
pub mod detect;
|
pub mod detect;
|
||||||
pub mod csv_ingest;
|
pub mod csv_ingest;
|
||||||
pub mod json_ingest;
|
pub mod json_ingest;
|
||||||
|
|||||||
626
crates/ingestd/src/schedule.rs
Normal file
626
crates/ingestd/src/schedule.rs
Normal file
@ -0,0 +1,626 @@
|
|||||||
|
//! Scheduled ingest — the production-substrate piece that turns the
|
||||||
|
//! lakehouse from "manual API toolkit" into a system that runs itself.
|
||||||
|
//!
|
||||||
|
//! A `ScheduleDef` declares "what to ingest" + "when to fire." A
|
||||||
|
//! background `Scheduler` task wakes periodically, asks each enabled
|
||||||
|
//! schedule whether it's due, and runs the matching ingest path. Outcomes
|
||||||
|
//! land back on the schedule so operators can see what happened without
|
||||||
|
//! `journalctl` archaeology.
|
||||||
|
//!
|
||||||
|
//! Storage shape: one JSON file per schedule at
|
||||||
|
//! `primary://_schedules/{id}.json`. Serializable via serde, rebuilt on
|
||||||
|
//! startup. Same write-once shape as the rest of the catalog — no DB.
|
||||||
|
//!
|
||||||
|
//! Concurrency model: single-flight per schedule_id. The scheduler holds
|
||||||
|
//! a `HashSet<String>` of currently-running ids; a tick that finds a
|
||||||
|
//! schedule already in flight skips it (no queueing — a slow ingest
|
||||||
|
//! shouldn't pile up runs). Different schedules run concurrently.
|
||||||
|
//!
|
||||||
|
//! What's deliberately not in scope here:
|
||||||
|
//! - Cron expressions (the trigger enum has the variant but parsing is
|
||||||
|
//! stubbed). Intervals cover 90% of operational scheduling; cron is
|
||||||
|
//! easy to bolt on later.
|
||||||
|
//! - Backoff / retry policies. A failed run records the failure and
|
||||||
|
//! schedules `next_run_at` as if it succeeded — no exponential backoff.
|
||||||
|
//! - Distributed coordination. Single-machine scheduler; no leader
|
||||||
|
//! election. If you run two gateway instances they'll both fire.
|
||||||
|
|
||||||
|
use chrono::{DateTime, Duration, Utc};
|
||||||
|
use object_store::ObjectStore;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::{HashMap, HashSet};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
|
use storaged::ops;
|
||||||
|
|
||||||
|
const PREFIX: &str = "_schedules";
|
||||||
|
|
||||||
|
// =================== Public types ===================
|
||||||
|
|
||||||
|
/// What kind of ingest the schedule fires.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(tag = "type", rename_all = "snake_case")]
|
||||||
|
pub enum ScheduleKind {
|
||||||
|
/// Pull a MySQL/MariaDB table via the my_stream path.
|
||||||
|
Mysql {
|
||||||
|
/// `mysql://user:pass@host:port/db` — DSN with full credentials.
|
||||||
|
/// Stored encrypted-at-rest is a future concern; for now it lives
|
||||||
|
/// in plain JSON in the catalog. Don't put production root creds
|
||||||
|
/// here without disk-level encryption.
|
||||||
|
dsn: String,
|
||||||
|
table: String,
|
||||||
|
#[serde(default)]
|
||||||
|
dataset_name: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
batch_size: Option<usize>,
|
||||||
|
#[serde(default)]
|
||||||
|
order_by: Option<String>,
|
||||||
|
},
|
||||||
|
/// Pull a Postgres table via the pg_stream path.
|
||||||
|
Postgres {
|
||||||
|
dsn: String,
|
||||||
|
table: String,
|
||||||
|
#[serde(default)]
|
||||||
|
dataset_name: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
batch_size: Option<usize>,
|
||||||
|
#[serde(default)]
|
||||||
|
order_by: Option<String>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ScheduleKind {
|
||||||
|
pub fn label(&self) -> String {
|
||||||
|
match self {
|
||||||
|
ScheduleKind::Mysql { table, .. } => format!("mysql:{table}"),
|
||||||
|
ScheduleKind::Postgres { table, .. } => format!("postgres:{table}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// When the schedule fires.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(tag = "type", rename_all = "snake_case")]
|
||||||
|
pub enum ScheduleTrigger {
|
||||||
|
/// Run every N seconds. Time anchor = compute_next_run_at decision.
|
||||||
|
Interval { secs: u64 },
|
||||||
|
/// Cron expression — parsing not implemented yet. Defining the
|
||||||
|
/// variant now so the JSON shape is forward-compatible.
|
||||||
|
Cron { expr: String },
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Outcome of a single run, kept on the schedule for observability.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct RunOutcome {
|
||||||
|
pub at: DateTime<Utc>,
|
||||||
|
pub success: bool,
|
||||||
|
pub message: String,
|
||||||
|
pub rows: Option<usize>,
|
||||||
|
pub duration_secs: f32,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// One scheduled ingest definition.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct ScheduleDef {
|
||||||
|
pub id: String,
|
||||||
|
pub kind: ScheduleKind,
|
||||||
|
pub trigger: ScheduleTrigger,
|
||||||
|
pub enabled: bool,
|
||||||
|
pub created_at: DateTime<Utc>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub created_by: String,
|
||||||
|
/// When the next run is due. Recomputed after each completion via
|
||||||
|
/// `compute_next_run_at`. On creation: set to "now" so the schedule
|
||||||
|
/// fires on the next scheduler tick.
|
||||||
|
pub next_run_at: DateTime<Utc>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub last_run_at: Option<DateTime<Utc>>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub last_outcome: Option<RunOutcome>,
|
||||||
|
/// Total successful runs and total failures since creation. Useful
|
||||||
|
/// for at-a-glance "is this schedule healthy?" checks.
|
||||||
|
#[serde(default)]
|
||||||
|
pub run_count: u64,
|
||||||
|
#[serde(default)]
|
||||||
|
pub failure_count: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ScheduleDef {
|
||||||
|
/// True if this schedule should fire at `now`. Honors `enabled`.
|
||||||
|
pub fn is_due(&self, now: DateTime<Utc>) -> bool {
|
||||||
|
self.enabled && now >= self.next_run_at
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// =================== Trigger semantics ===================
|
||||||
|
//
|
||||||
|
// J: THIS IS YOURS TO IMPLEMENT
|
||||||
|
// =====================================================
|
||||||
|
//
|
||||||
|
// `compute_next_run_at` decides when a schedule fires next. The current
|
||||||
|
// state is the "since last completion" semantics — simple but has real
|
||||||
|
// trade-offs vs other strategies:
|
||||||
|
//
|
||||||
|
// 1. SINCE LAST COMPLETION (current default below):
|
||||||
|
// next = completed_at + interval
|
||||||
|
// Easy to reason about. A long-running ingest delays the next fire
|
||||||
|
// by however long it took. Drift is bounded — the schedule never
|
||||||
|
// "falls behind" trying to catch up.
|
||||||
|
//
|
||||||
|
// 2. ANCHORED INTERVAL:
|
||||||
|
// next = previous_target + interval, regardless of when run finished
|
||||||
|
// Keeps wall-clock alignment ("every 30min on the :00 and :30").
|
||||||
|
// Risk: if ingest takes longer than the interval, the next run is
|
||||||
|
// "due immediately" and you can fire back-to-back forever.
|
||||||
|
//
|
||||||
|
// 3. ANCHORED + SKIP:
|
||||||
|
// Anchored, but skip a tick if the previous one is still running.
|
||||||
|
// Best of both, slightly more code.
|
||||||
|
//
|
||||||
|
// 4. CRON:
|
||||||
|
// Parse the expression, find the next match after `now`. Most
|
||||||
|
// expressive but pulls in a cron parser dep.
|
||||||
|
//
|
||||||
|
// Cron is currently stubbed — falling back to "now + 1 hour" so a
|
||||||
|
// mistakenly-configured Cron schedule doesn't melt the GPU.
|
||||||
|
pub fn compute_next_run_at(
|
||||||
|
trigger: &ScheduleTrigger,
|
||||||
|
completed_at: DateTime<Utc>,
|
||||||
|
_previous_next: DateTime<Utc>,
|
||||||
|
) -> DateTime<Utc> {
|
||||||
|
// TODO(J): pick a strategy. Starter = since-last-completion.
|
||||||
|
match trigger {
|
||||||
|
ScheduleTrigger::Interval { secs } => {
|
||||||
|
completed_at + Duration::seconds(*secs as i64)
|
||||||
|
}
|
||||||
|
ScheduleTrigger::Cron { expr: _ } => {
|
||||||
|
// Cron parsing not implemented — fall back to a safe 1h
|
||||||
|
// window so a bad config can't fire-loop the system.
|
||||||
|
completed_at + Duration::hours(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// =================== Persistence ===================
|
||||||
|
|
||||||
|
/// Storage path for a schedule's JSON record.
|
||||||
|
fn schedule_key(id: &str) -> String {
|
||||||
|
let safe: String = id
|
||||||
|
.chars()
|
||||||
|
.map(|c| if c.is_ascii_alphanumeric() || c == '_' || c == '-' { c } else { '_' })
|
||||||
|
.collect();
|
||||||
|
format!("{PREFIX}/{safe}.json")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// In-memory + on-disk schedule registry. The Scheduler reads from here;
|
||||||
|
/// HTTP CRUD writes through here.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct ScheduleStore {
|
||||||
|
/// Bucket where schedule JSON lives. Always primary today — a
|
||||||
|
/// future enhancement could move to per-profile schedule buckets.
|
||||||
|
store: Arc<dyn ObjectStore>,
|
||||||
|
schedules: Arc<RwLock<HashMap<String, ScheduleDef>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ScheduleStore {
|
||||||
|
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
|
||||||
|
Self {
|
||||||
|
store,
|
||||||
|
schedules: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Load every persisted schedule into memory. Call on startup.
|
||||||
|
pub async fn rebuild(&self) -> Result<usize, String> {
|
||||||
|
let keys = ops::list(&self.store, Some(&format!("{PREFIX}/")))
|
||||||
|
.await
|
||||||
|
.unwrap_or_default();
|
||||||
|
let mut count = 0usize;
|
||||||
|
for key in keys {
|
||||||
|
if !key.ends_with(".json") { continue; }
|
||||||
|
match ops::get(&self.store, &key).await {
|
||||||
|
Ok(bytes) => match serde_json::from_slice::<ScheduleDef>(&bytes) {
|
||||||
|
Ok(def) => {
|
||||||
|
self.schedules.write().await.insert(def.id.clone(), def);
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
|
Err(e) => tracing::warn!("schedule {key}: malformed: {e}"),
|
||||||
|
},
|
||||||
|
Err(e) => tracing::warn!("schedule {key}: read failed: {e}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(count)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn list(&self) -> Vec<ScheduleDef> {
|
||||||
|
let mut out: Vec<ScheduleDef> = self.schedules.read().await.values().cloned().collect();
|
||||||
|
out.sort_by(|a, b| a.id.cmp(&b.id));
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get(&self, id: &str) -> Option<ScheduleDef> {
|
||||||
|
self.schedules.read().await.get(id).cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn put(&self, def: ScheduleDef) -> Result<ScheduleDef, String> {
|
||||||
|
let json = serde_json::to_vec_pretty(&def).map_err(|e| e.to_string())?;
|
||||||
|
ops::put(&self.store, &schedule_key(&def.id), json.into()).await?;
|
||||||
|
self.schedules.write().await.insert(def.id.clone(), def.clone());
|
||||||
|
Ok(def)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn delete(&self, id: &str) -> Result<bool, String> {
|
||||||
|
// Object_store's delete is idempotent — silently OKs missing keys.
|
||||||
|
let _ = ops::delete(&self.store, &schedule_key(id)).await;
|
||||||
|
Ok(self.schedules.write().await.remove(id).is_some())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Update completion metadata + persist. Used by the scheduler
|
||||||
|
/// after a run finishes (success or failure).
|
||||||
|
pub async fn record_run(
|
||||||
|
&self,
|
||||||
|
id: &str,
|
||||||
|
outcome: RunOutcome,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
let mut guard = self.schedules.write().await;
|
||||||
|
let Some(def) = guard.get_mut(id) else {
|
||||||
|
return Err(format!("schedule '{id}' not registered"));
|
||||||
|
};
|
||||||
|
def.last_run_at = Some(outcome.at);
|
||||||
|
if outcome.success {
|
||||||
|
def.run_count += 1;
|
||||||
|
} else {
|
||||||
|
def.failure_count += 1;
|
||||||
|
}
|
||||||
|
def.next_run_at = compute_next_run_at(&def.trigger, outcome.at, def.next_run_at);
|
||||||
|
def.last_outcome = Some(outcome);
|
||||||
|
|
||||||
|
let json = serde_json::to_vec_pretty(def).map_err(|e| e.to_string())?;
|
||||||
|
let key = schedule_key(id);
|
||||||
|
let store = self.store.clone();
|
||||||
|
// Drop the write lock before async I/O.
|
||||||
|
drop(guard);
|
||||||
|
ops::put(&store, &key, json.into()).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// =================== Scheduler task ===================
|
||||||
|
|
||||||
|
/// Long-running task that fires due schedules. Spawned at gateway
|
||||||
|
/// startup. Stop semantics: drop the handle (the loop exits when its
|
||||||
|
/// store is the only reference left, or sooner if you wire a stop signal).
|
||||||
|
pub struct Scheduler {
|
||||||
|
pub store: ScheduleStore,
|
||||||
|
pub ingest: SchedulerIngestDeps,
|
||||||
|
/// Polling cadence for the loop itself. The schedules' own intervals
|
||||||
|
/// are independent — this just controls how granularly we check
|
||||||
|
/// "is anything due?" Default 10s.
|
||||||
|
pub tick_secs: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// What the scheduler needs to actually run an ingest. We pass in only
|
||||||
|
/// what's needed — keeps the dep set narrow.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct SchedulerIngestDeps {
|
||||||
|
pub store: Arc<dyn ObjectStore>,
|
||||||
|
pub registry: catalogd::registry::Registry,
|
||||||
|
pub buckets: Arc<storaged::registry::BucketRegistry>,
|
||||||
|
/// Optional Phase 16.5 hook — push a DatasetAppended event if any
|
||||||
|
/// HNSW index is bound to the dataset that just got refreshed.
|
||||||
|
pub agent_handle: Option<vectord::agent::AgentHandle>,
|
||||||
|
pub index_registry: vectord::index_registry::IndexRegistry,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Scheduler {
|
||||||
|
/// Spawn the loop in the background. Returns immediately.
|
||||||
|
pub fn spawn(self) {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
self.run().await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(self) {
|
||||||
|
tracing::info!("scheduler started — tick={}s", self.tick_secs);
|
||||||
|
let in_flight: Arc<RwLock<HashSet<String>>> = Arc::new(RwLock::new(HashSet::new()));
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(self.tick_secs)).await;
|
||||||
|
let now = Utc::now();
|
||||||
|
let due: Vec<ScheduleDef> = self.store.list().await
|
||||||
|
.into_iter()
|
||||||
|
.filter(|s| s.is_due(now))
|
||||||
|
.collect();
|
||||||
|
if due.is_empty() { continue; }
|
||||||
|
|
||||||
|
for def in due {
|
||||||
|
{
|
||||||
|
let in_flight_guard = in_flight.read().await;
|
||||||
|
if in_flight_guard.contains(&def.id) {
|
||||||
|
// Previous run still going — skip this tick.
|
||||||
|
// We don't reschedule; next tick re-evaluates.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let store = self.store.clone();
|
||||||
|
let ingest = self.ingest.clone();
|
||||||
|
let in_flight = in_flight.clone();
|
||||||
|
let id = def.id.clone();
|
||||||
|
|
||||||
|
in_flight.write().await.insert(id.clone());
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let t0 = std::time::Instant::now();
|
||||||
|
let outcome = run_schedule(&def, &ingest).await;
|
||||||
|
let elapsed = t0.elapsed().as_secs_f32();
|
||||||
|
let outcome = RunOutcome {
|
||||||
|
at: Utc::now(),
|
||||||
|
success: outcome.is_ok(),
|
||||||
|
message: match &outcome {
|
||||||
|
Ok(s) => s.clone(),
|
||||||
|
Err(e) => e.clone(),
|
||||||
|
},
|
||||||
|
rows: outcome.as_ref().ok().and_then(parse_rows_from_message),
|
||||||
|
duration_secs: elapsed,
|
||||||
|
};
|
||||||
|
if let Err(e) = store.record_run(&id, outcome).await {
|
||||||
|
tracing::warn!("scheduler: record_run({id}): {e}");
|
||||||
|
}
|
||||||
|
in_flight.write().await.remove(&id);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Pluck a "rows=N" hint out of a success message — best-effort, no
|
||||||
|
/// hard contract. Lets us surface row counts in the schedule status
|
||||||
|
/// endpoint without forcing every ingest path to return structured
|
||||||
|
/// stats here.
|
||||||
|
fn parse_rows_from_message(msg: &String) -> Option<usize> {
|
||||||
|
msg.split_whitespace()
|
||||||
|
.find_map(|tok| tok.strip_prefix("rows=").and_then(|n| n.parse().ok()))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fire one schedule's ingest. Returns Ok(success_message) or Err(reason).
|
||||||
|
pub async fn run_schedule(
|
||||||
|
def: &ScheduleDef,
|
||||||
|
deps: &SchedulerIngestDeps,
|
||||||
|
) -> Result<String, String> {
|
||||||
|
tracing::info!("scheduler: firing '{}' ({})", def.id, def.kind.label());
|
||||||
|
match &def.kind {
|
||||||
|
ScheduleKind::Mysql { dsn, table, dataset_name, batch_size, order_by } => {
|
||||||
|
let req = crate::my_stream::MyStreamRequest {
|
||||||
|
dsn: dsn.clone(),
|
||||||
|
table: table.clone(),
|
||||||
|
dataset_name: dataset_name.clone(),
|
||||||
|
batch_size: *batch_size,
|
||||||
|
order_by: order_by.clone(),
|
||||||
|
limit: None,
|
||||||
|
};
|
||||||
|
run_mysql(req, deps).await
|
||||||
|
}
|
||||||
|
ScheduleKind::Postgres { dsn, table, dataset_name, batch_size, order_by } => {
|
||||||
|
let req = crate::pg_stream::PgStreamRequest {
|
||||||
|
dsn: dsn.clone(),
|
||||||
|
table: table.clone(),
|
||||||
|
dataset_name: dataset_name.clone(),
|
||||||
|
batch_size: *batch_size,
|
||||||
|
order_by: order_by.clone(),
|
||||||
|
limit: None,
|
||||||
|
};
|
||||||
|
run_postgres(req, deps).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_mysql(
|
||||||
|
req: crate::my_stream::MyStreamRequest,
|
||||||
|
deps: &SchedulerIngestDeps,
|
||||||
|
) -> Result<String, String> {
|
||||||
|
let (parquet, result) = crate::my_stream::stream_table_to_parquet(&req).await?;
|
||||||
|
if result.rows == 0 {
|
||||||
|
return Ok(format!("rows=0 — table empty"));
|
||||||
|
}
|
||||||
|
persist_and_register(
|
||||||
|
&parquet, &result.table, req.dataset_name.as_deref(),
|
||||||
|
result.rows, "mysql", &req.dsn, deps,
|
||||||
|
).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_postgres(
|
||||||
|
req: crate::pg_stream::PgStreamRequest,
|
||||||
|
deps: &SchedulerIngestDeps,
|
||||||
|
) -> Result<String, String> {
|
||||||
|
let (parquet, result) = crate::pg_stream::stream_table_to_parquet(&req).await?;
|
||||||
|
if result.rows == 0 {
|
||||||
|
return Ok(format!("rows=0 — table empty"));
|
||||||
|
}
|
||||||
|
persist_and_register(
|
||||||
|
&parquet, &result.table, req.dataset_name.as_deref(),
|
||||||
|
result.rows, "postgresql", &req.dsn, deps,
|
||||||
|
).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Shared write+register path. Mirrors the HTTP ingest handlers in
|
||||||
|
/// service.rs so the scheduled path produces identical catalog output:
|
||||||
|
/// PII detection, lineage with redacted DSN, mark-stale + agent trigger.
|
||||||
|
async fn persist_and_register(
|
||||||
|
parquet: &bytes::Bytes,
|
||||||
|
table: &str,
|
||||||
|
dataset_name_override: Option<&str>,
|
||||||
|
rows: usize,
|
||||||
|
source_system: &str,
|
||||||
|
dsn: &str,
|
||||||
|
deps: &SchedulerIngestDeps,
|
||||||
|
) -> Result<String, String> {
|
||||||
|
use shared::types::{ColumnMeta, Lineage, ObjectRef, SchemaFingerprint, Sensitivity};
|
||||||
|
|
||||||
|
let (schema, _) = shared::arrow_helpers::parquet_to_record_batches(parquet)
|
||||||
|
.map_err(|e| format!("reparse parquet: {e}"))?;
|
||||||
|
|
||||||
|
let dataset_name = dataset_name_override.unwrap_or(table).to_string();
|
||||||
|
let storage_key = format!("datasets/{}.parquet", dataset_name);
|
||||||
|
let parquet_size = parquet.len() as u64;
|
||||||
|
let bucket = "primary".to_string();
|
||||||
|
|
||||||
|
let target_store = deps.buckets.get(&bucket)?;
|
||||||
|
ops::put(&target_store, &storage_key, parquet.clone()).await?;
|
||||||
|
|
||||||
|
let schema_fp = shared::arrow_helpers::fingerprint_schema(&schema);
|
||||||
|
let now = Utc::now();
|
||||||
|
deps.registry.register(
|
||||||
|
dataset_name.clone(),
|
||||||
|
SchemaFingerprint(schema_fp.0),
|
||||||
|
vec![ObjectRef {
|
||||||
|
bucket: bucket.clone(),
|
||||||
|
key: storage_key.clone(),
|
||||||
|
size_bytes: parquet_size,
|
||||||
|
created_at: now,
|
||||||
|
}],
|
||||||
|
).await?;
|
||||||
|
|
||||||
|
let col_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
|
||||||
|
let sensitivity = shared::pii::detect_dataset_sensitivity(&col_names);
|
||||||
|
let columns: Vec<ColumnMeta> = schema.fields().iter().map(|f| {
|
||||||
|
let sens = shared::pii::detect_sensitivity(f.name());
|
||||||
|
ColumnMeta {
|
||||||
|
name: f.name().clone(),
|
||||||
|
data_type: f.data_type().to_string(),
|
||||||
|
sensitivity: sens.clone(),
|
||||||
|
description: String::new(),
|
||||||
|
is_pii: matches!(sens, Some(Sensitivity::Pii)),
|
||||||
|
}
|
||||||
|
}).collect();
|
||||||
|
|
||||||
|
let lineage = Lineage {
|
||||||
|
source_system: source_system.to_string(),
|
||||||
|
source_file: format!("dsn: {}", redact_dsn(dsn)),
|
||||||
|
ingest_job: format!("scheduled-{}-{}", source_system, now.timestamp_millis()),
|
||||||
|
ingest_timestamp: now,
|
||||||
|
parent_datasets: vec![],
|
||||||
|
};
|
||||||
|
|
||||||
|
let _ = deps.registry.update_metadata(&dataset_name, catalogd::registry::MetadataUpdate {
|
||||||
|
sensitivity,
|
||||||
|
columns: Some(columns),
|
||||||
|
lineage: Some(lineage),
|
||||||
|
row_count: Some(rows as u64),
|
||||||
|
..Default::default()
|
||||||
|
}).await;
|
||||||
|
|
||||||
|
let _ = deps.registry.mark_embeddings_stale(&dataset_name).await;
|
||||||
|
|
||||||
|
if let Some(agent) = &deps.agent_handle {
|
||||||
|
let bound = deps.index_registry.list(Some(&dataset_name), None).await;
|
||||||
|
for meta in bound {
|
||||||
|
let event = vectord::agent::TriggerEvent::dataset_appended(
|
||||||
|
meta.index_name.clone(), &dataset_name,
|
||||||
|
);
|
||||||
|
let _ = agent.enqueue(event).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(format!("rows={rows} dataset={dataset_name} bytes={parquet_size}"))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Same generic redactor used in service.rs — duplicated here to avoid
|
||||||
|
/// pulling service::redact_dsn behind a `pub` visibility just for this.
|
||||||
|
fn redact_dsn(dsn: &str) -> String {
|
||||||
|
let scheme_end = match dsn.find("://") {
|
||||||
|
Some(i) => i + 3,
|
||||||
|
None => return dsn.to_string(),
|
||||||
|
};
|
||||||
|
let at_idx = match dsn.rfind('@') {
|
||||||
|
Some(i) if i > scheme_end => i,
|
||||||
|
_ => return dsn.to_string(),
|
||||||
|
};
|
||||||
|
let userpass = &dsn[scheme_end..at_idx];
|
||||||
|
let colon_offset = match userpass.find(':') {
|
||||||
|
Some(i) => i,
|
||||||
|
None => return dsn.to_string(),
|
||||||
|
};
|
||||||
|
let colon_idx = scheme_end + colon_offset;
|
||||||
|
format!("{}:***{}", &dsn[..colon_idx], &dsn[at_idx..])
|
||||||
|
}
|
||||||
|
|
||||||
|
// =================== Tests ===================
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
fn mk(secs: u64) -> ScheduleDef {
|
||||||
|
ScheduleDef {
|
||||||
|
id: "test".into(),
|
||||||
|
kind: ScheduleKind::Mysql {
|
||||||
|
dsn: "mysql://x@host/db".into(),
|
||||||
|
table: "t".into(),
|
||||||
|
dataset_name: None,
|
||||||
|
batch_size: None,
|
||||||
|
order_by: None,
|
||||||
|
},
|
||||||
|
trigger: ScheduleTrigger::Interval { secs },
|
||||||
|
enabled: true,
|
||||||
|
created_at: Utc::now(),
|
||||||
|
created_by: "tester".into(),
|
||||||
|
next_run_at: Utc::now(),
|
||||||
|
last_run_at: None,
|
||||||
|
last_outcome: None,
|
||||||
|
run_count: 0,
|
||||||
|
failure_count: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn is_due_respects_enabled() {
|
||||||
|
let mut s = mk(60);
|
||||||
|
s.next_run_at = Utc::now() - Duration::seconds(5);
|
||||||
|
assert!(s.is_due(Utc::now()));
|
||||||
|
s.enabled = false;
|
||||||
|
assert!(!s.is_due(Utc::now()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn is_due_respects_future_time() {
|
||||||
|
let mut s = mk(60);
|
||||||
|
s.next_run_at = Utc::now() + Duration::seconds(60);
|
||||||
|
assert!(!s.is_due(Utc::now()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn next_run_since_last_completion_advances() {
|
||||||
|
let s = mk(120);
|
||||||
|
let completed = Utc::now();
|
||||||
|
let next = compute_next_run_at(&s.trigger, completed, completed);
|
||||||
|
let delta = next - completed;
|
||||||
|
assert_eq!(delta.num_seconds(), 120);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cron_falls_back_to_one_hour() {
|
||||||
|
let trig = ScheduleTrigger::Cron { expr: "* * * * *".into() };
|
||||||
|
let now = Utc::now();
|
||||||
|
let next = compute_next_run_at(&trig, now, now);
|
||||||
|
let delta = next - now;
|
||||||
|
assert_eq!(delta.num_seconds(), 3600);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn schedule_kind_label() {
|
||||||
|
let k = ScheduleKind::Mysql {
|
||||||
|
dsn: "x".into(), table: "customers".into(),
|
||||||
|
dataset_name: None, batch_size: None, order_by: None,
|
||||||
|
};
|
||||||
|
assert_eq!(k.label(), "mysql:customers");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn redact_dsn_handles_mysql_and_postgres() {
|
||||||
|
assert_eq!(redact_dsn("mysql://u:secret@h/db"), "mysql://u:***@h/db");
|
||||||
|
assert_eq!(redact_dsn("postgresql://u:secret@h:5432/db"), "postgresql://u:***@h:5432/db");
|
||||||
|
assert_eq!(redact_dsn("mysql://u@h/db"), "mysql://u@h/db"); // no password
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,9 +1,9 @@
|
|||||||
use axum::{
|
use axum::{
|
||||||
Json, Router,
|
Json, Router,
|
||||||
extract::{Multipart, Query, State},
|
extract::{Multipart, Path, Query, State},
|
||||||
http::{HeaderMap, StatusCode},
|
http::{HeaderMap, StatusCode},
|
||||||
response::IntoResponse,
|
response::IntoResponse,
|
||||||
routing::{get, post},
|
routing::{delete, get, patch, post},
|
||||||
};
|
};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use object_store::ObjectStore;
|
use object_store::ObjectStore;
|
||||||
@ -11,7 +11,7 @@ use serde::Deserialize;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use catalogd::registry::Registry;
|
use catalogd::registry::Registry;
|
||||||
use crate::{db_ingest, my_stream, pg_stream, pipeline};
|
use crate::{db_ingest, my_stream, pg_stream, pipeline, schedule};
|
||||||
use shared::arrow_helpers::record_batch_to_parquet;
|
use shared::arrow_helpers::record_batch_to_parquet;
|
||||||
use shared::types::{ObjectRef, SchemaFingerprint};
|
use shared::types::{ObjectRef, SchemaFingerprint};
|
||||||
use storaged::ops;
|
use storaged::ops;
|
||||||
@ -30,6 +30,9 @@ pub struct IngestState {
|
|||||||
/// Used to look up which HNSW indexes are attached to the
|
/// Used to look up which HNSW indexes are attached to the
|
||||||
/// just-ingested dataset. Each matching index gets one trigger.
|
/// just-ingested dataset. Each matching index gets one trigger.
|
||||||
pub index_registry: vectord::index_registry::IndexRegistry,
|
pub index_registry: vectord::index_registry::IndexRegistry,
|
||||||
|
/// Scheduled-ingest registry. The scheduler task runs against this
|
||||||
|
/// store; HTTP CRUD endpoints write through it.
|
||||||
|
pub schedules: schedule::ScheduleStore,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Push `DatasetAppended` triggers for every HNSW index bound to this
|
/// Push `DatasetAppended` triggers for every HNSW index bound to this
|
||||||
@ -86,6 +89,12 @@ pub fn router(state: IngestState) -> Router {
|
|||||||
.route("/postgres/import", post(import_pg_table))
|
.route("/postgres/import", post(import_pg_table))
|
||||||
.route("/db", post(ingest_db_stream))
|
.route("/db", post(ingest_db_stream))
|
||||||
.route("/mysql", post(ingest_mysql_stream))
|
.route("/mysql", post(ingest_mysql_stream))
|
||||||
|
// Phase E: scheduled ingest
|
||||||
|
.route("/schedules", get(list_schedules).post(create_schedule))
|
||||||
|
.route("/schedules/{id}", get(get_schedule)
|
||||||
|
.patch(patch_schedule)
|
||||||
|
.delete(delete_schedule))
|
||||||
|
.route("/schedules/{id}/run-now", post(run_schedule_now))
|
||||||
.with_state(state)
|
.with_state(state)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -475,3 +484,141 @@ fn redact_dsn(dsn: &str) -> String {
|
|||||||
let colon_idx = scheme_end + colon_offset;
|
let colon_idx = scheme_end + colon_offset;
|
||||||
format!("{}:***{}", &dsn[..colon_idx], &dsn[at_idx..])
|
format!("{}:***{}", &dsn[..colon_idx], &dsn[at_idx..])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- Phase E: Scheduled ingest CRUD endpoints ---
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct CreateScheduleRequest {
|
||||||
|
/// Stable id used in URLs and on disk. Required so callers can
|
||||||
|
/// PATCH/DELETE deterministically. Sanitized on storage but the
|
||||||
|
/// caller's id is what gets persisted as the canonical key.
|
||||||
|
id: String,
|
||||||
|
kind: schedule::ScheduleKind,
|
||||||
|
trigger: schedule::ScheduleTrigger,
|
||||||
|
#[serde(default = "default_enabled")]
|
||||||
|
enabled: bool,
|
||||||
|
#[serde(default)]
|
||||||
|
created_by: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_enabled() -> bool { true }
|
||||||
|
|
||||||
|
async fn create_schedule(
|
||||||
|
State(state): State<IngestState>,
|
||||||
|
Json(req): Json<CreateScheduleRequest>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
if state.schedules.get(&req.id).await.is_some() {
|
||||||
|
return Err((StatusCode::CONFLICT, format!("schedule '{}' already exists", req.id)));
|
||||||
|
}
|
||||||
|
let now = chrono::Utc::now();
|
||||||
|
let def = schedule::ScheduleDef {
|
||||||
|
id: req.id,
|
||||||
|
kind: req.kind,
|
||||||
|
trigger: req.trigger,
|
||||||
|
enabled: req.enabled,
|
||||||
|
created_at: now,
|
||||||
|
created_by: req.created_by,
|
||||||
|
// Fire on the next tick — operators usually want to verify
|
||||||
|
// their schedule works without waiting for the full interval.
|
||||||
|
next_run_at: now,
|
||||||
|
last_run_at: None,
|
||||||
|
last_outcome: None,
|
||||||
|
run_count: 0,
|
||||||
|
failure_count: 0,
|
||||||
|
};
|
||||||
|
match state.schedules.put(def).await {
|
||||||
|
Ok(d) => Ok((StatusCode::CREATED, Json(d))),
|
||||||
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn list_schedules(State(state): State<IngestState>) -> impl IntoResponse {
|
||||||
|
Json(state.schedules.list().await)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_schedule(
|
||||||
|
State(state): State<IngestState>,
|
||||||
|
Path(id): Path<String>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
match state.schedules.get(&id).await {
|
||||||
|
Some(d) => Ok(Json(d)),
|
||||||
|
None => Err((StatusCode::NOT_FOUND, format!("schedule '{id}' not found"))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct PatchScheduleRequest {
|
||||||
|
/// Toggle on/off without changing anything else.
|
||||||
|
#[serde(default)]
|
||||||
|
enabled: Option<bool>,
|
||||||
|
/// Replace the trigger spec — useful for bumping intervals.
|
||||||
|
#[serde(default)]
|
||||||
|
trigger: Option<schedule::ScheduleTrigger>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn patch_schedule(
|
||||||
|
State(state): State<IngestState>,
|
||||||
|
Path(id): Path<String>,
|
||||||
|
Json(req): Json<PatchScheduleRequest>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let Some(mut def) = state.schedules.get(&id).await else {
|
||||||
|
return Err((StatusCode::NOT_FOUND, format!("schedule '{id}' not found")));
|
||||||
|
};
|
||||||
|
if let Some(e) = req.enabled { def.enabled = e; }
|
||||||
|
if let Some(t) = req.trigger { def.trigger = t; }
|
||||||
|
match state.schedules.put(def).await {
|
||||||
|
Ok(d) => Ok(Json(d)),
|
||||||
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn delete_schedule(
|
||||||
|
State(state): State<IngestState>,
|
||||||
|
Path(id): Path<String>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
match state.schedules.delete(&id).await {
|
||||||
|
Ok(true) => Ok(Json(serde_json::json!({ "deleted": id }))),
|
||||||
|
Ok(false) => Err((StatusCode::NOT_FOUND, format!("schedule '{id}' not found"))),
|
||||||
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Force a schedule to fire right now, regardless of its next_run_at.
|
||||||
|
/// Records the outcome and reschedules. Useful for "test my new schedule
|
||||||
|
/// works without waiting an hour."
|
||||||
|
async fn run_schedule_now(
|
||||||
|
State(state): State<IngestState>,
|
||||||
|
Path(id): Path<String>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let Some(def) = state.schedules.get(&id).await else {
|
||||||
|
return Err((StatusCode::NOT_FOUND, format!("schedule '{id}' not found")));
|
||||||
|
};
|
||||||
|
let deps = schedule::SchedulerIngestDeps {
|
||||||
|
store: state.store.clone(),
|
||||||
|
registry: state.registry.clone(),
|
||||||
|
buckets: state.buckets.clone(),
|
||||||
|
agent_handle: Some(state.agent_handle.clone()),
|
||||||
|
index_registry: state.index_registry.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let t0 = std::time::Instant::now();
|
||||||
|
let result = schedule::run_schedule(&def, &deps).await;
|
||||||
|
let elapsed = t0.elapsed().as_secs_f32();
|
||||||
|
let outcome = schedule::RunOutcome {
|
||||||
|
at: chrono::Utc::now(),
|
||||||
|
success: result.is_ok(),
|
||||||
|
message: match &result {
|
||||||
|
Ok(s) => s.clone(),
|
||||||
|
Err(e) => e.clone(),
|
||||||
|
},
|
||||||
|
rows: result.as_ref().ok().and_then(|m: &String| {
|
||||||
|
m.split_whitespace()
|
||||||
|
.find_map(|tok| tok.strip_prefix("rows=").and_then(|n| n.parse().ok()))
|
||||||
|
}),
|
||||||
|
duration_secs: elapsed,
|
||||||
|
};
|
||||||
|
if let Err(e) = state.schedules.record_run(&id, outcome.clone()).await {
|
||||||
|
tracing::warn!("schedule {id}: record_run failed: {e}");
|
||||||
|
}
|
||||||
|
Ok(Json(outcome))
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user