Phase 9: Event journal — append-only mutation history
- journald crate: immutable event log for every data mutation
- Events: entity_type, entity_id, field, action, old_value, new_value,
actor, source, workspace_id, timestamp
- In-memory buffer with configurable flush threshold (default 100 events)
- Flush writes events as Parquet to journal/ directory
- Query: GET /journal/history/{entity_id} — full history of any record
- Query: GET /journal/recent?limit=50 — latest events across all entities
- Convenience methods: record_insert, record_update, record_ingest
- Stats: GET /journal/stats — buffer size, persisted file count
- Manual flush: POST /journal/flush
- Per ADR-012: events are never modified or deleted
This is the single most important future-proofing decision.
Once history is lost, it's gone forever.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
3b695cd592
commit
bf7cf96911
19
Cargo.lock
generated
19
Cargo.lock
generated
@ -2376,6 +2376,7 @@ dependencies = [
|
|||||||
"axum",
|
"axum",
|
||||||
"catalogd",
|
"catalogd",
|
||||||
"ingestd",
|
"ingestd",
|
||||||
|
"journald",
|
||||||
"object_store",
|
"object_store",
|
||||||
"opentelemetry",
|
"opentelemetry",
|
||||||
"opentelemetry-stdout",
|
"opentelemetry-stdout",
|
||||||
@ -3018,6 +3019,24 @@ dependencies = [
|
|||||||
"libc",
|
"libc",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "journald"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"arrow",
|
||||||
|
"axum",
|
||||||
|
"bytes",
|
||||||
|
"chrono",
|
||||||
|
"object_store",
|
||||||
|
"parquet",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"shared",
|
||||||
|
"storaged",
|
||||||
|
"tokio",
|
||||||
|
"tracing",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "js-sys"
|
name = "js-sys"
|
||||||
version = "0.3.91"
|
version = "0.3.91"
|
||||||
|
|||||||
@ -9,6 +9,7 @@ members = [
|
|||||||
"crates/aibridge",
|
"crates/aibridge",
|
||||||
"crates/ingestd",
|
"crates/ingestd",
|
||||||
"crates/vectord",
|
"crates/vectord",
|
||||||
|
"crates/journald",
|
||||||
"crates/gateway",
|
"crates/gateway",
|
||||||
"crates/ui",
|
"crates/ui",
|
||||||
]
|
]
|
||||||
|
|||||||
@ -11,6 +11,7 @@ queryd = { path = "../queryd" }
|
|||||||
aibridge = { path = "../aibridge" }
|
aibridge = { path = "../aibridge" }
|
||||||
ingestd = { path = "../ingestd" }
|
ingestd = { path = "../ingestd" }
|
||||||
vectord = { path = "../vectord" }
|
vectord = { path = "../vectord" }
|
||||||
|
journald = { path = "../journald" }
|
||||||
tokio = { workspace = true }
|
tokio = { workspace = true }
|
||||||
axum = { workspace = true }
|
axum = { workspace = true }
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
|
|||||||
@ -34,6 +34,9 @@ async fn main() {
|
|||||||
let cache = queryd::cache::MemCache::new(16 * 1024 * 1024 * 1024);
|
let cache = queryd::cache::MemCache::new(16 * 1024 * 1024 * 1024);
|
||||||
let engine = queryd::context::QueryEngine::new(registry.clone(), store.clone(), cache);
|
let engine = queryd::context::QueryEngine::new(registry.clone(), store.clone(), cache);
|
||||||
|
|
||||||
|
// Event journal — append-only mutation log (flush every 100 events)
|
||||||
|
let journal = journald::journal::Journal::new(store.clone(), 100);
|
||||||
|
|
||||||
// Workspace manager for agent-specific overlays
|
// Workspace manager for agent-specific overlays
|
||||||
let workspace_mgr = queryd::workspace::WorkspaceManager::new(store.clone());
|
let workspace_mgr = queryd::workspace::WorkspaceManager::new(store.clone());
|
||||||
if let Err(e) = workspace_mgr.rebuild().await {
|
if let Err(e) = workspace_mgr.rebuild().await {
|
||||||
@ -59,7 +62,8 @@ async fn main() {
|
|||||||
ai_client: ai_client.clone(),
|
ai_client: ai_client.clone(),
|
||||||
job_tracker: vectord::jobs::JobTracker::new(),
|
job_tracker: vectord::jobs::JobTracker::new(),
|
||||||
}))
|
}))
|
||||||
.nest("/workspaces", queryd::workspace_service::router(workspace_mgr));
|
.nest("/workspaces", queryd::workspace_service::router(workspace_mgr))
|
||||||
|
.nest("/journal", journald::service::router(journal));
|
||||||
|
|
||||||
// Auth middleware (if enabled)
|
// Auth middleware (if enabled)
|
||||||
if config.auth.enabled {
|
if config.auth.enabled {
|
||||||
|
|||||||
18
crates/journald/Cargo.toml
Normal file
18
crates/journald/Cargo.toml
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
[package]
|
||||||
|
name = "journald"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2024"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
shared = { path = "../shared" }
|
||||||
|
storaged = { path = "../storaged" }
|
||||||
|
tokio = { workspace = true }
|
||||||
|
axum = { workspace = true }
|
||||||
|
serde = { workspace = true }
|
||||||
|
serde_json = { workspace = true }
|
||||||
|
tracing = { workspace = true }
|
||||||
|
arrow = { workspace = true }
|
||||||
|
parquet = { workspace = true }
|
||||||
|
bytes = { workspace = true }
|
||||||
|
chrono = { workspace = true }
|
||||||
|
object_store = { workspace = true }
|
||||||
322
crates/journald/src/journal.rs
Normal file
322
crates/journald/src/journal.rs
Normal file
@ -0,0 +1,322 @@
|
|||||||
|
/// Append-only event journal.
|
||||||
|
/// Every mutation to any dataset is recorded here as an immutable event.
|
||||||
|
/// Events are never modified or deleted — this IS the audit trail.
|
||||||
|
///
|
||||||
|
/// Storage: events buffer in memory, flush to Parquet periodically.
|
||||||
|
/// Query: load Parquet files, filter by entity/field/actor/time.
|
||||||
|
|
||||||
|
use arrow::array::{ArrayRef, RecordBatch, StringArray, UInt64Array};
|
||||||
|
use arrow::datatypes::{DataType, Field, Schema};
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use object_store::ObjectStore;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
|
use shared::arrow_helpers::record_batch_to_parquet;
|
||||||
|
use storaged::ops;
|
||||||
|
|
||||||
|
/// A single mutation event. Immutable once created.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct Event {
|
||||||
|
pub event_id: String,
|
||||||
|
pub timestamp: DateTime<Utc>,
|
||||||
|
pub entity_type: String, // "candidate", "placement", "client", etc.
|
||||||
|
pub entity_id: String, // "CAND-00342"
|
||||||
|
pub field: String, // "phone", "*" for insert/delete
|
||||||
|
pub action: String, // "insert", "update", "delete"
|
||||||
|
pub old_value: String, // "" for inserts
|
||||||
|
pub new_value: String, // "" for deletes
|
||||||
|
pub actor: String, // "Sarah", "ingest_pipeline", "auto_matcher"
|
||||||
|
pub source: String, // "api", "ingest", "workspace", "agent"
|
||||||
|
pub workspace_id: String, // "" if not workspace-scoped
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Arrow schema for journal Parquet files.
|
||||||
|
fn journal_schema() -> Arc<Schema> {
|
||||||
|
Arc::new(Schema::new(vec![
|
||||||
|
Field::new("event_id", DataType::Utf8, false),
|
||||||
|
Field::new("timestamp", DataType::Utf8, false),
|
||||||
|
Field::new("entity_type", DataType::Utf8, false),
|
||||||
|
Field::new("entity_id", DataType::Utf8, false),
|
||||||
|
Field::new("field", DataType::Utf8, false),
|
||||||
|
Field::new("action", DataType::Utf8, false),
|
||||||
|
Field::new("old_value", DataType::Utf8, true),
|
||||||
|
Field::new("new_value", DataType::Utf8, true),
|
||||||
|
Field::new("actor", DataType::Utf8, false),
|
||||||
|
Field::new("source", DataType::Utf8, false),
|
||||||
|
Field::new("workspace_id", DataType::Utf8, true),
|
||||||
|
]))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The event journal — buffers events in memory, flushes to Parquet.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Journal {
|
||||||
|
buffer: Arc<RwLock<Vec<Event>>>,
|
||||||
|
store: Arc<dyn ObjectStore>,
|
||||||
|
flush_threshold: usize,
|
||||||
|
event_counter: Arc<std::sync::atomic::AtomicU64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Journal {
|
||||||
|
pub fn new(store: Arc<dyn ObjectStore>, flush_threshold: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
buffer: Arc::new(RwLock::new(Vec::new())),
|
||||||
|
store,
|
||||||
|
flush_threshold,
|
||||||
|
event_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Record a single event. Triggers flush if buffer exceeds threshold.
|
||||||
|
pub async fn record(&self, mut event: Event) -> Result<(), String> {
|
||||||
|
let seq = self.event_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
event.event_id = format!("evt-{}-{seq}", event.timestamp.timestamp_millis());
|
||||||
|
|
||||||
|
let should_flush = {
|
||||||
|
let mut buf = self.buffer.write().await;
|
||||||
|
buf.push(event);
|
||||||
|
buf.len() >= self.flush_threshold
|
||||||
|
};
|
||||||
|
|
||||||
|
if should_flush {
|
||||||
|
self.flush().await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Record a batch insert (multiple rows added to a dataset).
|
||||||
|
pub async fn record_insert(
|
||||||
|
&self,
|
||||||
|
entity_type: &str,
|
||||||
|
entity_id: &str,
|
||||||
|
actor: &str,
|
||||||
|
source: &str,
|
||||||
|
workspace_id: &str,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
self.record(Event {
|
||||||
|
event_id: String::new(),
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
entity_type: entity_type.to_string(),
|
||||||
|
entity_id: entity_id.to_string(),
|
||||||
|
field: "*".to_string(),
|
||||||
|
action: "insert".to_string(),
|
||||||
|
old_value: String::new(),
|
||||||
|
new_value: String::new(),
|
||||||
|
actor: actor.to_string(),
|
||||||
|
source: source.to_string(),
|
||||||
|
workspace_id: workspace_id.to_string(),
|
||||||
|
}).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Record a field update.
|
||||||
|
pub async fn record_update(
|
||||||
|
&self,
|
||||||
|
entity_type: &str,
|
||||||
|
entity_id: &str,
|
||||||
|
field: &str,
|
||||||
|
old_value: &str,
|
||||||
|
new_value: &str,
|
||||||
|
actor: &str,
|
||||||
|
source: &str,
|
||||||
|
workspace_id: &str,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
self.record(Event {
|
||||||
|
event_id: String::new(),
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
entity_type: entity_type.to_string(),
|
||||||
|
entity_id: entity_id.to_string(),
|
||||||
|
field: field.to_string(),
|
||||||
|
action: "update".to_string(),
|
||||||
|
old_value: old_value.to_string(),
|
||||||
|
new_value: new_value.to_string(),
|
||||||
|
actor: actor.to_string(),
|
||||||
|
source: source.to_string(),
|
||||||
|
workspace_id: workspace_id.to_string(),
|
||||||
|
}).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Record a batch of events from a dataset ingest.
|
||||||
|
pub async fn record_ingest(
|
||||||
|
&self,
|
||||||
|
entity_type: &str,
|
||||||
|
count: usize,
|
||||||
|
actor: &str,
|
||||||
|
source_file: &str,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
self.record(Event {
|
||||||
|
event_id: String::new(),
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
entity_type: entity_type.to_string(),
|
||||||
|
entity_id: format!("batch:{count}"),
|
||||||
|
field: "*".to_string(),
|
||||||
|
action: "ingest".to_string(),
|
||||||
|
old_value: String::new(),
|
||||||
|
new_value: format!("{count} rows from {source_file}"),
|
||||||
|
actor: actor.to_string(),
|
||||||
|
source: "ingest".to_string(),
|
||||||
|
workspace_id: String::new(),
|
||||||
|
}).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Flush buffer to Parquet. Called automatically at threshold or manually.
|
||||||
|
pub async fn flush(&self) -> Result<usize, String> {
|
||||||
|
let events = {
|
||||||
|
let mut buf = self.buffer.write().await;
|
||||||
|
if buf.is_empty() { return Ok(0); }
|
||||||
|
std::mem::take(&mut *buf)
|
||||||
|
};
|
||||||
|
|
||||||
|
let n = events.len();
|
||||||
|
let batch = events_to_batch(&events)?;
|
||||||
|
let parquet = record_batch_to_parquet(&batch)?;
|
||||||
|
|
||||||
|
let ts = Utc::now().format("%Y%m%d_%H%M%S_%3f");
|
||||||
|
let key = format!("journal/{ts}.parquet");
|
||||||
|
ops::put(&self.store, &key, parquet).await?;
|
||||||
|
|
||||||
|
tracing::info!("journal: flushed {n} events to {key}");
|
||||||
|
Ok(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Query journal: get all events for a specific entity.
|
||||||
|
pub async fn get_entity_history(&self, entity_id: &str) -> Result<Vec<Event>, String> {
|
||||||
|
let mut all_events = Vec::new();
|
||||||
|
|
||||||
|
// Check in-memory buffer first
|
||||||
|
{
|
||||||
|
let buf = self.buffer.read().await;
|
||||||
|
for e in buf.iter() {
|
||||||
|
if e.entity_id == entity_id {
|
||||||
|
all_events.push(e.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load from persisted journal files
|
||||||
|
let keys = ops::list(&self.store, Some("journal/")).await?;
|
||||||
|
for key in &keys {
|
||||||
|
if !key.ends_with(".parquet") { continue; }
|
||||||
|
let data = ops::get(&self.store, key).await?;
|
||||||
|
let (_, batches) = shared::arrow_helpers::parquet_to_record_batches(&data)?;
|
||||||
|
for batch in &batches {
|
||||||
|
let events = batch_to_events(batch)?;
|
||||||
|
for e in events {
|
||||||
|
if e.entity_id == entity_id {
|
||||||
|
all_events.push(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
all_events.sort_by_key(|e| e.timestamp);
|
||||||
|
Ok(all_events)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Query journal: get recent events (last N).
|
||||||
|
pub async fn get_recent(&self, limit: usize) -> Result<Vec<Event>, String> {
|
||||||
|
let mut all_events = Vec::new();
|
||||||
|
|
||||||
|
// Buffer
|
||||||
|
{
|
||||||
|
let buf = self.buffer.read().await;
|
||||||
|
all_events.extend(buf.iter().cloned());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Persisted (load most recent files first)
|
||||||
|
let mut keys = ops::list(&self.store, Some("journal/")).await?;
|
||||||
|
keys.sort();
|
||||||
|
keys.reverse();
|
||||||
|
|
||||||
|
for key in &keys {
|
||||||
|
if !key.ends_with(".parquet") { continue; }
|
||||||
|
if all_events.len() >= limit { break; }
|
||||||
|
let data = ops::get(&self.store, key).await?;
|
||||||
|
let (_, batches) = shared::arrow_helpers::parquet_to_record_batches(&data)?;
|
||||||
|
for batch in &batches {
|
||||||
|
all_events.extend(batch_to_events(batch)?);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
all_events.sort_by_key(|e| e.timestamp);
|
||||||
|
all_events.reverse();
|
||||||
|
all_events.truncate(limit);
|
||||||
|
Ok(all_events)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get buffer stats.
|
||||||
|
pub async fn stats(&self) -> JournalStats {
|
||||||
|
let buf = self.buffer.read().await;
|
||||||
|
let keys = ops::list(&self.store, Some("journal/")).await.unwrap_or_default();
|
||||||
|
JournalStats {
|
||||||
|
buffer_events: buf.len(),
|
||||||
|
flush_threshold: self.flush_threshold,
|
||||||
|
persisted_files: keys.len(),
|
||||||
|
total_events_created: self.event_counter.load(std::sync::atomic::Ordering::Relaxed),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct JournalStats {
|
||||||
|
pub buffer_events: usize,
|
||||||
|
pub flush_threshold: usize,
|
||||||
|
pub persisted_files: usize,
|
||||||
|
pub total_events_created: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn events_to_batch(events: &[Event]) -> Result<RecordBatch, String> {
|
||||||
|
let schema = journal_schema();
|
||||||
|
let arrays: Vec<ArrayRef> = vec![
|
||||||
|
Arc::new(StringArray::from(events.iter().map(|e| e.event_id.as_str()).collect::<Vec<_>>())),
|
||||||
|
Arc::new(StringArray::from(events.iter().map(|e| e.timestamp.to_rfc3339()).collect::<Vec<_>>())),
|
||||||
|
Arc::new(StringArray::from(events.iter().map(|e| e.entity_type.as_str()).collect::<Vec<_>>())),
|
||||||
|
Arc::new(StringArray::from(events.iter().map(|e| e.entity_id.as_str()).collect::<Vec<_>>())),
|
||||||
|
Arc::new(StringArray::from(events.iter().map(|e| e.field.as_str()).collect::<Vec<_>>())),
|
||||||
|
Arc::new(StringArray::from(events.iter().map(|e| e.action.as_str()).collect::<Vec<_>>())),
|
||||||
|
Arc::new(StringArray::from(events.iter().map(|e| e.old_value.as_str()).collect::<Vec<_>>())),
|
||||||
|
Arc::new(StringArray::from(events.iter().map(|e| e.new_value.as_str()).collect::<Vec<_>>())),
|
||||||
|
Arc::new(StringArray::from(events.iter().map(|e| e.actor.as_str()).collect::<Vec<_>>())),
|
||||||
|
Arc::new(StringArray::from(events.iter().map(|e| e.source.as_str()).collect::<Vec<_>>())),
|
||||||
|
Arc::new(StringArray::from(events.iter().map(|e| e.workspace_id.as_str()).collect::<Vec<_>>())),
|
||||||
|
];
|
||||||
|
RecordBatch::try_new(schema, arrays).map_err(|e| format!("batch error: {e}"))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn batch_to_events(batch: &RecordBatch) -> Result<Vec<Event>, String> {
|
||||||
|
let get_col = |i: usize| -> Result<&StringArray, String> {
|
||||||
|
batch.column(i).as_any().downcast_ref::<StringArray>()
|
||||||
|
.ok_or_else(|| format!("column {i} not string"))
|
||||||
|
};
|
||||||
|
|
||||||
|
let event_ids = get_col(0)?;
|
||||||
|
let timestamps = get_col(1)?;
|
||||||
|
let entity_types = get_col(2)?;
|
||||||
|
let entity_ids = get_col(3)?;
|
||||||
|
let fields = get_col(4)?;
|
||||||
|
let actions = get_col(5)?;
|
||||||
|
let old_values = get_col(6)?;
|
||||||
|
let new_values = get_col(7)?;
|
||||||
|
let actors = get_col(8)?;
|
||||||
|
let sources = get_col(9)?;
|
||||||
|
let workspace_ids = get_col(10)?;
|
||||||
|
|
||||||
|
let mut events = Vec::with_capacity(batch.num_rows());
|
||||||
|
for i in 0..batch.num_rows() {
|
||||||
|
events.push(Event {
|
||||||
|
event_id: event_ids.value(i).to_string(),
|
||||||
|
timestamp: timestamps.value(i).parse().unwrap_or_else(|_| Utc::now()),
|
||||||
|
entity_type: entity_types.value(i).to_string(),
|
||||||
|
entity_id: entity_ids.value(i).to_string(),
|
||||||
|
field: fields.value(i).to_string(),
|
||||||
|
action: actions.value(i).to_string(),
|
||||||
|
old_value: old_values.value(i).to_string(),
|
||||||
|
new_value: new_values.value(i).to_string(),
|
||||||
|
actor: actors.value(i).to_string(),
|
||||||
|
source: sources.value(i).to_string(),
|
||||||
|
workspace_id: workspace_ids.value(i).to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Ok(events)
|
||||||
|
}
|
||||||
2
crates/journald/src/lib.rs
Normal file
2
crates/journald/src/lib.rs
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
pub mod journal;
|
||||||
|
pub mod service;
|
||||||
131
crates/journald/src/service.rs
Normal file
131
crates/journald/src/service.rs
Normal file
@ -0,0 +1,131 @@
|
|||||||
|
use axum::{
|
||||||
|
Json, Router,
|
||||||
|
extract::{Path, Query, State},
|
||||||
|
http::StatusCode,
|
||||||
|
response::IntoResponse,
|
||||||
|
routing::{get, post},
|
||||||
|
};
|
||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
|
use crate::journal::{Event, Journal};
|
||||||
|
|
||||||
|
pub fn router(journal: Journal) -> Router {
|
||||||
|
Router::new()
|
||||||
|
.route("/health", get(health))
|
||||||
|
.route("/event", post(record_event))
|
||||||
|
.route("/update", post(record_update))
|
||||||
|
.route("/flush", post(flush))
|
||||||
|
.route("/history/{entity_id}", get(entity_history))
|
||||||
|
.route("/recent", get(recent_events))
|
||||||
|
.route("/stats", get(stats))
|
||||||
|
.with_state(journal)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn health() -> &'static str {
|
||||||
|
"journald ok"
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Record events ---
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct RecordEventRequest {
|
||||||
|
entity_type: String,
|
||||||
|
entity_id: String,
|
||||||
|
action: String,
|
||||||
|
field: Option<String>,
|
||||||
|
old_value: Option<String>,
|
||||||
|
new_value: Option<String>,
|
||||||
|
actor: String,
|
||||||
|
source: String,
|
||||||
|
workspace_id: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn record_event(
|
||||||
|
State(journal): State<Journal>,
|
||||||
|
Json(req): Json<RecordEventRequest>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let event = Event {
|
||||||
|
event_id: String::new(), // assigned by journal
|
||||||
|
timestamp: chrono::Utc::now(),
|
||||||
|
entity_type: req.entity_type,
|
||||||
|
entity_id: req.entity_id,
|
||||||
|
field: req.field.unwrap_or_else(|| "*".to_string()),
|
||||||
|
action: req.action,
|
||||||
|
old_value: req.old_value.unwrap_or_default(),
|
||||||
|
new_value: req.new_value.unwrap_or_default(),
|
||||||
|
actor: req.actor,
|
||||||
|
source: req.source,
|
||||||
|
workspace_id: req.workspace_id.unwrap_or_default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
match journal.record(event).await {
|
||||||
|
Ok(()) => Ok((StatusCode::CREATED, "event recorded")),
|
||||||
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct UpdateRequest {
|
||||||
|
entity_type: String,
|
||||||
|
entity_id: String,
|
||||||
|
field: String,
|
||||||
|
old_value: String,
|
||||||
|
new_value: String,
|
||||||
|
actor: String,
|
||||||
|
workspace_id: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn record_update(
|
||||||
|
State(journal): State<Journal>,
|
||||||
|
Json(req): Json<UpdateRequest>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
match journal.record_update(
|
||||||
|
&req.entity_type, &req.entity_id, &req.field,
|
||||||
|
&req.old_value, &req.new_value, &req.actor, "api",
|
||||||
|
&req.workspace_id.unwrap_or_default(),
|
||||||
|
).await {
|
||||||
|
Ok(()) => Ok((StatusCode::CREATED, "update recorded")),
|
||||||
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Flush ---
|
||||||
|
|
||||||
|
async fn flush(State(journal): State<Journal>) -> impl IntoResponse {
|
||||||
|
match journal.flush().await {
|
||||||
|
Ok(n) => Ok(format!("flushed {n} events")),
|
||||||
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Query ---
|
||||||
|
|
||||||
|
async fn entity_history(
|
||||||
|
State(journal): State<Journal>,
|
||||||
|
Path(entity_id): Path<String>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
match journal.get_entity_history(&entity_id).await {
|
||||||
|
Ok(events) => Ok(Json(events)),
|
||||||
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct RecentQuery {
|
||||||
|
limit: Option<usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn recent_events(
|
||||||
|
State(journal): State<Journal>,
|
||||||
|
Query(q): Query<RecentQuery>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let limit = q.limit.unwrap_or(50);
|
||||||
|
match journal.get_recent(limit).await {
|
||||||
|
Ok(events) => Ok(Json(events)),
|
||||||
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn stats(State(journal): State<Journal>) -> impl IntoResponse {
|
||||||
|
Json(journal.stats().await)
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user