/// Paged query results with server-side cursor. /// Large results are stored server-side, client fetches pages on demand. /// Pattern matches the embedding supervisor: chunk → process → retry. use arrow::array::RecordBatch; use arrow::json::writer::{JsonArray, Writer as JsonWriter}; use serde::Serialize; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; use crate::context::QueryEngine; /// A stored query result that clients page through. #[derive(Clone)] pub struct StoredResult { pub query_id: String, pub sql: String, pub columns: Vec, pub total_rows: usize, pub batches: Vec, pub created_at: std::time::Instant, } #[derive(Clone, Serialize)] pub struct ColumnInfo { pub name: String, pub data_type: String, } /// Response for initial query — metadata only, no rows. #[derive(Serialize)] pub struct QueryHandle { pub query_id: String, pub total_rows: usize, pub columns: Vec, pub total_pages: usize, pub page_size: usize, } /// A page of results. #[derive(Serialize)] pub struct PageResult { pub query_id: String, pub page: usize, pub page_size: usize, pub total_rows: usize, pub total_pages: usize, pub rows: serde_json::Value, pub row_count: usize, } /// Server-side result store with TTL. #[derive(Clone)] pub struct ResultStore { results: Arc>>, default_page_size: usize, max_results: usize, } impl ResultStore { pub fn new(default_page_size: usize, max_results: usize) -> Self { Self { results: Arc::new(RwLock::new(HashMap::new())), default_page_size, max_results, } } /// Execute query and store results. Returns handle with metadata. pub async fn execute_and_store( &self, engine: &QueryEngine, sql: &str, ) -> Result { let batches = engine.query(sql).await?; if batches.is_empty() { let qid = format!("qr-{}", chrono::Utc::now().timestamp_millis()); return Ok(QueryHandle { query_id: qid, total_rows: 0, columns: vec![], total_pages: 0, page_size: self.default_page_size, }); } let schema = batches[0].schema(); let columns: Vec = schema.fields().iter().map(|f| ColumnInfo { name: f.name().clone(), data_type: f.data_type().to_string(), }).collect(); let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); let total_pages = (total_rows + self.default_page_size - 1) / self.default_page_size; let qid = format!("qr-{}", chrono::Utc::now().timestamp_millis()); let result = StoredResult { query_id: qid.clone(), sql: sql.to_string(), columns: columns.clone(), total_rows, batches, created_at: std::time::Instant::now(), }; // Store (evict old results if at capacity) let mut store = self.results.write().await; if store.len() >= self.max_results { // Evict oldest if let Some(oldest_key) = store.iter() .min_by_key(|(_, v)| v.created_at) .map(|(k, _)| k.clone()) { store.remove(&oldest_key); } } store.insert(qid.clone(), result); Ok(QueryHandle { query_id: qid, total_rows, columns, total_pages, page_size: self.default_page_size, }) } /// Get a page of results. Handles chunking and retry-safe. pub async fn get_page( &self, query_id: &str, page: usize, page_size: Option, ) -> Result { let store = self.results.read().await; let result = store.get(query_id) .ok_or_else(|| format!("query result not found: {query_id} (may have expired)"))?; let ps = page_size.unwrap_or(self.default_page_size); let start_row = page * ps; if start_row >= result.total_rows { return Ok(PageResult { query_id: query_id.to_string(), page, page_size: ps, total_rows: result.total_rows, total_pages: (result.total_rows + ps - 1) / ps, rows: serde_json::Value::Array(vec![]), row_count: 0, }); } let end_row = (start_row + ps).min(result.total_rows); // Extract the right rows from batches let page_batches = slice_batches(&result.batches, start_row, end_row); let rows = batches_to_json(&page_batches)?; let row_count = rows.as_array().map(|a| a.len()).unwrap_or(0); Ok(PageResult { query_id: query_id.to_string(), page, page_size: ps, total_rows: result.total_rows, total_pages: (result.total_rows + ps - 1) / ps, rows, row_count, }) } pub fn default_page_size(&self) -> usize { self.default_page_size } } /// Slice record batches to extract rows [start, end). fn slice_batches(batches: &[RecordBatch], start: usize, end: usize) -> Vec { let mut result = Vec::new(); let mut current_offset = 0; for batch in batches { let batch_end = current_offset + batch.num_rows(); if batch_end <= start { current_offset = batch_end; continue; } if current_offset >= end { break; } let local_start = if start > current_offset { start - current_offset } else { 0 }; let local_end = if end < batch_end { end - current_offset } else { batch.num_rows() }; let length = local_end - local_start; if length > 0 { result.push(batch.slice(local_start, length)); } current_offset = batch_end; } result } fn batches_to_json(batches: &[RecordBatch]) -> Result { if batches.is_empty() { return Ok(serde_json::Value::Array(vec![])); } let mut buf = Vec::new(); let mut writer = JsonWriter::<_, JsonArray>::new(&mut buf); for batch in batches { writer.write(batch).map_err(|e| format!("JSON write: {e}"))?; } writer.finish().map_err(|e| format!("JSON finish: {e}"))?; drop(writer); serde_json::from_slice(&buf).map_err(|e| format!("JSON parse: {e}")) }