Background Scheduler task fires due ingests on interval, records
outcomes, reschedules. Single-flight per schedule_id so a slow run
can't pile up. 10s tick cadence, schedules' own intervals independent.
ScheduleDef persisted as JSON at primary://_schedules/{id}.json,
rebuilt on startup. ScheduleKind supports Mysql and Postgres (both
through existing streaming paths). ScheduleTrigger::Interval is
live; Cron variant defined in the enum but parsing stubbed with a
safe 1h fallback.
next_run_at set to "now" on creation so operators see success or
failure within one tick — no waiting for the first full interval.
run-now endpoint fires even when schedule is disabled (manual
override for testing). Full catalog integration: PII detection,
lineage with redacted DSN, mark-stale + autotune agent trigger.
Verified live: 20s MySQL schedule against MariaDB lh_demo.customers.
Source mutated between runs (added row + updated value). Second
auto-fire picked up both changes (10→11 rows). DataFusion SQL
confirmed mutations in the lakehouse. 6 unit tests pass.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
625 lines
22 KiB
Rust
625 lines
22 KiB
Rust
use axum::{
|
|
Json, Router,
|
|
extract::{Multipart, Path, Query, State},
|
|
http::{HeaderMap, StatusCode},
|
|
response::IntoResponse,
|
|
routing::{delete, get, patch, post},
|
|
};
|
|
use bytes::Bytes;
|
|
use object_store::ObjectStore;
|
|
use serde::Deserialize;
|
|
use std::sync::Arc;
|
|
|
|
use catalogd::registry::Registry;
|
|
use crate::{db_ingest, my_stream, pg_stream, pipeline, schedule};
|
|
use shared::arrow_helpers::record_batch_to_parquet;
|
|
use shared::types::{ObjectRef, SchemaFingerprint};
|
|
use storaged::ops;
|
|
use storaged::registry::BucketRegistry;
|
|
|
|
#[derive(Clone)]
|
|
pub struct IngestState {
|
|
pub store: Arc<dyn ObjectStore>,
|
|
pub registry: Registry,
|
|
/// Federation layer 2: lookup target bucket from request headers.
|
|
pub buckets: Arc<BucketRegistry>,
|
|
/// Phase 16.5: when ingest marks a dataset's embeddings stale, push
|
|
/// a `DatasetAppended` trigger so the autotune agent can schedule
|
|
/// re-trials. Holds the agent handle (no-op when agent disabled).
|
|
pub agent_handle: vectord::agent::AgentHandle,
|
|
/// Used to look up which HNSW indexes are attached to the
|
|
/// just-ingested dataset. Each matching index gets one trigger.
|
|
pub index_registry: vectord::index_registry::IndexRegistry,
|
|
/// Scheduled-ingest registry. The scheduler task runs against this
|
|
/// store; HTTP CRUD endpoints write through it.
|
|
pub schedules: schedule::ScheduleStore,
|
|
}
|
|
|
|
/// Push `DatasetAppended` triggers for every HNSW index bound to this
|
|
/// dataset. Called after a successful ingest. Logs failures but never
|
|
/// fails the ingest — the agent is best-effort, not load-bearing.
|
|
async fn notify_agent_on_append(state: &IngestState, dataset_name: &str) {
|
|
let indexes = state.index_registry.list(Some(dataset_name), None).await;
|
|
if indexes.is_empty() {
|
|
return;
|
|
}
|
|
for meta in indexes {
|
|
let event = vectord::agent::TriggerEvent::dataset_appended(
|
|
meta.index_name.clone(), dataset_name,
|
|
);
|
|
if let Err(e) = state.agent_handle.enqueue(event).await {
|
|
tracing::warn!("agent enqueue failed for '{}': {}", meta.index_name, e);
|
|
} else {
|
|
tracing::info!(
|
|
"agent: enqueued DatasetAppended({}) for index '{}'",
|
|
dataset_name, meta.index_name,
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Resolve the target bucket from `X-Lakehouse-Bucket` header.
|
|
/// Returns `(bucket_name, store_for_writes)`. Falls back to "primary"
|
|
/// when the header is absent. Returns Err with the canonical bucket
|
|
/// list when the header names an unknown bucket.
|
|
fn resolve_bucket(
|
|
headers: &HeaderMap,
|
|
buckets: &BucketRegistry,
|
|
) -> Result<(String, Arc<dyn ObjectStore>), (StatusCode, String)> {
|
|
let target = headers
|
|
.get("x-lakehouse-bucket")
|
|
.and_then(|h| h.to_str().ok())
|
|
.map(|s| s.to_string())
|
|
.unwrap_or_else(|| buckets.default_name().to_string());
|
|
|
|
match buckets.get(&target) {
|
|
Ok(store) => Ok((target, store)),
|
|
Err(_) => Err((
|
|
StatusCode::NOT_FOUND,
|
|
format!("unknown bucket '{}' — use GET /storage/buckets to list", target),
|
|
)),
|
|
}
|
|
}
|
|
|
|
pub fn router(state: IngestState) -> Router {
|
|
Router::new()
|
|
.route("/health", get(health))
|
|
.route("/file", post(ingest_file))
|
|
.route("/postgres/tables", post(list_pg_tables))
|
|
.route("/postgres/import", post(import_pg_table))
|
|
.route("/db", post(ingest_db_stream))
|
|
.route("/mysql", post(ingest_mysql_stream))
|
|
// Phase E: scheduled ingest
|
|
.route("/schedules", get(list_schedules).post(create_schedule))
|
|
.route("/schedules/{id}", get(get_schedule)
|
|
.patch(patch_schedule)
|
|
.delete(delete_schedule))
|
|
.route("/schedules/{id}/run-now", post(run_schedule_now))
|
|
.with_state(state)
|
|
}
|
|
|
|
async fn health() -> &'static str {
|
|
"ingestd ok"
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
struct IngestQuery {
|
|
name: Option<String>,
|
|
}
|
|
|
|
async fn ingest_file(
|
|
State(state): State<IngestState>,
|
|
Query(query): Query<IngestQuery>,
|
|
headers: HeaderMap,
|
|
mut multipart: Multipart,
|
|
) -> impl IntoResponse {
|
|
let (bucket, store) = resolve_bucket(&headers, &state.buckets)?;
|
|
|
|
let field = match multipart.next_field().await {
|
|
Ok(Some(f)) => f,
|
|
Ok(None) => return Err((StatusCode::BAD_REQUEST, "no file uploaded".to_string())),
|
|
Err(e) => return Err((StatusCode::BAD_REQUEST, format!("multipart error: {e}"))),
|
|
};
|
|
|
|
let filename = field.file_name().unwrap_or("unknown").to_string();
|
|
let content = field.bytes().await
|
|
.map_err(|e| (StatusCode::BAD_REQUEST, format!("read error: {e}")))?;
|
|
|
|
tracing::info!(
|
|
"ingest '{}' ({} bytes) -> bucket={}",
|
|
filename, content.len(), bucket,
|
|
);
|
|
|
|
let dataset_name = query.name.as_deref();
|
|
|
|
match pipeline::ingest_file_to_bucket(&filename, &content, dataset_name, &bucket, &store, &state.registry).await {
|
|
Ok(result) => {
|
|
if !result.deduplicated {
|
|
notify_agent_on_append(&state, &result.dataset_name).await;
|
|
}
|
|
if result.deduplicated {
|
|
Ok((StatusCode::OK, Json(result)))
|
|
} else {
|
|
Ok((StatusCode::CREATED, Json(result)))
|
|
}
|
|
}
|
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
|
}
|
|
}
|
|
|
|
// --- PostgreSQL Import ---
|
|
|
|
/// List tables in a PostgreSQL database.
|
|
async fn list_pg_tables(
|
|
Json(config): Json<db_ingest::DbConfig>,
|
|
) -> impl IntoResponse {
|
|
match db_ingest::list_postgres_tables(&config).await {
|
|
Ok(tables) => Ok(Json(tables)),
|
|
Err(e) => Err((StatusCode::BAD_GATEWAY, e)),
|
|
}
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
struct PgImportRequest {
|
|
#[serde(flatten)]
|
|
config: db_ingest::DbConfig,
|
|
table: String,
|
|
/// Override dataset name (defaults to table name)
|
|
dataset_name: Option<String>,
|
|
/// Max rows to import (None = all)
|
|
limit: Option<usize>,
|
|
}
|
|
|
|
/// Import a PostgreSQL table into the lakehouse.
|
|
async fn import_pg_table(
|
|
State(state): State<IngestState>,
|
|
Json(req): Json<PgImportRequest>,
|
|
) -> Result<(StatusCode, Json<serde_json::Value>), (StatusCode, String)> {
|
|
tracing::info!("importing postgres table '{}' from {}:{}/{}",
|
|
req.table, req.config.host, req.config.port, req.config.database);
|
|
|
|
// Import from Postgres
|
|
let (schema, batches, db_result) = db_ingest::import_postgres_table(
|
|
&req.config, &req.table, req.limit,
|
|
).await.map_err(|e| (StatusCode::BAD_GATEWAY, e))?;
|
|
|
|
if batches.is_empty() || db_result.rows == 0 {
|
|
return Ok((StatusCode::OK, Json(serde_json::json!({
|
|
"table": req.table,
|
|
"rows": 0,
|
|
"message": "table is empty",
|
|
}))));
|
|
}
|
|
|
|
// Convert to Parquet
|
|
let mut all_parquet = Vec::new();
|
|
for batch in &batches {
|
|
let pq = record_batch_to_parquet(batch)
|
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
|
|
all_parquet.extend_from_slice(&pq);
|
|
}
|
|
|
|
let dataset_name = req.dataset_name.unwrap_or_else(|| req.table.clone());
|
|
let storage_key = format!("datasets/{}.parquet", dataset_name);
|
|
let parquet_size = all_parquet.len() as u64;
|
|
|
|
// Store
|
|
ops::put(&state.store, &storage_key, Bytes::from(all_parquet))
|
|
.await
|
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
|
|
|
|
// Register
|
|
let schema_fp = shared::arrow_helpers::fingerprint_schema(&schema);
|
|
let now = chrono::Utc::now();
|
|
state.registry.register(
|
|
dataset_name.clone(),
|
|
SchemaFingerprint(schema_fp.0),
|
|
vec![ObjectRef {
|
|
bucket: "data".to_string(),
|
|
key: storage_key.clone(),
|
|
size_bytes: parquet_size,
|
|
created_at: now,
|
|
}],
|
|
).await.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
|
|
|
|
// Auto-populate metadata (PII detection, lineage)
|
|
let col_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
|
|
let sensitivity = shared::pii::detect_dataset_sensitivity(&col_names);
|
|
let columns: Vec<shared::types::ColumnMeta> = schema.fields().iter().map(|f| {
|
|
let sens = shared::pii::detect_sensitivity(f.name());
|
|
shared::types::ColumnMeta {
|
|
name: f.name().clone(),
|
|
data_type: f.data_type().to_string(),
|
|
sensitivity: sens.clone(),
|
|
description: String::new(),
|
|
is_pii: matches!(sens, Some(shared::types::Sensitivity::Pii)),
|
|
}
|
|
}).collect();
|
|
|
|
let lineage = shared::types::Lineage {
|
|
source_system: "postgresql".to_string(),
|
|
source_file: format!("{}:{}/{}.{}", req.config.host, req.config.port, req.config.database, req.table),
|
|
ingest_job: format!("pg-import-{}", now.timestamp_millis()),
|
|
ingest_timestamp: now,
|
|
parent_datasets: vec![],
|
|
};
|
|
|
|
let _ = state.registry.update_metadata(&dataset_name, catalogd::registry::MetadataUpdate {
|
|
sensitivity,
|
|
columns: Some(columns),
|
|
lineage: Some(lineage),
|
|
row_count: Some(db_result.rows as u64),
|
|
..Default::default()
|
|
}).await;
|
|
|
|
tracing::info!("imported '{}' from postgres: {} rows → {}", dataset_name, db_result.rows, storage_key);
|
|
|
|
Ok((StatusCode::CREATED, Json(serde_json::json!({
|
|
"dataset_name": dataset_name,
|
|
"table": req.table,
|
|
"rows": db_result.rows,
|
|
"columns": db_result.columns,
|
|
"schema": db_result.schema_detected,
|
|
"storage_key": storage_key,
|
|
"size_bytes": parquet_size,
|
|
}))))
|
|
}
|
|
|
|
/// Streaming DSN-based ingest. Paginates via ORDER BY + LIMIT/OFFSET so
|
|
/// large tables don't blow up memory. This is the path the task spec calls
|
|
/// `POST /ingest/db` with `{dsn, table, dataset_name}`.
|
|
async fn ingest_db_stream(
|
|
State(state): State<IngestState>,
|
|
headers: HeaderMap,
|
|
Json(req): Json<pg_stream::PgStreamRequest>,
|
|
) -> Result<(StatusCode, Json<serde_json::Value>), (StatusCode, String)> {
|
|
let (bucket, store) = resolve_bucket(&headers, &state.buckets)?;
|
|
tracing::info!(
|
|
"pg stream ingest: table='{}' dataset='{:?}' bucket='{}' batch_size={:?}",
|
|
req.table, req.dataset_name, bucket, req.batch_size,
|
|
);
|
|
|
|
// Stream from Postgres into Parquet bytes.
|
|
let (parquet, stream_result) = pg_stream::stream_table_to_parquet(&req)
|
|
.await
|
|
.map_err(|e| (StatusCode::BAD_GATEWAY, e))?;
|
|
|
|
if stream_result.rows == 0 {
|
|
return Ok((StatusCode::OK, Json(serde_json::json!({
|
|
"table": req.table,
|
|
"rows": 0,
|
|
"message": "table is empty",
|
|
}))));
|
|
}
|
|
|
|
// Recover schema from the parquet footer — keeps a single source of truth
|
|
// for the schema fingerprint + column metadata.
|
|
let (schema, _) = shared::arrow_helpers::parquet_to_record_batches(&parquet)
|
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("reparse parquet: {e}")))?;
|
|
|
|
let dataset_name = req.dataset_name.clone().unwrap_or_else(|| req.table.clone());
|
|
let storage_key = format!("datasets/{}.parquet", dataset_name);
|
|
let size_bytes = parquet.len() as u64;
|
|
|
|
ops::put(&store, &storage_key, parquet)
|
|
.await
|
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
|
|
|
|
let schema_fp = shared::arrow_helpers::fingerprint_schema(&schema);
|
|
let now = chrono::Utc::now();
|
|
state.registry.register(
|
|
dataset_name.clone(),
|
|
SchemaFingerprint(schema_fp.0),
|
|
vec![ObjectRef {
|
|
bucket: bucket.clone(),
|
|
key: storage_key.clone(),
|
|
size_bytes,
|
|
created_at: now,
|
|
}],
|
|
).await.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
|
|
|
|
// Rich metadata: PII auto-detect, lineage, row_count, columns.
|
|
let col_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
|
|
let sensitivity = shared::pii::detect_dataset_sensitivity(&col_names);
|
|
let columns: Vec<shared::types::ColumnMeta> = schema.fields().iter().map(|f| {
|
|
let sens = shared::pii::detect_sensitivity(f.name());
|
|
shared::types::ColumnMeta {
|
|
name: f.name().clone(),
|
|
data_type: f.data_type().to_string(),
|
|
sensitivity: sens.clone(),
|
|
description: String::new(),
|
|
is_pii: matches!(sens, Some(shared::types::Sensitivity::Pii)),
|
|
}
|
|
}).collect();
|
|
|
|
let lineage = shared::types::Lineage {
|
|
source_system: "postgresql".to_string(),
|
|
source_file: format!("dsn: {}", redact_dsn(&req.dsn)),
|
|
ingest_job: format!("pg-stream-{}", now.timestamp_millis()),
|
|
ingest_timestamp: now,
|
|
parent_datasets: vec![],
|
|
};
|
|
|
|
let _ = state.registry.update_metadata(&dataset_name, catalogd::registry::MetadataUpdate {
|
|
sensitivity,
|
|
columns: Some(columns),
|
|
lineage: Some(lineage),
|
|
row_count: Some(stream_result.rows as u64),
|
|
..Default::default()
|
|
}).await;
|
|
|
|
// Phase C: mark embeddings stale if the dataset already had a vector
|
|
// index. No-op for newly-created datasets.
|
|
let _ = state.registry.mark_embeddings_stale(&dataset_name).await;
|
|
|
|
// Phase 16.5: notify autotune agent. No-op if no indexes attached.
|
|
notify_agent_on_append(&state, &dataset_name).await;
|
|
|
|
Ok((StatusCode::CREATED, Json(serde_json::json!({
|
|
"dataset_name": dataset_name,
|
|
"table": stream_result.table,
|
|
"rows": stream_result.rows,
|
|
"batches": stream_result.batches,
|
|
"columns": stream_result.columns,
|
|
"schema": stream_result.schema,
|
|
"storage_key": storage_key,
|
|
"size_bytes": size_bytes,
|
|
"duration_secs": stream_result.duration_secs,
|
|
}))))
|
|
}
|
|
|
|
/// Streaming MySQL ingest. Mirrors `ingest_db_stream` — same pagination +
|
|
/// Parquet-footer schema recovery + PII/lineage metadata. Different driver,
|
|
/// otherwise identical.
|
|
async fn ingest_mysql_stream(
|
|
State(state): State<IngestState>,
|
|
headers: HeaderMap,
|
|
Json(req): Json<my_stream::MyStreamRequest>,
|
|
) -> Result<(StatusCode, Json<serde_json::Value>), (StatusCode, String)> {
|
|
let (bucket, store) = resolve_bucket(&headers, &state.buckets)?;
|
|
tracing::info!(
|
|
"mysql stream ingest: table='{}' dataset='{:?}' bucket='{}' batch_size={:?}",
|
|
req.table, req.dataset_name, bucket, req.batch_size,
|
|
);
|
|
|
|
let (parquet, stream_result) = my_stream::stream_table_to_parquet(&req)
|
|
.await
|
|
.map_err(|e| (StatusCode::BAD_GATEWAY, e))?;
|
|
|
|
if stream_result.rows == 0 {
|
|
return Ok((StatusCode::OK, Json(serde_json::json!({
|
|
"table": req.table,
|
|
"rows": 0,
|
|
"message": "table is empty",
|
|
}))));
|
|
}
|
|
|
|
let (schema, _) = shared::arrow_helpers::parquet_to_record_batches(&parquet)
|
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("reparse parquet: {e}")))?;
|
|
|
|
let dataset_name = req.dataset_name.clone().unwrap_or_else(|| req.table.clone());
|
|
let storage_key = format!("datasets/{}.parquet", dataset_name);
|
|
let size_bytes = parquet.len() as u64;
|
|
|
|
ops::put(&store, &storage_key, parquet)
|
|
.await
|
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
|
|
|
|
let schema_fp = shared::arrow_helpers::fingerprint_schema(&schema);
|
|
let now = chrono::Utc::now();
|
|
state.registry.register(
|
|
dataset_name.clone(),
|
|
SchemaFingerprint(schema_fp.0),
|
|
vec![ObjectRef {
|
|
bucket: bucket.clone(),
|
|
key: storage_key.clone(),
|
|
size_bytes,
|
|
created_at: now,
|
|
}],
|
|
).await.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
|
|
|
|
let col_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
|
|
let sensitivity = shared::pii::detect_dataset_sensitivity(&col_names);
|
|
let columns: Vec<shared::types::ColumnMeta> = schema.fields().iter().map(|f| {
|
|
let sens = shared::pii::detect_sensitivity(f.name());
|
|
shared::types::ColumnMeta {
|
|
name: f.name().clone(),
|
|
data_type: f.data_type().to_string(),
|
|
sensitivity: sens.clone(),
|
|
description: String::new(),
|
|
is_pii: matches!(sens, Some(shared::types::Sensitivity::Pii)),
|
|
}
|
|
}).collect();
|
|
|
|
let lineage = shared::types::Lineage {
|
|
source_system: "mysql".to_string(),
|
|
source_file: format!("dsn: {}", redact_dsn(&req.dsn)),
|
|
ingest_job: format!("mysql-stream-{}", now.timestamp_millis()),
|
|
ingest_timestamp: now,
|
|
parent_datasets: vec![],
|
|
};
|
|
|
|
let _ = state.registry.update_metadata(&dataset_name, catalogd::registry::MetadataUpdate {
|
|
sensitivity,
|
|
columns: Some(columns),
|
|
lineage: Some(lineage),
|
|
row_count: Some(stream_result.rows as u64),
|
|
..Default::default()
|
|
}).await;
|
|
|
|
// Phase C: mark stale + Phase 16.5: notify agent.
|
|
let _ = state.registry.mark_embeddings_stale(&dataset_name).await;
|
|
notify_agent_on_append(&state, &dataset_name).await;
|
|
|
|
Ok((StatusCode::CREATED, Json(serde_json::json!({
|
|
"dataset_name": dataset_name,
|
|
"table": stream_result.table,
|
|
"rows": stream_result.rows,
|
|
"batches": stream_result.batches,
|
|
"columns": stream_result.columns,
|
|
"schema": stream_result.schema,
|
|
"storage_key": storage_key,
|
|
"size_bytes": size_bytes,
|
|
"duration_secs": stream_result.duration_secs,
|
|
}))))
|
|
}
|
|
|
|
/// Redact the password in any `scheme://user:pass@host/...` DSN. Works
|
|
/// for postgres, mysql, and any other scheme using the same shape.
|
|
fn redact_dsn(dsn: &str) -> String {
|
|
let scheme_end = match dsn.find("://") {
|
|
Some(i) => i + 3,
|
|
None => return dsn.to_string(),
|
|
};
|
|
let at_idx = match dsn.rfind('@') {
|
|
Some(i) if i > scheme_end => i,
|
|
_ => return dsn.to_string(),
|
|
};
|
|
// Find the password separator (first `:` after scheme, before `@`).
|
|
let userpass = &dsn[scheme_end..at_idx];
|
|
let colon_offset = match userpass.find(':') {
|
|
Some(i) => i,
|
|
None => return dsn.to_string(), // no password in DSN
|
|
};
|
|
let colon_idx = scheme_end + colon_offset;
|
|
format!("{}:***{}", &dsn[..colon_idx], &dsn[at_idx..])
|
|
}
|
|
|
|
// --- Phase E: Scheduled ingest CRUD endpoints ---
|
|
|
|
#[derive(Deserialize)]
|
|
struct CreateScheduleRequest {
|
|
/// Stable id used in URLs and on disk. Required so callers can
|
|
/// PATCH/DELETE deterministically. Sanitized on storage but the
|
|
/// caller's id is what gets persisted as the canonical key.
|
|
id: String,
|
|
kind: schedule::ScheduleKind,
|
|
trigger: schedule::ScheduleTrigger,
|
|
#[serde(default = "default_enabled")]
|
|
enabled: bool,
|
|
#[serde(default)]
|
|
created_by: String,
|
|
}
|
|
|
|
fn default_enabled() -> bool { true }
|
|
|
|
async fn create_schedule(
|
|
State(state): State<IngestState>,
|
|
Json(req): Json<CreateScheduleRequest>,
|
|
) -> impl IntoResponse {
|
|
if state.schedules.get(&req.id).await.is_some() {
|
|
return Err((StatusCode::CONFLICT, format!("schedule '{}' already exists", req.id)));
|
|
}
|
|
let now = chrono::Utc::now();
|
|
let def = schedule::ScheduleDef {
|
|
id: req.id,
|
|
kind: req.kind,
|
|
trigger: req.trigger,
|
|
enabled: req.enabled,
|
|
created_at: now,
|
|
created_by: req.created_by,
|
|
// Fire on the next tick — operators usually want to verify
|
|
// their schedule works without waiting for the full interval.
|
|
next_run_at: now,
|
|
last_run_at: None,
|
|
last_outcome: None,
|
|
run_count: 0,
|
|
failure_count: 0,
|
|
};
|
|
match state.schedules.put(def).await {
|
|
Ok(d) => Ok((StatusCode::CREATED, Json(d))),
|
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
|
}
|
|
}
|
|
|
|
async fn list_schedules(State(state): State<IngestState>) -> impl IntoResponse {
|
|
Json(state.schedules.list().await)
|
|
}
|
|
|
|
async fn get_schedule(
|
|
State(state): State<IngestState>,
|
|
Path(id): Path<String>,
|
|
) -> impl IntoResponse {
|
|
match state.schedules.get(&id).await {
|
|
Some(d) => Ok(Json(d)),
|
|
None => Err((StatusCode::NOT_FOUND, format!("schedule '{id}' not found"))),
|
|
}
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
struct PatchScheduleRequest {
|
|
/// Toggle on/off without changing anything else.
|
|
#[serde(default)]
|
|
enabled: Option<bool>,
|
|
/// Replace the trigger spec — useful for bumping intervals.
|
|
#[serde(default)]
|
|
trigger: Option<schedule::ScheduleTrigger>,
|
|
}
|
|
|
|
async fn patch_schedule(
|
|
State(state): State<IngestState>,
|
|
Path(id): Path<String>,
|
|
Json(req): Json<PatchScheduleRequest>,
|
|
) -> impl IntoResponse {
|
|
let Some(mut def) = state.schedules.get(&id).await else {
|
|
return Err((StatusCode::NOT_FOUND, format!("schedule '{id}' not found")));
|
|
};
|
|
if let Some(e) = req.enabled { def.enabled = e; }
|
|
if let Some(t) = req.trigger { def.trigger = t; }
|
|
match state.schedules.put(def).await {
|
|
Ok(d) => Ok(Json(d)),
|
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
|
}
|
|
}
|
|
|
|
async fn delete_schedule(
|
|
State(state): State<IngestState>,
|
|
Path(id): Path<String>,
|
|
) -> impl IntoResponse {
|
|
match state.schedules.delete(&id).await {
|
|
Ok(true) => Ok(Json(serde_json::json!({ "deleted": id }))),
|
|
Ok(false) => Err((StatusCode::NOT_FOUND, format!("schedule '{id}' not found"))),
|
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
|
}
|
|
}
|
|
|
|
/// Force a schedule to fire right now, regardless of its next_run_at.
|
|
/// Records the outcome and reschedules. Useful for "test my new schedule
|
|
/// works without waiting an hour."
|
|
async fn run_schedule_now(
|
|
State(state): State<IngestState>,
|
|
Path(id): Path<String>,
|
|
) -> impl IntoResponse {
|
|
let Some(def) = state.schedules.get(&id).await else {
|
|
return Err((StatusCode::NOT_FOUND, format!("schedule '{id}' not found")));
|
|
};
|
|
let deps = schedule::SchedulerIngestDeps {
|
|
store: state.store.clone(),
|
|
registry: state.registry.clone(),
|
|
buckets: state.buckets.clone(),
|
|
agent_handle: Some(state.agent_handle.clone()),
|
|
index_registry: state.index_registry.clone(),
|
|
};
|
|
|
|
let t0 = std::time::Instant::now();
|
|
let result = schedule::run_schedule(&def, &deps).await;
|
|
let elapsed = t0.elapsed().as_secs_f32();
|
|
let outcome = schedule::RunOutcome {
|
|
at: chrono::Utc::now(),
|
|
success: result.is_ok(),
|
|
message: match &result {
|
|
Ok(s) => s.clone(),
|
|
Err(e) => e.clone(),
|
|
},
|
|
rows: result.as_ref().ok().and_then(|m: &String| {
|
|
m.split_whitespace()
|
|
.find_map(|tok| tok.strip_prefix("rows=").and_then(|n| n.parse().ok()))
|
|
}),
|
|
duration_secs: elapsed,
|
|
};
|
|
if let Err(e) = state.schedules.record_run(&id, outcome.clone()).await {
|
|
tracing::warn!("schedule {id}: record_run failed: {e}");
|
|
}
|
|
Ok(Json(outcome))
|
|
}
|