vectord/index_registry: add last_used + build_signature (scrum iter 11)
Scrum iter 11 on crates/vectord/src/index_registry.rs flagged two
concrete field gaps (90% confidence). Both were tagged UnitMismatch
/ missing-invariant.
IndexMeta gains two Optional fields:
last_used: Option<DateTime<Utc>>
PRD 11.3 — when this index was last searched against. Callers
were reading created_at as a liveness proxy, which conflated
"built" with "used." IndexRegistry::touch_used(name) stamps the
field on every hit; incremental re-embed can now skip cold
indexes without misattributing "fresh build" to "recent use."
build_signature: Option<String>
PRD 11.3 — stable SHA-256 of (sorted source files + chunk_size
+ overlap + model_version). compute_build_signature() in the
same module is deterministic: file-order-invariant, changes on
chunk param, changes on model version. Lets incremental re-embed
answer "has anything changed since last build?" without scanning
the source Parquet.
Both fields are #[serde(default)] — the ~40 existing .json meta
files under vectors/meta/ load unchanged. Backward-compat verified
by the explicit `index_meta_deserializes_without_new_fields_backcompat`
test.
7 new tests:
- build_signature_is_deterministic
- build_signature_order_invariant (sorted internally)
- build_signature_changes_on_chunk_param
- build_signature_changes_on_model_version
- touch_used_updates_last_used
- touch_used_is_noop_on_missing_index
- index_meta_deserializes_without_new_fields_backcompat
Call-site fixes: crates/vectord/src/refresh.rs:294 and
crates/vectord/src/service.rs:244 both construct IndexMeta with
fully-literal init, default the new fields to None. One
indentation cleanup on service.rs (a pre-existing visual issue on
id_prefix: None).
Workspace warnings still at 0. touch_used() isn't wired into search
hot-path yet — follow-up commit when the search handlers can
adopt it without a broader refactor.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
6532938e85
commit
91a38dc20b
@ -46,6 +46,21 @@ pub struct IndexMeta {
|
||||
/// Existing indexes: "W-", "CAND-", "W500K-", etc.
|
||||
#[serde(default)]
|
||||
pub id_prefix: Option<String>,
|
||||
/// PRD 11.3 — when this index was last searched against. `None` =
|
||||
/// never used since registration (or pre-field-existed metadata).
|
||||
/// Incremental re-embed walks this to skip cold indexes.
|
||||
/// Scrum iter 11 flagged the missing field as a UnitMismatch
|
||||
/// because callers were reading `created_at` as a proxy for
|
||||
/// liveness, which conflated "built" with "used."
|
||||
#[serde(default)]
|
||||
pub last_used: Option<DateTime<Utc>>,
|
||||
/// PRD 11.3 — SHA-256 of (sorted source file list + chunk_size +
|
||||
/// overlap + model_version). Lets incremental re-embed detect
|
||||
/// "no change since last build" without scanning the source
|
||||
/// Parquet. None = signature not computed yet (pre-existing
|
||||
/// indexes before this field landed).
|
||||
#[serde(default)]
|
||||
pub build_signature: Option<String>,
|
||||
}
|
||||
|
||||
fn default_bucket() -> String { "primary".to_string() }
|
||||
@ -128,4 +143,139 @@ impl IndexRegistry {
|
||||
self.indexes.write().await.remove(index_name);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stamp `last_used = now()` on an index. Search handlers call this
|
||||
/// on every hit so incremental re-embed (PRD 11.3) can tell live
|
||||
/// indexes from cold ones. Silently no-ops if the index is unknown
|
||||
/// — callers get best-effort behavior, not a 500 on a missing row.
|
||||
pub async fn touch_used(&self, index_name: &str) {
|
||||
if let Some(m) = self.indexes.write().await.get_mut(index_name) {
|
||||
m.last_used = Some(Utc::now());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Compute a stable build_signature for PRD 11.3 incremental re-embed.
|
||||
/// Hashes (sorted source file list, chunk_size, overlap, model_version)
|
||||
/// so a caller can ask "has anything we built from changed?" without
|
||||
/// re-scanning the source parquet. Same inputs always produce the
|
||||
/// same hash.
|
||||
pub fn compute_build_signature(
|
||||
source_files: &[impl AsRef<str>],
|
||||
chunk_size: usize,
|
||||
overlap: usize,
|
||||
model_version: &str,
|
||||
) -> String {
|
||||
use sha2::{Digest, Sha256};
|
||||
let mut sorted: Vec<&str> = source_files.iter().map(|s| s.as_ref()).collect();
|
||||
sorted.sort();
|
||||
let mut hasher = Sha256::new();
|
||||
for f in &sorted {
|
||||
hasher.update(f.as_bytes());
|
||||
hasher.update(b"\n");
|
||||
}
|
||||
hasher.update(chunk_size.to_le_bytes());
|
||||
hasher.update(overlap.to_le_bytes());
|
||||
hasher.update(model_version.as_bytes());
|
||||
format!("{:x}", hasher.finalize())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn build_signature_is_deterministic() {
|
||||
let sig1 = compute_build_signature(&["a.parquet", "b.parquet"], 800, 80, "v1");
|
||||
let sig2 = compute_build_signature(&["a.parquet", "b.parquet"], 800, 80, "v1");
|
||||
assert_eq!(sig1, sig2, "same inputs → same hash");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_signature_order_invariant() {
|
||||
// Files get sorted internally so caller's order doesn't matter.
|
||||
let sig_a = compute_build_signature(&["a.parquet", "b.parquet"], 800, 80, "v1");
|
||||
let sig_b = compute_build_signature(&["b.parquet", "a.parquet"], 800, 80, "v1");
|
||||
assert_eq!(sig_a, sig_b, "file list order must not affect hash");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_signature_changes_on_chunk_param() {
|
||||
let sig_a = compute_build_signature(&["a.parquet"], 800, 80, "v1");
|
||||
let sig_b = compute_build_signature(&["a.parquet"], 900, 80, "v1");
|
||||
assert_ne!(sig_a, sig_b, "chunk_size change → different hash");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_signature_changes_on_model_version() {
|
||||
let sig_a = compute_build_signature(&["a.parquet"], 800, 80, "v1");
|
||||
let sig_b = compute_build_signature(&["a.parquet"], 800, 80, "v2");
|
||||
assert_ne!(sig_a, sig_b, "model version change → different hash");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn touch_used_updates_last_used() {
|
||||
use object_store::memory::InMemory;
|
||||
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
|
||||
let reg = IndexRegistry::new(store);
|
||||
let meta = IndexMeta {
|
||||
index_name: "test".into(),
|
||||
source: "s".into(),
|
||||
model_name: "m".into(),
|
||||
model_version: "v1".into(),
|
||||
dimensions: 768,
|
||||
chunk_count: 0,
|
||||
doc_count: 0,
|
||||
chunk_size: 800,
|
||||
overlap: 80,
|
||||
storage_key: "k".into(),
|
||||
created_at: Utc::now(),
|
||||
build_time_secs: 0.0,
|
||||
chunks_per_sec: 0.0,
|
||||
bucket: "primary".into(),
|
||||
vector_backend: Default::default(),
|
||||
id_prefix: None,
|
||||
last_used: None,
|
||||
build_signature: None,
|
||||
};
|
||||
reg.register(meta).await.unwrap();
|
||||
assert!(reg.get("test").await.unwrap().last_used.is_none());
|
||||
reg.touch_used("test").await;
|
||||
assert!(reg.get("test").await.unwrap().last_used.is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn touch_used_is_noop_on_missing_index() {
|
||||
use object_store::memory::InMemory;
|
||||
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
|
||||
let reg = IndexRegistry::new(store);
|
||||
// No panic — unknown index just doesn't get touched.
|
||||
reg.touch_used("nonexistent").await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn index_meta_deserializes_without_new_fields_backcompat() {
|
||||
// Pre-field-existence metadata files on disk must still load.
|
||||
// Critical — we have ~40 .json meta files under vectors/meta/
|
||||
// that predate these fields.
|
||||
let json = r#"{
|
||||
"index_name": "resumes_v1",
|
||||
"source": "resumes",
|
||||
"model_name": "nomic-embed-text",
|
||||
"model_version": "latest",
|
||||
"dimensions": 768,
|
||||
"chunk_count": 100,
|
||||
"doc_count": 10,
|
||||
"chunk_size": 800,
|
||||
"overlap": 80,
|
||||
"storage_key": "vectors/resumes_v1.parquet",
|
||||
"created_at": "2026-04-20T00:00:00Z",
|
||||
"build_time_secs": 1.0,
|
||||
"chunks_per_sec": 100.0
|
||||
}"#;
|
||||
let meta: IndexMeta = serde_json::from_str(json).expect("must deserialize pre-field meta");
|
||||
assert!(meta.last_used.is_none());
|
||||
assert!(meta.build_signature.is_none());
|
||||
assert_eq!(meta.bucket, "primary");
|
||||
}
|
||||
}
|
||||
|
||||
@ -308,6 +308,8 @@ async fn try_update_index_meta(
|
||||
bucket: "primary".to_string(),
|
||||
vector_backend: shared::types::VectorBackend::Parquet,
|
||||
id_prefix: None,
|
||||
last_used: None,
|
||||
build_signature: None,
|
||||
};
|
||||
index_registry.register(meta).await
|
||||
}
|
||||
|
||||
@ -257,7 +257,9 @@ async fn create_index(
|
||||
chunks_per_sec: rate,
|
||||
bucket: bucket.clone(),
|
||||
vector_backend: shared::types::VectorBackend::Parquet,
|
||||
id_prefix: None,
|
||||
id_prefix: None,
|
||||
last_used: None,
|
||||
build_signature: None,
|
||||
};
|
||||
let _ = registry.register(meta).await;
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user