diff --git a/Cargo.lock b/Cargo.lock index 0bea5ae..e77fe34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2376,6 +2376,7 @@ dependencies = [ "axum", "catalogd", "ingestd", + "journald", "object_store", "opentelemetry", "opentelemetry-stdout", @@ -3018,6 +3019,24 @@ dependencies = [ "libc", ] +[[package]] +name = "journald" +version = "0.1.0" +dependencies = [ + "arrow", + "axum", + "bytes", + "chrono", + "object_store", + "parquet", + "serde", + "serde_json", + "shared", + "storaged", + "tokio", + "tracing", +] + [[package]] name = "js-sys" version = "0.3.91" diff --git a/Cargo.toml b/Cargo.toml index 781356e..bc21831 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "crates/aibridge", "crates/ingestd", "crates/vectord", + "crates/journald", "crates/gateway", "crates/ui", ] diff --git a/crates/gateway/Cargo.toml b/crates/gateway/Cargo.toml index 1bf9037..92cf82a 100644 --- a/crates/gateway/Cargo.toml +++ b/crates/gateway/Cargo.toml @@ -11,6 +11,7 @@ queryd = { path = "../queryd" } aibridge = { path = "../aibridge" } ingestd = { path = "../ingestd" } vectord = { path = "../vectord" } +journald = { path = "../journald" } tokio = { workspace = true } axum = { workspace = true } serde = { workspace = true } diff --git a/crates/gateway/src/main.rs b/crates/gateway/src/main.rs index 6df5f9b..728d559 100644 --- a/crates/gateway/src/main.rs +++ b/crates/gateway/src/main.rs @@ -34,6 +34,9 @@ async fn main() { let cache = queryd::cache::MemCache::new(16 * 1024 * 1024 * 1024); let engine = queryd::context::QueryEngine::new(registry.clone(), store.clone(), cache); + // Event journal — append-only mutation log (flush every 100 events) + let journal = journald::journal::Journal::new(store.clone(), 100); + // Workspace manager for agent-specific overlays let workspace_mgr = queryd::workspace::WorkspaceManager::new(store.clone()); if let Err(e) = workspace_mgr.rebuild().await { @@ -59,7 +62,8 @@ async fn main() { ai_client: ai_client.clone(), job_tracker: vectord::jobs::JobTracker::new(), })) - .nest("/workspaces", queryd::workspace_service::router(workspace_mgr)); + .nest("/workspaces", queryd::workspace_service::router(workspace_mgr)) + .nest("/journal", journald::service::router(journal)); // Auth middleware (if enabled) if config.auth.enabled { diff --git a/crates/journald/Cargo.toml b/crates/journald/Cargo.toml new file mode 100644 index 0000000..133ff59 --- /dev/null +++ b/crates/journald/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "journald" +version = "0.1.0" +edition = "2024" + +[dependencies] +shared = { path = "../shared" } +storaged = { path = "../storaged" } +tokio = { workspace = true } +axum = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tracing = { workspace = true } +arrow = { workspace = true } +parquet = { workspace = true } +bytes = { workspace = true } +chrono = { workspace = true } +object_store = { workspace = true } diff --git a/crates/journald/src/journal.rs b/crates/journald/src/journal.rs new file mode 100644 index 0000000..12e70a9 --- /dev/null +++ b/crates/journald/src/journal.rs @@ -0,0 +1,322 @@ +/// Append-only event journal. +/// Every mutation to any dataset is recorded here as an immutable event. +/// Events are never modified or deleted — this IS the audit trail. +/// +/// Storage: events buffer in memory, flush to Parquet periodically. +/// Query: load Parquet files, filter by entity/field/actor/time. + +use arrow::array::{ArrayRef, RecordBatch, StringArray, UInt64Array}; +use arrow::datatypes::{DataType, Field, Schema}; +use chrono::{DateTime, Utc}; +use object_store::ObjectStore; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tokio::sync::RwLock; + +use shared::arrow_helpers::record_batch_to_parquet; +use storaged::ops; + +/// A single mutation event. Immutable once created. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Event { + pub event_id: String, + pub timestamp: DateTime, + pub entity_type: String, // "candidate", "placement", "client", etc. + pub entity_id: String, // "CAND-00342" + pub field: String, // "phone", "*" for insert/delete + pub action: String, // "insert", "update", "delete" + pub old_value: String, // "" for inserts + pub new_value: String, // "" for deletes + pub actor: String, // "Sarah", "ingest_pipeline", "auto_matcher" + pub source: String, // "api", "ingest", "workspace", "agent" + pub workspace_id: String, // "" if not workspace-scoped +} + +/// Arrow schema for journal Parquet files. +fn journal_schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("event_id", DataType::Utf8, false), + Field::new("timestamp", DataType::Utf8, false), + Field::new("entity_type", DataType::Utf8, false), + Field::new("entity_id", DataType::Utf8, false), + Field::new("field", DataType::Utf8, false), + Field::new("action", DataType::Utf8, false), + Field::new("old_value", DataType::Utf8, true), + Field::new("new_value", DataType::Utf8, true), + Field::new("actor", DataType::Utf8, false), + Field::new("source", DataType::Utf8, false), + Field::new("workspace_id", DataType::Utf8, true), + ])) +} + +/// The event journal — buffers events in memory, flushes to Parquet. +#[derive(Clone)] +pub struct Journal { + buffer: Arc>>, + store: Arc, + flush_threshold: usize, + event_counter: Arc, +} + +impl Journal { + pub fn new(store: Arc, flush_threshold: usize) -> Self { + Self { + buffer: Arc::new(RwLock::new(Vec::new())), + store, + flush_threshold, + event_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)), + } + } + + /// Record a single event. Triggers flush if buffer exceeds threshold. + pub async fn record(&self, mut event: Event) -> Result<(), String> { + let seq = self.event_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + event.event_id = format!("evt-{}-{seq}", event.timestamp.timestamp_millis()); + + let should_flush = { + let mut buf = self.buffer.write().await; + buf.push(event); + buf.len() >= self.flush_threshold + }; + + if should_flush { + self.flush().await?; + } + + Ok(()) + } + + /// Record a batch insert (multiple rows added to a dataset). + pub async fn record_insert( + &self, + entity_type: &str, + entity_id: &str, + actor: &str, + source: &str, + workspace_id: &str, + ) -> Result<(), String> { + self.record(Event { + event_id: String::new(), + timestamp: Utc::now(), + entity_type: entity_type.to_string(), + entity_id: entity_id.to_string(), + field: "*".to_string(), + action: "insert".to_string(), + old_value: String::new(), + new_value: String::new(), + actor: actor.to_string(), + source: source.to_string(), + workspace_id: workspace_id.to_string(), + }).await + } + + /// Record a field update. + pub async fn record_update( + &self, + entity_type: &str, + entity_id: &str, + field: &str, + old_value: &str, + new_value: &str, + actor: &str, + source: &str, + workspace_id: &str, + ) -> Result<(), String> { + self.record(Event { + event_id: String::new(), + timestamp: Utc::now(), + entity_type: entity_type.to_string(), + entity_id: entity_id.to_string(), + field: field.to_string(), + action: "update".to_string(), + old_value: old_value.to_string(), + new_value: new_value.to_string(), + actor: actor.to_string(), + source: source.to_string(), + workspace_id: workspace_id.to_string(), + }).await + } + + /// Record a batch of events from a dataset ingest. + pub async fn record_ingest( + &self, + entity_type: &str, + count: usize, + actor: &str, + source_file: &str, + ) -> Result<(), String> { + self.record(Event { + event_id: String::new(), + timestamp: Utc::now(), + entity_type: entity_type.to_string(), + entity_id: format!("batch:{count}"), + field: "*".to_string(), + action: "ingest".to_string(), + old_value: String::new(), + new_value: format!("{count} rows from {source_file}"), + actor: actor.to_string(), + source: "ingest".to_string(), + workspace_id: String::new(), + }).await + } + + /// Flush buffer to Parquet. Called automatically at threshold or manually. + pub async fn flush(&self) -> Result { + let events = { + let mut buf = self.buffer.write().await; + if buf.is_empty() { return Ok(0); } + std::mem::take(&mut *buf) + }; + + let n = events.len(); + let batch = events_to_batch(&events)?; + let parquet = record_batch_to_parquet(&batch)?; + + let ts = Utc::now().format("%Y%m%d_%H%M%S_%3f"); + let key = format!("journal/{ts}.parquet"); + ops::put(&self.store, &key, parquet).await?; + + tracing::info!("journal: flushed {n} events to {key}"); + Ok(n) + } + + /// Query journal: get all events for a specific entity. + pub async fn get_entity_history(&self, entity_id: &str) -> Result, String> { + let mut all_events = Vec::new(); + + // Check in-memory buffer first + { + let buf = self.buffer.read().await; + for e in buf.iter() { + if e.entity_id == entity_id { + all_events.push(e.clone()); + } + } + } + + // Load from persisted journal files + let keys = ops::list(&self.store, Some("journal/")).await?; + for key in &keys { + if !key.ends_with(".parquet") { continue; } + let data = ops::get(&self.store, key).await?; + let (_, batches) = shared::arrow_helpers::parquet_to_record_batches(&data)?; + for batch in &batches { + let events = batch_to_events(batch)?; + for e in events { + if e.entity_id == entity_id { + all_events.push(e); + } + } + } + } + + all_events.sort_by_key(|e| e.timestamp); + Ok(all_events) + } + + /// Query journal: get recent events (last N). + pub async fn get_recent(&self, limit: usize) -> Result, String> { + let mut all_events = Vec::new(); + + // Buffer + { + let buf = self.buffer.read().await; + all_events.extend(buf.iter().cloned()); + } + + // Persisted (load most recent files first) + let mut keys = ops::list(&self.store, Some("journal/")).await?; + keys.sort(); + keys.reverse(); + + for key in &keys { + if !key.ends_with(".parquet") { continue; } + if all_events.len() >= limit { break; } + let data = ops::get(&self.store, key).await?; + let (_, batches) = shared::arrow_helpers::parquet_to_record_batches(&data)?; + for batch in &batches { + all_events.extend(batch_to_events(batch)?); + } + } + + all_events.sort_by_key(|e| e.timestamp); + all_events.reverse(); + all_events.truncate(limit); + Ok(all_events) + } + + /// Get buffer stats. + pub async fn stats(&self) -> JournalStats { + let buf = self.buffer.read().await; + let keys = ops::list(&self.store, Some("journal/")).await.unwrap_or_default(); + JournalStats { + buffer_events: buf.len(), + flush_threshold: self.flush_threshold, + persisted_files: keys.len(), + total_events_created: self.event_counter.load(std::sync::atomic::Ordering::Relaxed), + } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct JournalStats { + pub buffer_events: usize, + pub flush_threshold: usize, + pub persisted_files: usize, + pub total_events_created: u64, +} + +fn events_to_batch(events: &[Event]) -> Result { + let schema = journal_schema(); + let arrays: Vec = vec![ + Arc::new(StringArray::from(events.iter().map(|e| e.event_id.as_str()).collect::>())), + Arc::new(StringArray::from(events.iter().map(|e| e.timestamp.to_rfc3339()).collect::>())), + Arc::new(StringArray::from(events.iter().map(|e| e.entity_type.as_str()).collect::>())), + Arc::new(StringArray::from(events.iter().map(|e| e.entity_id.as_str()).collect::>())), + Arc::new(StringArray::from(events.iter().map(|e| e.field.as_str()).collect::>())), + Arc::new(StringArray::from(events.iter().map(|e| e.action.as_str()).collect::>())), + Arc::new(StringArray::from(events.iter().map(|e| e.old_value.as_str()).collect::>())), + Arc::new(StringArray::from(events.iter().map(|e| e.new_value.as_str()).collect::>())), + Arc::new(StringArray::from(events.iter().map(|e| e.actor.as_str()).collect::>())), + Arc::new(StringArray::from(events.iter().map(|e| e.source.as_str()).collect::>())), + Arc::new(StringArray::from(events.iter().map(|e| e.workspace_id.as_str()).collect::>())), + ]; + RecordBatch::try_new(schema, arrays).map_err(|e| format!("batch error: {e}")) +} + +fn batch_to_events(batch: &RecordBatch) -> Result, String> { + let get_col = |i: usize| -> Result<&StringArray, String> { + batch.column(i).as_any().downcast_ref::() + .ok_or_else(|| format!("column {i} not string")) + }; + + let event_ids = get_col(0)?; + let timestamps = get_col(1)?; + let entity_types = get_col(2)?; + let entity_ids = get_col(3)?; + let fields = get_col(4)?; + let actions = get_col(5)?; + let old_values = get_col(6)?; + let new_values = get_col(7)?; + let actors = get_col(8)?; + let sources = get_col(9)?; + let workspace_ids = get_col(10)?; + + let mut events = Vec::with_capacity(batch.num_rows()); + for i in 0..batch.num_rows() { + events.push(Event { + event_id: event_ids.value(i).to_string(), + timestamp: timestamps.value(i).parse().unwrap_or_else(|_| Utc::now()), + entity_type: entity_types.value(i).to_string(), + entity_id: entity_ids.value(i).to_string(), + field: fields.value(i).to_string(), + action: actions.value(i).to_string(), + old_value: old_values.value(i).to_string(), + new_value: new_values.value(i).to_string(), + actor: actors.value(i).to_string(), + source: sources.value(i).to_string(), + workspace_id: workspace_ids.value(i).to_string(), + }); + } + Ok(events) +} diff --git a/crates/journald/src/lib.rs b/crates/journald/src/lib.rs new file mode 100644 index 0000000..4c36418 --- /dev/null +++ b/crates/journald/src/lib.rs @@ -0,0 +1,2 @@ +pub mod journal; +pub mod service; diff --git a/crates/journald/src/service.rs b/crates/journald/src/service.rs new file mode 100644 index 0000000..bf8bffd --- /dev/null +++ b/crates/journald/src/service.rs @@ -0,0 +1,131 @@ +use axum::{ + Json, Router, + extract::{Path, Query, State}, + http::StatusCode, + response::IntoResponse, + routing::{get, post}, +}; +use serde::Deserialize; + +use crate::journal::{Event, Journal}; + +pub fn router(journal: Journal) -> Router { + Router::new() + .route("/health", get(health)) + .route("/event", post(record_event)) + .route("/update", post(record_update)) + .route("/flush", post(flush)) + .route("/history/{entity_id}", get(entity_history)) + .route("/recent", get(recent_events)) + .route("/stats", get(stats)) + .with_state(journal) +} + +async fn health() -> &'static str { + "journald ok" +} + +// --- Record events --- + +#[derive(Deserialize)] +struct RecordEventRequest { + entity_type: String, + entity_id: String, + action: String, + field: Option, + old_value: Option, + new_value: Option, + actor: String, + source: String, + workspace_id: Option, +} + +async fn record_event( + State(journal): State, + Json(req): Json, +) -> impl IntoResponse { + let event = Event { + event_id: String::new(), // assigned by journal + timestamp: chrono::Utc::now(), + entity_type: req.entity_type, + entity_id: req.entity_id, + field: req.field.unwrap_or_else(|| "*".to_string()), + action: req.action, + old_value: req.old_value.unwrap_or_default(), + new_value: req.new_value.unwrap_or_default(), + actor: req.actor, + source: req.source, + workspace_id: req.workspace_id.unwrap_or_default(), + }; + + match journal.record(event).await { + Ok(()) => Ok((StatusCode::CREATED, "event recorded")), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} + +#[derive(Deserialize)] +struct UpdateRequest { + entity_type: String, + entity_id: String, + field: String, + old_value: String, + new_value: String, + actor: String, + workspace_id: Option, +} + +async fn record_update( + State(journal): State, + Json(req): Json, +) -> impl IntoResponse { + match journal.record_update( + &req.entity_type, &req.entity_id, &req.field, + &req.old_value, &req.new_value, &req.actor, "api", + &req.workspace_id.unwrap_or_default(), + ).await { + Ok(()) => Ok((StatusCode::CREATED, "update recorded")), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} + +// --- Flush --- + +async fn flush(State(journal): State) -> impl IntoResponse { + match journal.flush().await { + Ok(n) => Ok(format!("flushed {n} events")), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} + +// --- Query --- + +async fn entity_history( + State(journal): State, + Path(entity_id): Path, +) -> impl IntoResponse { + match journal.get_entity_history(&entity_id).await { + Ok(events) => Ok(Json(events)), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} + +#[derive(Deserialize)] +struct RecentQuery { + limit: Option, +} + +async fn recent_events( + State(journal): State, + Query(q): Query, +) -> impl IntoResponse { + let limit = q.limit.unwrap_or(50); + match journal.get_recent(limit).await { + Ok(events) => Ok(Json(events)), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} + +async fn stats(State(journal): State) -> impl IntoResponse { + Json(journal.stats().await) +}