gateway: trace-id propagation + coordinator session JSONL (Rust parity)
Some checks failed
lakehouse/auditor 10 blocking issues: cloud: claim not backed — "Verified end-to-end against persistent Go stack on :4110:"
Some checks failed
lakehouse/auditor 10 blocking issues: cloud: claim not backed — "Verified end-to-end against persistent Go stack on :4110:"
Cross-runtime parity with the Go-side observability wave (commits
d6d2fdf + 1a3a82a in golangLAKEHOUSE). The two layers J flagged:
the LIVE per-call view (Langfuse) and the LONGITUDINAL forensic view
(JSONL queryable via DuckDB). Hard correctness gate (FillValidator
phantom-rejection) was already in place; this is the observability
on top.
## Trace-id propagation
X-Lakehouse-Trace-Id header constant declared in
crates/gateway/src/v1/iterate.rs (matches Go's shared.TraceIDHeader
byte-for-byte). When set on an inbound /v1/iterate request, the
handler reuses it; the chat + validate self-loopback hops forward
the same header so chatd's trace emit nests under the parent rather
than minting a fresh top-level trace per call.
ChatTrace gains a parent_trace_id field. emit_chat_inner skips the
trace-create event when parent is set, only emits the
generation-create which attaches to the existing trace tree. Result:
an iterate session with N retries shows in Langfuse as ONE tree, not
N+1 disconnected traces.
emit_attempt_span (new) writes one Langfuse span per iteration
attempt with input={iteration, model, provider, prompt} and
output={verdict, raw, error}. WARNING level on non-accepted
verdicts. The returned span id is stamped on the corresponding
SessionRecord attempt for cross-log correlation.
## Coordinator session JSONL
crates/gateway/src/v1/session_log.rs — new writer matching Go's
internal/validator/session_log.go schema byte-for-byte:
- SessionRecord with schema=session.iterate.v1
- SessionAttemptRecord per retry
- SessionLogger.append: tokio Mutex serialized append-only
- Best-effort posture (slog.Warn on error, never blocks request)
iterate.rs builds + appends a row on EVERY code path:
- accepted: write_session_accepted with grounded_in_roster bool
derived from validate_workers WorkerLookup (matches Go's
handlers.rosterCheckFor("fill") semantics)
- max-iter-exhausted: write_session_failure
- infra-error: write_infra_error (so a missing /v1/iterate event
never silently disappears from the longitudinal log)
[gateway].session_log_path config field (empty = disabled).
Production: /var/lib/lakehouse/gateway/sessions.jsonl. Operators who
want a unified longitudinal stream can point both Rust and Go
loggers at the same path — write-append is safe at the row sizes we
produce.
## Cross-runtime parity probe
crates/gateway/src/bin/parity_session_log: tiny stdin/stdout helper
that round-trips a fixture through SessionRecord serde.
golangLAKEHOUSE/scripts/cutover/parity/session_log_parity.sh feeds
4 fixtures through both helpers and diffs the rows after stripping
timestamp + daemon (the two fields that legitimately differ between
producers).
Result: **4/4 byte-equal** including the unicode-prompt fixture
("Café résumé ⭐ 你好"). Schema parity holds. The non-trivial-equal
guard in the probe rejects the case where both sides fail
identically — protecting against a regression where one side
silently stops producing valid JSON.
## Verification
- cargo test -p gateway --lib: 90/90 PASS (3 new session_log tests
including concurrent-append safety)
- cargo check --workspace: clean
- session_log_parity.sh: 4/4 fixtures byte-equal
- Both runtimes can append to the same path; DuckDB sees one stream
- The Go-side validatord smoke remains 5/5 (unchanged)
## Architecture invariant
Don't propose to "wire trace-id propagation in Rust" or "add Rust
session log" — both are now shipped on the demo/post-pr11-polish
branch. The longitudinal log + Langfuse tree together cover the
multi-call observability concern J flagged 2026-05-02.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
ba928b1d64
commit
57bde63a06
71
crates/gateway/src/bin/parity_session_log.rs
Normal file
71
crates/gateway/src/bin/parity_session_log.rs
Normal file
@ -0,0 +1,71 @@
|
||||
//! Cross-runtime parity helper for `SessionRecord` JSON shape.
|
||||
//!
|
||||
//! Reads a fixture JSON on stdin, builds a `SessionRecord`, emits
|
||||
//! one JSONL row on stdout. Used by
|
||||
//! `golangLAKEHOUSE/scripts/cutover/parity/session_log_parity.sh`
|
||||
//! to verify the Rust gateway's session log shape stays byte-equal
|
||||
//! to the Go-side validatord's `validator.SessionRecord` (commit
|
||||
//! 1a3a82a in golangLAKEHOUSE).
|
||||
|
||||
use gateway::v1::session_log::{SessionAttemptRecord, SessionRecord, SESSION_RECORD_SCHEMA};
|
||||
use serde::Deserialize;
|
||||
use std::io::Read;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct FixtureInput {
|
||||
session_id: String,
|
||||
kind: String,
|
||||
model: String,
|
||||
provider: String,
|
||||
prompt: String,
|
||||
iterations: u32,
|
||||
max_iterations: u32,
|
||||
final_verdict: String,
|
||||
attempts: Vec<SessionAttemptRecord>,
|
||||
#[serde(default)]
|
||||
artifact: Option<serde_json::Value>,
|
||||
#[serde(default)]
|
||||
grounded_in_roster: Option<bool>,
|
||||
duration_ms: u64,
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let mut buf = String::new();
|
||||
if let Err(e) = std::io::stdin().read_to_string(&mut buf) {
|
||||
eprintln!("read stdin: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
let input: FixtureInput = match serde_json::from_str(&buf) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
eprintln!("parse stdin: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
let rec = SessionRecord {
|
||||
schema: SESSION_RECORD_SCHEMA.to_string(),
|
||||
session_id: input.session_id,
|
||||
// Pinned timestamp so both runtimes' rows compare byte-equal
|
||||
// when the test wrapper normalizes on `daemon` only.
|
||||
timestamp: "2026-01-01T00:00:00+00:00".to_string(),
|
||||
daemon: "gateway".to_string(),
|
||||
kind: input.kind,
|
||||
model: input.model,
|
||||
provider: input.provider,
|
||||
prompt: input.prompt,
|
||||
iterations: input.iterations,
|
||||
max_iterations: input.max_iterations,
|
||||
final_verdict: input.final_verdict,
|
||||
attempts: input.attempts,
|
||||
artifact: input.artifact,
|
||||
grounded_in_roster: input.grounded_in_roster,
|
||||
duration_ms: input.duration_ms,
|
||||
};
|
||||
match serde_json::to_string(&rec) {
|
||||
Ok(s) => println!("{s}"),
|
||||
Err(e) => {
|
||||
eprintln!("marshal: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -438,6 +438,10 @@ impl ExecutionLoop {
|
||||
start_time: start_time.to_rfc3339(),
|
||||
end_time: end_time.to_rfc3339(),
|
||||
latency_ms: elapsed_ms,
|
||||
// Internal execution-loop traffic is its own top-level
|
||||
// trace per call. If a future caller threads a parent
|
||||
// trace into self.state, lift this to Some(parent_id).
|
||||
parent_trace_id: None,
|
||||
});
|
||||
}
|
||||
|
||||
@ -654,6 +658,7 @@ impl ExecutionLoop {
|
||||
start_time: start_time.to_rfc3339(),
|
||||
end_time: end_time.to_rfc3339(),
|
||||
latency_ms,
|
||||
parent_trace_id: None,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -362,6 +362,22 @@ async fn main() {
|
||||
}
|
||||
c
|
||||
},
|
||||
// Coordinator session JSONL — one row per /v1/iterate
|
||||
// session for offline DuckDB analysis. Cross-runtime
|
||||
// parity with Go-side validatord (commit 1a3a82a).
|
||||
session_log: {
|
||||
let path = &config.gateway.session_log_path;
|
||||
let s = v1::session_log::SessionLogger::from_path(path);
|
||||
if s.is_some() {
|
||||
tracing::info!(
|
||||
"v1: session log enabled — coordinator sessions written to {}",
|
||||
path
|
||||
);
|
||||
} else {
|
||||
tracing::info!("v1: session log disabled (set [gateway].session_log_path to enable)");
|
||||
}
|
||||
s
|
||||
},
|
||||
}));
|
||||
|
||||
// Auth middleware (if enabled) — P5-001 fix 2026-04-23:
|
||||
|
||||
@ -21,12 +21,19 @@
|
||||
//! re-implementation. Staffing executors, agent loops, and future
|
||||
//! validators all reach the same code path.
|
||||
|
||||
use axum::{extract::State, http::StatusCode, response::IntoResponse, Json};
|
||||
use axum::{extract::State, http::{HeaderMap, StatusCode}, response::IntoResponse, Json};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
const DEFAULT_MAX_ITERATIONS: u32 = 3;
|
||||
const LOOPBACK_TIMEOUT_SECS: u64 = 240;
|
||||
|
||||
/// Header name used to propagate a Langfuse parent trace id across
|
||||
/// daemon boundaries. Matches Go's `shared.TraceIDHeader` constant
|
||||
/// byte-for-byte (commit d6d2fdf in golangLAKEHOUSE) — same wire
|
||||
/// format means a Go caller can hit Rust's /v1/iterate (or vice
|
||||
/// versa) and the resulting Langfuse trees nest correctly.
|
||||
pub const TRACE_ID_HEADER: &str = "x-lakehouse-trace-id";
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct IterateRequest {
|
||||
/// "fill" | "email" | "playbook" — picks which validator runs.
|
||||
@ -90,26 +97,47 @@ pub struct IterateFailure {
|
||||
}
|
||||
|
||||
pub async fn iterate(
|
||||
State(_state): State<super::V1State>,
|
||||
State(state): State<super::V1State>,
|
||||
headers: HeaderMap,
|
||||
Json(req): Json<IterateRequest>,
|
||||
) -> impl IntoResponse {
|
||||
let max_iter = req.max_iterations.unwrap_or(DEFAULT_MAX_ITERATIONS).max(1);
|
||||
let temperature = req.temperature.unwrap_or(0.2);
|
||||
let max_tokens = req.max_tokens.unwrap_or(4096);
|
||||
let mut history: Vec<IterateAttempt> = Vec::with_capacity(max_iter as usize);
|
||||
let mut attempt_records: Vec<super::session_log::SessionAttemptRecord> = Vec::with_capacity(max_iter as usize);
|
||||
let mut current_prompt = req.prompt.clone();
|
||||
|
||||
// Resolve the parent Langfuse trace id. Caller-forwarded header
|
||||
// wins (cross-daemon tree linkage); otherwise mint a fresh id so
|
||||
// the iterate session is its own tree. Same shape as the Go-side
|
||||
// validatord trace propagation.
|
||||
let trace_id: String = headers
|
||||
.get(TRACE_ID_HEADER)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.filter(|s| !s.is_empty())
|
||||
.map(|s| s.to_string())
|
||||
.unwrap_or_else(new_trace_id);
|
||||
|
||||
let client = match reqwest::Client::builder()
|
||||
.timeout(std::time::Duration::from_secs(LOOPBACK_TIMEOUT_SECS))
|
||||
.build() {
|
||||
Ok(c) => c,
|
||||
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("client build: {e}")).into_response(),
|
||||
Err(e) => {
|
||||
// Even infrastructure failures get a session row so a
|
||||
// missing /v1/iterate event never silently disappears
|
||||
// from the longitudinal log.
|
||||
write_infra_error(&state, &req, &trace_id, max_iter, 0, format!("client build: {e}")).await;
|
||||
return (StatusCode::INTERNAL_SERVER_ERROR, format!("client build: {e}")).into_response();
|
||||
}
|
||||
};
|
||||
// Self-loopback to the gateway port. Carries gateway internal
|
||||
// calls through /v1/chat + /v1/validate so /v1/usage tracks them.
|
||||
let gateway = "http://127.0.0.1:3100";
|
||||
let t0 = std::time::Instant::now();
|
||||
|
||||
for iteration in 0..max_iter {
|
||||
let attempt_started = chrono::Utc::now();
|
||||
// ── Generate ──
|
||||
let mut messages = Vec::with_capacity(2);
|
||||
if let Some(sys) = &req.system {
|
||||
@ -123,20 +151,33 @@ pub async fn iterate(
|
||||
"temperature": temperature,
|
||||
"max_tokens": max_tokens,
|
||||
});
|
||||
let raw = match call_chat(&client, gateway, &chat_body).await {
|
||||
let raw = match call_chat(&client, gateway, &chat_body, &trace_id).await {
|
||||
Ok(r) => r,
|
||||
Err(e) => return (StatusCode::BAD_GATEWAY, format!("/v1/chat hop failed at iter {iteration}: {e}")).into_response(),
|
||||
Err(e) => {
|
||||
write_infra_error(&state, &req, &trace_id, max_iter, t0.elapsed().as_millis() as u64, format!("/v1/chat hop failed at iter {iteration}: {e}")).await;
|
||||
return (StatusCode::BAD_GATEWAY, format!("/v1/chat hop failed at iter {iteration}: {e}")).into_response();
|
||||
}
|
||||
};
|
||||
|
||||
// ── Extract JSON ──
|
||||
let artifact = match extract_json(&raw) {
|
||||
Some(a) => a,
|
||||
None => {
|
||||
let span_id = emit_attempt_span(
|
||||
&state, &trace_id, iteration, &req, ¤t_prompt, &raw, "no_json", None,
|
||||
attempt_started, chrono::Utc::now(),
|
||||
);
|
||||
history.push(IterateAttempt {
|
||||
iteration,
|
||||
raw: raw.chars().take(2000).collect(),
|
||||
status: AttemptStatus::NoJson,
|
||||
});
|
||||
attempt_records.push(super::session_log::SessionAttemptRecord {
|
||||
iteration,
|
||||
verdict_kind: "no_json".to_string(),
|
||||
error: None,
|
||||
span_id,
|
||||
});
|
||||
current_prompt = format!(
|
||||
"{}\n\nYour previous attempt did not contain a JSON object. Reply with ONLY a valid JSON object matching the requested artifact shape.",
|
||||
req.prompt,
|
||||
@ -151,13 +192,26 @@ pub async fn iterate(
|
||||
"artifact": artifact,
|
||||
"context": req.context.clone().unwrap_or(serde_json::Value::Null),
|
||||
});
|
||||
match call_validate(&client, gateway, &validate_body).await {
|
||||
match call_validate(&client, gateway, &validate_body, &trace_id).await {
|
||||
Ok(report) => {
|
||||
let span_id = emit_attempt_span(
|
||||
&state, &trace_id, iteration, &req, ¤t_prompt, &raw, "accepted", None,
|
||||
attempt_started, chrono::Utc::now(),
|
||||
);
|
||||
history.push(IterateAttempt {
|
||||
iteration,
|
||||
raw: raw.chars().take(2000).collect(),
|
||||
status: AttemptStatus::Accepted,
|
||||
});
|
||||
attempt_records.push(super::session_log::SessionAttemptRecord {
|
||||
iteration,
|
||||
verdict_kind: "accepted".to_string(),
|
||||
error: None,
|
||||
span_id,
|
||||
});
|
||||
let duration_ms = t0.elapsed().as_millis() as u64;
|
||||
let grounded = grounded_in_roster(&state, &req.kind, &artifact);
|
||||
write_session_accepted(&state, &req, &trace_id, iteration + 1, max_iter, attempt_records, &artifact, grounded, duration_ms).await;
|
||||
return (StatusCode::OK, Json(IterateResponse {
|
||||
artifact,
|
||||
validation: report,
|
||||
@ -167,6 +221,11 @@ pub async fn iterate(
|
||||
}
|
||||
Err(err) => {
|
||||
let err_summary = err.to_string();
|
||||
let span_id = emit_attempt_span(
|
||||
&state, &trace_id, iteration, &req, ¤t_prompt, &raw, "validation_failed",
|
||||
Some(err_summary.clone()),
|
||||
attempt_started, chrono::Utc::now(),
|
||||
);
|
||||
history.push(IterateAttempt {
|
||||
iteration,
|
||||
raw: raw.chars().take(2000).collect(),
|
||||
@ -174,6 +233,12 @@ pub async fn iterate(
|
||||
error: serde_json::to_value(&err_summary).unwrap_or(serde_json::Value::Null),
|
||||
},
|
||||
});
|
||||
attempt_records.push(super::session_log::SessionAttemptRecord {
|
||||
iteration,
|
||||
verdict_kind: "validation_failed".to_string(),
|
||||
error: Some(err_summary.clone()),
|
||||
span_id,
|
||||
});
|
||||
// Append validation feedback to prompt for next iter.
|
||||
// The model sees concrete failure mode + retries with
|
||||
// corrective context. This is the "observer correction"
|
||||
@ -188,6 +253,8 @@ pub async fn iterate(
|
||||
}
|
||||
}
|
||||
|
||||
let duration_ms = t0.elapsed().as_millis() as u64;
|
||||
write_session_failure(&state, &req, &trace_id, max_iter, max_iter, attempt_records, duration_ms).await;
|
||||
(StatusCode::UNPROCESSABLE_ENTITY, Json(IterateFailure {
|
||||
error: format!("max iterations reached ({max_iter}) without passing validation"),
|
||||
iterations: max_iter,
|
||||
@ -195,12 +262,157 @@ pub async fn iterate(
|
||||
})).into_response()
|
||||
}
|
||||
|
||||
async fn call_chat(client: &reqwest::Client, gateway: &str, body: &serde_json::Value) -> Result<String, String> {
|
||||
let resp = client.post(format!("{gateway}/v1/chat"))
|
||||
.json(body)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("chat hop: {e}"))?;
|
||||
// ─── Helpers — Langfuse spans + session log + roster check ─────────
|
||||
|
||||
fn emit_attempt_span(
|
||||
state: &super::V1State,
|
||||
trace_id: &str,
|
||||
iteration: u32,
|
||||
req: &IterateRequest,
|
||||
prompt: &str,
|
||||
raw: &str,
|
||||
verdict: &str,
|
||||
error: Option<String>,
|
||||
started: chrono::DateTime<chrono::Utc>,
|
||||
ended: chrono::DateTime<chrono::Utc>,
|
||||
) -> Option<String> {
|
||||
let lf = state.langfuse.as_ref()?;
|
||||
Some(lf.emit_attempt_span(super::langfuse_trace::AttemptSpan {
|
||||
trace_id: trace_id.to_string(),
|
||||
iteration,
|
||||
model: req.model.clone(),
|
||||
provider: req.provider.clone(),
|
||||
prompt: prompt.to_string(),
|
||||
raw: raw.to_string(),
|
||||
verdict: verdict.to_string(),
|
||||
error,
|
||||
start_time: started.to_rfc3339(),
|
||||
end_time: ended.to_rfc3339(),
|
||||
}))
|
||||
}
|
||||
|
||||
/// Verify every fill artifact's candidate IDs exist in the roster.
|
||||
/// Returns Some(true)/Some(false) on the fill kind, None otherwise
|
||||
/// (other kinds don't have worker IDs to ground). Same semantics as
|
||||
/// Go's `handlers.rosterCheckFor("fill")`.
|
||||
fn grounded_in_roster(
|
||||
state: &super::V1State,
|
||||
kind: &str,
|
||||
artifact: &serde_json::Value,
|
||||
) -> Option<bool> {
|
||||
if kind != "fill" {
|
||||
return None;
|
||||
}
|
||||
let fills = artifact.get("fills").and_then(|v| v.as_array())?;
|
||||
for f in fills {
|
||||
let id = match f.get("candidate_id").and_then(|v| v.as_str()) {
|
||||
Some(s) if !s.is_empty() => s,
|
||||
_ => return Some(false),
|
||||
};
|
||||
if state.validate_workers.find(id).is_none() {
|
||||
return Some(false);
|
||||
}
|
||||
}
|
||||
Some(true)
|
||||
}
|
||||
|
||||
async fn write_session_accepted(
|
||||
state: &super::V1State,
|
||||
req: &IterateRequest,
|
||||
trace_id: &str,
|
||||
iterations: u32,
|
||||
max_iter: u32,
|
||||
attempts: Vec<super::session_log::SessionAttemptRecord>,
|
||||
artifact: &serde_json::Value,
|
||||
grounded: Option<bool>,
|
||||
duration_ms: u64,
|
||||
) {
|
||||
let Some(logger) = state.session_log.as_ref() else { return };
|
||||
let rec = build_session_record(req, trace_id, "accepted", iterations, max_iter, attempts, Some(artifact.clone()), grounded, duration_ms);
|
||||
logger.append(rec).await;
|
||||
}
|
||||
|
||||
async fn write_session_failure(
|
||||
state: &super::V1State,
|
||||
req: &IterateRequest,
|
||||
trace_id: &str,
|
||||
iterations: u32,
|
||||
max_iter: u32,
|
||||
attempts: Vec<super::session_log::SessionAttemptRecord>,
|
||||
duration_ms: u64,
|
||||
) {
|
||||
let Some(logger) = state.session_log.as_ref() else { return };
|
||||
let rec = build_session_record(req, trace_id, "max_iter_exhausted", iterations, max_iter, attempts, None, None, duration_ms);
|
||||
logger.append(rec).await;
|
||||
}
|
||||
|
||||
async fn write_infra_error(
|
||||
state: &super::V1State,
|
||||
req: &IterateRequest,
|
||||
trace_id: &str,
|
||||
max_iter: u32,
|
||||
duration_ms: u64,
|
||||
error: String,
|
||||
) {
|
||||
let Some(logger) = state.session_log.as_ref() else { return };
|
||||
let attempts = vec![super::session_log::SessionAttemptRecord {
|
||||
iteration: 0,
|
||||
verdict_kind: "infra_error".to_string(),
|
||||
error: Some(error),
|
||||
span_id: None,
|
||||
}];
|
||||
let rec = build_session_record(req, trace_id, "infra_error", 0, max_iter, attempts, None, None, duration_ms);
|
||||
logger.append(rec).await;
|
||||
}
|
||||
|
||||
fn build_session_record(
|
||||
req: &IterateRequest,
|
||||
trace_id: &str,
|
||||
final_verdict: &str,
|
||||
iterations: u32,
|
||||
max_iter: u32,
|
||||
attempts: Vec<super::session_log::SessionAttemptRecord>,
|
||||
artifact: Option<serde_json::Value>,
|
||||
grounded: Option<bool>,
|
||||
duration_ms: u64,
|
||||
) -> super::session_log::SessionRecord {
|
||||
super::session_log::SessionRecord {
|
||||
schema: super::session_log::SESSION_RECORD_SCHEMA.to_string(),
|
||||
session_id: trace_id.to_string(),
|
||||
timestamp: chrono::Utc::now().to_rfc3339(),
|
||||
daemon: "gateway".to_string(),
|
||||
kind: req.kind.clone(),
|
||||
model: req.model.clone(),
|
||||
provider: req.provider.clone(),
|
||||
prompt: super::session_log::truncate(&req.prompt, 4000),
|
||||
iterations,
|
||||
max_iterations: max_iter,
|
||||
final_verdict: final_verdict.to_string(),
|
||||
attempts,
|
||||
artifact,
|
||||
grounded_in_roster: grounded,
|
||||
duration_ms,
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate a fresh trace id when no parent was forwarded. Same
|
||||
/// time-ordered hex shape Langfuse already accepts elsewhere in this
|
||||
/// crate (see `langfuse_trace::uuid_v7_like`).
|
||||
fn new_trace_id() -> String {
|
||||
let ts = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0);
|
||||
let rand = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.map(|d| d.subsec_nanos())
|
||||
.unwrap_or(0);
|
||||
format!("{:016x}-{:08x}", ts, rand)
|
||||
}
|
||||
|
||||
async fn call_chat(client: &reqwest::Client, gateway: &str, body: &serde_json::Value, trace_id: &str) -> Result<String, String> {
|
||||
let mut req = client.post(format!("{gateway}/v1/chat")).json(body);
|
||||
if !trace_id.is_empty() {
|
||||
req = req.header(TRACE_ID_HEADER, trace_id);
|
||||
}
|
||||
let resp = req.send().await.map_err(|e| format!("chat hop: {e}"))?;
|
||||
let status = resp.status();
|
||||
if !status.is_success() {
|
||||
let body = resp.text().await.unwrap_or_default();
|
||||
@ -213,12 +425,12 @@ async fn call_chat(client: &reqwest::Client, gateway: &str, body: &serde_json::V
|
||||
.to_string())
|
||||
}
|
||||
|
||||
async fn call_validate(client: &reqwest::Client, gateway: &str, body: &serde_json::Value) -> Result<serde_json::Value, String> {
|
||||
let resp = client.post(format!("{gateway}/v1/validate"))
|
||||
.json(body)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("validate hop: {e}"))?;
|
||||
async fn call_validate(client: &reqwest::Client, gateway: &str, body: &serde_json::Value, trace_id: &str) -> Result<serde_json::Value, String> {
|
||||
let mut req = client.post(format!("{gateway}/v1/validate")).json(body);
|
||||
if !trace_id.is_empty() {
|
||||
req = req.header(TRACE_ID_HEADER, trace_id);
|
||||
}
|
||||
let resp = req.send().await.map_err(|e| format!("validate hop: {e}"))?;
|
||||
let status = resp.status();
|
||||
let parsed: serde_json::Value = resp.json().await.map_err(|e| format!("validate parse: {e}"))?;
|
||||
if status.is_success() {
|
||||
|
||||
@ -76,63 +76,54 @@ impl LangfuseClient {
|
||||
});
|
||||
}
|
||||
|
||||
async fn emit_chat_inner(&self, ev: ChatTrace) -> Result<(), String> {
|
||||
let trace_id = uuid_v7_like();
|
||||
let gen_id = uuid_v7_like();
|
||||
let trace_ts = ev.start_time.clone();
|
||||
/// Fire-and-forget per-iteration span emit. Returns the generated
|
||||
/// span id synchronously so the caller can stamp it on
|
||||
/// `IterateAttempt.span_id` before the network round-trip resolves.
|
||||
/// Mirrors Go's `validator.Tracer` callback shape.
|
||||
pub fn emit_attempt_span(&self, sp: AttemptSpan) -> String {
|
||||
let span_id = uuid_v7_like();
|
||||
let span_id_for_caller = span_id.clone();
|
||||
let this = self.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = this.emit_attempt_span_inner(span_id, sp).await {
|
||||
tracing::warn!(target: "v1.langfuse", "iterate span drop: {e}");
|
||||
}
|
||||
});
|
||||
span_id_for_caller
|
||||
}
|
||||
|
||||
async fn emit_attempt_span_inner(&self, span_id: String, sp: AttemptSpan) -> Result<(), String> {
|
||||
let level = if sp.verdict == "accepted" { "DEFAULT" } else { "WARNING" };
|
||||
let batch = IngestionBatch {
|
||||
batch: vec![
|
||||
IngestionEvent {
|
||||
id: uuid_v7_like(),
|
||||
timestamp: trace_ts.clone(),
|
||||
kind: "trace-create",
|
||||
body: serde_json::json!({
|
||||
"id": trace_id,
|
||||
"name": format!("v1.chat:{}", ev.provider),
|
||||
"input": serde_json::json!({
|
||||
"model": ev.model,
|
||||
"messages": ev.input,
|
||||
}),
|
||||
"metadata": serde_json::json!({
|
||||
"provider": ev.provider,
|
||||
"think": ev.think,
|
||||
}),
|
||||
batch: vec![IngestionEvent {
|
||||
id: uuid_v7_like(),
|
||||
timestamp: sp.end_time.clone(),
|
||||
kind: "span-create",
|
||||
body: serde_json::json!({
|
||||
"id": span_id,
|
||||
"traceId": sp.trace_id,
|
||||
"name": format!("iterate.attempt[{}]", sp.iteration),
|
||||
"input": serde_json::json!({
|
||||
"iteration": sp.iteration,
|
||||
"model": sp.model,
|
||||
"provider": sp.provider,
|
||||
"prompt": truncate(&sp.prompt, 4000),
|
||||
}),
|
||||
},
|
||||
IngestionEvent {
|
||||
id: uuid_v7_like(),
|
||||
timestamp: ev.end_time.clone(),
|
||||
kind: "generation-create",
|
||||
body: serde_json::json!({
|
||||
"id": gen_id,
|
||||
"traceId": trace_id,
|
||||
"name": "chat",
|
||||
"model": ev.model,
|
||||
"modelParameters": serde_json::json!({
|
||||
"temperature": ev.temperature,
|
||||
"max_tokens": ev.max_tokens,
|
||||
"think": ev.think,
|
||||
}),
|
||||
"input": ev.input,
|
||||
"output": ev.output,
|
||||
"usage": serde_json::json!({
|
||||
"input": ev.prompt_tokens,
|
||||
"output": ev.completion_tokens,
|
||||
"total": ev.prompt_tokens + ev.completion_tokens,
|
||||
"unit": "TOKENS",
|
||||
}),
|
||||
"startTime": ev.start_time,
|
||||
"endTime": ev.end_time,
|
||||
"metadata": serde_json::json!({
|
||||
"provider": ev.provider,
|
||||
"latency_ms": ev.latency_ms,
|
||||
}),
|
||||
"output": serde_json::json!({
|
||||
"verdict": sp.verdict,
|
||||
"error": sp.error,
|
||||
"raw": truncate(&sp.raw, 4000),
|
||||
}),
|
||||
},
|
||||
],
|
||||
"level": level,
|
||||
"startTime": sp.start_time,
|
||||
"endTime": sp.end_time,
|
||||
}),
|
||||
}],
|
||||
};
|
||||
self.post_batch(batch).await
|
||||
}
|
||||
|
||||
async fn post_batch(&self, batch: IngestionBatch) -> Result<(), String> {
|
||||
let url = format!("{}{}", self.inner.base_url.trim_end_matches('/'), INGESTION_PATH);
|
||||
let resp = self.inner.http
|
||||
.post(url)
|
||||
@ -146,6 +137,81 @@ impl LangfuseClient {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn emit_chat_inner(&self, ev: ChatTrace) -> Result<(), String> {
|
||||
// When the caller forwarded a parent trace id (via the
|
||||
// X-Lakehouse-Trace-Id header → V1State plumbing), attach the
|
||||
// generation as a child of that trace. Without a parent we
|
||||
// mint a new top-level trace per call (Phase 40 default).
|
||||
let trace_id = ev.parent_trace_id.clone().unwrap_or_else(uuid_v7_like);
|
||||
let nested = ev.parent_trace_id.is_some();
|
||||
let gen_id = uuid_v7_like();
|
||||
let trace_ts = ev.start_time.clone();
|
||||
|
||||
let mut events = Vec::with_capacity(2);
|
||||
if !nested {
|
||||
// Only mint a fresh trace-create when we don't have a parent.
|
||||
// Reusing a parent trace id without re-creating it is the
|
||||
// contract that lets validatord's iterate-session show up
|
||||
// as one tree in Langfuse.
|
||||
events.push(IngestionEvent {
|
||||
id: uuid_v7_like(),
|
||||
timestamp: trace_ts.clone(),
|
||||
kind: "trace-create",
|
||||
body: serde_json::json!({
|
||||
"id": trace_id,
|
||||
"name": format!("v1.chat:{}", ev.provider),
|
||||
"input": serde_json::json!({
|
||||
"model": ev.model,
|
||||
"messages": ev.input,
|
||||
}),
|
||||
"metadata": serde_json::json!({
|
||||
"provider": ev.provider,
|
||||
"think": ev.think,
|
||||
}),
|
||||
}),
|
||||
});
|
||||
}
|
||||
events.push(IngestionEvent {
|
||||
id: uuid_v7_like(),
|
||||
timestamp: ev.end_time.clone(),
|
||||
kind: "generation-create",
|
||||
body: serde_json::json!({
|
||||
"id": gen_id,
|
||||
"traceId": trace_id,
|
||||
"name": "chat",
|
||||
"model": ev.model,
|
||||
"modelParameters": serde_json::json!({
|
||||
"temperature": ev.temperature,
|
||||
"max_tokens": ev.max_tokens,
|
||||
"think": ev.think,
|
||||
}),
|
||||
"input": ev.input,
|
||||
"output": ev.output,
|
||||
"usage": serde_json::json!({
|
||||
"input": ev.prompt_tokens,
|
||||
"output": ev.completion_tokens,
|
||||
"total": ev.prompt_tokens + ev.completion_tokens,
|
||||
"unit": "TOKENS",
|
||||
}),
|
||||
"startTime": ev.start_time,
|
||||
"endTime": ev.end_time,
|
||||
"metadata": serde_json::json!({
|
||||
"provider": ev.provider,
|
||||
"latency_ms": ev.latency_ms,
|
||||
}),
|
||||
}),
|
||||
});
|
||||
|
||||
self.post_batch(IngestionBatch { batch: events }).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Truncate a string to at most `n` chars (NOT bytes). Matches the Go
|
||||
/// `trim` helper used in session log + attempt-span emission so an
|
||||
/// operator reading two cross-runtime traces sees the same boundary.
|
||||
fn truncate(s: &str, n: usize) -> String {
|
||||
s.chars().take(n).collect()
|
||||
}
|
||||
|
||||
/// Everything the v1.chat handler collects for one completed call.
|
||||
@ -162,6 +228,32 @@ pub struct ChatTrace {
|
||||
pub start_time: String,
|
||||
pub end_time: String,
|
||||
pub latency_ms: u64,
|
||||
/// When set, attach this chat trace as a child of the named
|
||||
/// Langfuse trace instead of starting a new top-level trace. Used
|
||||
/// by `/v1/iterate` to nest its inner /v1/chat hops under the
|
||||
/// iterate-session trace so a multi-call session shows in
|
||||
/// Langfuse as ONE trace tree, not N+1 disconnected traces.
|
||||
/// Matches the Go-side `X-Lakehouse-Trace-Id` propagation
|
||||
/// (commit d6d2fdf in golangLAKEHOUSE).
|
||||
pub parent_trace_id: Option<String>,
|
||||
}
|
||||
|
||||
/// One iteration attempt inside `/v1/iterate`'s loop. Becomes one
|
||||
/// span on the parent trace when emitted via `emit_attempt_span`.
|
||||
/// Matches Go's `validator.AttemptSpan` shape so the cross-runtime
|
||||
/// observability surface is consistent.
|
||||
pub struct AttemptSpan {
|
||||
pub trace_id: String,
|
||||
pub iteration: u32,
|
||||
pub model: String,
|
||||
pub provider: String,
|
||||
pub prompt: String,
|
||||
pub raw: String,
|
||||
/// Verdict kind: "no_json" | "validation_failed" | "accepted"
|
||||
pub verdict: String,
|
||||
pub error: Option<String>,
|
||||
pub start_time: String,
|
||||
pub end_time: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
|
||||
@ -21,6 +21,7 @@ pub mod opencode;
|
||||
pub mod validate;
|
||||
pub mod iterate;
|
||||
pub mod langfuse_trace;
|
||||
pub mod session_log;
|
||||
pub mod mode;
|
||||
pub mod respond;
|
||||
pub mod truth;
|
||||
@ -83,6 +84,13 @@ pub struct V1State {
|
||||
/// disabled (keys missing or container unreachable). Traces are
|
||||
/// fire-and-forget: never block the response path.
|
||||
pub langfuse: Option<langfuse_trace::LangfuseClient>,
|
||||
/// Coordinator session JSONL writer (path from
|
||||
/// `[gateway].session_log_path`). One row per `/v1/iterate`
|
||||
/// session for offline DuckDB analysis. None = disabled.
|
||||
/// Cross-runtime parity with the Go-side `validatord`
|
||||
/// `[validatord].session_log_path` (commit 1a3a82a in
|
||||
/// golangLAKEHOUSE).
|
||||
pub session_log: Option<session_log::SessionLogger>,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Serialize)]
|
||||
@ -361,6 +369,7 @@ mod resolve_provider_tests {
|
||||
|
||||
async fn chat(
|
||||
State(state): State<V1State>,
|
||||
headers: axum::http::HeaderMap,
|
||||
Json(req): Json<ChatRequest>,
|
||||
) -> Result<Json<ChatResponse>, (StatusCode, String)> {
|
||||
if req.messages.is_empty() {
|
||||
@ -490,6 +499,17 @@ async fn chat(
|
||||
let output = resp.choices.first()
|
||||
.map(|c| c.message.text())
|
||||
.unwrap_or_default();
|
||||
// Cross-runtime trace linkage. When a caller (validatord on
|
||||
// Go side, /v1/iterate on Rust side) forwards a parent trace
|
||||
// id via X-Lakehouse-Trace-Id, attach this generation to that
|
||||
// trace so the iterate session and its inner chat hops show
|
||||
// up as ONE trace tree in Langfuse. Header name matches the
|
||||
// Go-side `shared.TraceIDHeader` constant byte-for-byte.
|
||||
let parent_trace_id = headers
|
||||
.get(crate::v1::iterate::TRACE_ID_HEADER)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|s| s.to_string())
|
||||
.filter(|s| !s.is_empty());
|
||||
lf.emit_chat(langfuse_trace::ChatTrace {
|
||||
provider: used_provider.clone(),
|
||||
model: resp.model.clone(),
|
||||
@ -503,6 +523,7 @@ async fn chat(
|
||||
start_time: start_time.to_rfc3339(),
|
||||
end_time: end_time.to_rfc3339(),
|
||||
latency_ms,
|
||||
parent_trace_id,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
235
crates/gateway/src/v1/session_log.rs
Normal file
235
crates/gateway/src/v1/session_log.rs
Normal file
@ -0,0 +1,235 @@
|
||||
//! Coordinator session JSONL writer — Rust parity with the Go-side
|
||||
//! `internal/validator/session_log.go` (commit 1a3a82a in
|
||||
//! golangLAKEHOUSE). Same schema, same field names, same producer
|
||||
//! semantics, so a unified longitudinal log can pull from either
|
||||
//! runtime via DuckDB.
|
||||
//!
|
||||
//! Schema: `session.iterate.v1`. One row per `/v1/iterate` session.
|
||||
//! Append-only. Best-effort posture: errors warn and the iterate
|
||||
//! response always ships.
|
||||
//!
|
||||
//! See `golangLAKEHOUSE/docs/SESSION_LOG.md` for the full schema
|
||||
//! reference + DuckDB query examples. This module produces rows
|
||||
//! with `daemon: "gateway"`; the Go side produces `daemon:
|
||||
//! "validatord"`. Operators who want a unified stream can point both
|
||||
//! to the same path (the OS write-append is atomic for the row sizes
|
||||
//! we produce) or query both files together via duckdb's `read_json`
|
||||
//! glob support.
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
pub const SESSION_RECORD_SCHEMA: &str = "session.iterate.v1";
|
||||
|
||||
/// One row in coordinator_sessions.jsonl. Field names are the on-wire
|
||||
/// names — must stay byte-equal to the Go side's
|
||||
/// `validator.SessionRecord` (proven by the cross-runtime parity
|
||||
/// probe at golangLAKEHOUSE/scripts/cutover/parity/).
|
||||
// Deserialize is supported so the parity helper binary can round-trip
|
||||
// fixture inputs through serde without hand-rolling a parser. Production
|
||||
// emit path uses Serialize only; SessionRecord rows are written by the
|
||||
// gateway and consumed by DuckDB / external tooling, never re-read by us.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct SessionRecord {
|
||||
pub schema: String,
|
||||
pub session_id: String,
|
||||
pub timestamp: String,
|
||||
pub daemon: String,
|
||||
pub kind: String,
|
||||
pub model: String,
|
||||
pub provider: String,
|
||||
pub prompt: String,
|
||||
pub iterations: u32,
|
||||
pub max_iterations: u32,
|
||||
pub final_verdict: String, // "accepted" | "max_iter_exhausted" | "infra_error"
|
||||
pub attempts: Vec<SessionAttemptRecord>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub artifact: Option<serde_json::Value>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub grounded_in_roster: Option<bool>,
|
||||
pub duration_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct SessionAttemptRecord {
|
||||
pub iteration: u32,
|
||||
pub verdict_kind: String, // "no_json" | "validation_failed" | "accepted" | "infra_error"
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub error: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub span_id: Option<String>,
|
||||
}
|
||||
|
||||
/// Append-only writer. Cloneable handle — internal state is Arc'd so
|
||||
/// V1State can keep its own clone and per-request clones are cheap.
|
||||
#[derive(Clone)]
|
||||
pub struct SessionLogger {
|
||||
inner: Arc<Inner>,
|
||||
}
|
||||
|
||||
struct Inner {
|
||||
path: String,
|
||||
/// tokio::Mutex (not std) because we hold it across the async
|
||||
/// fs write. Contention is low (one row per /v1/iterate session).
|
||||
mu: Mutex<()>,
|
||||
}
|
||||
|
||||
impl SessionLogger {
|
||||
/// Construct a logger writing to `path`. Empty path → None
|
||||
/// (skip the wiring in the iterate handler entirely).
|
||||
pub fn from_path(path: &str) -> Option<Self> {
|
||||
if path.is_empty() {
|
||||
return None;
|
||||
}
|
||||
Some(Self {
|
||||
inner: Arc::new(Inner {
|
||||
path: path.to_string(),
|
||||
mu: Mutex::new(()),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
/// Append one record. Best-effort: failures land in `tracing::warn!`
|
||||
/// and the caller sees Ok(()) — observability is a witness, never
|
||||
/// a gate. Returns Err only on impossible cases the type system
|
||||
/// can't rule out (here: serde_json::to_string failing on a
|
||||
/// well-formed struct, which shouldn't happen).
|
||||
pub async fn append(&self, rec: SessionRecord) {
|
||||
let body = match serde_json::to_string(&rec) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
tracing::warn!(target: "v1.session_log", "marshal: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
let _guard = self.inner.mu.lock().await;
|
||||
if let Err(e) = self.write(&body).await {
|
||||
tracing::warn!(target: "v1.session_log", "write {}: {e}", self.inner.path);
|
||||
}
|
||||
}
|
||||
|
||||
async fn write(&self, body: &str) -> std::io::Result<()> {
|
||||
use tokio::fs::OpenOptions;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
// Lazy mkdir on first write so a not-yet-mounted volume at
|
||||
// startup doesn't kill the daemon.
|
||||
if let Some(parent) = std::path::Path::new(&self.inner.path).parent() {
|
||||
if !parent.as_os_str().is_empty() {
|
||||
tokio::fs::create_dir_all(parent).await?;
|
||||
}
|
||||
}
|
||||
let mut f = OpenOptions::new()
|
||||
.append(true)
|
||||
.create(true)
|
||||
.open(&self.inner.path)
|
||||
.await?;
|
||||
f.write_all(body.as_bytes()).await?;
|
||||
f.write_all(b"\n").await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Best-effort UTF-8 char truncation. Matches Go's `trim` helper so
|
||||
/// rows produced by either runtime cap fields at the same boundaries.
|
||||
pub fn truncate(s: &str, n: usize) -> String {
|
||||
s.chars().take(n).collect()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs;
|
||||
|
||||
fn fixture_record(session_id: &str) -> SessionRecord {
|
||||
SessionRecord {
|
||||
schema: SESSION_RECORD_SCHEMA.to_string(),
|
||||
session_id: session_id.to_string(),
|
||||
timestamp: "2026-05-02T08:00:00Z".to_string(),
|
||||
daemon: "gateway".to_string(),
|
||||
kind: "fill".to_string(),
|
||||
model: "qwen3.5:latest".to_string(),
|
||||
provider: "ollama".to_string(),
|
||||
prompt: "produce a fill artifact".to_string(),
|
||||
iterations: 1,
|
||||
max_iterations: 3,
|
||||
final_verdict: "accepted".to_string(),
|
||||
attempts: vec![SessionAttemptRecord {
|
||||
iteration: 0,
|
||||
verdict_kind: "accepted".to_string(),
|
||||
error: None,
|
||||
span_id: Some("span-0".to_string()),
|
||||
}],
|
||||
artifact: Some(serde_json::json!({"fills":[{"candidate_id":"W-1"}]})),
|
||||
grounded_in_roster: Some(true),
|
||||
duration_ms: 50,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn from_path_empty_returns_none() {
|
||||
assert!(SessionLogger::from_path("").is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn append_writes_jsonl_row_with_schema_field() {
|
||||
let dir = tempdir();
|
||||
let path = dir.join("sessions.jsonl");
|
||||
let path_str = path.to_string_lossy().to_string();
|
||||
let logger = SessionLogger::from_path(&path_str).unwrap();
|
||||
logger.append(fixture_record("trace-a")).await;
|
||||
|
||||
let body = fs::read_to_string(&path).await.unwrap();
|
||||
assert!(body.contains("\"schema\":\"session.iterate.v1\""));
|
||||
assert!(body.contains("\"session_id\":\"trace-a\""));
|
||||
assert!(body.contains("\"grounded_in_roster\":true"));
|
||||
assert!(body.ends_with('\n'));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn append_concurrent_safe() {
|
||||
let dir = tempdir();
|
||||
let path = dir.join("sessions.jsonl");
|
||||
let path_str = path.to_string_lossy().to_string();
|
||||
let logger = SessionLogger::from_path(&path_str).unwrap();
|
||||
|
||||
let n = 32;
|
||||
let mut handles = Vec::with_capacity(n);
|
||||
for i in 0..n {
|
||||
let l = logger.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
l.append(fixture_record(&format!("trace-{i}"))).await;
|
||||
}));
|
||||
}
|
||||
for h in handles {
|
||||
h.await.unwrap();
|
||||
}
|
||||
|
||||
let body = fs::read_to_string(&path).await.unwrap();
|
||||
let lines: Vec<_> = body.lines().filter(|l| !l.is_empty()).collect();
|
||||
assert_eq!(lines.len(), n, "expected {n} rows, got {}", lines.len());
|
||||
// Every row must round-trip through serde — a torn write
|
||||
// would surface as a parse error.
|
||||
for line in lines {
|
||||
let _: serde_json::Value = serde_json::from_str(line).expect("valid json per row");
|
||||
}
|
||||
}
|
||||
|
||||
fn tempdir() -> PathBuf {
|
||||
// Per-test unique path so prior runs don't pollute the next.
|
||||
// The static counter increments across the whole test binary,
|
||||
// so back-to-back tests in the same module get distinct dirs.
|
||||
static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
|
||||
let n = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
let p = std::env::temp_dir().join(format!(
|
||||
"session_log_test_{}_{}_{}",
|
||||
std::process::id(),
|
||||
chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
|
||||
n,
|
||||
));
|
||||
std::fs::create_dir_all(&p).unwrap();
|
||||
p
|
||||
}
|
||||
}
|
||||
@ -62,6 +62,15 @@ pub struct GatewayConfig {
|
||||
pub host: String,
|
||||
#[serde(default = "default_gateway_port")]
|
||||
pub port: u16,
|
||||
/// Coordinator session JSONL output path. One row per
|
||||
/// `/v1/iterate` session, schema=`session.iterate.v1`. Empty =
|
||||
/// disabled. Cross-runtime parity with the Go side's
|
||||
/// `[validatord].session_log_path` (added 2026-05-02). Default
|
||||
/// empty so existing deployments aren't perturbed; production
|
||||
/// sets `/var/lib/lakehouse/gateway/sessions.jsonl`. See
|
||||
/// `golangLAKEHOUSE/docs/SESSION_LOG.md` for query examples.
|
||||
#[serde(default)]
|
||||
pub session_log_path: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
@ -190,7 +199,11 @@ impl Config {
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
gateway: GatewayConfig { host: default_host(), port: default_gateway_port() },
|
||||
gateway: GatewayConfig {
|
||||
host: default_host(),
|
||||
port: default_gateway_port(),
|
||||
session_log_path: String::new(),
|
||||
},
|
||||
storage: StorageConfig {
|
||||
root: default_storage_root(),
|
||||
profile_root: default_profile_root(),
|
||||
|
||||
@ -3,6 +3,11 @@
|
||||
[gateway]
|
||||
host = "0.0.0.0"
|
||||
port = 3100
|
||||
# Coordinator session JSONL — one row per /v1/iterate session for
|
||||
# offline DuckDB analysis. Cross-runtime parity with the Go-side
|
||||
# [validatord].session_log_path. Empty = disabled. Production:
|
||||
# session_log_path = "/var/lib/lakehouse/gateway/sessions.jsonl"
|
||||
session_log_path = ""
|
||||
|
||||
[storage]
|
||||
root = "./data"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user