diff --git a/crates/gateway/src/bin/parity_session_log.rs b/crates/gateway/src/bin/parity_session_log.rs new file mode 100644 index 0000000..eb4f61a --- /dev/null +++ b/crates/gateway/src/bin/parity_session_log.rs @@ -0,0 +1,71 @@ +//! Cross-runtime parity helper for `SessionRecord` JSON shape. +//! +//! Reads a fixture JSON on stdin, builds a `SessionRecord`, emits +//! one JSONL row on stdout. Used by +//! `golangLAKEHOUSE/scripts/cutover/parity/session_log_parity.sh` +//! to verify the Rust gateway's session log shape stays byte-equal +//! to the Go-side validatord's `validator.SessionRecord` (commit +//! 1a3a82a in golangLAKEHOUSE). + +use gateway::v1::session_log::{SessionAttemptRecord, SessionRecord, SESSION_RECORD_SCHEMA}; +use serde::Deserialize; +use std::io::Read; + +#[derive(Deserialize)] +struct FixtureInput { + session_id: String, + kind: String, + model: String, + provider: String, + prompt: String, + iterations: u32, + max_iterations: u32, + final_verdict: String, + attempts: Vec, + #[serde(default)] + artifact: Option, + #[serde(default)] + grounded_in_roster: Option, + duration_ms: u64, +} + +fn main() { + let mut buf = String::new(); + if let Err(e) = std::io::stdin().read_to_string(&mut buf) { + eprintln!("read stdin: {e}"); + std::process::exit(1); + } + let input: FixtureInput = match serde_json::from_str(&buf) { + Ok(v) => v, + Err(e) => { + eprintln!("parse stdin: {e}"); + std::process::exit(1); + } + }; + let rec = SessionRecord { + schema: SESSION_RECORD_SCHEMA.to_string(), + session_id: input.session_id, + // Pinned timestamp so both runtimes' rows compare byte-equal + // when the test wrapper normalizes on `daemon` only. + timestamp: "2026-01-01T00:00:00+00:00".to_string(), + daemon: "gateway".to_string(), + kind: input.kind, + model: input.model, + provider: input.provider, + prompt: input.prompt, + iterations: input.iterations, + max_iterations: input.max_iterations, + final_verdict: input.final_verdict, + attempts: input.attempts, + artifact: input.artifact, + grounded_in_roster: input.grounded_in_roster, + duration_ms: input.duration_ms, + }; + match serde_json::to_string(&rec) { + Ok(s) => println!("{s}"), + Err(e) => { + eprintln!("marshal: {e}"); + std::process::exit(1); + } + } +} diff --git a/crates/gateway/src/execution_loop/mod.rs b/crates/gateway/src/execution_loop/mod.rs index 57cb86f..4d5d1f3 100644 --- a/crates/gateway/src/execution_loop/mod.rs +++ b/crates/gateway/src/execution_loop/mod.rs @@ -438,6 +438,10 @@ impl ExecutionLoop { start_time: start_time.to_rfc3339(), end_time: end_time.to_rfc3339(), latency_ms: elapsed_ms, + // Internal execution-loop traffic is its own top-level + // trace per call. If a future caller threads a parent + // trace into self.state, lift this to Some(parent_id). + parent_trace_id: None, }); } @@ -654,6 +658,7 @@ impl ExecutionLoop { start_time: start_time.to_rfc3339(), end_time: end_time.to_rfc3339(), latency_ms, + parent_trace_id: None, }); } diff --git a/crates/gateway/src/main.rs b/crates/gateway/src/main.rs index eae6adc..ff923d0 100644 --- a/crates/gateway/src/main.rs +++ b/crates/gateway/src/main.rs @@ -362,6 +362,22 @@ async fn main() { } c }, + // Coordinator session JSONL — one row per /v1/iterate + // session for offline DuckDB analysis. Cross-runtime + // parity with Go-side validatord (commit 1a3a82a). + session_log: { + let path = &config.gateway.session_log_path; + let s = v1::session_log::SessionLogger::from_path(path); + if s.is_some() { + tracing::info!( + "v1: session log enabled — coordinator sessions written to {}", + path + ); + } else { + tracing::info!("v1: session log disabled (set [gateway].session_log_path to enable)"); + } + s + }, })); // Auth middleware (if enabled) — P5-001 fix 2026-04-23: diff --git a/crates/gateway/src/v1/iterate.rs b/crates/gateway/src/v1/iterate.rs index 1b62b71..38e6538 100644 --- a/crates/gateway/src/v1/iterate.rs +++ b/crates/gateway/src/v1/iterate.rs @@ -21,12 +21,19 @@ //! re-implementation. Staffing executors, agent loops, and future //! validators all reach the same code path. -use axum::{extract::State, http::StatusCode, response::IntoResponse, Json}; +use axum::{extract::State, http::{HeaderMap, StatusCode}, response::IntoResponse, Json}; use serde::{Deserialize, Serialize}; const DEFAULT_MAX_ITERATIONS: u32 = 3; const LOOPBACK_TIMEOUT_SECS: u64 = 240; +/// Header name used to propagate a Langfuse parent trace id across +/// daemon boundaries. Matches Go's `shared.TraceIDHeader` constant +/// byte-for-byte (commit d6d2fdf in golangLAKEHOUSE) — same wire +/// format means a Go caller can hit Rust's /v1/iterate (or vice +/// versa) and the resulting Langfuse trees nest correctly. +pub const TRACE_ID_HEADER: &str = "x-lakehouse-trace-id"; + #[derive(Deserialize)] pub struct IterateRequest { /// "fill" | "email" | "playbook" — picks which validator runs. @@ -90,26 +97,47 @@ pub struct IterateFailure { } pub async fn iterate( - State(_state): State, + State(state): State, + headers: HeaderMap, Json(req): Json, ) -> impl IntoResponse { let max_iter = req.max_iterations.unwrap_or(DEFAULT_MAX_ITERATIONS).max(1); let temperature = req.temperature.unwrap_or(0.2); let max_tokens = req.max_tokens.unwrap_or(4096); let mut history: Vec = Vec::with_capacity(max_iter as usize); + let mut attempt_records: Vec = Vec::with_capacity(max_iter as usize); let mut current_prompt = req.prompt.clone(); + // Resolve the parent Langfuse trace id. Caller-forwarded header + // wins (cross-daemon tree linkage); otherwise mint a fresh id so + // the iterate session is its own tree. Same shape as the Go-side + // validatord trace propagation. + let trace_id: String = headers + .get(TRACE_ID_HEADER) + .and_then(|v| v.to_str().ok()) + .filter(|s| !s.is_empty()) + .map(|s| s.to_string()) + .unwrap_or_else(new_trace_id); + let client = match reqwest::Client::builder() .timeout(std::time::Duration::from_secs(LOOPBACK_TIMEOUT_SECS)) .build() { Ok(c) => c, - Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("client build: {e}")).into_response(), + Err(e) => { + // Even infrastructure failures get a session row so a + // missing /v1/iterate event never silently disappears + // from the longitudinal log. + write_infra_error(&state, &req, &trace_id, max_iter, 0, format!("client build: {e}")).await; + return (StatusCode::INTERNAL_SERVER_ERROR, format!("client build: {e}")).into_response(); + } }; // Self-loopback to the gateway port. Carries gateway internal // calls through /v1/chat + /v1/validate so /v1/usage tracks them. let gateway = "http://127.0.0.1:3100"; + let t0 = std::time::Instant::now(); for iteration in 0..max_iter { + let attempt_started = chrono::Utc::now(); // ── Generate ── let mut messages = Vec::with_capacity(2); if let Some(sys) = &req.system { @@ -123,20 +151,33 @@ pub async fn iterate( "temperature": temperature, "max_tokens": max_tokens, }); - let raw = match call_chat(&client, gateway, &chat_body).await { + let raw = match call_chat(&client, gateway, &chat_body, &trace_id).await { Ok(r) => r, - Err(e) => return (StatusCode::BAD_GATEWAY, format!("/v1/chat hop failed at iter {iteration}: {e}")).into_response(), + Err(e) => { + write_infra_error(&state, &req, &trace_id, max_iter, t0.elapsed().as_millis() as u64, format!("/v1/chat hop failed at iter {iteration}: {e}")).await; + return (StatusCode::BAD_GATEWAY, format!("/v1/chat hop failed at iter {iteration}: {e}")).into_response(); + } }; // ── Extract JSON ── let artifact = match extract_json(&raw) { Some(a) => a, None => { + let span_id = emit_attempt_span( + &state, &trace_id, iteration, &req, ¤t_prompt, &raw, "no_json", None, + attempt_started, chrono::Utc::now(), + ); history.push(IterateAttempt { iteration, raw: raw.chars().take(2000).collect(), status: AttemptStatus::NoJson, }); + attempt_records.push(super::session_log::SessionAttemptRecord { + iteration, + verdict_kind: "no_json".to_string(), + error: None, + span_id, + }); current_prompt = format!( "{}\n\nYour previous attempt did not contain a JSON object. Reply with ONLY a valid JSON object matching the requested artifact shape.", req.prompt, @@ -151,13 +192,26 @@ pub async fn iterate( "artifact": artifact, "context": req.context.clone().unwrap_or(serde_json::Value::Null), }); - match call_validate(&client, gateway, &validate_body).await { + match call_validate(&client, gateway, &validate_body, &trace_id).await { Ok(report) => { + let span_id = emit_attempt_span( + &state, &trace_id, iteration, &req, ¤t_prompt, &raw, "accepted", None, + attempt_started, chrono::Utc::now(), + ); history.push(IterateAttempt { iteration, raw: raw.chars().take(2000).collect(), status: AttemptStatus::Accepted, }); + attempt_records.push(super::session_log::SessionAttemptRecord { + iteration, + verdict_kind: "accepted".to_string(), + error: None, + span_id, + }); + let duration_ms = t0.elapsed().as_millis() as u64; + let grounded = grounded_in_roster(&state, &req.kind, &artifact); + write_session_accepted(&state, &req, &trace_id, iteration + 1, max_iter, attempt_records, &artifact, grounded, duration_ms).await; return (StatusCode::OK, Json(IterateResponse { artifact, validation: report, @@ -167,6 +221,11 @@ pub async fn iterate( } Err(err) => { let err_summary = err.to_string(); + let span_id = emit_attempt_span( + &state, &trace_id, iteration, &req, ¤t_prompt, &raw, "validation_failed", + Some(err_summary.clone()), + attempt_started, chrono::Utc::now(), + ); history.push(IterateAttempt { iteration, raw: raw.chars().take(2000).collect(), @@ -174,6 +233,12 @@ pub async fn iterate( error: serde_json::to_value(&err_summary).unwrap_or(serde_json::Value::Null), }, }); + attempt_records.push(super::session_log::SessionAttemptRecord { + iteration, + verdict_kind: "validation_failed".to_string(), + error: Some(err_summary.clone()), + span_id, + }); // Append validation feedback to prompt for next iter. // The model sees concrete failure mode + retries with // corrective context. This is the "observer correction" @@ -188,6 +253,8 @@ pub async fn iterate( } } + let duration_ms = t0.elapsed().as_millis() as u64; + write_session_failure(&state, &req, &trace_id, max_iter, max_iter, attempt_records, duration_ms).await; (StatusCode::UNPROCESSABLE_ENTITY, Json(IterateFailure { error: format!("max iterations reached ({max_iter}) without passing validation"), iterations: max_iter, @@ -195,12 +262,157 @@ pub async fn iterate( })).into_response() } -async fn call_chat(client: &reqwest::Client, gateway: &str, body: &serde_json::Value) -> Result { - let resp = client.post(format!("{gateway}/v1/chat")) - .json(body) - .send() - .await - .map_err(|e| format!("chat hop: {e}"))?; +// ─── Helpers — Langfuse spans + session log + roster check ───────── + +fn emit_attempt_span( + state: &super::V1State, + trace_id: &str, + iteration: u32, + req: &IterateRequest, + prompt: &str, + raw: &str, + verdict: &str, + error: Option, + started: chrono::DateTime, + ended: chrono::DateTime, +) -> Option { + let lf = state.langfuse.as_ref()?; + Some(lf.emit_attempt_span(super::langfuse_trace::AttemptSpan { + trace_id: trace_id.to_string(), + iteration, + model: req.model.clone(), + provider: req.provider.clone(), + prompt: prompt.to_string(), + raw: raw.to_string(), + verdict: verdict.to_string(), + error, + start_time: started.to_rfc3339(), + end_time: ended.to_rfc3339(), + })) +} + +/// Verify every fill artifact's candidate IDs exist in the roster. +/// Returns Some(true)/Some(false) on the fill kind, None otherwise +/// (other kinds don't have worker IDs to ground). Same semantics as +/// Go's `handlers.rosterCheckFor("fill")`. +fn grounded_in_roster( + state: &super::V1State, + kind: &str, + artifact: &serde_json::Value, +) -> Option { + if kind != "fill" { + return None; + } + let fills = artifact.get("fills").and_then(|v| v.as_array())?; + for f in fills { + let id = match f.get("candidate_id").and_then(|v| v.as_str()) { + Some(s) if !s.is_empty() => s, + _ => return Some(false), + }; + if state.validate_workers.find(id).is_none() { + return Some(false); + } + } + Some(true) +} + +async fn write_session_accepted( + state: &super::V1State, + req: &IterateRequest, + trace_id: &str, + iterations: u32, + max_iter: u32, + attempts: Vec, + artifact: &serde_json::Value, + grounded: Option, + duration_ms: u64, +) { + let Some(logger) = state.session_log.as_ref() else { return }; + let rec = build_session_record(req, trace_id, "accepted", iterations, max_iter, attempts, Some(artifact.clone()), grounded, duration_ms); + logger.append(rec).await; +} + +async fn write_session_failure( + state: &super::V1State, + req: &IterateRequest, + trace_id: &str, + iterations: u32, + max_iter: u32, + attempts: Vec, + duration_ms: u64, +) { + let Some(logger) = state.session_log.as_ref() else { return }; + let rec = build_session_record(req, trace_id, "max_iter_exhausted", iterations, max_iter, attempts, None, None, duration_ms); + logger.append(rec).await; +} + +async fn write_infra_error( + state: &super::V1State, + req: &IterateRequest, + trace_id: &str, + max_iter: u32, + duration_ms: u64, + error: String, +) { + let Some(logger) = state.session_log.as_ref() else { return }; + let attempts = vec![super::session_log::SessionAttemptRecord { + iteration: 0, + verdict_kind: "infra_error".to_string(), + error: Some(error), + span_id: None, + }]; + let rec = build_session_record(req, trace_id, "infra_error", 0, max_iter, attempts, None, None, duration_ms); + logger.append(rec).await; +} + +fn build_session_record( + req: &IterateRequest, + trace_id: &str, + final_verdict: &str, + iterations: u32, + max_iter: u32, + attempts: Vec, + artifact: Option, + grounded: Option, + duration_ms: u64, +) -> super::session_log::SessionRecord { + super::session_log::SessionRecord { + schema: super::session_log::SESSION_RECORD_SCHEMA.to_string(), + session_id: trace_id.to_string(), + timestamp: chrono::Utc::now().to_rfc3339(), + daemon: "gateway".to_string(), + kind: req.kind.clone(), + model: req.model.clone(), + provider: req.provider.clone(), + prompt: super::session_log::truncate(&req.prompt, 4000), + iterations, + max_iterations: max_iter, + final_verdict: final_verdict.to_string(), + attempts, + artifact, + grounded_in_roster: grounded, + duration_ms, + } +} + +/// Generate a fresh trace id when no parent was forwarded. Same +/// time-ordered hex shape Langfuse already accepts elsewhere in this +/// crate (see `langfuse_trace::uuid_v7_like`). +fn new_trace_id() -> String { + let ts = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0); + let rand = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.subsec_nanos()) + .unwrap_or(0); + format!("{:016x}-{:08x}", ts, rand) +} + +async fn call_chat(client: &reqwest::Client, gateway: &str, body: &serde_json::Value, trace_id: &str) -> Result { + let mut req = client.post(format!("{gateway}/v1/chat")).json(body); + if !trace_id.is_empty() { + req = req.header(TRACE_ID_HEADER, trace_id); + } + let resp = req.send().await.map_err(|e| format!("chat hop: {e}"))?; let status = resp.status(); if !status.is_success() { let body = resp.text().await.unwrap_or_default(); @@ -213,12 +425,12 @@ async fn call_chat(client: &reqwest::Client, gateway: &str, body: &serde_json::V .to_string()) } -async fn call_validate(client: &reqwest::Client, gateway: &str, body: &serde_json::Value) -> Result { - let resp = client.post(format!("{gateway}/v1/validate")) - .json(body) - .send() - .await - .map_err(|e| format!("validate hop: {e}"))?; +async fn call_validate(client: &reqwest::Client, gateway: &str, body: &serde_json::Value, trace_id: &str) -> Result { + let mut req = client.post(format!("{gateway}/v1/validate")).json(body); + if !trace_id.is_empty() { + req = req.header(TRACE_ID_HEADER, trace_id); + } + let resp = req.send().await.map_err(|e| format!("validate hop: {e}"))?; let status = resp.status(); let parsed: serde_json::Value = resp.json().await.map_err(|e| format!("validate parse: {e}"))?; if status.is_success() { diff --git a/crates/gateway/src/v1/langfuse_trace.rs b/crates/gateway/src/v1/langfuse_trace.rs index fe976f4..98c5c1c 100644 --- a/crates/gateway/src/v1/langfuse_trace.rs +++ b/crates/gateway/src/v1/langfuse_trace.rs @@ -76,63 +76,54 @@ impl LangfuseClient { }); } - async fn emit_chat_inner(&self, ev: ChatTrace) -> Result<(), String> { - let trace_id = uuid_v7_like(); - let gen_id = uuid_v7_like(); - let trace_ts = ev.start_time.clone(); + /// Fire-and-forget per-iteration span emit. Returns the generated + /// span id synchronously so the caller can stamp it on + /// `IterateAttempt.span_id` before the network round-trip resolves. + /// Mirrors Go's `validator.Tracer` callback shape. + pub fn emit_attempt_span(&self, sp: AttemptSpan) -> String { + let span_id = uuid_v7_like(); + let span_id_for_caller = span_id.clone(); + let this = self.clone(); + tokio::spawn(async move { + if let Err(e) = this.emit_attempt_span_inner(span_id, sp).await { + tracing::warn!(target: "v1.langfuse", "iterate span drop: {e}"); + } + }); + span_id_for_caller + } + async fn emit_attempt_span_inner(&self, span_id: String, sp: AttemptSpan) -> Result<(), String> { + let level = if sp.verdict == "accepted" { "DEFAULT" } else { "WARNING" }; let batch = IngestionBatch { - batch: vec![ - IngestionEvent { - id: uuid_v7_like(), - timestamp: trace_ts.clone(), - kind: "trace-create", - body: serde_json::json!({ - "id": trace_id, - "name": format!("v1.chat:{}", ev.provider), - "input": serde_json::json!({ - "model": ev.model, - "messages": ev.input, - }), - "metadata": serde_json::json!({ - "provider": ev.provider, - "think": ev.think, - }), + batch: vec![IngestionEvent { + id: uuid_v7_like(), + timestamp: sp.end_time.clone(), + kind: "span-create", + body: serde_json::json!({ + "id": span_id, + "traceId": sp.trace_id, + "name": format!("iterate.attempt[{}]", sp.iteration), + "input": serde_json::json!({ + "iteration": sp.iteration, + "model": sp.model, + "provider": sp.provider, + "prompt": truncate(&sp.prompt, 4000), }), - }, - IngestionEvent { - id: uuid_v7_like(), - timestamp: ev.end_time.clone(), - kind: "generation-create", - body: serde_json::json!({ - "id": gen_id, - "traceId": trace_id, - "name": "chat", - "model": ev.model, - "modelParameters": serde_json::json!({ - "temperature": ev.temperature, - "max_tokens": ev.max_tokens, - "think": ev.think, - }), - "input": ev.input, - "output": ev.output, - "usage": serde_json::json!({ - "input": ev.prompt_tokens, - "output": ev.completion_tokens, - "total": ev.prompt_tokens + ev.completion_tokens, - "unit": "TOKENS", - }), - "startTime": ev.start_time, - "endTime": ev.end_time, - "metadata": serde_json::json!({ - "provider": ev.provider, - "latency_ms": ev.latency_ms, - }), + "output": serde_json::json!({ + "verdict": sp.verdict, + "error": sp.error, + "raw": truncate(&sp.raw, 4000), }), - }, - ], + "level": level, + "startTime": sp.start_time, + "endTime": sp.end_time, + }), + }], }; + self.post_batch(batch).await + } + async fn post_batch(&self, batch: IngestionBatch) -> Result<(), String> { let url = format!("{}{}", self.inner.base_url.trim_end_matches('/'), INGESTION_PATH); let resp = self.inner.http .post(url) @@ -146,6 +137,81 @@ impl LangfuseClient { } Ok(()) } + + async fn emit_chat_inner(&self, ev: ChatTrace) -> Result<(), String> { + // When the caller forwarded a parent trace id (via the + // X-Lakehouse-Trace-Id header → V1State plumbing), attach the + // generation as a child of that trace. Without a parent we + // mint a new top-level trace per call (Phase 40 default). + let trace_id = ev.parent_trace_id.clone().unwrap_or_else(uuid_v7_like); + let nested = ev.parent_trace_id.is_some(); + let gen_id = uuid_v7_like(); + let trace_ts = ev.start_time.clone(); + + let mut events = Vec::with_capacity(2); + if !nested { + // Only mint a fresh trace-create when we don't have a parent. + // Reusing a parent trace id without re-creating it is the + // contract that lets validatord's iterate-session show up + // as one tree in Langfuse. + events.push(IngestionEvent { + id: uuid_v7_like(), + timestamp: trace_ts.clone(), + kind: "trace-create", + body: serde_json::json!({ + "id": trace_id, + "name": format!("v1.chat:{}", ev.provider), + "input": serde_json::json!({ + "model": ev.model, + "messages": ev.input, + }), + "metadata": serde_json::json!({ + "provider": ev.provider, + "think": ev.think, + }), + }), + }); + } + events.push(IngestionEvent { + id: uuid_v7_like(), + timestamp: ev.end_time.clone(), + kind: "generation-create", + body: serde_json::json!({ + "id": gen_id, + "traceId": trace_id, + "name": "chat", + "model": ev.model, + "modelParameters": serde_json::json!({ + "temperature": ev.temperature, + "max_tokens": ev.max_tokens, + "think": ev.think, + }), + "input": ev.input, + "output": ev.output, + "usage": serde_json::json!({ + "input": ev.prompt_tokens, + "output": ev.completion_tokens, + "total": ev.prompt_tokens + ev.completion_tokens, + "unit": "TOKENS", + }), + "startTime": ev.start_time, + "endTime": ev.end_time, + "metadata": serde_json::json!({ + "provider": ev.provider, + "latency_ms": ev.latency_ms, + }), + }), + }); + + self.post_batch(IngestionBatch { batch: events }).await + } +} + +/// Truncate a string to at most `n` chars (NOT bytes). Matches the Go +/// `trim` helper used in session log + attempt-span emission so an +/// operator reading two cross-runtime traces sees the same boundary. +fn truncate(s: &str, n: usize) -> String { + s.chars().take(n).collect() } /// Everything the v1.chat handler collects for one completed call. @@ -162,6 +228,32 @@ pub struct ChatTrace { pub start_time: String, pub end_time: String, pub latency_ms: u64, + /// When set, attach this chat trace as a child of the named + /// Langfuse trace instead of starting a new top-level trace. Used + /// by `/v1/iterate` to nest its inner /v1/chat hops under the + /// iterate-session trace so a multi-call session shows in + /// Langfuse as ONE trace tree, not N+1 disconnected traces. + /// Matches the Go-side `X-Lakehouse-Trace-Id` propagation + /// (commit d6d2fdf in golangLAKEHOUSE). + pub parent_trace_id: Option, +} + +/// One iteration attempt inside `/v1/iterate`'s loop. Becomes one +/// span on the parent trace when emitted via `emit_attempt_span`. +/// Matches Go's `validator.AttemptSpan` shape so the cross-runtime +/// observability surface is consistent. +pub struct AttemptSpan { + pub trace_id: String, + pub iteration: u32, + pub model: String, + pub provider: String, + pub prompt: String, + pub raw: String, + /// Verdict kind: "no_json" | "validation_failed" | "accepted" + pub verdict: String, + pub error: Option, + pub start_time: String, + pub end_time: String, } #[derive(Serialize)] diff --git a/crates/gateway/src/v1/mod.rs b/crates/gateway/src/v1/mod.rs index 052e5cc..6ab9ef0 100644 --- a/crates/gateway/src/v1/mod.rs +++ b/crates/gateway/src/v1/mod.rs @@ -21,6 +21,7 @@ pub mod opencode; pub mod validate; pub mod iterate; pub mod langfuse_trace; +pub mod session_log; pub mod mode; pub mod respond; pub mod truth; @@ -83,6 +84,13 @@ pub struct V1State { /// disabled (keys missing or container unreachable). Traces are /// fire-and-forget: never block the response path. pub langfuse: Option, + /// Coordinator session JSONL writer (path from + /// `[gateway].session_log_path`). One row per `/v1/iterate` + /// session for offline DuckDB analysis. None = disabled. + /// Cross-runtime parity with the Go-side `validatord` + /// `[validatord].session_log_path` (commit 1a3a82a in + /// golangLAKEHOUSE). + pub session_log: Option, } #[derive(Default, Clone, Serialize)] @@ -361,6 +369,7 @@ mod resolve_provider_tests { async fn chat( State(state): State, + headers: axum::http::HeaderMap, Json(req): Json, ) -> Result, (StatusCode, String)> { if req.messages.is_empty() { @@ -490,6 +499,17 @@ async fn chat( let output = resp.choices.first() .map(|c| c.message.text()) .unwrap_or_default(); + // Cross-runtime trace linkage. When a caller (validatord on + // Go side, /v1/iterate on Rust side) forwards a parent trace + // id via X-Lakehouse-Trace-Id, attach this generation to that + // trace so the iterate session and its inner chat hops show + // up as ONE trace tree in Langfuse. Header name matches the + // Go-side `shared.TraceIDHeader` constant byte-for-byte. + let parent_trace_id = headers + .get(crate::v1::iterate::TRACE_ID_HEADER) + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()) + .filter(|s| !s.is_empty()); lf.emit_chat(langfuse_trace::ChatTrace { provider: used_provider.clone(), model: resp.model.clone(), @@ -503,6 +523,7 @@ async fn chat( start_time: start_time.to_rfc3339(), end_time: end_time.to_rfc3339(), latency_ms, + parent_trace_id, }); } diff --git a/crates/gateway/src/v1/session_log.rs b/crates/gateway/src/v1/session_log.rs new file mode 100644 index 0000000..62282d2 --- /dev/null +++ b/crates/gateway/src/v1/session_log.rs @@ -0,0 +1,235 @@ +//! Coordinator session JSONL writer — Rust parity with the Go-side +//! `internal/validator/session_log.go` (commit 1a3a82a in +//! golangLAKEHOUSE). Same schema, same field names, same producer +//! semantics, so a unified longitudinal log can pull from either +//! runtime via DuckDB. +//! +//! Schema: `session.iterate.v1`. One row per `/v1/iterate` session. +//! Append-only. Best-effort posture: errors warn and the iterate +//! response always ships. +//! +//! See `golangLAKEHOUSE/docs/SESSION_LOG.md` for the full schema +//! reference + DuckDB query examples. This module produces rows +//! with `daemon: "gateway"`; the Go side produces `daemon: +//! "validatord"`. Operators who want a unified stream can point both +//! to the same path (the OS write-append is atomic for the row sizes +//! we produce) or query both files together via duckdb's `read_json` +//! glob support. + +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tokio::sync::Mutex; + +pub const SESSION_RECORD_SCHEMA: &str = "session.iterate.v1"; + +/// One row in coordinator_sessions.jsonl. Field names are the on-wire +/// names — must stay byte-equal to the Go side's +/// `validator.SessionRecord` (proven by the cross-runtime parity +/// probe at golangLAKEHOUSE/scripts/cutover/parity/). +// Deserialize is supported so the parity helper binary can round-trip +// fixture inputs through serde without hand-rolling a parser. Production +// emit path uses Serialize only; SessionRecord rows are written by the +// gateway and consumed by DuckDB / external tooling, never re-read by us. +#[derive(Serialize, Deserialize)] +pub struct SessionRecord { + pub schema: String, + pub session_id: String, + pub timestamp: String, + pub daemon: String, + pub kind: String, + pub model: String, + pub provider: String, + pub prompt: String, + pub iterations: u32, + pub max_iterations: u32, + pub final_verdict: String, // "accepted" | "max_iter_exhausted" | "infra_error" + pub attempts: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub artifact: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub grounded_in_roster: Option, + pub duration_ms: u64, +} + +#[derive(Serialize, Deserialize)] +pub struct SessionAttemptRecord { + pub iteration: u32, + pub verdict_kind: String, // "no_json" | "validation_failed" | "accepted" | "infra_error" + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub span_id: Option, +} + +/// Append-only writer. Cloneable handle — internal state is Arc'd so +/// V1State can keep its own clone and per-request clones are cheap. +#[derive(Clone)] +pub struct SessionLogger { + inner: Arc, +} + +struct Inner { + path: String, + /// tokio::Mutex (not std) because we hold it across the async + /// fs write. Contention is low (one row per /v1/iterate session). + mu: Mutex<()>, +} + +impl SessionLogger { + /// Construct a logger writing to `path`. Empty path → None + /// (skip the wiring in the iterate handler entirely). + pub fn from_path(path: &str) -> Option { + if path.is_empty() { + return None; + } + Some(Self { + inner: Arc::new(Inner { + path: path.to_string(), + mu: Mutex::new(()), + }), + }) + } + + /// Append one record. Best-effort: failures land in `tracing::warn!` + /// and the caller sees Ok(()) — observability is a witness, never + /// a gate. Returns Err only on impossible cases the type system + /// can't rule out (here: serde_json::to_string failing on a + /// well-formed struct, which shouldn't happen). + pub async fn append(&self, rec: SessionRecord) { + let body = match serde_json::to_string(&rec) { + Ok(s) => s, + Err(e) => { + tracing::warn!(target: "v1.session_log", "marshal: {e}"); + return; + } + }; + let _guard = self.inner.mu.lock().await; + if let Err(e) = self.write(&body).await { + tracing::warn!(target: "v1.session_log", "write {}: {e}", self.inner.path); + } + } + + async fn write(&self, body: &str) -> std::io::Result<()> { + use tokio::fs::OpenOptions; + use tokio::io::AsyncWriteExt; + + // Lazy mkdir on first write so a not-yet-mounted volume at + // startup doesn't kill the daemon. + if let Some(parent) = std::path::Path::new(&self.inner.path).parent() { + if !parent.as_os_str().is_empty() { + tokio::fs::create_dir_all(parent).await?; + } + } + let mut f = OpenOptions::new() + .append(true) + .create(true) + .open(&self.inner.path) + .await?; + f.write_all(body.as_bytes()).await?; + f.write_all(b"\n").await?; + Ok(()) + } +} + +/// Best-effort UTF-8 char truncation. Matches Go's `trim` helper so +/// rows produced by either runtime cap fields at the same boundaries. +pub fn truncate(s: &str, n: usize) -> String { + s.chars().take(n).collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use std::path::PathBuf; + use tokio::fs; + + fn fixture_record(session_id: &str) -> SessionRecord { + SessionRecord { + schema: SESSION_RECORD_SCHEMA.to_string(), + session_id: session_id.to_string(), + timestamp: "2026-05-02T08:00:00Z".to_string(), + daemon: "gateway".to_string(), + kind: "fill".to_string(), + model: "qwen3.5:latest".to_string(), + provider: "ollama".to_string(), + prompt: "produce a fill artifact".to_string(), + iterations: 1, + max_iterations: 3, + final_verdict: "accepted".to_string(), + attempts: vec![SessionAttemptRecord { + iteration: 0, + verdict_kind: "accepted".to_string(), + error: None, + span_id: Some("span-0".to_string()), + }], + artifact: Some(serde_json::json!({"fills":[{"candidate_id":"W-1"}]})), + grounded_in_roster: Some(true), + duration_ms: 50, + } + } + + #[tokio::test] + async fn from_path_empty_returns_none() { + assert!(SessionLogger::from_path("").is_none()); + } + + #[tokio::test] + async fn append_writes_jsonl_row_with_schema_field() { + let dir = tempdir(); + let path = dir.join("sessions.jsonl"); + let path_str = path.to_string_lossy().to_string(); + let logger = SessionLogger::from_path(&path_str).unwrap(); + logger.append(fixture_record("trace-a")).await; + + let body = fs::read_to_string(&path).await.unwrap(); + assert!(body.contains("\"schema\":\"session.iterate.v1\"")); + assert!(body.contains("\"session_id\":\"trace-a\"")); + assert!(body.contains("\"grounded_in_roster\":true")); + assert!(body.ends_with('\n')); + } + + #[tokio::test] + async fn append_concurrent_safe() { + let dir = tempdir(); + let path = dir.join("sessions.jsonl"); + let path_str = path.to_string_lossy().to_string(); + let logger = SessionLogger::from_path(&path_str).unwrap(); + + let n = 32; + let mut handles = Vec::with_capacity(n); + for i in 0..n { + let l = logger.clone(); + handles.push(tokio::spawn(async move { + l.append(fixture_record(&format!("trace-{i}"))).await; + })); + } + for h in handles { + h.await.unwrap(); + } + + let body = fs::read_to_string(&path).await.unwrap(); + let lines: Vec<_> = body.lines().filter(|l| !l.is_empty()).collect(); + assert_eq!(lines.len(), n, "expected {n} rows, got {}", lines.len()); + // Every row must round-trip through serde — a torn write + // would surface as a parse error. + for line in lines { + let _: serde_json::Value = serde_json::from_str(line).expect("valid json per row"); + } + } + + fn tempdir() -> PathBuf { + // Per-test unique path so prior runs don't pollute the next. + // The static counter increments across the whole test binary, + // so back-to-back tests in the same module get distinct dirs. + static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); + let n = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let p = std::env::temp_dir().join(format!( + "session_log_test_{}_{}_{}", + std::process::id(), + chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0), + n, + )); + std::fs::create_dir_all(&p).unwrap(); + p + } +} diff --git a/crates/shared/src/config.rs b/crates/shared/src/config.rs index 9656bdc..f12898e 100644 --- a/crates/shared/src/config.rs +++ b/crates/shared/src/config.rs @@ -62,6 +62,15 @@ pub struct GatewayConfig { pub host: String, #[serde(default = "default_gateway_port")] pub port: u16, + /// Coordinator session JSONL output path. One row per + /// `/v1/iterate` session, schema=`session.iterate.v1`. Empty = + /// disabled. Cross-runtime parity with the Go side's + /// `[validatord].session_log_path` (added 2026-05-02). Default + /// empty so existing deployments aren't perturbed; production + /// sets `/var/lib/lakehouse/gateway/sessions.jsonl`. See + /// `golangLAKEHOUSE/docs/SESSION_LOG.md` for query examples. + #[serde(default)] + pub session_log_path: String, } #[derive(Debug, Clone, Deserialize)] @@ -190,7 +199,11 @@ impl Config { impl Default for Config { fn default() -> Self { Self { - gateway: GatewayConfig { host: default_host(), port: default_gateway_port() }, + gateway: GatewayConfig { + host: default_host(), + port: default_gateway_port(), + session_log_path: String::new(), + }, storage: StorageConfig { root: default_storage_root(), profile_root: default_profile_root(), diff --git a/lakehouse.toml b/lakehouse.toml index 345ea5d..d0eea54 100644 --- a/lakehouse.toml +++ b/lakehouse.toml @@ -3,6 +3,11 @@ [gateway] host = "0.0.0.0" port = 3100 +# Coordinator session JSONL — one row per /v1/iterate session for +# offline DuckDB analysis. Cross-runtime parity with the Go-side +# [validatord].session_log_path. Empty = disabled. Production: +# session_log_path = "/var/lib/lakehouse/gateway/sessions.jsonl" +session_log_path = "" [storage] root = "./data"