diff --git a/crates/queryd/src/lib.rs b/crates/queryd/src/lib.rs index 2fff448..f653de4 100644 --- a/crates/queryd/src/lib.rs +++ b/crates/queryd/src/lib.rs @@ -1,6 +1,7 @@ pub mod cache; pub mod context; pub mod delta; +pub mod paged; pub mod service; pub mod workspace; pub mod workspace_service; diff --git a/crates/queryd/src/paged.rs b/crates/queryd/src/paged.rs new file mode 100644 index 0000000..e3eadf5 --- /dev/null +++ b/crates/queryd/src/paged.rs @@ -0,0 +1,222 @@ +/// Paged query results with server-side cursor. +/// Large results are stored server-side, client fetches pages on demand. +/// Pattern matches the embedding supervisor: chunk → process → retry. + +use arrow::array::RecordBatch; +use arrow::json::writer::{JsonArray, Writer as JsonWriter}; +use serde::Serialize; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; + +use crate::context::QueryEngine; + +/// A stored query result that clients page through. +#[derive(Clone)] +pub struct StoredResult { + pub query_id: String, + pub sql: String, + pub columns: Vec, + pub total_rows: usize, + pub batches: Vec, + pub created_at: std::time::Instant, +} + +#[derive(Clone, Serialize)] +pub struct ColumnInfo { + pub name: String, + pub data_type: String, +} + +/// Response for initial query — metadata only, no rows. +#[derive(Serialize)] +pub struct QueryHandle { + pub query_id: String, + pub total_rows: usize, + pub columns: Vec, + pub total_pages: usize, + pub page_size: usize, +} + +/// A page of results. +#[derive(Serialize)] +pub struct PageResult { + pub query_id: String, + pub page: usize, + pub page_size: usize, + pub total_rows: usize, + pub total_pages: usize, + pub rows: serde_json::Value, + pub row_count: usize, +} + +/// Server-side result store with TTL. +#[derive(Clone)] +pub struct ResultStore { + results: Arc>>, + default_page_size: usize, + max_results: usize, +} + +impl ResultStore { + pub fn new(default_page_size: usize, max_results: usize) -> Self { + Self { + results: Arc::new(RwLock::new(HashMap::new())), + default_page_size, + max_results, + } + } + + /// Execute query and store results. Returns handle with metadata. + pub async fn execute_and_store( + &self, + engine: &QueryEngine, + sql: &str, + ) -> Result { + let batches = engine.query(sql).await?; + + if batches.is_empty() { + let qid = format!("qr-{}", chrono::Utc::now().timestamp_millis()); + return Ok(QueryHandle { + query_id: qid, + total_rows: 0, + columns: vec![], + total_pages: 0, + page_size: self.default_page_size, + }); + } + + let schema = batches[0].schema(); + let columns: Vec = schema.fields().iter().map(|f| ColumnInfo { + name: f.name().clone(), + data_type: f.data_type().to_string(), + }).collect(); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + let total_pages = (total_rows + self.default_page_size - 1) / self.default_page_size; + + let qid = format!("qr-{}", chrono::Utc::now().timestamp_millis()); + + let result = StoredResult { + query_id: qid.clone(), + sql: sql.to_string(), + columns: columns.clone(), + total_rows, + batches, + created_at: std::time::Instant::now(), + }; + + // Store (evict old results if at capacity) + let mut store = self.results.write().await; + if store.len() >= self.max_results { + // Evict oldest + if let Some(oldest_key) = store.iter() + .min_by_key(|(_, v)| v.created_at) + .map(|(k, _)| k.clone()) + { + store.remove(&oldest_key); + } + } + store.insert(qid.clone(), result); + + Ok(QueryHandle { + query_id: qid, + total_rows, + columns, + total_pages, + page_size: self.default_page_size, + }) + } + + /// Get a page of results. Handles chunking and retry-safe. + pub async fn get_page( + &self, + query_id: &str, + page: usize, + page_size: Option, + ) -> Result { + let store = self.results.read().await; + let result = store.get(query_id) + .ok_or_else(|| format!("query result not found: {query_id} (may have expired)"))?; + + let ps = page_size.unwrap_or(self.default_page_size); + let start_row = page * ps; + + if start_row >= result.total_rows { + return Ok(PageResult { + query_id: query_id.to_string(), + page, + page_size: ps, + total_rows: result.total_rows, + total_pages: (result.total_rows + ps - 1) / ps, + rows: serde_json::Value::Array(vec![]), + row_count: 0, + }); + } + + let end_row = (start_row + ps).min(result.total_rows); + + // Extract the right rows from batches + let page_batches = slice_batches(&result.batches, start_row, end_row); + let rows = batches_to_json(&page_batches)?; + let row_count = rows.as_array().map(|a| a.len()).unwrap_or(0); + + Ok(PageResult { + query_id: query_id.to_string(), + page, + page_size: ps, + total_rows: result.total_rows, + total_pages: (result.total_rows + ps - 1) / ps, + rows, + row_count, + }) + } + + pub fn default_page_size(&self) -> usize { + self.default_page_size + } +} + +/// Slice record batches to extract rows [start, end). +fn slice_batches(batches: &[RecordBatch], start: usize, end: usize) -> Vec { + let mut result = Vec::new(); + let mut current_offset = 0; + + for batch in batches { + let batch_end = current_offset + batch.num_rows(); + + if batch_end <= start { + current_offset = batch_end; + continue; + } + if current_offset >= end { + break; + } + + let local_start = if start > current_offset { start - current_offset } else { 0 }; + let local_end = if end < batch_end { end - current_offset } else { batch.num_rows() }; + let length = local_end - local_start; + + if length > 0 { + result.push(batch.slice(local_start, length)); + } + + current_offset = batch_end; + } + + result +} + +fn batches_to_json(batches: &[RecordBatch]) -> Result { + if batches.is_empty() { + return Ok(serde_json::Value::Array(vec![])); + } + let mut buf = Vec::new(); + let mut writer = JsonWriter::<_, JsonArray>::new(&mut buf); + for batch in batches { + writer.write(batch).map_err(|e| format!("JSON write: {e}"))?; + } + writer.finish().map_err(|e| format!("JSON finish: {e}"))?; + drop(writer); + serde_json::from_slice(&buf).map_err(|e| format!("JSON parse: {e}")) +} diff --git a/crates/queryd/src/service.rs b/crates/queryd/src/service.rs index 1660b8c..3be7094 100644 --- a/crates/queryd/src/service.rs +++ b/crates/queryd/src/service.rs @@ -2,7 +2,7 @@ use arrow::array::RecordBatch; use arrow::json::writer::{JsonArray, Writer as JsonWriter}; use axum::{ Json, Router, - extract::State, + extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, routing::{get, post}, @@ -12,16 +12,29 @@ use serde::{Deserialize, Serialize}; use crate::cache::CacheStats; use crate::context::QueryEngine; use crate::delta; +use crate::paged::ResultStore; + +#[derive(Clone)] +pub struct QueryState { + pub engine: QueryEngine, + pub result_store: ResultStore, +} pub fn router(engine: QueryEngine) -> Router { + let state = QueryState { + engine: engine.clone(), + result_store: ResultStore::new(100, 50), // 100 rows/page, keep 50 results + }; Router::new() .route("/health", get(health)) .route("/sql", post(execute_query)) + .route("/paged", post(paged_query)) + .route("/page/{query_id}/{page}", get(get_page)) .route("/cache/pin", post(pin_dataset)) .route("/cache/evict", post(evict_dataset)) .route("/cache/stats", get(cache_stats)) .route("/compact", post(compact_dataset)) - .with_state(engine) + .with_state(state) } async fn health() -> &'static str { @@ -60,12 +73,12 @@ fn batches_to_json(batches: &[RecordBatch]) -> Result } async fn execute_query( - State(engine): State, + State(state): State, Json(req): Json, ) -> impl IntoResponse { tracing::info!("executing query: {}", req.sql); - match engine.query(&req.sql).await { + match state.engine.query(&req.sql).await { Ok(batches) => { if batches.is_empty() { return Ok(Json(QueryResponse { @@ -96,6 +109,35 @@ async fn execute_query( } } +// --- Paged Queries (large result sets) --- + +async fn paged_query( + State(state): State, + Json(req): Json, +) -> impl IntoResponse { + tracing::info!("paged query: {}", req.sql); + match state.result_store.execute_and_store(&state.engine, &req.sql).await { + Ok(handle) => Ok(Json(handle)), + Err(e) => Err((StatusCode::BAD_REQUEST, e)), + } +} + +#[derive(Deserialize)] +struct PageQuery { + size: Option, +} + +async fn get_page( + State(state): State, + Path((query_id, page)): Path<(String, usize)>, + Query(q): Query, +) -> impl IntoResponse { + match state.result_store.get_page(&query_id, page, q.size).await { + Ok(result) => Ok(Json(result)), + Err(e) => Err((StatusCode::NOT_FOUND, e)), + } +} + // --- Cache Management --- #[derive(Deserialize)] @@ -104,28 +146,28 @@ struct CacheRequest { } async fn pin_dataset( - State(engine): State, + State(state): State, Json(req): Json, ) -> impl IntoResponse { - match engine.pin_dataset(&req.dataset).await { + match state.engine.pin_dataset(&req.dataset).await { Ok(()) => Ok((StatusCode::OK, format!("pinned: {}", req.dataset))), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } async fn evict_dataset( - State(engine): State, + State(state): State, Json(req): Json, ) -> impl IntoResponse { - if engine.cache().evict(&req.dataset).await { + if state.engine.cache().evict(&req.dataset).await { (StatusCode::OK, format!("evicted: {}", req.dataset)) } else { (StatusCode::NOT_FOUND, format!("not cached: {}", req.dataset)) } } -async fn cache_stats(State(engine): State) -> impl IntoResponse { - let stats = engine.cache().stats().await; +async fn cache_stats(State(state): State) -> impl IntoResponse { + let stats = state.engine.cache().stats().await; Json(stats) } @@ -139,11 +181,11 @@ struct CompactRequest { } async fn compact_dataset( - State(engine): State, + State(state): State, Json(req): Json, ) -> impl IntoResponse { match delta::compact( - engine.store(), + state.engine.store(), &req.dataset, &req.base_key, req.primary_key.as_deref(), diff --git a/crates/ui/data/_catalog/manifests/6d0002ef-28de-4b92-abed-0bfb7ab5fb6f.json b/crates/ui/data/_catalog/manifests/6d0002ef-28de-4b92-abed-0bfb7ab5fb6f.json new file mode 100644 index 0000000..f85f1b6 --- /dev/null +++ b/crates/ui/data/_catalog/manifests/6d0002ef-28de-4b92-abed-0bfb7ab5fb6f.json @@ -0,0 +1,23 @@ +{ + "id": "6d0002ef-28de-4b92-abed-0bfb7ab5fb6f", + "name": "candidates", + "schema_fingerprint": "auto", + "objects": [ + { + "bucket": "data", + "key": "datasets/candidates.parquet", + "size_bytes": 10592165, + "created_at": "2026-03-28T01:54:06.898505222Z" + } + ], + "created_at": "2026-03-28T01:54:06.898506121Z", + "updated_at": "2026-03-28T01:54:06.898506121Z", + "description": "", + "owner": "", + "sensitivity": null, + "columns": [], + "lineage": null, + "freshness": null, + "tags": [], + "row_count": null +} \ No newline at end of file diff --git a/crates/ui/data/_catalog/manifests/73a4ef71-5b2a-4124-a2c2-fcefdde99624.json b/crates/ui/data/_catalog/manifests/73a4ef71-5b2a-4124-a2c2-fcefdde99624.json new file mode 100644 index 0000000..7d87bc2 --- /dev/null +++ b/crates/ui/data/_catalog/manifests/73a4ef71-5b2a-4124-a2c2-fcefdde99624.json @@ -0,0 +1,23 @@ +{ + "id": "73a4ef71-5b2a-4124-a2c2-fcefdde99624", + "name": "job_orders", + "schema_fingerprint": "auto", + "objects": [ + { + "bucket": "data", + "key": "datasets/job_orders.parquet", + "size_bytes": 905534, + "created_at": "2026-03-28T01:54:07.007332060Z" + } + ], + "created_at": "2026-03-28T01:54:07.007332775Z", + "updated_at": "2026-03-28T01:54:07.007332775Z", + "description": "", + "owner": "", + "sensitivity": null, + "columns": [], + "lineage": null, + "freshness": null, + "tags": [], + "row_count": null +} \ No newline at end of file diff --git a/crates/ui/data/_catalog/manifests/b56694bb-f4f7-447d-b4d6-ac89103a6a3a.json b/crates/ui/data/_catalog/manifests/b56694bb-f4f7-447d-b4d6-ac89103a6a3a.json new file mode 100644 index 0000000..821407f --- /dev/null +++ b/crates/ui/data/_catalog/manifests/b56694bb-f4f7-447d-b4d6-ac89103a6a3a.json @@ -0,0 +1,23 @@ +{ + "id": "b56694bb-f4f7-447d-b4d6-ac89103a6a3a", + "name": "call_log", + "schema_fingerprint": "auto", + "objects": [ + { + "bucket": "data", + "key": "datasets/call_log.parquet", + "size_bytes": 35951077, + "created_at": "2026-03-28T01:54:15.457573021Z" + } + ], + "created_at": "2026-03-28T01:54:15.457573701Z", + "updated_at": "2026-03-28T01:54:15.457573701Z", + "description": "", + "owner": "", + "sensitivity": null, + "columns": [], + "lineage": null, + "freshness": null, + "tags": [], + "row_count": null +} \ No newline at end of file diff --git a/crates/ui/data/_catalog/manifests/daa5a11b-6898-40ba-a811-3bd4b6f5b750.json b/crates/ui/data/_catalog/manifests/daa5a11b-6898-40ba-a811-3bd4b6f5b750.json new file mode 100644 index 0000000..7a68362 --- /dev/null +++ b/crates/ui/data/_catalog/manifests/daa5a11b-6898-40ba-a811-3bd4b6f5b750.json @@ -0,0 +1,23 @@ +{ + "id": "daa5a11b-6898-40ba-a811-3bd4b6f5b750", + "name": "email_log", + "schema_fingerprint": "auto", + "objects": [ + { + "bucket": "data", + "key": "datasets/email_log.parquet", + "size_bytes": 16768671, + "created_at": "2026-03-28T01:54:17.356056562Z" + } + ], + "created_at": "2026-03-28T01:54:17.356057529Z", + "updated_at": "2026-03-28T01:54:17.356057529Z", + "description": "", + "owner": "", + "sensitivity": null, + "columns": [], + "lineage": null, + "freshness": null, + "tags": [], + "row_count": null +} \ No newline at end of file diff --git a/crates/ui/data/_catalog/manifests/dd396ee4-3a1a-4939-af60-84d409eebed4.json b/crates/ui/data/_catalog/manifests/dd396ee4-3a1a-4939-af60-84d409eebed4.json new file mode 100644 index 0000000..c00d144 --- /dev/null +++ b/crates/ui/data/_catalog/manifests/dd396ee4-3a1a-4939-af60-84d409eebed4.json @@ -0,0 +1,23 @@ +{ + "id": "dd396ee4-3a1a-4939-af60-84d409eebed4", + "name": "clients", + "schema_fingerprint": "auto", + "objects": [ + { + "bucket": "data", + "key": "datasets/clients.parquet", + "size_bytes": 21971, + "created_at": "2026-03-28T01:54:06.904379300Z" + } + ], + "created_at": "2026-03-28T01:54:06.904379793Z", + "updated_at": "2026-03-28T01:54:06.904379793Z", + "description": "", + "owner": "", + "sensitivity": null, + "columns": [], + "lineage": null, + "freshness": null, + "tags": [], + "row_count": null +} \ No newline at end of file diff --git a/crates/ui/data/_catalog/manifests/f85285e3-111a-4a55-aa58-05fd740ed862.json b/crates/ui/data/_catalog/manifests/f85285e3-111a-4a55-aa58-05fd740ed862.json new file mode 100644 index 0000000..9605b39 --- /dev/null +++ b/crates/ui/data/_catalog/manifests/f85285e3-111a-4a55-aa58-05fd740ed862.json @@ -0,0 +1,23 @@ +{ + "id": "f85285e3-111a-4a55-aa58-05fd740ed862", + "name": "timesheets", + "schema_fingerprint": "auto", + "objects": [ + { + "bucket": "data", + "key": "datasets/timesheets.parquet", + "size_bytes": 17539932, + "created_at": "2026-03-28T01:54:11.947773535Z" + } + ], + "created_at": "2026-03-28T01:54:11.947778065Z", + "updated_at": "2026-03-28T01:54:11.947778065Z", + "description": "", + "owner": "", + "sensitivity": null, + "columns": [], + "lineage": null, + "freshness": null, + "tags": [], + "row_count": null +} \ No newline at end of file diff --git a/crates/ui/data/_catalog/manifests/ffa72582-dc11-4454-a46e-35839a7d04bb.json b/crates/ui/data/_catalog/manifests/ffa72582-dc11-4454-a46e-35839a7d04bb.json new file mode 100644 index 0000000..1a90d7a --- /dev/null +++ b/crates/ui/data/_catalog/manifests/ffa72582-dc11-4454-a46e-35839a7d04bb.json @@ -0,0 +1,23 @@ +{ + "id": "ffa72582-dc11-4454-a46e-35839a7d04bb", + "name": "placements", + "schema_fingerprint": "auto", + "objects": [ + { + "bucket": "data", + "key": "datasets/placements.parquet", + "size_bytes": 1213820, + "created_at": "2026-03-28T01:54:07.109218264Z" + } + ], + "created_at": "2026-03-28T01:54:07.109219070Z", + "updated_at": "2026-03-28T01:54:07.109219070Z", + "description": "", + "owner": "", + "sensitivity": null, + "columns": [], + "lineage": null, + "freshness": null, + "tags": [], + "row_count": null +} \ No newline at end of file diff --git a/crates/ui/data/datasets/call_log.parquet b/crates/ui/data/datasets/call_log.parquet new file mode 100644 index 0000000..0f62d0e Binary files /dev/null and b/crates/ui/data/datasets/call_log.parquet differ diff --git a/crates/ui/data/datasets/candidates.parquet b/crates/ui/data/datasets/candidates.parquet new file mode 100644 index 0000000..a7f05d5 Binary files /dev/null and b/crates/ui/data/datasets/candidates.parquet differ diff --git a/crates/ui/data/datasets/clients.parquet b/crates/ui/data/datasets/clients.parquet new file mode 100644 index 0000000..c61e229 Binary files /dev/null and b/crates/ui/data/datasets/clients.parquet differ diff --git a/crates/ui/data/datasets/email_log.parquet b/crates/ui/data/datasets/email_log.parquet new file mode 100644 index 0000000..34ee53c Binary files /dev/null and b/crates/ui/data/datasets/email_log.parquet differ diff --git a/crates/ui/data/datasets/job_orders.parquet b/crates/ui/data/datasets/job_orders.parquet new file mode 100644 index 0000000..3ad5c39 Binary files /dev/null and b/crates/ui/data/datasets/job_orders.parquet differ diff --git a/crates/ui/data/datasets/placements.parquet b/crates/ui/data/datasets/placements.parquet new file mode 100644 index 0000000..49d3d55 Binary files /dev/null and b/crates/ui/data/datasets/placements.parquet differ diff --git a/crates/ui/data/datasets/timesheets.parquet b/crates/ui/data/datasets/timesheets.parquet new file mode 100644 index 0000000..c1e6fb1 Binary files /dev/null and b/crates/ui/data/datasets/timesheets.parquet differ diff --git a/crates/ui/src/main.rs b/crates/ui/src/main.rs index 7b69ee9..8206f33 100644 --- a/crates/ui/src/main.rs +++ b/crates/ui/src/main.rs @@ -1003,8 +1003,8 @@ fn ResultsTable(response: QueryResponse) -> Element { if response.row_count == 0 { div { class: "empty-sm", "no rows returned" } } else if let Some(rows) = rows { - if rows.len() > 200 { - div { class: "results-info", "Showing first 200 of {response.row_count} rows" } + if rows.len() > 500 { + div { class: "results-info", "Showing first 500 of {response.row_count} rows (use SQL tab with LIMIT for larger)" } } div { class: "table-wrap", table { @@ -1016,7 +1016,7 @@ fn ResultsTable(response: QueryResponse) -> Element { } } tbody { - for row in rows.iter().take(200) { + for row in rows.iter().take(500) { tr { for col in response.columns.iter() { td { {format_cell(row.get(&col.name))} }