diff --git a/Cargo.lock b/Cargo.lock index d88f471..6e12912 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -36,6 +36,7 @@ name = "aibridge" version = "0.1.0" dependencies = [ "axum", + "reqwest", "serde", "serde_json", "shared", @@ -1740,6 +1741,7 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", + "webpki-roots", ] [[package]] @@ -2734,6 +2736,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", + "webpki-roots", ] [[package]] @@ -3672,6 +3675,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22cfaf3c063993ff62e73cb4311efde4db1efb31ab78a3e5c457939ad5cc0bed" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "winapi-util" version = "0.1.11" diff --git a/crates/aibridge/Cargo.toml b/crates/aibridge/Cargo.toml index 8f57876..5a1a93b 100644 --- a/crates/aibridge/Cargo.toml +++ b/crates/aibridge/Cargo.toml @@ -10,3 +10,4 @@ axum = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tracing = { workspace = true } +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } diff --git a/crates/aibridge/src/client.rs b/crates/aibridge/src/client.rs new file mode 100644 index 0000000..97e7cec --- /dev/null +++ b/crates/aibridge/src/client.rs @@ -0,0 +1,137 @@ +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +/// HTTP client for the Python AI sidecar. +#[derive(Clone)] +pub struct AiClient { + client: Client, + base_url: String, +} + +// -- Request/Response types -- + +#[derive(Serialize, Deserialize)] +pub struct EmbedRequest { + pub texts: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub model: Option, +} + +#[derive(Deserialize, Serialize, Clone)] +pub struct EmbedResponse { + pub embeddings: Vec>, + pub model: String, + pub dimensions: usize, +} + +#[derive(Serialize, Deserialize)] +pub struct GenerateRequest { + pub prompt: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub model: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub system: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub temperature: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub max_tokens: Option, +} + +#[derive(Deserialize, Serialize, Clone)] +pub struct GenerateResponse { + pub text: String, + pub model: String, + pub tokens_evaluated: Option, + pub tokens_generated: Option, +} + +#[derive(Serialize, Deserialize)] +pub struct RerankRequest { + pub query: String, + pub documents: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub model: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub top_k: Option, +} + +#[derive(Deserialize, Serialize, Clone)] +pub struct ScoredDocument { + pub index: usize, + pub text: String, + pub score: f64, +} + +#[derive(Deserialize, Serialize, Clone)] +pub struct RerankResponse { + pub results: Vec, + pub model: String, +} + +impl AiClient { + pub fn new(base_url: &str) -> Self { + let client = Client::builder() + .timeout(Duration::from_secs(120)) + .build() + .expect("failed to build HTTP client"); + Self { + client, + base_url: base_url.trim_end_matches('/').to_string(), + } + } + + pub async fn health(&self) -> Result { + let resp = self.client + .get(format!("{}/health", self.base_url)) + .send() + .await + .map_err(|e| format!("sidecar unreachable: {e}"))?; + resp.json().await.map_err(|e| format!("invalid response: {e}")) + } + + pub async fn embed(&self, req: EmbedRequest) -> Result { + let resp = self.client + .post(format!("{}/embed", self.base_url)) + .json(&req) + .send() + .await + .map_err(|e| format!("embed request failed: {e}"))?; + + if !resp.status().is_success() { + let text = resp.text().await.unwrap_or_default(); + return Err(format!("embed error ({}): {text}", text.len())); + } + resp.json().await.map_err(|e| format!("embed parse error: {e}")) + } + + pub async fn generate(&self, req: GenerateRequest) -> Result { + let resp = self.client + .post(format!("{}/generate", self.base_url)) + .json(&req) + .send() + .await + .map_err(|e| format!("generate request failed: {e}"))?; + + if !resp.status().is_success() { + let text = resp.text().await.unwrap_or_default(); + return Err(format!("generate error: {text}")); + } + resp.json().await.map_err(|e| format!("generate parse error: {e}")) + } + + pub async fn rerank(&self, req: RerankRequest) -> Result { + let resp = self.client + .post(format!("{}/rerank", self.base_url)) + .json(&req) + .send() + .await + .map_err(|e| format!("rerank request failed: {e}"))?; + + if !resp.status().is_success() { + let text = resp.text().await.unwrap_or_default(); + return Err(format!("rerank error: {text}")); + } + resp.json().await.map_err(|e| format!("rerank parse error: {e}")) + } +} diff --git a/crates/aibridge/src/lib.rs b/crates/aibridge/src/lib.rs index 1f278a4..431311a 100644 --- a/crates/aibridge/src/lib.rs +++ b/crates/aibridge/src/lib.rs @@ -1 +1,2 @@ +pub mod client; pub mod service; diff --git a/crates/aibridge/src/service.rs b/crates/aibridge/src/service.rs index 97c7655..dee4847 100644 --- a/crates/aibridge/src/service.rs +++ b/crates/aibridge/src/service.rs @@ -1,9 +1,60 @@ -use axum::{Router, routing::get}; +use axum::{ + Json, Router, + extract::State, + http::StatusCode, + response::IntoResponse, + routing::{get, post}, +}; -pub fn router() -> Router { - Router::new().route("/health", get(health)) +use crate::client::{ + AiClient, EmbedRequest, GenerateRequest, RerankRequest, +}; + +pub fn router(client: AiClient) -> Router { + Router::new() + .route("/health", get(health)) + .route("/embed", post(embed)) + .route("/generate", post(generate)) + .route("/rerank", post(rerank)) + .with_state(client) } -async fn health() -> &'static str { - "aibridge ok" +async fn health(State(client): State) -> impl IntoResponse { + match client.health().await { + Ok(info) => Ok(Json(info)), + Err(e) => Err((StatusCode::BAD_GATEWAY, format!("sidecar down: {e}"))), + } +} + +async fn embed( + State(client): State, + Json(req): Json, +) -> impl IntoResponse { + tracing::info!("embedding {} texts", req.texts.len()); + match client.embed(req).await { + Ok(resp) => Ok(Json(resp)), + Err(e) => Err((StatusCode::BAD_GATEWAY, e)), + } +} + +async fn generate( + State(client): State, + Json(req): Json, +) -> impl IntoResponse { + tracing::info!("generating with prompt len={}", req.prompt.len()); + match client.generate(req).await { + Ok(resp) => Ok(Json(resp)), + Err(e) => Err((StatusCode::BAD_GATEWAY, e)), + } +} + +async fn rerank( + State(client): State, + Json(req): Json, +) -> impl IntoResponse { + tracing::info!("reranking {} documents", req.documents.len()); + match client.rerank(req).await { + Ok(resp) => Ok(Json(resp)), + Err(e) => Err((StatusCode::BAD_GATEWAY, e)), + } } diff --git a/crates/gateway/src/main.rs b/crates/gateway/src/main.rs index 7206092..4489629 100644 --- a/crates/gateway/src/main.rs +++ b/crates/gateway/src/main.rs @@ -22,12 +22,16 @@ async fn main() { // Query engine — DataFusion over catalog-registered Parquet let engine = queryd::context::QueryEngine::new(registry.clone(), store.clone()); + // AI sidecar client + let sidecar_url = std::env::var("SIDECAR_URL").unwrap_or_else(|_| "http://localhost:3200".to_string()); + let ai_client = aibridge::client::AiClient::new(&sidecar_url); + let app = Router::new() .route("/health", get(health)) .nest("/storage", storaged::service::router(store)) .nest("/catalog", catalogd::service::router(registry)) .nest("/query", queryd::service::router(engine)) - .nest("/ai", aibridge::service::router()) + .nest("/ai", aibridge::service::router(ai_client)) .layer(TraceLayer::new_for_http()); let addr = "0.0.0.0:3100"; diff --git a/docs/PHASES.md b/docs/PHASES.md index 9cc412a..e16ac95 100644 --- a/docs/PHASES.md +++ b/docs/PHASES.md @@ -31,13 +31,13 @@ **Gate: PASSED** — SELECT *, WHERE/ORDER BY, COUNT/AVG all return correct results via catalog. ## Phase 3: AI Integration -- [ ] 3.1 — Python sidecar: FastAPI + Ollama (embed/generate/rerank) -- [ ] 3.2 — Dockerfile for sidecar -- [ ] 3.3 — aibridge/client.rs: HTTP client to sidecar -- [ ] 3.4 — aibridge service: Axum proxy endpoints -- [ ] 3.5 — Model config via env vars +- [x] 3.1 — Python sidecar: FastAPI + Ollama (embed/generate/rerank) — real models, no mocks +- [x] 3.2 — Dockerfile for sidecar +- [x] 3.3 — aibridge/client.rs: reqwest HTTP client with 120s timeout +- [x] 3.4 — aibridge service: Axum proxy endpoints (POST /ai/embed, /ai/generate, /ai/rerank) +- [x] 3.5 — Model config via env vars (EMBED_MODEL, GEN_MODEL, RERANK_MODEL, SIDECAR_URL) -**Gate:** Rust → Python → Ollama → real embeddings return. +**Gate: PASSED** — Gateway → aibridge → sidecar → Ollama → real 768d embeddings + generation. ## Phase 4: Frontend - [ ] 4.1 — Dioxus scaffold, WASM build diff --git a/sidecar/Dockerfile b/sidecar/Dockerfile new file mode 100644 index 0000000..d0a0cd8 --- /dev/null +++ b/sidecar/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.13-slim + +WORKDIR /app + +COPY pyproject.toml . +RUN pip install --no-cache-dir . + +COPY sidecar/ sidecar/ + +ENV OLLAMA_URL=http://host.docker.internal:11434 +ENV EMBED_MODEL=nomic-embed-text +ENV GEN_MODEL=qwen2.5 +ENV RERANK_MODEL=qwen2.5 + +EXPOSE 3200 + +CMD ["uvicorn", "sidecar.main:app", "--host", "0.0.0.0", "--port", "3200"] diff --git a/sidecar/pyproject.toml b/sidecar/pyproject.toml new file mode 100644 index 0000000..559e1b5 --- /dev/null +++ b/sidecar/pyproject.toml @@ -0,0 +1,14 @@ +[project] +name = "lakehouse-sidecar" +version = "0.1.0" +requires-python = ">=3.11" +dependencies = [ + "fastapi>=0.115", + "uvicorn>=0.34", + "httpx>=0.28", + "pydantic>=2.0", +] + +[tool.uvicorn] +host = "0.0.0.0" +port = 3200 diff --git a/sidecar/sidecar/__init__.py b/sidecar/sidecar/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sidecar/sidecar/__pycache__/__init__.cpython-313.pyc b/sidecar/sidecar/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..7654651 Binary files /dev/null and b/sidecar/sidecar/__pycache__/__init__.cpython-313.pyc differ diff --git a/sidecar/sidecar/__pycache__/embed.cpython-313.pyc b/sidecar/sidecar/__pycache__/embed.cpython-313.pyc new file mode 100644 index 0000000..c93e159 Binary files /dev/null and b/sidecar/sidecar/__pycache__/embed.cpython-313.pyc differ diff --git a/sidecar/sidecar/__pycache__/generate.cpython-313.pyc b/sidecar/sidecar/__pycache__/generate.cpython-313.pyc new file mode 100644 index 0000000..27dbcf2 Binary files /dev/null and b/sidecar/sidecar/__pycache__/generate.cpython-313.pyc differ diff --git a/sidecar/sidecar/__pycache__/main.cpython-313.pyc b/sidecar/sidecar/__pycache__/main.cpython-313.pyc new file mode 100644 index 0000000..4396af1 Binary files /dev/null and b/sidecar/sidecar/__pycache__/main.cpython-313.pyc differ diff --git a/sidecar/sidecar/__pycache__/ollama.cpython-313.pyc b/sidecar/sidecar/__pycache__/ollama.cpython-313.pyc new file mode 100644 index 0000000..df9a2e7 Binary files /dev/null and b/sidecar/sidecar/__pycache__/ollama.cpython-313.pyc differ diff --git a/sidecar/sidecar/__pycache__/rerank.cpython-313.pyc b/sidecar/sidecar/__pycache__/rerank.cpython-313.pyc new file mode 100644 index 0000000..7cda211 Binary files /dev/null and b/sidecar/sidecar/__pycache__/rerank.cpython-313.pyc differ diff --git a/sidecar/sidecar/embed.py b/sidecar/sidecar/embed.py new file mode 100644 index 0000000..2ff9d7b --- /dev/null +++ b/sidecar/sidecar/embed.py @@ -0,0 +1,44 @@ +import os + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from .ollama import client + +router = APIRouter() + +EMBED_MODEL = os.environ.get("EMBED_MODEL", "nomic-embed-text") + + +class EmbedRequest(BaseModel): + texts: list[str] + model: str | None = None + + +class EmbedResponse(BaseModel): + embeddings: list[list[float]] + model: str + dimensions: int + + +@router.post("", response_model=EmbedResponse) +async def embed(req: EmbedRequest): + model = req.model or EMBED_MODEL + embeddings = [] + + async with client() as c: + for text in req.texts: + resp = await c.post("/api/embed", json={"model": model, "input": text}) + if resp.status_code != 200: + raise HTTPException(502, f"Ollama error: {resp.text}") + data = resp.json() + embeddings.extend(data.get("embeddings", [])) + + if not embeddings: + raise HTTPException(502, "No embeddings returned") + + return EmbedResponse( + embeddings=embeddings, + model=model, + dimensions=len(embeddings[0]), + ) diff --git a/sidecar/sidecar/generate.py b/sidecar/sidecar/generate.py new file mode 100644 index 0000000..e7bcc23 --- /dev/null +++ b/sidecar/sidecar/generate.py @@ -0,0 +1,55 @@ +import os + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from .ollama import client + +router = APIRouter() + +GEN_MODEL = os.environ.get("GEN_MODEL", "qwen2.5") + + +class GenerateRequest(BaseModel): + prompt: str + model: str | None = None + system: str | None = None + temperature: float = 0.7 + max_tokens: int = 2048 + + +class GenerateResponse(BaseModel): + text: str + model: str + tokens_evaluated: int | None = None + tokens_generated: int | None = None + + +@router.post("", response_model=GenerateResponse) +async def generate(req: GenerateRequest): + model = req.model or GEN_MODEL + + payload = { + "model": model, + "prompt": req.prompt, + "stream": False, + "options": { + "temperature": req.temperature, + "num_predict": req.max_tokens, + }, + } + if req.system: + payload["system"] = req.system + + async with client() as c: + resp = await c.post("/api/generate", json=payload) + if resp.status_code != 200: + raise HTTPException(502, f"Ollama error: {resp.text}") + data = resp.json() + + return GenerateResponse( + text=data.get("response", ""), + model=model, + tokens_evaluated=data.get("prompt_eval_count"), + tokens_generated=data.get("eval_count"), + ) diff --git a/sidecar/sidecar/main.py b/sidecar/sidecar/main.py new file mode 100644 index 0000000..9555de2 --- /dev/null +++ b/sidecar/sidecar/main.py @@ -0,0 +1,23 @@ +import os + +from fastapi import FastAPI + +from .embed import router as embed_router +from .generate import router as generate_router +from .rerank import router as rerank_router + +app = FastAPI(title="Lakehouse AI Sidecar") + +app.include_router(embed_router, prefix="/embed", tags=["embed"]) +app.include_router(generate_router, prefix="/generate", tags=["generate"]) +app.include_router(rerank_router, prefix="/rerank", tags=["rerank"]) + + +@app.get("/health") +async def health(): + return { + "status": "ok", + "ollama_url": os.environ.get("OLLAMA_URL", "http://localhost:11434"), + "embed_model": os.environ.get("EMBED_MODEL", "nomic-embed-text"), + "gen_model": os.environ.get("GEN_MODEL", "qwen2.5"), + } diff --git a/sidecar/sidecar/ollama.py b/sidecar/sidecar/ollama.py new file mode 100644 index 0000000..bccd7d3 --- /dev/null +++ b/sidecar/sidecar/ollama.py @@ -0,0 +1,12 @@ +"""Shared Ollama HTTP client.""" + +import os + +import httpx + +OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434") +TIMEOUT = float(os.environ.get("OLLAMA_TIMEOUT", "120")) + + +def client() -> httpx.AsyncClient: + return httpx.AsyncClient(base_url=OLLAMA_URL, timeout=TIMEOUT) diff --git a/sidecar/sidecar/rerank.py b/sidecar/sidecar/rerank.py new file mode 100644 index 0000000..eb61a46 --- /dev/null +++ b/sidecar/sidecar/rerank.py @@ -0,0 +1,70 @@ +import os + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from .ollama import client + +router = APIRouter() + +RERANK_MODEL = os.environ.get("RERANK_MODEL", "qwen2.5") + + +class RerankRequest(BaseModel): + query: str + documents: list[str] + model: str | None = None + top_k: int | None = None + + +class ScoredDocument(BaseModel): + index: int + text: str + score: float + + +class RerankResponse(BaseModel): + results: list[ScoredDocument] + model: str + + +@router.post("", response_model=RerankResponse) +async def rerank(req: RerankRequest): + """Cross-encoder reranking via Ollama generate. + + Scores each document against the query by asking the model to rate relevance 0-10, + then sorts by score descending. + """ + model = req.model or RERANK_MODEL + scored = [] + + async with client() as c: + for i, doc in enumerate(req.documents): + prompt = ( + f"Rate the relevance of the following document to the query on a scale of 0 to 10. " + f"Respond with ONLY a number.\n\n" + f"Query: {req.query}\n\n" + f"Document: {doc}\n\n" + f"Score:" + ) + resp = await c.post( + "/api/generate", + json={"model": model, "prompt": prompt, "stream": False, "options": {"temperature": 0.0, "num_predict": 8}}, + ) + if resp.status_code != 200: + raise HTTPException(502, f"Ollama error: {resp.text}") + + text = resp.json().get("response", "").strip() + try: + score = float(text.split()[0]) + score = max(0.0, min(10.0, score)) + except (ValueError, IndexError): + score = 0.0 + + scored.append(ScoredDocument(index=i, text=doc, score=score)) + + scored.sort(key=lambda x: x.score, reverse=True) + if req.top_k: + scored = scored[: req.top_k] + + return RerankResponse(results=scored, model=model)