- journald crate: immutable event log for every data mutation
- Events: entity_type, entity_id, field, action, old_value, new_value,
actor, source, workspace_id, timestamp
- In-memory buffer with configurable flush threshold (default 100 events)
- Flush writes events as Parquet to journal/ directory
- Query: GET /journal/history/{entity_id} — full history of any record
- Query: GET /journal/recent?limit=50 — latest events across all entities
- Convenience methods: record_insert, record_update, record_ingest
- Stats: GET /journal/stats — buffer size, persisted file count
- Manual flush: POST /journal/flush
- Per ADR-012: events are never modified or deleted
This is the single most important future-proofing decision.
Once history is lost, it's gone forever.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
132 lines
3.4 KiB
Rust
132 lines
3.4 KiB
Rust
use axum::{
|
|
Json, Router,
|
|
extract::{Path, Query, State},
|
|
http::StatusCode,
|
|
response::IntoResponse,
|
|
routing::{get, post},
|
|
};
|
|
use serde::Deserialize;
|
|
|
|
use crate::journal::{Event, Journal};
|
|
|
|
pub fn router(journal: Journal) -> Router {
|
|
Router::new()
|
|
.route("/health", get(health))
|
|
.route("/event", post(record_event))
|
|
.route("/update", post(record_update))
|
|
.route("/flush", post(flush))
|
|
.route("/history/{entity_id}", get(entity_history))
|
|
.route("/recent", get(recent_events))
|
|
.route("/stats", get(stats))
|
|
.with_state(journal)
|
|
}
|
|
|
|
async fn health() -> &'static str {
|
|
"journald ok"
|
|
}
|
|
|
|
// --- Record events ---
|
|
|
|
#[derive(Deserialize)]
|
|
struct RecordEventRequest {
|
|
entity_type: String,
|
|
entity_id: String,
|
|
action: String,
|
|
field: Option<String>,
|
|
old_value: Option<String>,
|
|
new_value: Option<String>,
|
|
actor: String,
|
|
source: String,
|
|
workspace_id: Option<String>,
|
|
}
|
|
|
|
async fn record_event(
|
|
State(journal): State<Journal>,
|
|
Json(req): Json<RecordEventRequest>,
|
|
) -> impl IntoResponse {
|
|
let event = Event {
|
|
event_id: String::new(), // assigned by journal
|
|
timestamp: chrono::Utc::now(),
|
|
entity_type: req.entity_type,
|
|
entity_id: req.entity_id,
|
|
field: req.field.unwrap_or_else(|| "*".to_string()),
|
|
action: req.action,
|
|
old_value: req.old_value.unwrap_or_default(),
|
|
new_value: req.new_value.unwrap_or_default(),
|
|
actor: req.actor,
|
|
source: req.source,
|
|
workspace_id: req.workspace_id.unwrap_or_default(),
|
|
};
|
|
|
|
match journal.record(event).await {
|
|
Ok(()) => Ok((StatusCode::CREATED, "event recorded")),
|
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
|
}
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
struct UpdateRequest {
|
|
entity_type: String,
|
|
entity_id: String,
|
|
field: String,
|
|
old_value: String,
|
|
new_value: String,
|
|
actor: String,
|
|
workspace_id: Option<String>,
|
|
}
|
|
|
|
async fn record_update(
|
|
State(journal): State<Journal>,
|
|
Json(req): Json<UpdateRequest>,
|
|
) -> impl IntoResponse {
|
|
match journal.record_update(
|
|
&req.entity_type, &req.entity_id, &req.field,
|
|
&req.old_value, &req.new_value, &req.actor, "api",
|
|
&req.workspace_id.unwrap_or_default(),
|
|
).await {
|
|
Ok(()) => Ok((StatusCode::CREATED, "update recorded")),
|
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
|
}
|
|
}
|
|
|
|
// --- Flush ---
|
|
|
|
async fn flush(State(journal): State<Journal>) -> impl IntoResponse {
|
|
match journal.flush().await {
|
|
Ok(n) => Ok(format!("flushed {n} events")),
|
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
|
}
|
|
}
|
|
|
|
// --- Query ---
|
|
|
|
async fn entity_history(
|
|
State(journal): State<Journal>,
|
|
Path(entity_id): Path<String>,
|
|
) -> impl IntoResponse {
|
|
match journal.get_entity_history(&entity_id).await {
|
|
Ok(events) => Ok(Json(events)),
|
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
|
}
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
struct RecentQuery {
|
|
limit: Option<usize>,
|
|
}
|
|
|
|
async fn recent_events(
|
|
State(journal): State<Journal>,
|
|
Query(q): Query<RecentQuery>,
|
|
) -> impl IntoResponse {
|
|
let limit = q.limit.unwrap_or(50);
|
|
match journal.get_recent(limit).await {
|
|
Ok(events) => Ok(Json(events)),
|
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
|
}
|
|
}
|
|
|
|
async fn stats(State(journal): State<Journal>) -> impl IntoResponse {
|
|
Json(journal.stats().await)
|
|
}
|