root bf7cf96911 Phase 9: Event journal — append-only mutation history
- journald crate: immutable event log for every data mutation
- Events: entity_type, entity_id, field, action, old_value, new_value,
  actor, source, workspace_id, timestamp
- In-memory buffer with configurable flush threshold (default 100 events)
- Flush writes events as Parquet to journal/ directory
- Query: GET /journal/history/{entity_id} — full history of any record
- Query: GET /journal/recent?limit=50 — latest events across all entities
- Convenience methods: record_insert, record_update, record_ingest
- Stats: GET /journal/stats — buffer size, persisted file count
- Manual flush: POST /journal/flush
- Per ADR-012: events are never modified or deleted

This is the single most important future-proofing decision.
Once history is lost, it's gone forever.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 09:09:33 -05:00

132 lines
3.4 KiB
Rust

use axum::{
Json, Router,
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
};
use serde::Deserialize;
use crate::journal::{Event, Journal};
pub fn router(journal: Journal) -> Router {
Router::new()
.route("/health", get(health))
.route("/event", post(record_event))
.route("/update", post(record_update))
.route("/flush", post(flush))
.route("/history/{entity_id}", get(entity_history))
.route("/recent", get(recent_events))
.route("/stats", get(stats))
.with_state(journal)
}
async fn health() -> &'static str {
"journald ok"
}
// --- Record events ---
#[derive(Deserialize)]
struct RecordEventRequest {
entity_type: String,
entity_id: String,
action: String,
field: Option<String>,
old_value: Option<String>,
new_value: Option<String>,
actor: String,
source: String,
workspace_id: Option<String>,
}
async fn record_event(
State(journal): State<Journal>,
Json(req): Json<RecordEventRequest>,
) -> impl IntoResponse {
let event = Event {
event_id: String::new(), // assigned by journal
timestamp: chrono::Utc::now(),
entity_type: req.entity_type,
entity_id: req.entity_id,
field: req.field.unwrap_or_else(|| "*".to_string()),
action: req.action,
old_value: req.old_value.unwrap_or_default(),
new_value: req.new_value.unwrap_or_default(),
actor: req.actor,
source: req.source,
workspace_id: req.workspace_id.unwrap_or_default(),
};
match journal.record(event).await {
Ok(()) => Ok((StatusCode::CREATED, "event recorded")),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Deserialize)]
struct UpdateRequest {
entity_type: String,
entity_id: String,
field: String,
old_value: String,
new_value: String,
actor: String,
workspace_id: Option<String>,
}
async fn record_update(
State(journal): State<Journal>,
Json(req): Json<UpdateRequest>,
) -> impl IntoResponse {
match journal.record_update(
&req.entity_type, &req.entity_id, &req.field,
&req.old_value, &req.new_value, &req.actor, "api",
&req.workspace_id.unwrap_or_default(),
).await {
Ok(()) => Ok((StatusCode::CREATED, "update recorded")),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
// --- Flush ---
async fn flush(State(journal): State<Journal>) -> impl IntoResponse {
match journal.flush().await {
Ok(n) => Ok(format!("flushed {n} events")),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
// --- Query ---
async fn entity_history(
State(journal): State<Journal>,
Path(entity_id): Path<String>,
) -> impl IntoResponse {
match journal.get_entity_history(&entity_id).await {
Ok(events) => Ok(Json(events)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Deserialize)]
struct RecentQuery {
limit: Option<usize>,
}
async fn recent_events(
State(journal): State<Journal>,
Query(q): Query<RecentQuery>,
) -> impl IntoResponse {
let limit = q.limit.unwrap_or(50);
match journal.get_recent(limit).await {
Ok(events) => Ok(Json(events)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
async fn stats(State(journal): State<Journal>) -> impl IntoResponse {
Json(journal.stats().await)
}