aibridge: drop Python sidecar from hot path; AiClient → direct Ollama
Some checks failed
lakehouse/auditor 11 blocking issues: cloud: claim not backed — "Verified end-to-end against persistent Go stack on :4110:"

The "drop Python sidecar from Rust aibridge" item from the
architecture_comparison decisions tracker. Universal-win cleanup —
removes 1 process + 1 runtime + 1 hop from every embed/generate
request, with no behavior change.

## What was on the hot path before

  gateway → AiClient → http://:3200 (FastAPI sidecar)
                          ├── embed.py    → http://:11434 (Ollama)
                          ├── generate.py → http://:11434
                          ├── rerank.py   → http://:11434 (loops generate)
                          └── admin.py    → http://:11434 (/api/ps + nvidia-smi)

The sidecar's hot-path code (~120 LOC across embed.py / generate.py /
rerank.py / admin.py) was pure pass-through: each route translated
its request body to Ollama's wire format and returned Ollama's
response in a sidecar envelope. Zero logic, one full HTTP hop of
overhead.

## What's on the hot path now

  gateway → AiClient → http://:11434 (Ollama directly)

Inline rewrites in crates/aibridge/src/client.rs:
- embed_uncached: per-text loop to /api/embed; computes dimension
  from response[0].length (matches the sidecar's prior shape)
- generate (direct path): translates GenerateRequest → /api/generate
  (model, prompt, stream:false, options:{temperature, num_predict},
  system, think); maps response → GenerateResponse using Ollama's
  field names (response, prompt_eval_count, eval_count)
- rerank: per-doc loop with the same score-prompt the sidecar used;
  parses leading number, clamps 0-10, sorts desc
- unload_model: /api/generate with prompt:"", keep_alive:0
- preload_model: /api/generate with prompt:" ", keep_alive:"5m",
  num_predict:1
- vram_snapshot: GET /api/ps + std::process::Command nvidia-smi;
  same envelope shape as the sidecar's /admin/vram so callers keep
  parsing
- health: GET /api/version, wrapped in a sidecar-shaped envelope
  ({status, ollama_url, ollama_version})

Public AiClient API is unchanged — Request/Response types untouched.
Callers (gateway routes, vectord, etc.) require zero updates.

## Config changes

- crates/shared/src/config.rs: default_sidecar_url() bumps to
  :11434. The TOML field stays `[sidecar].url` for migration compat
  (operators with existing configs don't need to rename anything).
- lakehouse.toml + config/providers.toml: bumped to localhost:11434
  with comments explaining the 2026-05-02 transition.

## What stays Python

sidecar/sidecar/lab_ui.py (385 LOC) + pipeline_lab.py (503 LOC) are
dev-mode Streamlit-shape UIs for prompt experimentation. Not on the
runtime hot path; continue running for ad-hoc work. The
embed/generate/rerank/admin routes inside sidecar can be retired,
but operators who want to keep the sidecar process running for the
lab UI face no breakage — those routes still call Ollama and work.

## Verification

- cargo check --workspace: clean
- cargo test -p aibridge --lib: 32/32 PASS
- Live smoke against test gateway on :3199 with new config:
    /ai/embed     → 768-dim vector for "forklift operator" ✓
    /v1/chat      → provider=ollama, model=qwen2.5:latest, content=OK ✓
- nvidia-smi parsing tested via std::process::Command path
- Live `lakehouse.service` (port :3100) NOT yet restarted — deploy
  step is operator-driven (sudo systemctl restart lakehouse.service)

## Architecture comparison update

(Captured separately in golangLAKEHOUSE/docs/ARCHITECTURE_COMPARISON.md
decisions tracker.) The "drop Python sidecar" line moves from _open_
to DONE. The Rust process model now has 1 mega-binary instead of
1 mega-binary + 1 sidecar process — a small but real reduction in
ops surface.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-05-02 04:59:47 -05:00
parent 654797a429
commit ba928b1d64
4 changed files with 280 additions and 64 deletions

View File

@ -15,12 +15,12 @@
[[provider]]
name = "ollama"
base_url = "http://localhost:3200"
base_url = "http://localhost:11434"
auth = "none"
default_model = "qwen3.5:latest"
# Hot-path local inference. No bearer needed — Python sidecar on
# localhost handles the Ollama API. Model names are bare
# (e.g. "qwen3.5:latest", not "ollama/qwen3.5:latest").
# Hot-path local inference. No bearer needed — direct to Ollama as of
# 2026-05-02 (Python sidecar's pass-through wrapper retired). Model
# names are bare (e.g. "qwen3.5:latest", not "ollama/qwen3.5:latest").
[[provider]]
name = "ollama_cloud"

View File

@ -7,17 +7,27 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
/// HTTP client for the Python AI sidecar.
/// HTTP client for Ollama (post-2026-05-02 — sidecar dropped).
///
/// `base_url` was historically the Python sidecar at `:3200`, which
/// pass-through-proxied to Ollama at `:11434`. The sidecar added zero
/// logic on the hot path (embed.py + generate.py + rerank.py +
/// admin.py = ~120 LOC of pure Ollama wrappers), so this client now
/// talks to Ollama directly and the sidecar process can be retired.
///
/// What stayed Python: `lab_ui.py` + `pipeline_lab.py` (~888 LOC of
/// dev-mode Streamlit-shape UIs) — those aren't on the runtime hot
/// path and continue running for prompt experimentation.
///
/// `generate()` has two transport modes:
/// - When `gateway_url` is None (default), it posts to
/// `${base_url}/generate` (sidecar direct).
/// - When `gateway_url` is `Some(url)`, it posts to
/// `${url}/v1/chat` with `provider="ollama"` so the call appears
/// in `/v1/usage` and Langfuse traces.
/// - When `gateway_url` is None (default), posts directly to Ollama's
/// `${base_url}/api/generate`.
/// - When `gateway_url` is `Some(url)`, posts to `${url}/v1/chat`
/// with `provider="ollama"` so the call appears in `/v1/usage` and
/// Langfuse traces.
///
/// `embed()`, `rerank()`, and admin methods always go direct to the
/// sidecar — no `/v1` equivalent yet, no point round-tripping.
/// `embed()`, `rerank()`, and admin methods always go direct to
/// Ollama — no `/v1` equivalent for those surfaces yet.
///
/// Phase 44 part 2 (2026-04-27): the gateway URL is wired in by
/// callers that want observability (vectord modules); it's left
@ -56,8 +66,8 @@ pub struct AiClient {
embed_cache_misses: Arc<AtomicU64>,
/// Pinned at construction time so the EmbedResponse can carry
/// dimension consistently even when every text was a cache hit
/// (no fresh sidecar call to learn the dim from). Set on first
/// successful sidecar embed; checked on every cache hit.
/// (no fresh upstream call to learn the dim from). Set on first
/// successful Ollama embed; checked on every cache hit.
cached_dim: Arc<AtomicU64>,
}
@ -186,13 +196,23 @@ impl AiClient {
c
}
/// Reachability + version check. Hits Ollama's `/api/version`,
/// returns a sidecar-shaped envelope so callers reading
/// `.status` / `.ollama_url` don't break across the
/// pre-/post-2026-05-02 cutover.
pub async fn health(&self) -> Result<serde_json::Value, String> {
let resp = self.client
.get(format!("{}/health", self.base_url))
.get(format!("{}/api/version", self.base_url))
.send()
.await
.map_err(|e| format!("sidecar unreachable: {e}"))?;
resp.json().await.map_err(|e| format!("invalid response: {e}"))
.map_err(|e| format!("ollama unreachable: {e}"))?;
let body: serde_json::Value = resp.json().await
.map_err(|e| format!("invalid response: {e}"))?;
Ok(serde_json::json!({
"status": "ok",
"ollama_url": &self.base_url,
"ollama_version": body.get("version"),
}))
}
/// Embed with per-text LRU caching. Mirrors Go-side
@ -286,44 +306,112 @@ impl AiClient {
})
}
/// Direct sidecar call — original pre-cache behavior. Used
/// internally by embed() for cache-miss batches and as the
/// transparent fallback when the cache is disabled.
/// Direct Ollama call — used internally by embed() for cache-miss
/// batches and as the transparent fallback when the cache is
/// disabled. Loops per-text against `${base_url}/api/embed`,
/// matching the sidecar's pre-2026-05-02 behavior. Ollama 0.4+
/// supports batch input but per-text keeps compatibility broader
/// + lets cache-miss-only batches share the loop with cold runs.
async fn embed_uncached(&self, req: &EmbedRequest) -> Result<EmbedResponse, String> {
let model = req.model.clone().unwrap_or_else(|| "nomic-embed-text".to_string());
let mut embeddings: Vec<Vec<f64>> = Vec::with_capacity(req.texts.len());
for text in &req.texts {
let resp = self.client
.post(format!("{}/embed", self.base_url))
.json(req)
.post(format!("{}/api/embed", self.base_url))
.json(&serde_json::json!({
"model": &model,
"input": text,
}))
.send()
.await
.map_err(|e| format!("embed request failed: {e}"))?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(format!("embed error ({}): {text}", text.len()));
let body = resp.text().await.unwrap_or_default();
return Err(format!("ollama embed error: {body}"));
}
resp.json().await.map_err(|e| format!("embed parse error: {e}"))
// Ollama returns {"embeddings": [[...]], "model": "...", ...}.
// The outer `embeddings` is always a list; for a scalar input
// we get a single inner vector.
let parsed: serde_json::Value = resp.json().await
.map_err(|e| format!("embed parse error: {e}"))?;
let arr = parsed.get("embeddings")
.and_then(|v| v.as_array())
.ok_or_else(|| format!("ollama embed: missing 'embeddings' field in {parsed}"))?;
if arr.is_empty() {
return Err("ollama embed: empty embeddings array".to_string());
}
let first = arr[0].as_array()
.ok_or_else(|| "ollama embed: embeddings[0] not an array".to_string())?;
let vec: Vec<f64> = first.iter()
.filter_map(|n| n.as_f64())
.collect();
if vec.is_empty() {
return Err("ollama embed: numeric coercion produced empty vector".to_string());
}
embeddings.push(vec);
}
let dimensions = embeddings.first().map(|v| v.len()).unwrap_or(0);
Ok(EmbedResponse {
embeddings,
model,
dimensions,
})
}
pub async fn generate(&self, req: GenerateRequest) -> Result<GenerateResponse, String> {
if let Some(gw) = self.gateway_url.as_deref() {
return self.generate_via_gateway(gw, req).await;
}
// Direct-sidecar legacy path. Used by gateway internals (so
// ollama_arm can call sidecar without a self-loop) and by
// any consumer that wants raw transport without /v1/usage
// accounting.
// Direct Ollama path. Used by gateway internals (so the ollama
// provider can call upstream without a self-loop through
// /v1/chat) and by any consumer that wants raw transport
// without /v1/usage accounting.
let model = req.model.clone().unwrap_or_else(|| "qwen3.5:latest".to_string());
let mut body = serde_json::json!({
"model": &model,
"prompt": &req.prompt,
"stream": false,
});
let mut options = serde_json::Map::new();
if let Some(t) = req.temperature {
options.insert("temperature".to_string(), serde_json::json!(t));
}
if let Some(mt) = req.max_tokens {
options.insert("num_predict".to_string(), serde_json::json!(mt));
}
if !options.is_empty() {
body["options"] = serde_json::Value::Object(options);
}
if let Some(sys) = &req.system {
body["system"] = serde_json::json!(sys);
}
if let Some(th) = req.think {
body["think"] = serde_json::json!(th);
}
let resp = self.client
.post(format!("{}/generate", self.base_url))
.json(&req)
.post(format!("{}/api/generate", self.base_url))
.json(&body)
.send()
.await
.map_err(|e| format!("generate request failed: {e}"))?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(format!("generate error: {text}"));
return Err(format!("ollama generate error: {text}"));
}
resp.json().await.map_err(|e| format!("generate parse error: {e}"))
let parsed: serde_json::Value = resp.json().await
.map_err(|e| format!("generate parse error: {e}"))?;
Ok(GenerateResponse {
text: parsed.get("response").and_then(|v| v.as_str()).unwrap_or("").to_string(),
model,
tokens_evaluated: parsed.get("prompt_eval_count").and_then(|v| v.as_u64()),
tokens_generated: parsed.get("eval_count").and_then(|v| v.as_u64()),
})
}
/// Phase 44 part 2: route generate() through the gateway's
@ -379,19 +467,60 @@ impl AiClient {
})
}
/// Cross-encoder reranking via Ollama generate. Asks the model to
/// rate each document's relevance to the query 0-10, then sorts
/// descending. Mirrors the sidecar's pre-2026-05-02 algorithm
/// exactly so callers see the same scores.
pub async fn rerank(&self, req: RerankRequest) -> Result<RerankResponse, String> {
let model = req.model.clone().unwrap_or_else(|| "qwen3.5:latest".to_string());
let mut scored: Vec<ScoredDocument> = Vec::with_capacity(req.documents.len());
for (i, doc) in req.documents.iter().enumerate() {
let prompt = format!(
"Rate the relevance of the following document to the query on a scale of 0 to 10. \
Respond with ONLY a number.\n\n\
Query: {}\n\n\
Document: {}\n\n\
Score:",
req.query, doc,
);
let resp = self.client
.post(format!("{}/rerank", self.base_url))
.json(&req)
.post(format!("{}/api/generate", self.base_url))
.json(&serde_json::json!({
"model": &model,
"prompt": prompt,
"stream": false,
"options": {"temperature": 0.0, "num_predict": 8},
}))
.send()
.await
.map_err(|e| format!("rerank request failed: {e}"))?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(format!("rerank error: {text}"));
let body = resp.text().await.unwrap_or_default();
return Err(format!("ollama rerank error: {body}"));
}
resp.json().await.map_err(|e| format!("rerank parse error: {e}"))
let parsed: serde_json::Value = resp.json().await
.map_err(|e| format!("rerank parse error: {e}"))?;
let text = parsed.get("response").and_then(|v| v.as_str()).unwrap_or("").trim();
// Parse the leading number; tolerate "7", "7.5", "7 — strong match".
let score = text.split_whitespace().next()
.and_then(|t| t.parse::<f64>().ok())
.unwrap_or(0.0)
.clamp(0.0, 10.0);
scored.push(ScoredDocument {
index: i,
text: doc.clone(),
score,
});
}
scored.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal));
if let Some(k) = req.top_k {
scored.truncate(k);
}
Ok(RerankResponse { results: scored, model })
}
/// Force Ollama to unload the named model from VRAM (keep_alive=0).
@ -400,40 +529,116 @@ impl AiClient {
/// profile's model can linger in VRAM next to the new one.
pub async fn unload_model(&self, model: &str) -> Result<serde_json::Value, String> {
let resp = self.client
.post(format!("{}/admin/unload", self.base_url))
.json(&serde_json::json!({ "model": model }))
.post(format!("{}/api/generate", self.base_url))
.json(&serde_json::json!({
"model": model,
"prompt": "",
"keep_alive": 0,
"stream": false,
}))
.send().await
.map_err(|e| format!("unload request failed: {e}"))?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(format!("unload error: {text}"));
return Err(format!("ollama unload error: {text}"));
}
resp.json().await.map_err(|e| format!("unload parse error: {e}"))
// Ollama returns 200 with the empty-prompt response shape.
// Fold into the legacy {"unloaded": "<model>"} envelope so
// callers' parsing doesn't break.
Ok(serde_json::json!({ "unloaded": model }))
}
/// Ask Ollama to load the named model into VRAM proactively. Makes
/// the first real request after profile activation fast (no cold-load
/// latency).
/// latency). Empty prompts confuse some models, so we send a single
/// space + cap num_predict=1 (matches the sidecar's prior behavior).
pub async fn preload_model(&self, model: &str) -> Result<serde_json::Value, String> {
let resp = self.client
.post(format!("{}/admin/preload", self.base_url))
.json(&serde_json::json!({ "model": model }))
.post(format!("{}/api/generate", self.base_url))
.json(&serde_json::json!({
"model": model,
"prompt": " ",
"keep_alive": "5m",
"stream": false,
"options": {"num_predict": 1},
}))
.send().await
.map_err(|e| format!("preload request failed: {e}"))?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(format!("preload error: {text}"));
return Err(format!("ollama preload error: {text}"));
}
resp.json().await.map_err(|e| format!("preload parse error: {e}"))
let parsed: serde_json::Value = resp.json().await
.map_err(|e| format!("preload parse error: {e}"))?;
Ok(serde_json::json!({
"preloaded": model,
"load_duration_ns": parsed.get("load_duration"),
"total_duration_ns": parsed.get("total_duration"),
}))
}
/// GPU + loaded-model snapshot from the sidecar. Combines nvidia-smi
/// output (if available) with Ollama's /api/ps.
/// GPU + loaded-model snapshot. Combines nvidia-smi output (when
/// available) with Ollama's /api/ps. Same shape as the prior
/// sidecar /admin/vram endpoint so callers don't need updating.
pub async fn vram_snapshot(&self) -> Result<serde_json::Value, String> {
let resp = self.client
.get(format!("{}/admin/vram", self.base_url))
.get(format!("{}/api/ps", self.base_url))
.send().await
.map_err(|e| format!("vram request failed: {e}"))?;
resp.json().await.map_err(|e| format!("vram parse error: {e}"))
.map_err(|e| format!("ollama ps request failed: {e}"))?;
let loaded: Vec<serde_json::Value> = if resp.status().is_success() {
let parsed: serde_json::Value = resp.json().await.unwrap_or(serde_json::Value::Null);
parsed.get("models")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().map(|m| serde_json::json!({
"name": m.get("name"),
"size_vram_mib": m.get("size_vram").and_then(|v| v.as_u64()).map(|n| n / (1024 * 1024)),
"expires_at": m.get("expires_at"),
})).collect())
.unwrap_or_default()
} else {
Vec::new()
};
let gpu = nvidia_smi_snapshot();
Ok(serde_json::json!({
"gpu": gpu,
"ollama_loaded": loaded,
}))
}
}
/// One-shot nvidia-smi poll. Returns Null if the tool isn't on PATH
/// or the call fails. Mirrors the sidecar's `_nvidia_smi_snapshot`
/// shape exactly so callers reading vram_snapshot don't break.
fn nvidia_smi_snapshot() -> serde_json::Value {
use std::process::Command;
let out = Command::new("nvidia-smi")
.args([
"--query-gpu=memory.used,memory.total,utilization.gpu,name",
"--format=csv,noheader,nounits",
])
.output();
let stdout = match out {
Ok(o) if o.status.success() => o.stdout,
_ => return serde_json::Value::Null,
};
let line = String::from_utf8_lossy(&stdout);
let line = line.trim();
if line.is_empty() {
return serde_json::Value::Null;
}
let parts: Vec<&str> = line.split(',').map(|s| s.trim()).collect();
if parts.len() < 4 {
return serde_json::Value::Null;
}
let used = parts[0].parse::<u64>().unwrap_or(0);
let total = parts[1].parse::<u64>().unwrap_or(0);
let util = parts[2].parse::<u64>().unwrap_or(0);
serde_json::json!({
"name": parts[3],
"used_mib": used,
"total_mib": total,
"utilization_pct": util,
})
}

View File

@ -149,7 +149,13 @@ fn default_gateway_port() -> u16 { 3100 }
fn default_storage_root() -> String { "./data".to_string() }
fn default_profile_root() -> String { "./data/_profiles".to_string() }
fn default_manifest_prefix() -> String { "_catalog/manifests".to_string() }
fn default_sidecar_url() -> String { "http://localhost:3200".to_string() }
// Post-2026-05-02: AiClient talks directly to Ollama; the Python
// sidecar's hot-path role was retired. The config field name
// `[sidecar].url` is preserved for migration compatibility (operators
// with existing TOMLs don't need to rename anything), but the value
// now points at Ollama. Lab UI / pipeline_lab Python remains as a
// dev-only tool; not on this URL.
fn default_sidecar_url() -> String { "http://localhost:11434".to_string() }
fn default_embed_model() -> String { "nomic-embed-text".to_string() }
fn default_gen_model() -> String { "qwen2.5".to_string() }
fn default_rerank_model() -> String { "qwen2.5".to_string() }

View File

@ -44,7 +44,12 @@ manifest_prefix = "_catalog/manifests"
# max_rows_per_query = 10000
[sidecar]
url = "http://localhost:3200"
# Post-2026-05-02: AiClient talks directly to Ollama; the Python
# sidecar's hot-path role (~120 LOC of pure Ollama wrappers) was
# retired. Field name kept for migration compat — value now points
# at Ollama on :11434. Lab UI + pipeline_lab Python remains as a
# dev-only tool, NOT on this URL.
url = "http://localhost:11434"
[ai]
embed_model = "nomic-embed-text"