- MemCache: LRU in-memory cache for hot datasets (configurable max, default 16GB) Pin/evict/stats endpoints: POST /query/cache/pin, /cache/evict, GET /cache/stats - Delta store: append-only delta Parquet files for row-level updates Write deltas without rewriting base files, merge at query time - Compaction: POST /query/compact merges deltas into base Parquet - Query engine: checks cache first, falls back to Parquet, merges deltas - Benchmarked on 2.47M rows: 1M row JOIN: 854ms cold → 96ms hot (8.9x speedup) 100K filter: 62ms cold → 21ms hot (3x speedup) 1.1M rows cached in 408MB RAM Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
136 lines
4.1 KiB
Rust
136 lines
4.1 KiB
Rust
/// In-memory cache for hot datasets.
|
|
/// Pinned datasets are loaded as Arrow RecordBatches and served from RAM.
|
|
/// LRU eviction keeps memory bounded.
|
|
|
|
use arrow::array::RecordBatch;
|
|
use arrow::datatypes::SchemaRef;
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
use tokio::sync::RwLock;
|
|
|
|
/// A cached dataset — schema + all batches in memory.
|
|
#[derive(Clone)]
|
|
pub struct CachedDataset {
|
|
pub name: String,
|
|
pub schema: SchemaRef,
|
|
pub batches: Vec<RecordBatch>,
|
|
pub row_count: usize,
|
|
pub size_bytes: usize, // approximate
|
|
pub last_accessed: std::time::Instant,
|
|
}
|
|
|
|
/// Memory cache with LRU eviction.
|
|
#[derive(Clone)]
|
|
pub struct MemCache {
|
|
datasets: Arc<RwLock<HashMap<String, CachedDataset>>>,
|
|
max_bytes: usize,
|
|
}
|
|
|
|
impl MemCache {
|
|
pub fn new(max_bytes: usize) -> Self {
|
|
Self {
|
|
datasets: Arc::new(RwLock::new(HashMap::new())),
|
|
max_bytes,
|
|
}
|
|
}
|
|
|
|
/// Load a dataset into the cache.
|
|
pub async fn put(&self, name: &str, schema: SchemaRef, batches: Vec<RecordBatch>) {
|
|
let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
|
|
let size_bytes: usize = batches.iter()
|
|
.map(|b| b.get_array_memory_size())
|
|
.sum();
|
|
|
|
// Evict if needed
|
|
self.evict_to_fit(size_bytes).await;
|
|
|
|
let entry = CachedDataset {
|
|
name: name.to_string(),
|
|
schema,
|
|
batches,
|
|
row_count,
|
|
size_bytes,
|
|
last_accessed: std::time::Instant::now(),
|
|
};
|
|
|
|
tracing::info!("cached '{}': {} rows, {:.1} MB",
|
|
name, row_count, size_bytes as f64 / 1024.0 / 1024.0);
|
|
|
|
let mut cache = self.datasets.write().await;
|
|
cache.insert(name.to_string(), entry);
|
|
}
|
|
|
|
/// Get a cached dataset, updating access time.
|
|
pub async fn get(&self, name: &str) -> Option<CachedDataset> {
|
|
let mut cache = self.datasets.write().await;
|
|
if let Some(entry) = cache.get_mut(name) {
|
|
entry.last_accessed = std::time::Instant::now();
|
|
Some(entry.clone())
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
|
|
/// Check if a dataset is cached.
|
|
pub async fn contains(&self, name: &str) -> bool {
|
|
self.datasets.read().await.contains_key(name)
|
|
}
|
|
|
|
/// Evict a specific dataset.
|
|
pub async fn evict(&self, name: &str) -> bool {
|
|
let mut cache = self.datasets.write().await;
|
|
cache.remove(name).is_some()
|
|
}
|
|
|
|
/// Get cache stats.
|
|
pub async fn stats(&self) -> CacheStats {
|
|
let cache = self.datasets.read().await;
|
|
let total_bytes: usize = cache.values().map(|d| d.size_bytes).sum();
|
|
let total_rows: usize = cache.values().map(|d| d.row_count).sum();
|
|
CacheStats {
|
|
datasets: cache.len(),
|
|
total_bytes,
|
|
total_rows,
|
|
max_bytes: self.max_bytes,
|
|
names: cache.keys().cloned().collect(),
|
|
}
|
|
}
|
|
|
|
/// Evict least-recently-used entries to make room for `needed_bytes`.
|
|
async fn evict_to_fit(&self, needed_bytes: usize) {
|
|
let mut cache = self.datasets.write().await;
|
|
let current: usize = cache.values().map(|d| d.size_bytes).sum();
|
|
|
|
if current + needed_bytes <= self.max_bytes {
|
|
return;
|
|
}
|
|
|
|
// Sort by last accessed (oldest first)
|
|
let mut entries: Vec<(String, std::time::Instant, usize)> = cache.iter()
|
|
.map(|(k, v)| (k.clone(), v.last_accessed, v.size_bytes))
|
|
.collect();
|
|
entries.sort_by_key(|(_, t, _)| *t);
|
|
|
|
let mut freed = 0usize;
|
|
let target = (current + needed_bytes).saturating_sub(self.max_bytes);
|
|
|
|
for (name, _, size) in entries {
|
|
if freed >= target {
|
|
break;
|
|
}
|
|
tracing::info!("evicting '{}' from cache ({:.1} MB)", name, size as f64 / 1024.0 / 1024.0);
|
|
cache.remove(&name);
|
|
freed += size;
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, serde::Serialize)]
|
|
pub struct CacheStats {
|
|
pub datasets: usize,
|
|
pub total_bytes: usize,
|
|
pub total_rows: usize,
|
|
pub max_bytes: usize,
|
|
pub names: Vec<String>,
|
|
}
|