/// Append-only event journal. /// Every mutation to any dataset is recorded here as an immutable event. /// Events are never modified or deleted — this IS the audit trail. /// /// Storage: events buffer in memory, flush to Parquet periodically. /// Query: load Parquet files, filter by entity/field/actor/time. use arrow::array::{ArrayRef, RecordBatch, StringArray, UInt64Array}; use arrow::datatypes::{DataType, Field, Schema}; use chrono::{DateTime, Utc}; use object_store::ObjectStore; use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::RwLock; use shared::arrow_helpers::record_batch_to_parquet; use storaged::ops; /// A single mutation event. Immutable once created. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Event { pub event_id: String, pub timestamp: DateTime, pub entity_type: String, // "candidate", "placement", "client", etc. pub entity_id: String, // "CAND-00342" pub field: String, // "phone", "*" for insert/delete pub action: String, // "insert", "update", "delete" pub old_value: String, // "" for inserts pub new_value: String, // "" for deletes pub actor: String, // "Sarah", "ingest_pipeline", "auto_matcher" pub source: String, // "api", "ingest", "workspace", "agent" pub workspace_id: String, // "" if not workspace-scoped } /// Arrow schema for journal Parquet files. fn journal_schema() -> Arc { Arc::new(Schema::new(vec![ Field::new("event_id", DataType::Utf8, false), Field::new("timestamp", DataType::Utf8, false), Field::new("entity_type", DataType::Utf8, false), Field::new("entity_id", DataType::Utf8, false), Field::new("field", DataType::Utf8, false), Field::new("action", DataType::Utf8, false), Field::new("old_value", DataType::Utf8, true), Field::new("new_value", DataType::Utf8, true), Field::new("actor", DataType::Utf8, false), Field::new("source", DataType::Utf8, false), Field::new("workspace_id", DataType::Utf8, true), ])) } /// The event journal — buffers events in memory, flushes to Parquet. #[derive(Clone)] pub struct Journal { buffer: Arc>>, store: Arc, flush_threshold: usize, event_counter: Arc, } impl Journal { pub fn new(store: Arc, flush_threshold: usize) -> Self { Self { buffer: Arc::new(RwLock::new(Vec::new())), store, flush_threshold, event_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)), } } /// Record a single event. Triggers flush if buffer exceeds threshold. pub async fn record(&self, mut event: Event) -> Result<(), String> { let seq = self.event_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); event.event_id = format!("evt-{}-{seq}", event.timestamp.timestamp_millis()); let should_flush = { let mut buf = self.buffer.write().await; buf.push(event); buf.len() >= self.flush_threshold }; if should_flush { self.flush().await?; } Ok(()) } /// Record a batch insert (multiple rows added to a dataset). pub async fn record_insert( &self, entity_type: &str, entity_id: &str, actor: &str, source: &str, workspace_id: &str, ) -> Result<(), String> { self.record(Event { event_id: String::new(), timestamp: Utc::now(), entity_type: entity_type.to_string(), entity_id: entity_id.to_string(), field: "*".to_string(), action: "insert".to_string(), old_value: String::new(), new_value: String::new(), actor: actor.to_string(), source: source.to_string(), workspace_id: workspace_id.to_string(), }).await } /// Record a field update. pub async fn record_update( &self, entity_type: &str, entity_id: &str, field: &str, old_value: &str, new_value: &str, actor: &str, source: &str, workspace_id: &str, ) -> Result<(), String> { self.record(Event { event_id: String::new(), timestamp: Utc::now(), entity_type: entity_type.to_string(), entity_id: entity_id.to_string(), field: field.to_string(), action: "update".to_string(), old_value: old_value.to_string(), new_value: new_value.to_string(), actor: actor.to_string(), source: source.to_string(), workspace_id: workspace_id.to_string(), }).await } /// Record a batch of events from a dataset ingest. pub async fn record_ingest( &self, entity_type: &str, count: usize, actor: &str, source_file: &str, ) -> Result<(), String> { self.record(Event { event_id: String::new(), timestamp: Utc::now(), entity_type: entity_type.to_string(), entity_id: format!("batch:{count}"), field: "*".to_string(), action: "ingest".to_string(), old_value: String::new(), new_value: format!("{count} rows from {source_file}"), actor: actor.to_string(), source: "ingest".to_string(), workspace_id: String::new(), }).await } /// Flush buffer to Parquet. Called automatically at threshold or manually. pub async fn flush(&self) -> Result { let events = { let mut buf = self.buffer.write().await; if buf.is_empty() { return Ok(0); } std::mem::take(&mut *buf) }; let n = events.len(); let batch = events_to_batch(&events)?; let parquet = record_batch_to_parquet(&batch)?; let ts = Utc::now().format("%Y%m%d_%H%M%S_%3f"); let key = format!("journal/{ts}.parquet"); ops::put(&self.store, &key, parquet).await?; tracing::info!("journal: flushed {n} events to {key}"); Ok(n) } /// Query journal: get all events for a specific entity. pub async fn get_entity_history(&self, entity_id: &str) -> Result, String> { let mut all_events = Vec::new(); // Check in-memory buffer first { let buf = self.buffer.read().await; for e in buf.iter() { if e.entity_id == entity_id { all_events.push(e.clone()); } } } // Load from persisted journal files let keys = ops::list(&self.store, Some("journal/")).await?; for key in &keys { if !key.ends_with(".parquet") { continue; } let data = ops::get(&self.store, key).await?; let (_, batches) = shared::arrow_helpers::parquet_to_record_batches(&data)?; for batch in &batches { let events = batch_to_events(batch)?; for e in events { if e.entity_id == entity_id { all_events.push(e); } } } } all_events.sort_by_key(|e| e.timestamp); Ok(all_events) } /// Query journal: get recent events (last N). pub async fn get_recent(&self, limit: usize) -> Result, String> { let mut all_events = Vec::new(); // Buffer { let buf = self.buffer.read().await; all_events.extend(buf.iter().cloned()); } // Persisted (load most recent files first) let mut keys = ops::list(&self.store, Some("journal/")).await?; keys.sort(); keys.reverse(); for key in &keys { if !key.ends_with(".parquet") { continue; } if all_events.len() >= limit { break; } let data = ops::get(&self.store, key).await?; let (_, batches) = shared::arrow_helpers::parquet_to_record_batches(&data)?; for batch in &batches { all_events.extend(batch_to_events(batch)?); } } all_events.sort_by_key(|e| e.timestamp); all_events.reverse(); all_events.truncate(limit); Ok(all_events) } /// Get buffer stats. pub async fn stats(&self) -> JournalStats { let buf = self.buffer.read().await; let keys = ops::list(&self.store, Some("journal/")).await.unwrap_or_default(); JournalStats { buffer_events: buf.len(), flush_threshold: self.flush_threshold, persisted_files: keys.len(), total_events_created: self.event_counter.load(std::sync::atomic::Ordering::Relaxed), } } } #[derive(Debug, Clone, Serialize)] pub struct JournalStats { pub buffer_events: usize, pub flush_threshold: usize, pub persisted_files: usize, pub total_events_created: u64, } fn events_to_batch(events: &[Event]) -> Result { let schema = journal_schema(); let arrays: Vec = vec![ Arc::new(StringArray::from(events.iter().map(|e| e.event_id.as_str()).collect::>())), Arc::new(StringArray::from(events.iter().map(|e| e.timestamp.to_rfc3339()).collect::>())), Arc::new(StringArray::from(events.iter().map(|e| e.entity_type.as_str()).collect::>())), Arc::new(StringArray::from(events.iter().map(|e| e.entity_id.as_str()).collect::>())), Arc::new(StringArray::from(events.iter().map(|e| e.field.as_str()).collect::>())), Arc::new(StringArray::from(events.iter().map(|e| e.action.as_str()).collect::>())), Arc::new(StringArray::from(events.iter().map(|e| e.old_value.as_str()).collect::>())), Arc::new(StringArray::from(events.iter().map(|e| e.new_value.as_str()).collect::>())), Arc::new(StringArray::from(events.iter().map(|e| e.actor.as_str()).collect::>())), Arc::new(StringArray::from(events.iter().map(|e| e.source.as_str()).collect::>())), Arc::new(StringArray::from(events.iter().map(|e| e.workspace_id.as_str()).collect::>())), ]; RecordBatch::try_new(schema, arrays).map_err(|e| format!("batch error: {e}")) } fn batch_to_events(batch: &RecordBatch) -> Result, String> { let get_col = |i: usize| -> Result<&StringArray, String> { batch.column(i).as_any().downcast_ref::() .ok_or_else(|| format!("column {i} not string")) }; let event_ids = get_col(0)?; let timestamps = get_col(1)?; let entity_types = get_col(2)?; let entity_ids = get_col(3)?; let fields = get_col(4)?; let actions = get_col(5)?; let old_values = get_col(6)?; let new_values = get_col(7)?; let actors = get_col(8)?; let sources = get_col(9)?; let workspace_ids = get_col(10)?; let mut events = Vec::with_capacity(batch.num_rows()); for i in 0..batch.num_rows() { events.push(Event { event_id: event_ids.value(i).to_string(), timestamp: timestamps.value(i).parse().unwrap_or_else(|_| Utc::now()), entity_type: entity_types.value(i).to_string(), entity_id: entity_ids.value(i).to_string(), field: fields.value(i).to_string(), action: actions.value(i).to_string(), old_value: old_values.value(i).to_string(), new_value: new_values.value(i).to_string(), actor: actors.value(i).to_string(), source: sources.value(i).to_string(), workspace_id: workspace_ids.value(i).to_string(), }); } Ok(events) }