lakehouse/crates/queryd/src/service.rs
root 4e1c400f5d Phase E.2: Compaction integrates tombstones — physical deletion closes GDPR loop
Phase E gave us soft-delete at query time (tombstones hide rows via a
DataFusion filter view). This completes the invariant: after compact,
tombstoned rows are PHYSICALLY absent from the parquet on disk.

delta::compact changes:
- Signature adds tombstones: &[Tombstone]
- After merging base + deltas, apply_tombstone_filter builds a
  BooleanArray keep-mask per batch (True where row_key_value is NOT
  in the tombstone set) and applies arrow::compute::filter_record_batch
- Supports Utf8, Int32, Int64 key columns (matches refresh.rs coverage
  for pg- and csv-derived schemas)
- CompactResult gains tombstones_applied + rows_dropped_by_tombstones
- Caller clears tombstone store on success

Critical correctness fix surfaced during E2E testing:
The original Phase 8 compact concatenated N independent Parquet byte
streams from record_batch_to_parquet() — each with its own footer.
Parquet readers only see the FIRST footer's data; the rest is invisible.
Latent since Phase 8 shipped; triggered by tombstone-filtering produc-
ing multiple batches. Corrupted candidates.parquet on first test run
(restored from UI fixture copy — good argument for test data in repo).

Fix:
- Single ArrowWriter per compaction, writes every batch into one
  properly-footered Parquet
- Snappy compression to match ingest defaults (otherwise rewrite
  inflated file 3× — 10.5MB → 34MB — because no compression was set)
- Verify-before-swap: parse written buf back to confirm row count
  matches expected; refuses to overwrite base_key if verification fails
- Write to {base_key}.compact-{ts}.tmp first, then to base_key; delete
  temp; only then delete delta files. Any error along the way leaves
  the original base intact.

TombstoneStore::clear(dataset) drops all tombstone batch files and
evicts the per-dataset AppendLog from cache. Called after successful
compact.

QueryEngine::catalog() accessor exposes the Registry so queryd
handlers can reach the tombstone store without routing through gateway
state.

E2E on candidates (100K rows, 15 cols):
- Baseline: 10.59 MB, 100000 rows
- Tombstone CAND-000001/2/3 (soft-delete): 99997 visible, 100000 raw
- Compact: tombstones_applied=3, rows_dropped=3, final_rows=99997
- Post: 10.72 MB (Snappy), valid parquet (1 row_group), 99997 rows
- Restart: persists, tombstones list empty, __raw__candidates also
  99997 (the 3 IDs are physically gone from disk)

PRD invariant close: deletion is now actually deletion, not just
masking. GDPR erasure request → tombstone + schedule compact → data
gone.

Deferred:
- Compact-all-datasets cron (currently manual per-dataset via
  POST /query/compact)
- Compaction of tombstone batch files themselves (they grow at
  flush_threshold=1 per tombstone; TombstoneStore::compact exists
  but not auto-called)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 10:38:30 -05:00

215 lines
5.9 KiB
Rust

use arrow::array::RecordBatch;
use arrow::json::writer::{JsonArray, Writer as JsonWriter};
use axum::{
Json, Router,
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
};
use serde::{Deserialize, Serialize};
use crate::cache::CacheStats;
use crate::context::QueryEngine;
use crate::delta;
use crate::paged::ResultStore;
#[derive(Clone)]
pub struct QueryState {
pub engine: QueryEngine,
pub result_store: ResultStore,
}
pub fn router(engine: QueryEngine) -> Router {
let state = QueryState {
engine: engine.clone(),
result_store: ResultStore::new(100, 50), // 100 rows/page, keep 50 results
};
Router::new()
.route("/health", get(health))
.route("/sql", post(execute_query))
.route("/paged", post(paged_query))
.route("/page/{query_id}/{page}", get(get_page))
.route("/cache/pin", post(pin_dataset))
.route("/cache/evict", post(evict_dataset))
.route("/cache/stats", get(cache_stats))
.route("/compact", post(compact_dataset))
.with_state(state)
}
async fn health() -> &'static str {
"queryd ok"
}
// --- SQL Query ---
#[derive(Deserialize)]
struct QueryRequest {
sql: String,
}
#[derive(Serialize)]
struct QueryResponse {
columns: Vec<ColumnInfo>,
rows: serde_json::Value,
row_count: usize,
}
#[derive(Serialize)]
struct ColumnInfo {
name: String,
data_type: String,
}
fn batches_to_json(batches: &[RecordBatch]) -> Result<serde_json::Value, String> {
let mut buf = Vec::new();
let mut writer = JsonWriter::<_, JsonArray>::new(&mut buf);
for batch in batches {
writer.write(batch).map_err(|e| format!("JSON write error: {e}"))?;
}
writer.finish().map_err(|e| format!("JSON finish error: {e}"))?;
drop(writer);
serde_json::from_slice(&buf).map_err(|e| format!("JSON parse error: {e}"))
}
async fn execute_query(
State(state): State<QueryState>,
Json(req): Json<QueryRequest>,
) -> impl IntoResponse {
tracing::info!("executing query: {}", req.sql);
match state.engine.query(&req.sql).await {
Ok(batches) => {
if batches.is_empty() {
return Ok(Json(QueryResponse {
columns: vec![],
rows: serde_json::Value::Array(vec![]),
row_count: 0,
}));
}
let schema = batches[0].schema();
let columns: Vec<ColumnInfo> = schema.fields().iter().map(|f| ColumnInfo {
name: f.name().clone(),
data_type: f.data_type().to_string(),
}).collect();
let rows = batches_to_json(&batches)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
let row_count = rows.as_array().map(|a| a.len()).unwrap_or(0);
Ok(Json(QueryResponse {
columns,
rows,
row_count,
}))
}
Err(e) => Err((StatusCode::BAD_REQUEST, e)),
}
}
// --- Paged Queries (large result sets) ---
async fn paged_query(
State(state): State<QueryState>,
Json(req): Json<QueryRequest>,
) -> impl IntoResponse {
tracing::info!("paged query: {}", req.sql);
match state.result_store.execute_and_store(&state.engine, &req.sql).await {
Ok(handle) => Ok(Json(handle)),
Err(e) => Err((StatusCode::BAD_REQUEST, e)),
}
}
#[derive(Deserialize)]
struct PageQuery {
size: Option<usize>,
}
async fn get_page(
State(state): State<QueryState>,
Path((query_id, page)): Path<(String, usize)>,
Query(q): Query<PageQuery>,
) -> impl IntoResponse {
match state.result_store.get_page(&query_id, page, q.size).await {
Ok(result) => Ok(Json(result)),
Err(e) => Err((StatusCode::NOT_FOUND, e)),
}
}
// --- Cache Management ---
#[derive(Deserialize)]
struct CacheRequest {
dataset: String,
}
async fn pin_dataset(
State(state): State<QueryState>,
Json(req): Json<CacheRequest>,
) -> impl IntoResponse {
match state.engine.pin_dataset(&req.dataset).await {
Ok(()) => Ok((StatusCode::OK, format!("pinned: {}", req.dataset))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
async fn evict_dataset(
State(state): State<QueryState>,
Json(req): Json<CacheRequest>,
) -> impl IntoResponse {
if state.engine.cache().evict(&req.dataset).await {
(StatusCode::OK, format!("evicted: {}", req.dataset))
} else {
(StatusCode::NOT_FOUND, format!("not cached: {}", req.dataset))
}
}
async fn cache_stats(State(state): State<QueryState>) -> impl IntoResponse {
let stats = state.engine.cache().stats().await;
Json(stats)
}
// --- Compaction ---
#[derive(Deserialize)]
struct CompactRequest {
dataset: String,
base_key: String,
primary_key: Option<String>,
}
async fn compact_dataset(
State(state): State<QueryState>,
Json(req): Json<CompactRequest>,
) -> impl IntoResponse {
// Phase E: pull tombstones for this dataset and let compact physically
// drop those rows. After a successful rewrite, clear the tombstone log
// — the rows are gone from disk, the tombstones have done their job.
let tombstones = state
.engine
.catalog()
.list_tombstones(&req.dataset)
.await
.unwrap_or_default();
match delta::compact(
state.engine.store(),
&req.dataset,
&req.base_key,
req.primary_key.as_deref(),
&tombstones,
).await {
Ok(result) => {
if result.rows_dropped_by_tombstones > 0 {
if let Err(e) = state.engine.catalog().tombstones().clear(&req.dataset).await {
tracing::warn!("post-compact tombstone clear failed: {e}");
}
}
Ok(Json(result))
}
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}