The Phase 44 PRD's "AiClient becomes a thin /v1/chat client" was a
chicken-and-egg problem: the gateway's own /v1/chat ollama_arm calls
AiClient.generate() to reach the sidecar. If AiClient unconditionally
routed through /v1/chat, gateway → /v1/chat → ollama → AiClient →
/v1/chat would loop forever.
Solution: opt-in routing.
- `AiClient::new(base_url)` — direct-sidecar, gateway-internal use
(gateway's own /v1/chat handlers, ollama::chat in mod.rs)
- `AiClient::new_with_gateway(base_url, gateway_url)` — routes
generate() through ${gateway_url}/v1/chat with provider="ollama"
so the call lands in /v1/usage + Langfuse traces
Shape translation in generate_via_gateway():
GenerateRequest {prompt, system, model, temperature, max_tokens, think}
→ /v1/chat {messages: [system?, user], provider:"ollama", ...}
/v1/chat response choices[0].message.content + usage.{prompt,completion}_tokens
→ GenerateResponse {text, model, tokens_evaluated, tokens_generated}
embed(), rerank(), and admin methods (health, unload_model, etc.) stay
direct-to-sidecar — no /v1/embed equivalent yet, no point round-trip.
Transitive migration: aibridge::continuation::generate_continuable
goes through TextGenerator::generate_text() → AiClient.generate(), so
every caller of generate_continuable inherits the routing decision
made at AiClient construction. Phase 21's continuation loop, hot-
path JSON emitters, etc. all gain observability for free when the
construction site opts in.
Verified end-to-end:
curl /v1/chat with the exact JSON shape AiClient sends
→ "PONG-AIBRIDGE", finish=stop, 27/7 tokens
/v1/usage after the call
→ requests=1, by_provider.ollama.requests=1, tokens tracked
Phase 44 part 3 (next):
- Migrate vectord's AiClient construction site so vectord modules
(rag, autotune, harness, refresh, supervisor, playbook_memory)
flow through /v1/chat. Currently the gateway's main.rs constructs
one AiClient via `new()` and shares it via V1State; vectord
inherits direct-sidecar transport. Migration requires constructing
a SEPARATE AiClient with `new_with_gateway` for vectord's state
bag (V1State.ai_client must stay direct to avoid the self-loop).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
278 lines
10 KiB
Rust
278 lines
10 KiB
Rust
use reqwest::Client;
|
|
use serde::{Deserialize, Serialize};
|
|
use std::time::Duration;
|
|
|
|
/// HTTP client for the Python AI sidecar.
|
|
///
|
|
/// `generate()` has two transport modes:
|
|
/// - When `gateway_url` is None (default), it posts to
|
|
/// `${base_url}/generate` (sidecar direct).
|
|
/// - When `gateway_url` is `Some(url)`, it posts to
|
|
/// `${url}/v1/chat` with `provider="ollama"` so the call appears
|
|
/// in `/v1/usage` and Langfuse traces.
|
|
///
|
|
/// `embed()`, `rerank()`, and admin methods always go direct to the
|
|
/// sidecar — no `/v1` equivalent yet, no point round-tripping.
|
|
///
|
|
/// Phase 44 part 2 (2026-04-27): the gateway URL is wired in by
|
|
/// callers that want observability (vectord modules); it's left
|
|
/// unset by callers that ARE the gateway internals (avoids self-loops
|
|
/// + redundant hops).
|
|
#[derive(Clone)]
|
|
pub struct AiClient {
|
|
client: Client,
|
|
base_url: String,
|
|
gateway_url: Option<String>,
|
|
}
|
|
|
|
// -- Request/Response types --
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
pub struct EmbedRequest {
|
|
pub texts: Vec<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub model: Option<String>,
|
|
}
|
|
|
|
#[derive(Deserialize, Serialize, Clone)]
|
|
pub struct EmbedResponse {
|
|
pub embeddings: Vec<Vec<f64>>,
|
|
pub model: String,
|
|
pub dimensions: usize,
|
|
}
|
|
|
|
#[derive(Clone, Serialize, Deserialize)]
|
|
pub struct GenerateRequest {
|
|
pub prompt: String,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub model: Option<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub system: Option<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub temperature: Option<f64>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub max_tokens: Option<u32>,
|
|
/// Phase 21 — per-call opt-out of hidden reasoning. Thinking models
|
|
/// (qwen3.5, gpt-oss, etc) burn tokens on reasoning before the
|
|
/// visible response starts; setting this to `false` on hot-path
|
|
/// JSON emitters avoids empty returns when the budget is tight.
|
|
/// Sidecar forwards this to Ollama's `think` parameter; if the
|
|
/// sidecar drops an unknown field the request still succeeds.
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub think: Option<bool>,
|
|
}
|
|
|
|
#[derive(Deserialize, Serialize, Clone)]
|
|
pub struct GenerateResponse {
|
|
pub text: String,
|
|
pub model: String,
|
|
pub tokens_evaluated: Option<u64>,
|
|
pub tokens_generated: Option<u64>,
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
pub struct RerankRequest {
|
|
pub query: String,
|
|
pub documents: Vec<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub model: Option<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub top_k: Option<usize>,
|
|
}
|
|
|
|
#[derive(Deserialize, Serialize, Clone)]
|
|
pub struct ScoredDocument {
|
|
pub index: usize,
|
|
pub text: String,
|
|
pub score: f64,
|
|
}
|
|
|
|
#[derive(Deserialize, Serialize, Clone)]
|
|
pub struct RerankResponse {
|
|
pub results: Vec<ScoredDocument>,
|
|
pub model: String,
|
|
}
|
|
|
|
impl AiClient {
|
|
pub fn new(base_url: &str) -> Self {
|
|
let client = Client::builder()
|
|
.timeout(Duration::from_secs(120))
|
|
.build()
|
|
.expect("failed to build HTTP client");
|
|
Self {
|
|
client,
|
|
base_url: base_url.trim_end_matches('/').to_string(),
|
|
gateway_url: None,
|
|
}
|
|
}
|
|
|
|
/// Same as `new`, but every `generate()` is routed through
|
|
/// `${gateway_url}/v1/chat` (provider=ollama) for observability.
|
|
/// Use this for callers OUTSIDE the gateway. Inside the gateway
|
|
/// itself, prefer `new()` — calling /v1/chat from /v1/chat works
|
|
/// (no infinite loop, ollama_arm doesn't use AiClient) but adds
|
|
/// a wasted localhost hop.
|
|
pub fn new_with_gateway(base_url: &str, gateway_url: &str) -> Self {
|
|
let mut c = Self::new(base_url);
|
|
c.gateway_url = Some(gateway_url.trim_end_matches('/').to_string());
|
|
c
|
|
}
|
|
|
|
pub async fn health(&self) -> Result<serde_json::Value, String> {
|
|
let resp = self.client
|
|
.get(format!("{}/health", self.base_url))
|
|
.send()
|
|
.await
|
|
.map_err(|e| format!("sidecar unreachable: {e}"))?;
|
|
resp.json().await.map_err(|e| format!("invalid response: {e}"))
|
|
}
|
|
|
|
pub async fn embed(&self, req: EmbedRequest) -> Result<EmbedResponse, String> {
|
|
let resp = self.client
|
|
.post(format!("{}/embed", self.base_url))
|
|
.json(&req)
|
|
.send()
|
|
.await
|
|
.map_err(|e| format!("embed request failed: {e}"))?;
|
|
|
|
if !resp.status().is_success() {
|
|
let text = resp.text().await.unwrap_or_default();
|
|
return Err(format!("embed error ({}): {text}", text.len()));
|
|
}
|
|
resp.json().await.map_err(|e| format!("embed parse error: {e}"))
|
|
}
|
|
|
|
pub async fn generate(&self, req: GenerateRequest) -> Result<GenerateResponse, String> {
|
|
if let Some(gw) = self.gateway_url.as_deref() {
|
|
return self.generate_via_gateway(gw, req).await;
|
|
}
|
|
// Direct-sidecar legacy path. Used by gateway internals (so
|
|
// ollama_arm can call sidecar without a self-loop) and by
|
|
// any consumer that wants raw transport without /v1/usage
|
|
// accounting.
|
|
let resp = self.client
|
|
.post(format!("{}/generate", self.base_url))
|
|
.json(&req)
|
|
.send()
|
|
.await
|
|
.map_err(|e| format!("generate request failed: {e}"))?;
|
|
|
|
if !resp.status().is_success() {
|
|
let text = resp.text().await.unwrap_or_default();
|
|
return Err(format!("generate error: {text}"));
|
|
}
|
|
resp.json().await.map_err(|e| format!("generate parse error: {e}"))
|
|
}
|
|
|
|
/// Phase 44 part 2: route generate() through the gateway's
|
|
/// /v1/chat with provider="ollama" so the call lands in
|
|
/// /v1/usage + Langfuse. Translates between the sidecar
|
|
/// GenerateRequest/Response shape and the OpenAI-compat
|
|
/// chat shape on the wire.
|
|
async fn generate_via_gateway(&self, gateway_url: &str, req: GenerateRequest) -> Result<GenerateResponse, String> {
|
|
let mut messages = Vec::with_capacity(2);
|
|
if let Some(sys) = &req.system {
|
|
messages.push(serde_json::json!({"role": "system", "content": sys}));
|
|
}
|
|
messages.push(serde_json::json!({"role": "user", "content": req.prompt}));
|
|
let mut body = serde_json::json!({
|
|
"messages": messages,
|
|
"provider": "ollama",
|
|
});
|
|
if let Some(m) = &req.model { body["model"] = serde_json::json!(m); }
|
|
if let Some(t) = req.temperature { body["temperature"] = serde_json::json!(t); }
|
|
if let Some(mt) = req.max_tokens { body["max_tokens"] = serde_json::json!(mt); }
|
|
if let Some(th) = req.think { body["think"] = serde_json::json!(th); }
|
|
|
|
let resp = self.client
|
|
.post(format!("{}/v1/chat", gateway_url))
|
|
.json(&body)
|
|
.send()
|
|
.await
|
|
.map_err(|e| format!("/v1/chat request failed: {e}"))?;
|
|
if !resp.status().is_success() {
|
|
let text = resp.text().await.unwrap_or_default();
|
|
return Err(format!("/v1/chat error: {text}"));
|
|
}
|
|
let parsed: serde_json::Value = resp.json().await
|
|
.map_err(|e| format!("/v1/chat parse error: {e}"))?;
|
|
|
|
let text = parsed
|
|
.pointer("/choices/0/message/content")
|
|
.and_then(|v| v.as_str())
|
|
.unwrap_or("")
|
|
.to_string();
|
|
let model = parsed.get("model")
|
|
.and_then(|v| v.as_str())
|
|
.unwrap_or_else(|| req.model.as_deref().unwrap_or(""))
|
|
.to_string();
|
|
let prompt_tokens = parsed.pointer("/usage/prompt_tokens").and_then(|v| v.as_u64());
|
|
let completion_tokens = parsed.pointer("/usage/completion_tokens").and_then(|v| v.as_u64());
|
|
|
|
Ok(GenerateResponse {
|
|
text,
|
|
model,
|
|
tokens_evaluated: prompt_tokens,
|
|
tokens_generated: completion_tokens,
|
|
})
|
|
}
|
|
|
|
pub async fn rerank(&self, req: RerankRequest) -> Result<RerankResponse, String> {
|
|
let resp = self.client
|
|
.post(format!("{}/rerank", self.base_url))
|
|
.json(&req)
|
|
.send()
|
|
.await
|
|
.map_err(|e| format!("rerank request failed: {e}"))?;
|
|
|
|
if !resp.status().is_success() {
|
|
let text = resp.text().await.unwrap_or_default();
|
|
return Err(format!("rerank error: {text}"));
|
|
}
|
|
resp.json().await.map_err(|e| format!("rerank parse error: {e}"))
|
|
}
|
|
|
|
/// Force Ollama to unload the named model from VRAM (keep_alive=0).
|
|
/// Used for predictable profile swaps — without this, Ollama holds a
|
|
/// model for its configured TTL (default 5min) and the previous
|
|
/// profile's model can linger in VRAM next to the new one.
|
|
pub async fn unload_model(&self, model: &str) -> Result<serde_json::Value, String> {
|
|
let resp = self.client
|
|
.post(format!("{}/admin/unload", self.base_url))
|
|
.json(&serde_json::json!({ "model": model }))
|
|
.send().await
|
|
.map_err(|e| format!("unload request failed: {e}"))?;
|
|
if !resp.status().is_success() {
|
|
let text = resp.text().await.unwrap_or_default();
|
|
return Err(format!("unload error: {text}"));
|
|
}
|
|
resp.json().await.map_err(|e| format!("unload parse error: {e}"))
|
|
}
|
|
|
|
/// Ask Ollama to load the named model into VRAM proactively. Makes
|
|
/// the first real request after profile activation fast (no cold-load
|
|
/// latency).
|
|
pub async fn preload_model(&self, model: &str) -> Result<serde_json::Value, String> {
|
|
let resp = self.client
|
|
.post(format!("{}/admin/preload", self.base_url))
|
|
.json(&serde_json::json!({ "model": model }))
|
|
.send().await
|
|
.map_err(|e| format!("preload request failed: {e}"))?;
|
|
if !resp.status().is_success() {
|
|
let text = resp.text().await.unwrap_or_default();
|
|
return Err(format!("preload error: {text}"));
|
|
}
|
|
resp.json().await.map_err(|e| format!("preload parse error: {e}"))
|
|
}
|
|
|
|
/// GPU + loaded-model snapshot from the sidecar. Combines nvidia-smi
|
|
/// output (if available) with Ollama's /api/ps.
|
|
pub async fn vram_snapshot(&self) -> Result<serde_json::Value, String> {
|
|
let resp = self.client
|
|
.get(format!("{}/admin/vram", self.base_url))
|
|
.send().await
|
|
.map_err(|e| format!("vram request failed: {e}"))?;
|
|
resp.json().await.map_err(|e| format!("vram parse error: {e}"))
|
|
}
|
|
}
|