Phase 3: AI integration with Ollama via Python sidecar

- sidecar: FastAPI app with /embed, /generate, /rerank hitting Ollama
- sidecar: Dockerfile, env var config (EMBED_MODEL, GEN_MODEL, RERANK_MODEL)
- aibridge: reqwest HTTP client with typed request/response structs
- aibridge: Axum proxy endpoints (POST /ai/embed, /ai/generate, /ai/rerank)
- gateway: wires AiClient with SIDECAR_URL env var
- e2e verified: nomic-embed-text returns 768d vectors, qwen2.5 generates text

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-03-27 05:53:56 -05:00
parent 19bdfab227
commit 239e471223
21 changed files with 453 additions and 12 deletions

12
Cargo.lock generated
View File

@ -36,6 +36,7 @@ name = "aibridge"
version = "0.1.0"
dependencies = [
"axum",
"reqwest",
"serde",
"serde_json",
"shared",
@ -1740,6 +1741,7 @@ dependencies = [
"tokio",
"tokio-rustls",
"tower-service",
"webpki-roots",
]
[[package]]
@ -2734,6 +2736,7 @@ dependencies = [
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots",
]
[[package]]
@ -3672,6 +3675,15 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki-roots"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22cfaf3c063993ff62e73cb4311efde4db1efb31ab78a3e5c457939ad5cc0bed"
dependencies = [
"rustls-pki-types",
]
[[package]]
name = "winapi-util"
version = "0.1.11"

View File

@ -10,3 +10,4 @@ axum = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true }
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }

View File

@ -0,0 +1,137 @@
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::time::Duration;
/// HTTP client for the Python AI sidecar.
#[derive(Clone)]
pub struct AiClient {
client: Client,
base_url: String,
}
// -- Request/Response types --
#[derive(Serialize, Deserialize)]
pub struct EmbedRequest {
pub texts: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub model: Option<String>,
}
#[derive(Deserialize, Serialize, Clone)]
pub struct EmbedResponse {
pub embeddings: Vec<Vec<f64>>,
pub model: String,
pub dimensions: usize,
}
#[derive(Serialize, Deserialize)]
pub struct GenerateRequest {
pub prompt: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub model: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub system: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub temperature: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_tokens: Option<u32>,
}
#[derive(Deserialize, Serialize, Clone)]
pub struct GenerateResponse {
pub text: String,
pub model: String,
pub tokens_evaluated: Option<u64>,
pub tokens_generated: Option<u64>,
}
#[derive(Serialize, Deserialize)]
pub struct RerankRequest {
pub query: String,
pub documents: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub model: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub top_k: Option<usize>,
}
#[derive(Deserialize, Serialize, Clone)]
pub struct ScoredDocument {
pub index: usize,
pub text: String,
pub score: f64,
}
#[derive(Deserialize, Serialize, Clone)]
pub struct RerankResponse {
pub results: Vec<ScoredDocument>,
pub model: String,
}
impl AiClient {
pub fn new(base_url: &str) -> Self {
let client = Client::builder()
.timeout(Duration::from_secs(120))
.build()
.expect("failed to build HTTP client");
Self {
client,
base_url: base_url.trim_end_matches('/').to_string(),
}
}
pub async fn health(&self) -> Result<serde_json::Value, String> {
let resp = self.client
.get(format!("{}/health", self.base_url))
.send()
.await
.map_err(|e| format!("sidecar unreachable: {e}"))?;
resp.json().await.map_err(|e| format!("invalid response: {e}"))
}
pub async fn embed(&self, req: EmbedRequest) -> Result<EmbedResponse, String> {
let resp = self.client
.post(format!("{}/embed", self.base_url))
.json(&req)
.send()
.await
.map_err(|e| format!("embed request failed: {e}"))?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(format!("embed error ({}): {text}", text.len()));
}
resp.json().await.map_err(|e| format!("embed parse error: {e}"))
}
pub async fn generate(&self, req: GenerateRequest) -> Result<GenerateResponse, String> {
let resp = self.client
.post(format!("{}/generate", self.base_url))
.json(&req)
.send()
.await
.map_err(|e| format!("generate request failed: {e}"))?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(format!("generate error: {text}"));
}
resp.json().await.map_err(|e| format!("generate parse error: {e}"))
}
pub async fn rerank(&self, req: RerankRequest) -> Result<RerankResponse, String> {
let resp = self.client
.post(format!("{}/rerank", self.base_url))
.json(&req)
.send()
.await
.map_err(|e| format!("rerank request failed: {e}"))?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(format!("rerank error: {text}"));
}
resp.json().await.map_err(|e| format!("rerank parse error: {e}"))
}
}

View File

@ -1 +1,2 @@
pub mod client;
pub mod service;

View File

@ -1,9 +1,60 @@
use axum::{Router, routing::get};
use axum::{
Json, Router,
extract::State,
http::StatusCode,
response::IntoResponse,
routing::{get, post},
};
pub fn router() -> Router {
Router::new().route("/health", get(health))
use crate::client::{
AiClient, EmbedRequest, GenerateRequest, RerankRequest,
};
pub fn router(client: AiClient) -> Router {
Router::new()
.route("/health", get(health))
.route("/embed", post(embed))
.route("/generate", post(generate))
.route("/rerank", post(rerank))
.with_state(client)
}
async fn health() -> &'static str {
"aibridge ok"
async fn health(State(client): State<AiClient>) -> impl IntoResponse {
match client.health().await {
Ok(info) => Ok(Json(info)),
Err(e) => Err((StatusCode::BAD_GATEWAY, format!("sidecar down: {e}"))),
}
}
async fn embed(
State(client): State<AiClient>,
Json(req): Json<EmbedRequest>,
) -> impl IntoResponse {
tracing::info!("embedding {} texts", req.texts.len());
match client.embed(req).await {
Ok(resp) => Ok(Json(resp)),
Err(e) => Err((StatusCode::BAD_GATEWAY, e)),
}
}
async fn generate(
State(client): State<AiClient>,
Json(req): Json<GenerateRequest>,
) -> impl IntoResponse {
tracing::info!("generating with prompt len={}", req.prompt.len());
match client.generate(req).await {
Ok(resp) => Ok(Json(resp)),
Err(e) => Err((StatusCode::BAD_GATEWAY, e)),
}
}
async fn rerank(
State(client): State<AiClient>,
Json(req): Json<RerankRequest>,
) -> impl IntoResponse {
tracing::info!("reranking {} documents", req.documents.len());
match client.rerank(req).await {
Ok(resp) => Ok(Json(resp)),
Err(e) => Err((StatusCode::BAD_GATEWAY, e)),
}
}

View File

@ -22,12 +22,16 @@ async fn main() {
// Query engine — DataFusion over catalog-registered Parquet
let engine = queryd::context::QueryEngine::new(registry.clone(), store.clone());
// AI sidecar client
let sidecar_url = std::env::var("SIDECAR_URL").unwrap_or_else(|_| "http://localhost:3200".to_string());
let ai_client = aibridge::client::AiClient::new(&sidecar_url);
let app = Router::new()
.route("/health", get(health))
.nest("/storage", storaged::service::router(store))
.nest("/catalog", catalogd::service::router(registry))
.nest("/query", queryd::service::router(engine))
.nest("/ai", aibridge::service::router())
.nest("/ai", aibridge::service::router(ai_client))
.layer(TraceLayer::new_for_http());
let addr = "0.0.0.0:3100";

View File

@ -31,13 +31,13 @@
**Gate: PASSED** — SELECT *, WHERE/ORDER BY, COUNT/AVG all return correct results via catalog.
## Phase 3: AI Integration
- [ ] 3.1 — Python sidecar: FastAPI + Ollama (embed/generate/rerank)
- [ ] 3.2 — Dockerfile for sidecar
- [ ] 3.3 — aibridge/client.rs: HTTP client to sidecar
- [ ] 3.4 — aibridge service: Axum proxy endpoints
- [ ] 3.5 — Model config via env vars
- [x] 3.1 — Python sidecar: FastAPI + Ollama (embed/generate/rerank) — real models, no mocks
- [x] 3.2 — Dockerfile for sidecar
- [x] 3.3 — aibridge/client.rs: reqwest HTTP client with 120s timeout
- [x] 3.4 — aibridge service: Axum proxy endpoints (POST /ai/embed, /ai/generate, /ai/rerank)
- [x] 3.5 — Model config via env vars (EMBED_MODEL, GEN_MODEL, RERANK_MODEL, SIDECAR_URL)
**Gate:** Rust → Python → Ollama → real embeddings return.
**Gate: PASSED** — Gateway → aibridge → sidecar → Ollama → real 768d embeddings + generation.
## Phase 4: Frontend
- [ ] 4.1 — Dioxus scaffold, WASM build

17
sidecar/Dockerfile Normal file
View File

@ -0,0 +1,17 @@
FROM python:3.13-slim
WORKDIR /app
COPY pyproject.toml .
RUN pip install --no-cache-dir .
COPY sidecar/ sidecar/
ENV OLLAMA_URL=http://host.docker.internal:11434
ENV EMBED_MODEL=nomic-embed-text
ENV GEN_MODEL=qwen2.5
ENV RERANK_MODEL=qwen2.5
EXPOSE 3200
CMD ["uvicorn", "sidecar.main:app", "--host", "0.0.0.0", "--port", "3200"]

14
sidecar/pyproject.toml Normal file
View File

@ -0,0 +1,14 @@
[project]
name = "lakehouse-sidecar"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
"fastapi>=0.115",
"uvicorn>=0.34",
"httpx>=0.28",
"pydantic>=2.0",
]
[tool.uvicorn]
host = "0.0.0.0"
port = 3200

View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

44
sidecar/sidecar/embed.py Normal file
View File

@ -0,0 +1,44 @@
import os
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from .ollama import client
router = APIRouter()
EMBED_MODEL = os.environ.get("EMBED_MODEL", "nomic-embed-text")
class EmbedRequest(BaseModel):
texts: list[str]
model: str | None = None
class EmbedResponse(BaseModel):
embeddings: list[list[float]]
model: str
dimensions: int
@router.post("", response_model=EmbedResponse)
async def embed(req: EmbedRequest):
model = req.model or EMBED_MODEL
embeddings = []
async with client() as c:
for text in req.texts:
resp = await c.post("/api/embed", json={"model": model, "input": text})
if resp.status_code != 200:
raise HTTPException(502, f"Ollama error: {resp.text}")
data = resp.json()
embeddings.extend(data.get("embeddings", []))
if not embeddings:
raise HTTPException(502, "No embeddings returned")
return EmbedResponse(
embeddings=embeddings,
model=model,
dimensions=len(embeddings[0]),
)

View File

@ -0,0 +1,55 @@
import os
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from .ollama import client
router = APIRouter()
GEN_MODEL = os.environ.get("GEN_MODEL", "qwen2.5")
class GenerateRequest(BaseModel):
prompt: str
model: str | None = None
system: str | None = None
temperature: float = 0.7
max_tokens: int = 2048
class GenerateResponse(BaseModel):
text: str
model: str
tokens_evaluated: int | None = None
tokens_generated: int | None = None
@router.post("", response_model=GenerateResponse)
async def generate(req: GenerateRequest):
model = req.model or GEN_MODEL
payload = {
"model": model,
"prompt": req.prompt,
"stream": False,
"options": {
"temperature": req.temperature,
"num_predict": req.max_tokens,
},
}
if req.system:
payload["system"] = req.system
async with client() as c:
resp = await c.post("/api/generate", json=payload)
if resp.status_code != 200:
raise HTTPException(502, f"Ollama error: {resp.text}")
data = resp.json()
return GenerateResponse(
text=data.get("response", ""),
model=model,
tokens_evaluated=data.get("prompt_eval_count"),
tokens_generated=data.get("eval_count"),
)

23
sidecar/sidecar/main.py Normal file
View File

@ -0,0 +1,23 @@
import os
from fastapi import FastAPI
from .embed import router as embed_router
from .generate import router as generate_router
from .rerank import router as rerank_router
app = FastAPI(title="Lakehouse AI Sidecar")
app.include_router(embed_router, prefix="/embed", tags=["embed"])
app.include_router(generate_router, prefix="/generate", tags=["generate"])
app.include_router(rerank_router, prefix="/rerank", tags=["rerank"])
@app.get("/health")
async def health():
return {
"status": "ok",
"ollama_url": os.environ.get("OLLAMA_URL", "http://localhost:11434"),
"embed_model": os.environ.get("EMBED_MODEL", "nomic-embed-text"),
"gen_model": os.environ.get("GEN_MODEL", "qwen2.5"),
}

12
sidecar/sidecar/ollama.py Normal file
View File

@ -0,0 +1,12 @@
"""Shared Ollama HTTP client."""
import os
import httpx
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434")
TIMEOUT = float(os.environ.get("OLLAMA_TIMEOUT", "120"))
def client() -> httpx.AsyncClient:
return httpx.AsyncClient(base_url=OLLAMA_URL, timeout=TIMEOUT)

70
sidecar/sidecar/rerank.py Normal file
View File

@ -0,0 +1,70 @@
import os
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from .ollama import client
router = APIRouter()
RERANK_MODEL = os.environ.get("RERANK_MODEL", "qwen2.5")
class RerankRequest(BaseModel):
query: str
documents: list[str]
model: str | None = None
top_k: int | None = None
class ScoredDocument(BaseModel):
index: int
text: str
score: float
class RerankResponse(BaseModel):
results: list[ScoredDocument]
model: str
@router.post("", response_model=RerankResponse)
async def rerank(req: RerankRequest):
"""Cross-encoder reranking via Ollama generate.
Scores each document against the query by asking the model to rate relevance 0-10,
then sorts by score descending.
"""
model = req.model or RERANK_MODEL
scored = []
async with client() as c:
for i, doc in enumerate(req.documents):
prompt = (
f"Rate the relevance of the following document to the query on a scale of 0 to 10. "
f"Respond with ONLY a number.\n\n"
f"Query: {req.query}\n\n"
f"Document: {doc}\n\n"
f"Score:"
)
resp = await c.post(
"/api/generate",
json={"model": model, "prompt": prompt, "stream": False, "options": {"temperature": 0.0, "num_predict": 8}},
)
if resp.status_code != 200:
raise HTTPException(502, f"Ollama error: {resp.text}")
text = resp.json().get("response", "").strip()
try:
score = float(text.split()[0])
score = max(0.0, min(10.0, score))
except (ValueError, IndexError):
score = 0.0
scored.append(ScoredDocument(index=i, text=doc, score=score))
scored.sort(key=lambda x: x.score, reverse=True)
if req.top_k:
scored = scored[: req.top_k]
return RerankResponse(results=scored, model=model)