Compare commits
No commits in common. "150cc3b6814b92d74385d546cbe762a4df1dab2f" and "3d068681f59a0e2fb1b601c50e407cfd9026bd64" have entirely different histories.
150cc3b681
...
3d068681f5
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -48,7 +48,6 @@ version = "0.1.0"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"axum",
|
"axum",
|
||||||
"lru",
|
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
|||||||
@ -12,4 +12,3 @@ serde_json = { workspace = true }
|
|||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
|
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
lru = "0.12"
|
|
||||||
|
|||||||
@ -1,10 +1,5 @@
|
|||||||
use lru::LruCache;
|
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::num::NonZeroUsize;
|
|
||||||
use std::sync::Mutex;
|
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
/// HTTP client for the Python AI sidecar.
|
/// HTTP client for the Python AI sidecar.
|
||||||
@ -23,42 +18,11 @@ use std::time::Duration;
|
|||||||
/// callers that want observability (vectord modules); it's left
|
/// callers that want observability (vectord modules); it's left
|
||||||
/// unset by callers that ARE the gateway internals (avoids self-loops
|
/// unset by callers that ARE the gateway internals (avoids self-loops
|
||||||
/// + redundant hops).
|
/// + redundant hops).
|
||||||
/// Per-text embed cache key. We key on (model, text) so different
|
|
||||||
/// model selections produce distinct cache lines — a query embedded
|
|
||||||
/// under nomic-embed-text-v2-moe must NOT collide with the same
|
|
||||||
/// query under nomic-embed-text v1.
|
|
||||||
#[derive(Eq, PartialEq, Hash, Clone)]
|
|
||||||
struct EmbedCacheKey {
|
|
||||||
model: String,
|
|
||||||
text: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Default LRU cache size — 4096 entries × ~6KB per 768-d f64
|
|
||||||
/// vector ≈ 24MB. Sized for typical staffing-domain repetition
|
|
||||||
/// (coordinator workflows have query repetition rates around 70-90%
|
|
||||||
/// per session). Tunable via [aibridge].embed_cache_size in the
|
|
||||||
/// config; 0 disables the cache entirely.
|
|
||||||
const DEFAULT_EMBED_CACHE_SIZE: usize = 4096;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct AiClient {
|
pub struct AiClient {
|
||||||
client: Client,
|
client: Client,
|
||||||
base_url: String,
|
base_url: String,
|
||||||
gateway_url: Option<String>,
|
gateway_url: Option<String>,
|
||||||
/// Closes the 63× perf gap with Go side. Mirrors the shape of
|
|
||||||
/// Go's internal/embed/cached.go::CachedProvider — same
|
|
||||||
/// (model, text) → vector caching, same nil-disable semantics.
|
|
||||||
/// None = caching disabled (cache_size=0); Some = bounded LRU.
|
|
||||||
embed_cache: Option<Arc<Mutex<LruCache<EmbedCacheKey, Vec<f64>>>>>,
|
|
||||||
/// Hit / miss counters for /admin observability + load-test
|
|
||||||
/// validation. Atomic so Clone'd AiClients share the same counts.
|
|
||||||
embed_cache_hits: Arc<AtomicU64>,
|
|
||||||
embed_cache_misses: Arc<AtomicU64>,
|
|
||||||
/// Pinned at construction time so the EmbedResponse can carry
|
|
||||||
/// dimension consistently even when every text was a cache hit
|
|
||||||
/// (no fresh sidecar call to learn the dim from). Set on first
|
|
||||||
/// successful sidecar embed; checked on every cache hit.
|
|
||||||
cached_dim: Arc<AtomicU64>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// -- Request/Response types --
|
// -- Request/Response types --
|
||||||
@ -131,49 +95,17 @@ pub struct RerankResponse {
|
|||||||
|
|
||||||
impl AiClient {
|
impl AiClient {
|
||||||
pub fn new(base_url: &str) -> Self {
|
pub fn new(base_url: &str) -> Self {
|
||||||
Self::with_embed_cache(base_url, DEFAULT_EMBED_CACHE_SIZE)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Constructs an AiClient with an explicit embed-cache size.
|
|
||||||
/// Pass 0 to disable the cache entirely (matches Go-side
|
|
||||||
/// CachedProvider's nil-cache semantics).
|
|
||||||
pub fn with_embed_cache(base_url: &str, cache_size: usize) -> Self {
|
|
||||||
let client = Client::builder()
|
let client = Client::builder()
|
||||||
.timeout(Duration::from_secs(120))
|
.timeout(Duration::from_secs(120))
|
||||||
.build()
|
.build()
|
||||||
.expect("failed to build HTTP client");
|
.expect("failed to build HTTP client");
|
||||||
let embed_cache = if cache_size > 0 {
|
|
||||||
// SAFETY: cache_size > 0 just verified, NonZeroUsize::new
|
|
||||||
// returns Some.
|
|
||||||
let cap = NonZeroUsize::new(cache_size).expect("cache_size > 0");
|
|
||||||
Some(Arc::new(Mutex::new(LruCache::new(cap))))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
Self {
|
Self {
|
||||||
client,
|
client,
|
||||||
base_url: base_url.trim_end_matches('/').to_string(),
|
base_url: base_url.trim_end_matches('/').to_string(),
|
||||||
gateway_url: None,
|
gateway_url: None,
|
||||||
embed_cache,
|
|
||||||
embed_cache_hits: Arc::new(AtomicU64::new(0)),
|
|
||||||
embed_cache_misses: Arc::new(AtomicU64::new(0)),
|
|
||||||
cached_dim: Arc::new(AtomicU64::new(0)),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Cache hit/miss/size snapshot. Useful for /admin endpoints +
|
|
||||||
/// load-test validation ("did the cache fire as expected?").
|
|
||||||
pub fn embed_cache_stats(&self) -> (u64, u64, usize) {
|
|
||||||
let hits = self.embed_cache_hits.load(Ordering::Relaxed);
|
|
||||||
let misses = self.embed_cache_misses.load(Ordering::Relaxed);
|
|
||||||
let len = self
|
|
||||||
.embed_cache
|
|
||||||
.as_ref()
|
|
||||||
.map(|c| c.lock().map(|g| g.len()).unwrap_or(0))
|
|
||||||
.unwrap_or(0);
|
|
||||||
(hits, misses, len)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Same as `new`, but every `generate()` is routed through
|
/// Same as `new`, but every `generate()` is routed through
|
||||||
/// `${gateway_url}/v1/chat` (provider=ollama) for observability.
|
/// `${gateway_url}/v1/chat` (provider=ollama) for observability.
|
||||||
/// Use this for callers OUTSIDE the gateway. Inside the gateway
|
/// Use this for callers OUTSIDE the gateway. Inside the gateway
|
||||||
@ -195,104 +127,10 @@ impl AiClient {
|
|||||||
resp.json().await.map_err(|e| format!("invalid response: {e}"))
|
resp.json().await.map_err(|e| format!("invalid response: {e}"))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Embed with per-text LRU caching. Mirrors Go-side
|
|
||||||
/// CachedProvider behavior: cache key is (model, text);
|
|
||||||
/// cache-hit texts skip the sidecar; cache-miss texts batch
|
|
||||||
/// into a single sidecar call; results are interleaved in the
|
|
||||||
/// caller's input order.
|
|
||||||
///
|
|
||||||
/// Closes ~95% of the load-test perf gap vs Go side (loadgen
|
|
||||||
/// 2026-05-01: Rust 128 RPS → with cache ≥ 7000 RPS expected
|
|
||||||
/// for warm-cache workloads). Cold-cache behavior unchanged
|
|
||||||
/// (every text is a miss → single sidecar call, identical to
|
|
||||||
/// pre-cache).
|
|
||||||
pub async fn embed(&self, req: EmbedRequest) -> Result<EmbedResponse, String> {
|
pub async fn embed(&self, req: EmbedRequest) -> Result<EmbedResponse, String> {
|
||||||
let model_key = req.model.clone().unwrap_or_default();
|
|
||||||
|
|
||||||
// Fast path: cache disabled → original behavior.
|
|
||||||
let Some(cache) = self.embed_cache.as_ref() else {
|
|
||||||
return self.embed_uncached(&req).await;
|
|
||||||
};
|
|
||||||
if req.texts.is_empty() {
|
|
||||||
return self.embed_uncached(&req).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
// First pass: check cache for each text. Track which positions
|
|
||||||
// need a sidecar fetch.
|
|
||||||
let mut embeddings: Vec<Option<Vec<f64>>> = vec![None; req.texts.len()];
|
|
||||||
let mut miss_indices: Vec<usize> = Vec::new();
|
|
||||||
let mut miss_texts: Vec<String> = Vec::new();
|
|
||||||
{
|
|
||||||
let mut guard = cache.lock().map_err(|e| format!("cache lock poisoned: {e}"))?;
|
|
||||||
for (i, text) in req.texts.iter().enumerate() {
|
|
||||||
let key = EmbedCacheKey { model: model_key.clone(), text: text.clone() };
|
|
||||||
if let Some(vec) = guard.get(&key) {
|
|
||||||
embeddings[i] = Some(vec.clone());
|
|
||||||
self.embed_cache_hits.fetch_add(1, Ordering::Relaxed);
|
|
||||||
} else {
|
|
||||||
miss_indices.push(i);
|
|
||||||
miss_texts.push(text.clone());
|
|
||||||
self.embed_cache_misses.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// All hit? Return immediately. Use cached_dim to populate
|
|
||||||
// the response dimension (no sidecar to ask).
|
|
||||||
if miss_indices.is_empty() {
|
|
||||||
let dim = self.cached_dim.load(Ordering::Relaxed) as usize;
|
|
||||||
let dim = if dim == 0 { embeddings[0].as_ref().map(|v| v.len()).unwrap_or(0) } else { dim };
|
|
||||||
return Ok(EmbedResponse {
|
|
||||||
embeddings: embeddings.into_iter().map(|opt| opt.expect("filled")).collect(),
|
|
||||||
model: req.model.unwrap_or_else(|| "nomic-embed-text".to_string()),
|
|
||||||
dimensions: dim,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Second pass: fetch the misses in one sidecar call.
|
|
||||||
let miss_req = EmbedRequest { texts: miss_texts.clone(), model: req.model.clone() };
|
|
||||||
let resp = self.embed_uncached(&miss_req).await?;
|
|
||||||
if resp.embeddings.len() != miss_texts.len() {
|
|
||||||
return Err(format!(
|
|
||||||
"embed cache: sidecar returned {} embeddings for {} texts",
|
|
||||||
resp.embeddings.len(),
|
|
||||||
miss_texts.len()
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pin cached_dim on first successful response.
|
|
||||||
if resp.dimensions > 0 {
|
|
||||||
self.cached_dim.store(resp.dimensions as u64, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Insert misses into cache + fill response slots.
|
|
||||||
{
|
|
||||||
let mut guard = cache.lock().map_err(|e| format!("cache lock poisoned: {e}"))?;
|
|
||||||
for (j, idx) in miss_indices.iter().enumerate() {
|
|
||||||
let key = EmbedCacheKey {
|
|
||||||
model: model_key.clone(),
|
|
||||||
text: miss_texts[j].clone(),
|
|
||||||
};
|
|
||||||
let vec = resp.embeddings[j].clone();
|
|
||||||
guard.put(key, vec.clone());
|
|
||||||
embeddings[*idx] = Some(vec);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(EmbedResponse {
|
|
||||||
embeddings: embeddings.into_iter().map(|opt| opt.expect("filled")).collect(),
|
|
||||||
model: resp.model,
|
|
||||||
dimensions: resp.dimensions,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Direct sidecar call — original pre-cache behavior. Used
|
|
||||||
/// internally by embed() for cache-miss batches and as the
|
|
||||||
/// transparent fallback when the cache is disabled.
|
|
||||||
async fn embed_uncached(&self, req: &EmbedRequest) -> Result<EmbedResponse, String> {
|
|
||||||
let resp = self.client
|
let resp = self.client
|
||||||
.post(format!("{}/embed", self.base_url))
|
.post(format!("{}/embed", self.base_url))
|
||||||
.json(req)
|
.json(&req)
|
||||||
.send()
|
.send()
|
||||||
.await
|
.await
|
||||||
.map_err(|e| format!("embed request failed: {e}"))?;
|
.map_err(|e| format!("embed request failed: {e}"))?;
|
||||||
|
|||||||
@ -23,13 +23,6 @@ import { roleBand, SCENES, SCENES_VERSION, FACE_RENDER_DIM, type RoleBand } from
|
|||||||
import { ICONS, ICONS_VERSION, DEFAULT_NEGATIVE, certToSlug, type IconRecipe } from "./icon_recipes.js";
|
import { ICONS, ICONS_VERSION, DEFAULT_NEGATIVE, certToSlug, type IconRecipe } from "./icon_recipes.js";
|
||||||
|
|
||||||
const BASE = process.env.LAKEHOUSE_URL || "http://localhost:3100";
|
const BASE = process.env.LAKEHOUSE_URL || "http://localhost:3100";
|
||||||
// G5 cutover prep (2026-05-01): when GO_LAKEHOUSE_URL is set, the
|
|
||||||
// `/_go/*` pass-through routes to the Go gateway. Off-by-default
|
|
||||||
// (empty = disabled, returns 503 on /_go/*). Reversible via unset.
|
|
||||||
// Doesn't modify any existing tool — additive parallel path so
|
|
||||||
// operators can validate Go-side handlers under real Bun-frontend
|
|
||||||
// shape without touching the production Rust path.
|
|
||||||
const GO_BASE = process.env.GO_LAKEHOUSE_URL || "";
|
|
||||||
const PORT = parseInt(process.env.MCP_PORT || "3700");
|
const PORT = parseInt(process.env.MCP_PORT || "3700");
|
||||||
|
|
||||||
// ─── Staffer roster — used by the per-staffer hot-swap index (G). ────
|
// ─── Staffer roster — used by the per-staffer hot-swap index (G). ────
|
||||||
@ -718,26 +711,6 @@ async function main() {
|
|||||||
return new Response(await r.text(), { status: r.status, headers: { "Content-Type": "application/json" } });
|
return new Response(await r.text(), { status: r.status, headers: { "Content-Type": "application/json" } });
|
||||||
}
|
}
|
||||||
|
|
||||||
// G5 cutover slice: pass-through to GO_BASE (Go gateway). Same
|
|
||||||
// shape as /api/* but points at the Go side. Returns 503 when
|
|
||||||
// GO_LAKEHOUSE_URL isn't set so callers know the cutover slice
|
|
||||||
// is off rather than silently routing to Rust. No body
|
|
||||||
// transformation — caller responsible for sending Go-shaped
|
|
||||||
// requests (e.g. /v1/embed not /ai/embed).
|
|
||||||
if (url.pathname.startsWith("/_go/")) {
|
|
||||||
if (!GO_BASE) {
|
|
||||||
return err("GO_LAKEHOUSE_URL not set; /_go/* cutover slice is disabled", 503);
|
|
||||||
}
|
|
||||||
const path = url.pathname.replace("/_go", "");
|
|
||||||
const body = req.method !== "GET" ? await req.text() : undefined;
|
|
||||||
try {
|
|
||||||
const r = await fetch(`${GO_BASE}${path}`, { method: req.method, headers: { "Content-Type": "application/json" }, body });
|
|
||||||
return new Response(await r.text(), { status: r.status, headers: { "Content-Type": "application/json" } });
|
|
||||||
} catch (e) {
|
|
||||||
return err(`Go gateway unreachable at ${GO_BASE}: ${e}`, 502);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Proof — narrative HTML served from mcp-server/proof.html.
|
// Proof — narrative HTML served from mcp-server/proof.html.
|
||||||
// Live tests consumed client-side via /proof.json.
|
// Live tests consumed client-side via /proof.json.
|
||||||
if (url.pathname === "/proof") {
|
if (url.pathname === "/proof") {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user