gateway: trace-id propagation + coordinator session JSONL (Rust parity)
Some checks failed
lakehouse/auditor 10 blocking issues: cloud: claim not backed — "Verified end-to-end against persistent Go stack on :4110:"
Some checks failed
lakehouse/auditor 10 blocking issues: cloud: claim not backed — "Verified end-to-end against persistent Go stack on :4110:"
Cross-runtime parity with the Go-side observability wave (commits
d6d2fdf + 1a3a82a in golangLAKEHOUSE). The two layers J flagged:
the LIVE per-call view (Langfuse) and the LONGITUDINAL forensic view
(JSONL queryable via DuckDB). Hard correctness gate (FillValidator
phantom-rejection) was already in place; this is the observability
on top.
## Trace-id propagation
X-Lakehouse-Trace-Id header constant declared in
crates/gateway/src/v1/iterate.rs (matches Go's shared.TraceIDHeader
byte-for-byte). When set on an inbound /v1/iterate request, the
handler reuses it; the chat + validate self-loopback hops forward
the same header so chatd's trace emit nests under the parent rather
than minting a fresh top-level trace per call.
ChatTrace gains a parent_trace_id field. emit_chat_inner skips the
trace-create event when parent is set, only emits the
generation-create which attaches to the existing trace tree. Result:
an iterate session with N retries shows in Langfuse as ONE tree, not
N+1 disconnected traces.
emit_attempt_span (new) writes one Langfuse span per iteration
attempt with input={iteration, model, provider, prompt} and
output={verdict, raw, error}. WARNING level on non-accepted
verdicts. The returned span id is stamped on the corresponding
SessionRecord attempt for cross-log correlation.
## Coordinator session JSONL
crates/gateway/src/v1/session_log.rs — new writer matching Go's
internal/validator/session_log.go schema byte-for-byte:
- SessionRecord with schema=session.iterate.v1
- SessionAttemptRecord per retry
- SessionLogger.append: tokio Mutex serialized append-only
- Best-effort posture (slog.Warn on error, never blocks request)
iterate.rs builds + appends a row on EVERY code path:
- accepted: write_session_accepted with grounded_in_roster bool
derived from validate_workers WorkerLookup (matches Go's
handlers.rosterCheckFor("fill") semantics)
- max-iter-exhausted: write_session_failure
- infra-error: write_infra_error (so a missing /v1/iterate event
never silently disappears from the longitudinal log)
[gateway].session_log_path config field (empty = disabled).
Production: /var/lib/lakehouse/gateway/sessions.jsonl. Operators who
want a unified longitudinal stream can point both Rust and Go
loggers at the same path — write-append is safe at the row sizes we
produce.
## Cross-runtime parity probe
crates/gateway/src/bin/parity_session_log: tiny stdin/stdout helper
that round-trips a fixture through SessionRecord serde.
golangLAKEHOUSE/scripts/cutover/parity/session_log_parity.sh feeds
4 fixtures through both helpers and diffs the rows after stripping
timestamp + daemon (the two fields that legitimately differ between
producers).
Result: **4/4 byte-equal** including the unicode-prompt fixture
("Café résumé ⭐ 你好"). Schema parity holds. The non-trivial-equal
guard in the probe rejects the case where both sides fail
identically — protecting against a regression where one side
silently stops producing valid JSON.
## Verification
- cargo test -p gateway --lib: 90/90 PASS (3 new session_log tests
including concurrent-append safety)
- cargo check --workspace: clean
- session_log_parity.sh: 4/4 fixtures byte-equal
- Both runtimes can append to the same path; DuckDB sees one stream
- The Go-side validatord smoke remains 5/5 (unchanged)
## Architecture invariant
Don't propose to "wire trace-id propagation in Rust" or "add Rust
session log" — both are now shipped on the demo/post-pr11-polish
branch. The longitudinal log + Langfuse tree together cover the
multi-call observability concern J flagged 2026-05-02.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
ba928b1d64
commit
57bde63a06
71
crates/gateway/src/bin/parity_session_log.rs
Normal file
71
crates/gateway/src/bin/parity_session_log.rs
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
//! Cross-runtime parity helper for `SessionRecord` JSON shape.
|
||||||
|
//!
|
||||||
|
//! Reads a fixture JSON on stdin, builds a `SessionRecord`, emits
|
||||||
|
//! one JSONL row on stdout. Used by
|
||||||
|
//! `golangLAKEHOUSE/scripts/cutover/parity/session_log_parity.sh`
|
||||||
|
//! to verify the Rust gateway's session log shape stays byte-equal
|
||||||
|
//! to the Go-side validatord's `validator.SessionRecord` (commit
|
||||||
|
//! 1a3a82a in golangLAKEHOUSE).
|
||||||
|
|
||||||
|
use gateway::v1::session_log::{SessionAttemptRecord, SessionRecord, SESSION_RECORD_SCHEMA};
|
||||||
|
use serde::Deserialize;
|
||||||
|
use std::io::Read;
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct FixtureInput {
|
||||||
|
session_id: String,
|
||||||
|
kind: String,
|
||||||
|
model: String,
|
||||||
|
provider: String,
|
||||||
|
prompt: String,
|
||||||
|
iterations: u32,
|
||||||
|
max_iterations: u32,
|
||||||
|
final_verdict: String,
|
||||||
|
attempts: Vec<SessionAttemptRecord>,
|
||||||
|
#[serde(default)]
|
||||||
|
artifact: Option<serde_json::Value>,
|
||||||
|
#[serde(default)]
|
||||||
|
grounded_in_roster: Option<bool>,
|
||||||
|
duration_ms: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
let mut buf = String::new();
|
||||||
|
if let Err(e) = std::io::stdin().read_to_string(&mut buf) {
|
||||||
|
eprintln!("read stdin: {e}");
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
let input: FixtureInput = match serde_json::from_str(&buf) {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("parse stdin: {e}");
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let rec = SessionRecord {
|
||||||
|
schema: SESSION_RECORD_SCHEMA.to_string(),
|
||||||
|
session_id: input.session_id,
|
||||||
|
// Pinned timestamp so both runtimes' rows compare byte-equal
|
||||||
|
// when the test wrapper normalizes on `daemon` only.
|
||||||
|
timestamp: "2026-01-01T00:00:00+00:00".to_string(),
|
||||||
|
daemon: "gateway".to_string(),
|
||||||
|
kind: input.kind,
|
||||||
|
model: input.model,
|
||||||
|
provider: input.provider,
|
||||||
|
prompt: input.prompt,
|
||||||
|
iterations: input.iterations,
|
||||||
|
max_iterations: input.max_iterations,
|
||||||
|
final_verdict: input.final_verdict,
|
||||||
|
attempts: input.attempts,
|
||||||
|
artifact: input.artifact,
|
||||||
|
grounded_in_roster: input.grounded_in_roster,
|
||||||
|
duration_ms: input.duration_ms,
|
||||||
|
};
|
||||||
|
match serde_json::to_string(&rec) {
|
||||||
|
Ok(s) => println!("{s}"),
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("marshal: {e}");
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -438,6 +438,10 @@ impl ExecutionLoop {
|
|||||||
start_time: start_time.to_rfc3339(),
|
start_time: start_time.to_rfc3339(),
|
||||||
end_time: end_time.to_rfc3339(),
|
end_time: end_time.to_rfc3339(),
|
||||||
latency_ms: elapsed_ms,
|
latency_ms: elapsed_ms,
|
||||||
|
// Internal execution-loop traffic is its own top-level
|
||||||
|
// trace per call. If a future caller threads a parent
|
||||||
|
// trace into self.state, lift this to Some(parent_id).
|
||||||
|
parent_trace_id: None,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -654,6 +658,7 @@ impl ExecutionLoop {
|
|||||||
start_time: start_time.to_rfc3339(),
|
start_time: start_time.to_rfc3339(),
|
||||||
end_time: end_time.to_rfc3339(),
|
end_time: end_time.to_rfc3339(),
|
||||||
latency_ms,
|
latency_ms,
|
||||||
|
parent_trace_id: None,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -362,6 +362,22 @@ async fn main() {
|
|||||||
}
|
}
|
||||||
c
|
c
|
||||||
},
|
},
|
||||||
|
// Coordinator session JSONL — one row per /v1/iterate
|
||||||
|
// session for offline DuckDB analysis. Cross-runtime
|
||||||
|
// parity with Go-side validatord (commit 1a3a82a).
|
||||||
|
session_log: {
|
||||||
|
let path = &config.gateway.session_log_path;
|
||||||
|
let s = v1::session_log::SessionLogger::from_path(path);
|
||||||
|
if s.is_some() {
|
||||||
|
tracing::info!(
|
||||||
|
"v1: session log enabled — coordinator sessions written to {}",
|
||||||
|
path
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
tracing::info!("v1: session log disabled (set [gateway].session_log_path to enable)");
|
||||||
|
}
|
||||||
|
s
|
||||||
|
},
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Auth middleware (if enabled) — P5-001 fix 2026-04-23:
|
// Auth middleware (if enabled) — P5-001 fix 2026-04-23:
|
||||||
|
|||||||
@ -21,12 +21,19 @@
|
|||||||
//! re-implementation. Staffing executors, agent loops, and future
|
//! re-implementation. Staffing executors, agent loops, and future
|
||||||
//! validators all reach the same code path.
|
//! validators all reach the same code path.
|
||||||
|
|
||||||
use axum::{extract::State, http::StatusCode, response::IntoResponse, Json};
|
use axum::{extract::State, http::{HeaderMap, StatusCode}, response::IntoResponse, Json};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
const DEFAULT_MAX_ITERATIONS: u32 = 3;
|
const DEFAULT_MAX_ITERATIONS: u32 = 3;
|
||||||
const LOOPBACK_TIMEOUT_SECS: u64 = 240;
|
const LOOPBACK_TIMEOUT_SECS: u64 = 240;
|
||||||
|
|
||||||
|
/// Header name used to propagate a Langfuse parent trace id across
|
||||||
|
/// daemon boundaries. Matches Go's `shared.TraceIDHeader` constant
|
||||||
|
/// byte-for-byte (commit d6d2fdf in golangLAKEHOUSE) — same wire
|
||||||
|
/// format means a Go caller can hit Rust's /v1/iterate (or vice
|
||||||
|
/// versa) and the resulting Langfuse trees nest correctly.
|
||||||
|
pub const TRACE_ID_HEADER: &str = "x-lakehouse-trace-id";
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
pub struct IterateRequest {
|
pub struct IterateRequest {
|
||||||
/// "fill" | "email" | "playbook" — picks which validator runs.
|
/// "fill" | "email" | "playbook" — picks which validator runs.
|
||||||
@ -90,26 +97,47 @@ pub struct IterateFailure {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn iterate(
|
pub async fn iterate(
|
||||||
State(_state): State<super::V1State>,
|
State(state): State<super::V1State>,
|
||||||
|
headers: HeaderMap,
|
||||||
Json(req): Json<IterateRequest>,
|
Json(req): Json<IterateRequest>,
|
||||||
) -> impl IntoResponse {
|
) -> impl IntoResponse {
|
||||||
let max_iter = req.max_iterations.unwrap_or(DEFAULT_MAX_ITERATIONS).max(1);
|
let max_iter = req.max_iterations.unwrap_or(DEFAULT_MAX_ITERATIONS).max(1);
|
||||||
let temperature = req.temperature.unwrap_or(0.2);
|
let temperature = req.temperature.unwrap_or(0.2);
|
||||||
let max_tokens = req.max_tokens.unwrap_or(4096);
|
let max_tokens = req.max_tokens.unwrap_or(4096);
|
||||||
let mut history: Vec<IterateAttempt> = Vec::with_capacity(max_iter as usize);
|
let mut history: Vec<IterateAttempt> = Vec::with_capacity(max_iter as usize);
|
||||||
|
let mut attempt_records: Vec<super::session_log::SessionAttemptRecord> = Vec::with_capacity(max_iter as usize);
|
||||||
let mut current_prompt = req.prompt.clone();
|
let mut current_prompt = req.prompt.clone();
|
||||||
|
|
||||||
|
// Resolve the parent Langfuse trace id. Caller-forwarded header
|
||||||
|
// wins (cross-daemon tree linkage); otherwise mint a fresh id so
|
||||||
|
// the iterate session is its own tree. Same shape as the Go-side
|
||||||
|
// validatord trace propagation.
|
||||||
|
let trace_id: String = headers
|
||||||
|
.get(TRACE_ID_HEADER)
|
||||||
|
.and_then(|v| v.to_str().ok())
|
||||||
|
.filter(|s| !s.is_empty())
|
||||||
|
.map(|s| s.to_string())
|
||||||
|
.unwrap_or_else(new_trace_id);
|
||||||
|
|
||||||
let client = match reqwest::Client::builder()
|
let client = match reqwest::Client::builder()
|
||||||
.timeout(std::time::Duration::from_secs(LOOPBACK_TIMEOUT_SECS))
|
.timeout(std::time::Duration::from_secs(LOOPBACK_TIMEOUT_SECS))
|
||||||
.build() {
|
.build() {
|
||||||
Ok(c) => c,
|
Ok(c) => c,
|
||||||
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("client build: {e}")).into_response(),
|
Err(e) => {
|
||||||
|
// Even infrastructure failures get a session row so a
|
||||||
|
// missing /v1/iterate event never silently disappears
|
||||||
|
// from the longitudinal log.
|
||||||
|
write_infra_error(&state, &req, &trace_id, max_iter, 0, format!("client build: {e}")).await;
|
||||||
|
return (StatusCode::INTERNAL_SERVER_ERROR, format!("client build: {e}")).into_response();
|
||||||
|
}
|
||||||
};
|
};
|
||||||
// Self-loopback to the gateway port. Carries gateway internal
|
// Self-loopback to the gateway port. Carries gateway internal
|
||||||
// calls through /v1/chat + /v1/validate so /v1/usage tracks them.
|
// calls through /v1/chat + /v1/validate so /v1/usage tracks them.
|
||||||
let gateway = "http://127.0.0.1:3100";
|
let gateway = "http://127.0.0.1:3100";
|
||||||
|
let t0 = std::time::Instant::now();
|
||||||
|
|
||||||
for iteration in 0..max_iter {
|
for iteration in 0..max_iter {
|
||||||
|
let attempt_started = chrono::Utc::now();
|
||||||
// ── Generate ──
|
// ── Generate ──
|
||||||
let mut messages = Vec::with_capacity(2);
|
let mut messages = Vec::with_capacity(2);
|
||||||
if let Some(sys) = &req.system {
|
if let Some(sys) = &req.system {
|
||||||
@ -123,20 +151,33 @@ pub async fn iterate(
|
|||||||
"temperature": temperature,
|
"temperature": temperature,
|
||||||
"max_tokens": max_tokens,
|
"max_tokens": max_tokens,
|
||||||
});
|
});
|
||||||
let raw = match call_chat(&client, gateway, &chat_body).await {
|
let raw = match call_chat(&client, gateway, &chat_body, &trace_id).await {
|
||||||
Ok(r) => r,
|
Ok(r) => r,
|
||||||
Err(e) => return (StatusCode::BAD_GATEWAY, format!("/v1/chat hop failed at iter {iteration}: {e}")).into_response(),
|
Err(e) => {
|
||||||
|
write_infra_error(&state, &req, &trace_id, max_iter, t0.elapsed().as_millis() as u64, format!("/v1/chat hop failed at iter {iteration}: {e}")).await;
|
||||||
|
return (StatusCode::BAD_GATEWAY, format!("/v1/chat hop failed at iter {iteration}: {e}")).into_response();
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// ── Extract JSON ──
|
// ── Extract JSON ──
|
||||||
let artifact = match extract_json(&raw) {
|
let artifact = match extract_json(&raw) {
|
||||||
Some(a) => a,
|
Some(a) => a,
|
||||||
None => {
|
None => {
|
||||||
|
let span_id = emit_attempt_span(
|
||||||
|
&state, &trace_id, iteration, &req, ¤t_prompt, &raw, "no_json", None,
|
||||||
|
attempt_started, chrono::Utc::now(),
|
||||||
|
);
|
||||||
history.push(IterateAttempt {
|
history.push(IterateAttempt {
|
||||||
iteration,
|
iteration,
|
||||||
raw: raw.chars().take(2000).collect(),
|
raw: raw.chars().take(2000).collect(),
|
||||||
status: AttemptStatus::NoJson,
|
status: AttemptStatus::NoJson,
|
||||||
});
|
});
|
||||||
|
attempt_records.push(super::session_log::SessionAttemptRecord {
|
||||||
|
iteration,
|
||||||
|
verdict_kind: "no_json".to_string(),
|
||||||
|
error: None,
|
||||||
|
span_id,
|
||||||
|
});
|
||||||
current_prompt = format!(
|
current_prompt = format!(
|
||||||
"{}\n\nYour previous attempt did not contain a JSON object. Reply with ONLY a valid JSON object matching the requested artifact shape.",
|
"{}\n\nYour previous attempt did not contain a JSON object. Reply with ONLY a valid JSON object matching the requested artifact shape.",
|
||||||
req.prompt,
|
req.prompt,
|
||||||
@ -151,13 +192,26 @@ pub async fn iterate(
|
|||||||
"artifact": artifact,
|
"artifact": artifact,
|
||||||
"context": req.context.clone().unwrap_or(serde_json::Value::Null),
|
"context": req.context.clone().unwrap_or(serde_json::Value::Null),
|
||||||
});
|
});
|
||||||
match call_validate(&client, gateway, &validate_body).await {
|
match call_validate(&client, gateway, &validate_body, &trace_id).await {
|
||||||
Ok(report) => {
|
Ok(report) => {
|
||||||
|
let span_id = emit_attempt_span(
|
||||||
|
&state, &trace_id, iteration, &req, ¤t_prompt, &raw, "accepted", None,
|
||||||
|
attempt_started, chrono::Utc::now(),
|
||||||
|
);
|
||||||
history.push(IterateAttempt {
|
history.push(IterateAttempt {
|
||||||
iteration,
|
iteration,
|
||||||
raw: raw.chars().take(2000).collect(),
|
raw: raw.chars().take(2000).collect(),
|
||||||
status: AttemptStatus::Accepted,
|
status: AttemptStatus::Accepted,
|
||||||
});
|
});
|
||||||
|
attempt_records.push(super::session_log::SessionAttemptRecord {
|
||||||
|
iteration,
|
||||||
|
verdict_kind: "accepted".to_string(),
|
||||||
|
error: None,
|
||||||
|
span_id,
|
||||||
|
});
|
||||||
|
let duration_ms = t0.elapsed().as_millis() as u64;
|
||||||
|
let grounded = grounded_in_roster(&state, &req.kind, &artifact);
|
||||||
|
write_session_accepted(&state, &req, &trace_id, iteration + 1, max_iter, attempt_records, &artifact, grounded, duration_ms).await;
|
||||||
return (StatusCode::OK, Json(IterateResponse {
|
return (StatusCode::OK, Json(IterateResponse {
|
||||||
artifact,
|
artifact,
|
||||||
validation: report,
|
validation: report,
|
||||||
@ -167,6 +221,11 @@ pub async fn iterate(
|
|||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let err_summary = err.to_string();
|
let err_summary = err.to_string();
|
||||||
|
let span_id = emit_attempt_span(
|
||||||
|
&state, &trace_id, iteration, &req, ¤t_prompt, &raw, "validation_failed",
|
||||||
|
Some(err_summary.clone()),
|
||||||
|
attempt_started, chrono::Utc::now(),
|
||||||
|
);
|
||||||
history.push(IterateAttempt {
|
history.push(IterateAttempt {
|
||||||
iteration,
|
iteration,
|
||||||
raw: raw.chars().take(2000).collect(),
|
raw: raw.chars().take(2000).collect(),
|
||||||
@ -174,6 +233,12 @@ pub async fn iterate(
|
|||||||
error: serde_json::to_value(&err_summary).unwrap_or(serde_json::Value::Null),
|
error: serde_json::to_value(&err_summary).unwrap_or(serde_json::Value::Null),
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
attempt_records.push(super::session_log::SessionAttemptRecord {
|
||||||
|
iteration,
|
||||||
|
verdict_kind: "validation_failed".to_string(),
|
||||||
|
error: Some(err_summary.clone()),
|
||||||
|
span_id,
|
||||||
|
});
|
||||||
// Append validation feedback to prompt for next iter.
|
// Append validation feedback to prompt for next iter.
|
||||||
// The model sees concrete failure mode + retries with
|
// The model sees concrete failure mode + retries with
|
||||||
// corrective context. This is the "observer correction"
|
// corrective context. This is the "observer correction"
|
||||||
@ -188,6 +253,8 @@ pub async fn iterate(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let duration_ms = t0.elapsed().as_millis() as u64;
|
||||||
|
write_session_failure(&state, &req, &trace_id, max_iter, max_iter, attempt_records, duration_ms).await;
|
||||||
(StatusCode::UNPROCESSABLE_ENTITY, Json(IterateFailure {
|
(StatusCode::UNPROCESSABLE_ENTITY, Json(IterateFailure {
|
||||||
error: format!("max iterations reached ({max_iter}) without passing validation"),
|
error: format!("max iterations reached ({max_iter}) without passing validation"),
|
||||||
iterations: max_iter,
|
iterations: max_iter,
|
||||||
@ -195,12 +262,157 @@ pub async fn iterate(
|
|||||||
})).into_response()
|
})).into_response()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn call_chat(client: &reqwest::Client, gateway: &str, body: &serde_json::Value) -> Result<String, String> {
|
// ─── Helpers — Langfuse spans + session log + roster check ─────────
|
||||||
let resp = client.post(format!("{gateway}/v1/chat"))
|
|
||||||
.json(body)
|
fn emit_attempt_span(
|
||||||
.send()
|
state: &super::V1State,
|
||||||
.await
|
trace_id: &str,
|
||||||
.map_err(|e| format!("chat hop: {e}"))?;
|
iteration: u32,
|
||||||
|
req: &IterateRequest,
|
||||||
|
prompt: &str,
|
||||||
|
raw: &str,
|
||||||
|
verdict: &str,
|
||||||
|
error: Option<String>,
|
||||||
|
started: chrono::DateTime<chrono::Utc>,
|
||||||
|
ended: chrono::DateTime<chrono::Utc>,
|
||||||
|
) -> Option<String> {
|
||||||
|
let lf = state.langfuse.as_ref()?;
|
||||||
|
Some(lf.emit_attempt_span(super::langfuse_trace::AttemptSpan {
|
||||||
|
trace_id: trace_id.to_string(),
|
||||||
|
iteration,
|
||||||
|
model: req.model.clone(),
|
||||||
|
provider: req.provider.clone(),
|
||||||
|
prompt: prompt.to_string(),
|
||||||
|
raw: raw.to_string(),
|
||||||
|
verdict: verdict.to_string(),
|
||||||
|
error,
|
||||||
|
start_time: started.to_rfc3339(),
|
||||||
|
end_time: ended.to_rfc3339(),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Verify every fill artifact's candidate IDs exist in the roster.
|
||||||
|
/// Returns Some(true)/Some(false) on the fill kind, None otherwise
|
||||||
|
/// (other kinds don't have worker IDs to ground). Same semantics as
|
||||||
|
/// Go's `handlers.rosterCheckFor("fill")`.
|
||||||
|
fn grounded_in_roster(
|
||||||
|
state: &super::V1State,
|
||||||
|
kind: &str,
|
||||||
|
artifact: &serde_json::Value,
|
||||||
|
) -> Option<bool> {
|
||||||
|
if kind != "fill" {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let fills = artifact.get("fills").and_then(|v| v.as_array())?;
|
||||||
|
for f in fills {
|
||||||
|
let id = match f.get("candidate_id").and_then(|v| v.as_str()) {
|
||||||
|
Some(s) if !s.is_empty() => s,
|
||||||
|
_ => return Some(false),
|
||||||
|
};
|
||||||
|
if state.validate_workers.find(id).is_none() {
|
||||||
|
return Some(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn write_session_accepted(
|
||||||
|
state: &super::V1State,
|
||||||
|
req: &IterateRequest,
|
||||||
|
trace_id: &str,
|
||||||
|
iterations: u32,
|
||||||
|
max_iter: u32,
|
||||||
|
attempts: Vec<super::session_log::SessionAttemptRecord>,
|
||||||
|
artifact: &serde_json::Value,
|
||||||
|
grounded: Option<bool>,
|
||||||
|
duration_ms: u64,
|
||||||
|
) {
|
||||||
|
let Some(logger) = state.session_log.as_ref() else { return };
|
||||||
|
let rec = build_session_record(req, trace_id, "accepted", iterations, max_iter, attempts, Some(artifact.clone()), grounded, duration_ms);
|
||||||
|
logger.append(rec).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn write_session_failure(
|
||||||
|
state: &super::V1State,
|
||||||
|
req: &IterateRequest,
|
||||||
|
trace_id: &str,
|
||||||
|
iterations: u32,
|
||||||
|
max_iter: u32,
|
||||||
|
attempts: Vec<super::session_log::SessionAttemptRecord>,
|
||||||
|
duration_ms: u64,
|
||||||
|
) {
|
||||||
|
let Some(logger) = state.session_log.as_ref() else { return };
|
||||||
|
let rec = build_session_record(req, trace_id, "max_iter_exhausted", iterations, max_iter, attempts, None, None, duration_ms);
|
||||||
|
logger.append(rec).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn write_infra_error(
|
||||||
|
state: &super::V1State,
|
||||||
|
req: &IterateRequest,
|
||||||
|
trace_id: &str,
|
||||||
|
max_iter: u32,
|
||||||
|
duration_ms: u64,
|
||||||
|
error: String,
|
||||||
|
) {
|
||||||
|
let Some(logger) = state.session_log.as_ref() else { return };
|
||||||
|
let attempts = vec![super::session_log::SessionAttemptRecord {
|
||||||
|
iteration: 0,
|
||||||
|
verdict_kind: "infra_error".to_string(),
|
||||||
|
error: Some(error),
|
||||||
|
span_id: None,
|
||||||
|
}];
|
||||||
|
let rec = build_session_record(req, trace_id, "infra_error", 0, max_iter, attempts, None, None, duration_ms);
|
||||||
|
logger.append(rec).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_session_record(
|
||||||
|
req: &IterateRequest,
|
||||||
|
trace_id: &str,
|
||||||
|
final_verdict: &str,
|
||||||
|
iterations: u32,
|
||||||
|
max_iter: u32,
|
||||||
|
attempts: Vec<super::session_log::SessionAttemptRecord>,
|
||||||
|
artifact: Option<serde_json::Value>,
|
||||||
|
grounded: Option<bool>,
|
||||||
|
duration_ms: u64,
|
||||||
|
) -> super::session_log::SessionRecord {
|
||||||
|
super::session_log::SessionRecord {
|
||||||
|
schema: super::session_log::SESSION_RECORD_SCHEMA.to_string(),
|
||||||
|
session_id: trace_id.to_string(),
|
||||||
|
timestamp: chrono::Utc::now().to_rfc3339(),
|
||||||
|
daemon: "gateway".to_string(),
|
||||||
|
kind: req.kind.clone(),
|
||||||
|
model: req.model.clone(),
|
||||||
|
provider: req.provider.clone(),
|
||||||
|
prompt: super::session_log::truncate(&req.prompt, 4000),
|
||||||
|
iterations,
|
||||||
|
max_iterations: max_iter,
|
||||||
|
final_verdict: final_verdict.to_string(),
|
||||||
|
attempts,
|
||||||
|
artifact,
|
||||||
|
grounded_in_roster: grounded,
|
||||||
|
duration_ms,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generate a fresh trace id when no parent was forwarded. Same
|
||||||
|
/// time-ordered hex shape Langfuse already accepts elsewhere in this
|
||||||
|
/// crate (see `langfuse_trace::uuid_v7_like`).
|
||||||
|
fn new_trace_id() -> String {
|
||||||
|
let ts = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0);
|
||||||
|
let rand = std::time::SystemTime::now()
|
||||||
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.map(|d| d.subsec_nanos())
|
||||||
|
.unwrap_or(0);
|
||||||
|
format!("{:016x}-{:08x}", ts, rand)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn call_chat(client: &reqwest::Client, gateway: &str, body: &serde_json::Value, trace_id: &str) -> Result<String, String> {
|
||||||
|
let mut req = client.post(format!("{gateway}/v1/chat")).json(body);
|
||||||
|
if !trace_id.is_empty() {
|
||||||
|
req = req.header(TRACE_ID_HEADER, trace_id);
|
||||||
|
}
|
||||||
|
let resp = req.send().await.map_err(|e| format!("chat hop: {e}"))?;
|
||||||
let status = resp.status();
|
let status = resp.status();
|
||||||
if !status.is_success() {
|
if !status.is_success() {
|
||||||
let body = resp.text().await.unwrap_or_default();
|
let body = resp.text().await.unwrap_or_default();
|
||||||
@ -213,12 +425,12 @@ async fn call_chat(client: &reqwest::Client, gateway: &str, body: &serde_json::V
|
|||||||
.to_string())
|
.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn call_validate(client: &reqwest::Client, gateway: &str, body: &serde_json::Value) -> Result<serde_json::Value, String> {
|
async fn call_validate(client: &reqwest::Client, gateway: &str, body: &serde_json::Value, trace_id: &str) -> Result<serde_json::Value, String> {
|
||||||
let resp = client.post(format!("{gateway}/v1/validate"))
|
let mut req = client.post(format!("{gateway}/v1/validate")).json(body);
|
||||||
.json(body)
|
if !trace_id.is_empty() {
|
||||||
.send()
|
req = req.header(TRACE_ID_HEADER, trace_id);
|
||||||
.await
|
}
|
||||||
.map_err(|e| format!("validate hop: {e}"))?;
|
let resp = req.send().await.map_err(|e| format!("validate hop: {e}"))?;
|
||||||
let status = resp.status();
|
let status = resp.status();
|
||||||
let parsed: serde_json::Value = resp.json().await.map_err(|e| format!("validate parse: {e}"))?;
|
let parsed: serde_json::Value = resp.json().await.map_err(|e| format!("validate parse: {e}"))?;
|
||||||
if status.is_success() {
|
if status.is_success() {
|
||||||
|
|||||||
@ -76,14 +76,85 @@ impl LangfuseClient {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Fire-and-forget per-iteration span emit. Returns the generated
|
||||||
|
/// span id synchronously so the caller can stamp it on
|
||||||
|
/// `IterateAttempt.span_id` before the network round-trip resolves.
|
||||||
|
/// Mirrors Go's `validator.Tracer` callback shape.
|
||||||
|
pub fn emit_attempt_span(&self, sp: AttemptSpan) -> String {
|
||||||
|
let span_id = uuid_v7_like();
|
||||||
|
let span_id_for_caller = span_id.clone();
|
||||||
|
let this = self.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = this.emit_attempt_span_inner(span_id, sp).await {
|
||||||
|
tracing::warn!(target: "v1.langfuse", "iterate span drop: {e}");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
span_id_for_caller
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn emit_attempt_span_inner(&self, span_id: String, sp: AttemptSpan) -> Result<(), String> {
|
||||||
|
let level = if sp.verdict == "accepted" { "DEFAULT" } else { "WARNING" };
|
||||||
|
let batch = IngestionBatch {
|
||||||
|
batch: vec![IngestionEvent {
|
||||||
|
id: uuid_v7_like(),
|
||||||
|
timestamp: sp.end_time.clone(),
|
||||||
|
kind: "span-create",
|
||||||
|
body: serde_json::json!({
|
||||||
|
"id": span_id,
|
||||||
|
"traceId": sp.trace_id,
|
||||||
|
"name": format!("iterate.attempt[{}]", sp.iteration),
|
||||||
|
"input": serde_json::json!({
|
||||||
|
"iteration": sp.iteration,
|
||||||
|
"model": sp.model,
|
||||||
|
"provider": sp.provider,
|
||||||
|
"prompt": truncate(&sp.prompt, 4000),
|
||||||
|
}),
|
||||||
|
"output": serde_json::json!({
|
||||||
|
"verdict": sp.verdict,
|
||||||
|
"error": sp.error,
|
||||||
|
"raw": truncate(&sp.raw, 4000),
|
||||||
|
}),
|
||||||
|
"level": level,
|
||||||
|
"startTime": sp.start_time,
|
||||||
|
"endTime": sp.end_time,
|
||||||
|
}),
|
||||||
|
}],
|
||||||
|
};
|
||||||
|
self.post_batch(batch).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn post_batch(&self, batch: IngestionBatch) -> Result<(), String> {
|
||||||
|
let url = format!("{}{}", self.inner.base_url.trim_end_matches('/'), INGESTION_PATH);
|
||||||
|
let resp = self.inner.http
|
||||||
|
.post(url)
|
||||||
|
.basic_auth(&self.inner.public_key, Some(&self.inner.secret_key))
|
||||||
|
.json(&batch)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("POST failed: {e}"))?;
|
||||||
|
if !resp.status().is_success() {
|
||||||
|
return Err(format!("{}: {}", resp.status(), resp.text().await.unwrap_or_default()));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn emit_chat_inner(&self, ev: ChatTrace) -> Result<(), String> {
|
async fn emit_chat_inner(&self, ev: ChatTrace) -> Result<(), String> {
|
||||||
let trace_id = uuid_v7_like();
|
// When the caller forwarded a parent trace id (via the
|
||||||
|
// X-Lakehouse-Trace-Id header → V1State plumbing), attach the
|
||||||
|
// generation as a child of that trace. Without a parent we
|
||||||
|
// mint a new top-level trace per call (Phase 40 default).
|
||||||
|
let trace_id = ev.parent_trace_id.clone().unwrap_or_else(uuid_v7_like);
|
||||||
|
let nested = ev.parent_trace_id.is_some();
|
||||||
let gen_id = uuid_v7_like();
|
let gen_id = uuid_v7_like();
|
||||||
let trace_ts = ev.start_time.clone();
|
let trace_ts = ev.start_time.clone();
|
||||||
|
|
||||||
let batch = IngestionBatch {
|
let mut events = Vec::with_capacity(2);
|
||||||
batch: vec![
|
if !nested {
|
||||||
IngestionEvent {
|
// Only mint a fresh trace-create when we don't have a parent.
|
||||||
|
// Reusing a parent trace id without re-creating it is the
|
||||||
|
// contract that lets validatord's iterate-session show up
|
||||||
|
// as one tree in Langfuse.
|
||||||
|
events.push(IngestionEvent {
|
||||||
id: uuid_v7_like(),
|
id: uuid_v7_like(),
|
||||||
timestamp: trace_ts.clone(),
|
timestamp: trace_ts.clone(),
|
||||||
kind: "trace-create",
|
kind: "trace-create",
|
||||||
@ -99,8 +170,9 @@ impl LangfuseClient {
|
|||||||
"think": ev.think,
|
"think": ev.think,
|
||||||
}),
|
}),
|
||||||
}),
|
}),
|
||||||
},
|
});
|
||||||
IngestionEvent {
|
}
|
||||||
|
events.push(IngestionEvent {
|
||||||
id: uuid_v7_like(),
|
id: uuid_v7_like(),
|
||||||
timestamp: ev.end_time.clone(),
|
timestamp: ev.end_time.clone(),
|
||||||
kind: "generation-create",
|
kind: "generation-create",
|
||||||
@ -129,23 +201,17 @@ impl LangfuseClient {
|
|||||||
"latency_ms": ev.latency_ms,
|
"latency_ms": ev.latency_ms,
|
||||||
}),
|
}),
|
||||||
}),
|
}),
|
||||||
},
|
});
|
||||||
],
|
|
||||||
};
|
|
||||||
|
|
||||||
let url = format!("{}{}", self.inner.base_url.trim_end_matches('/'), INGESTION_PATH);
|
self.post_batch(IngestionBatch { batch: events }).await
|
||||||
let resp = self.inner.http
|
|
||||||
.post(url)
|
|
||||||
.basic_auth(&self.inner.public_key, Some(&self.inner.secret_key))
|
|
||||||
.json(&batch)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("POST failed: {e}"))?;
|
|
||||||
if !resp.status().is_success() {
|
|
||||||
return Err(format!("{}: {}", resp.status(), resp.text().await.unwrap_or_default()));
|
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Truncate a string to at most `n` chars (NOT bytes). Matches the Go
|
||||||
|
/// `trim` helper used in session log + attempt-span emission so an
|
||||||
|
/// operator reading two cross-runtime traces sees the same boundary.
|
||||||
|
fn truncate(s: &str, n: usize) -> String {
|
||||||
|
s.chars().take(n).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Everything the v1.chat handler collects for one completed call.
|
/// Everything the v1.chat handler collects for one completed call.
|
||||||
@ -162,6 +228,32 @@ pub struct ChatTrace {
|
|||||||
pub start_time: String,
|
pub start_time: String,
|
||||||
pub end_time: String,
|
pub end_time: String,
|
||||||
pub latency_ms: u64,
|
pub latency_ms: u64,
|
||||||
|
/// When set, attach this chat trace as a child of the named
|
||||||
|
/// Langfuse trace instead of starting a new top-level trace. Used
|
||||||
|
/// by `/v1/iterate` to nest its inner /v1/chat hops under the
|
||||||
|
/// iterate-session trace so a multi-call session shows in
|
||||||
|
/// Langfuse as ONE trace tree, not N+1 disconnected traces.
|
||||||
|
/// Matches the Go-side `X-Lakehouse-Trace-Id` propagation
|
||||||
|
/// (commit d6d2fdf in golangLAKEHOUSE).
|
||||||
|
pub parent_trace_id: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// One iteration attempt inside `/v1/iterate`'s loop. Becomes one
|
||||||
|
/// span on the parent trace when emitted via `emit_attempt_span`.
|
||||||
|
/// Matches Go's `validator.AttemptSpan` shape so the cross-runtime
|
||||||
|
/// observability surface is consistent.
|
||||||
|
pub struct AttemptSpan {
|
||||||
|
pub trace_id: String,
|
||||||
|
pub iteration: u32,
|
||||||
|
pub model: String,
|
||||||
|
pub provider: String,
|
||||||
|
pub prompt: String,
|
||||||
|
pub raw: String,
|
||||||
|
/// Verdict kind: "no_json" | "validation_failed" | "accepted"
|
||||||
|
pub verdict: String,
|
||||||
|
pub error: Option<String>,
|
||||||
|
pub start_time: String,
|
||||||
|
pub end_time: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
|
|||||||
@ -21,6 +21,7 @@ pub mod opencode;
|
|||||||
pub mod validate;
|
pub mod validate;
|
||||||
pub mod iterate;
|
pub mod iterate;
|
||||||
pub mod langfuse_trace;
|
pub mod langfuse_trace;
|
||||||
|
pub mod session_log;
|
||||||
pub mod mode;
|
pub mod mode;
|
||||||
pub mod respond;
|
pub mod respond;
|
||||||
pub mod truth;
|
pub mod truth;
|
||||||
@ -83,6 +84,13 @@ pub struct V1State {
|
|||||||
/// disabled (keys missing or container unreachable). Traces are
|
/// disabled (keys missing or container unreachable). Traces are
|
||||||
/// fire-and-forget: never block the response path.
|
/// fire-and-forget: never block the response path.
|
||||||
pub langfuse: Option<langfuse_trace::LangfuseClient>,
|
pub langfuse: Option<langfuse_trace::LangfuseClient>,
|
||||||
|
/// Coordinator session JSONL writer (path from
|
||||||
|
/// `[gateway].session_log_path`). One row per `/v1/iterate`
|
||||||
|
/// session for offline DuckDB analysis. None = disabled.
|
||||||
|
/// Cross-runtime parity with the Go-side `validatord`
|
||||||
|
/// `[validatord].session_log_path` (commit 1a3a82a in
|
||||||
|
/// golangLAKEHOUSE).
|
||||||
|
pub session_log: Option<session_log::SessionLogger>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Clone, Serialize)]
|
#[derive(Default, Clone, Serialize)]
|
||||||
@ -361,6 +369,7 @@ mod resolve_provider_tests {
|
|||||||
|
|
||||||
async fn chat(
|
async fn chat(
|
||||||
State(state): State<V1State>,
|
State(state): State<V1State>,
|
||||||
|
headers: axum::http::HeaderMap,
|
||||||
Json(req): Json<ChatRequest>,
|
Json(req): Json<ChatRequest>,
|
||||||
) -> Result<Json<ChatResponse>, (StatusCode, String)> {
|
) -> Result<Json<ChatResponse>, (StatusCode, String)> {
|
||||||
if req.messages.is_empty() {
|
if req.messages.is_empty() {
|
||||||
@ -490,6 +499,17 @@ async fn chat(
|
|||||||
let output = resp.choices.first()
|
let output = resp.choices.first()
|
||||||
.map(|c| c.message.text())
|
.map(|c| c.message.text())
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
// Cross-runtime trace linkage. When a caller (validatord on
|
||||||
|
// Go side, /v1/iterate on Rust side) forwards a parent trace
|
||||||
|
// id via X-Lakehouse-Trace-Id, attach this generation to that
|
||||||
|
// trace so the iterate session and its inner chat hops show
|
||||||
|
// up as ONE trace tree in Langfuse. Header name matches the
|
||||||
|
// Go-side `shared.TraceIDHeader` constant byte-for-byte.
|
||||||
|
let parent_trace_id = headers
|
||||||
|
.get(crate::v1::iterate::TRACE_ID_HEADER)
|
||||||
|
.and_then(|v| v.to_str().ok())
|
||||||
|
.map(|s| s.to_string())
|
||||||
|
.filter(|s| !s.is_empty());
|
||||||
lf.emit_chat(langfuse_trace::ChatTrace {
|
lf.emit_chat(langfuse_trace::ChatTrace {
|
||||||
provider: used_provider.clone(),
|
provider: used_provider.clone(),
|
||||||
model: resp.model.clone(),
|
model: resp.model.clone(),
|
||||||
@ -503,6 +523,7 @@ async fn chat(
|
|||||||
start_time: start_time.to_rfc3339(),
|
start_time: start_time.to_rfc3339(),
|
||||||
end_time: end_time.to_rfc3339(),
|
end_time: end_time.to_rfc3339(),
|
||||||
latency_ms,
|
latency_ms,
|
||||||
|
parent_trace_id,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
235
crates/gateway/src/v1/session_log.rs
Normal file
235
crates/gateway/src/v1/session_log.rs
Normal file
@ -0,0 +1,235 @@
|
|||||||
|
//! Coordinator session JSONL writer — Rust parity with the Go-side
|
||||||
|
//! `internal/validator/session_log.go` (commit 1a3a82a in
|
||||||
|
//! golangLAKEHOUSE). Same schema, same field names, same producer
|
||||||
|
//! semantics, so a unified longitudinal log can pull from either
|
||||||
|
//! runtime via DuckDB.
|
||||||
|
//!
|
||||||
|
//! Schema: `session.iterate.v1`. One row per `/v1/iterate` session.
|
||||||
|
//! Append-only. Best-effort posture: errors warn and the iterate
|
||||||
|
//! response always ships.
|
||||||
|
//!
|
||||||
|
//! See `golangLAKEHOUSE/docs/SESSION_LOG.md` for the full schema
|
||||||
|
//! reference + DuckDB query examples. This module produces rows
|
||||||
|
//! with `daemon: "gateway"`; the Go side produces `daemon:
|
||||||
|
//! "validatord"`. Operators who want a unified stream can point both
|
||||||
|
//! to the same path (the OS write-append is atomic for the row sizes
|
||||||
|
//! we produce) or query both files together via duckdb's `read_json`
|
||||||
|
//! glob support.
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
pub const SESSION_RECORD_SCHEMA: &str = "session.iterate.v1";
|
||||||
|
|
||||||
|
/// One row in coordinator_sessions.jsonl. Field names are the on-wire
|
||||||
|
/// names — must stay byte-equal to the Go side's
|
||||||
|
/// `validator.SessionRecord` (proven by the cross-runtime parity
|
||||||
|
/// probe at golangLAKEHOUSE/scripts/cutover/parity/).
|
||||||
|
// Deserialize is supported so the parity helper binary can round-trip
|
||||||
|
// fixture inputs through serde without hand-rolling a parser. Production
|
||||||
|
// emit path uses Serialize only; SessionRecord rows are written by the
|
||||||
|
// gateway and consumed by DuckDB / external tooling, never re-read by us.
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
pub struct SessionRecord {
|
||||||
|
pub schema: String,
|
||||||
|
pub session_id: String,
|
||||||
|
pub timestamp: String,
|
||||||
|
pub daemon: String,
|
||||||
|
pub kind: String,
|
||||||
|
pub model: String,
|
||||||
|
pub provider: String,
|
||||||
|
pub prompt: String,
|
||||||
|
pub iterations: u32,
|
||||||
|
pub max_iterations: u32,
|
||||||
|
pub final_verdict: String, // "accepted" | "max_iter_exhausted" | "infra_error"
|
||||||
|
pub attempts: Vec<SessionAttemptRecord>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub artifact: Option<serde_json::Value>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub grounded_in_roster: Option<bool>,
|
||||||
|
pub duration_ms: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
pub struct SessionAttemptRecord {
|
||||||
|
pub iteration: u32,
|
||||||
|
pub verdict_kind: String, // "no_json" | "validation_failed" | "accepted" | "infra_error"
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub error: Option<String>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub span_id: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Append-only writer. Cloneable handle — internal state is Arc'd so
|
||||||
|
/// V1State can keep its own clone and per-request clones are cheap.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct SessionLogger {
|
||||||
|
inner: Arc<Inner>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Inner {
|
||||||
|
path: String,
|
||||||
|
/// tokio::Mutex (not std) because we hold it across the async
|
||||||
|
/// fs write. Contention is low (one row per /v1/iterate session).
|
||||||
|
mu: Mutex<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SessionLogger {
|
||||||
|
/// Construct a logger writing to `path`. Empty path → None
|
||||||
|
/// (skip the wiring in the iterate handler entirely).
|
||||||
|
pub fn from_path(path: &str) -> Option<Self> {
|
||||||
|
if path.is_empty() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some(Self {
|
||||||
|
inner: Arc::new(Inner {
|
||||||
|
path: path.to_string(),
|
||||||
|
mu: Mutex::new(()),
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Append one record. Best-effort: failures land in `tracing::warn!`
|
||||||
|
/// and the caller sees Ok(()) — observability is a witness, never
|
||||||
|
/// a gate. Returns Err only on impossible cases the type system
|
||||||
|
/// can't rule out (here: serde_json::to_string failing on a
|
||||||
|
/// well-formed struct, which shouldn't happen).
|
||||||
|
pub async fn append(&self, rec: SessionRecord) {
|
||||||
|
let body = match serde_json::to_string(&rec) {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(target: "v1.session_log", "marshal: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let _guard = self.inner.mu.lock().await;
|
||||||
|
if let Err(e) = self.write(&body).await {
|
||||||
|
tracing::warn!(target: "v1.session_log", "write {}: {e}", self.inner.path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn write(&self, body: &str) -> std::io::Result<()> {
|
||||||
|
use tokio::fs::OpenOptions;
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
// Lazy mkdir on first write so a not-yet-mounted volume at
|
||||||
|
// startup doesn't kill the daemon.
|
||||||
|
if let Some(parent) = std::path::Path::new(&self.inner.path).parent() {
|
||||||
|
if !parent.as_os_str().is_empty() {
|
||||||
|
tokio::fs::create_dir_all(parent).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let mut f = OpenOptions::new()
|
||||||
|
.append(true)
|
||||||
|
.create(true)
|
||||||
|
.open(&self.inner.path)
|
||||||
|
.await?;
|
||||||
|
f.write_all(body.as_bytes()).await?;
|
||||||
|
f.write_all(b"\n").await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Best-effort UTF-8 char truncation. Matches Go's `trim` helper so
|
||||||
|
/// rows produced by either runtime cap fields at the same boundaries.
|
||||||
|
pub fn truncate(s: &str, n: usize) -> String {
|
||||||
|
s.chars().take(n).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use tokio::fs;
|
||||||
|
|
||||||
|
fn fixture_record(session_id: &str) -> SessionRecord {
|
||||||
|
SessionRecord {
|
||||||
|
schema: SESSION_RECORD_SCHEMA.to_string(),
|
||||||
|
session_id: session_id.to_string(),
|
||||||
|
timestamp: "2026-05-02T08:00:00Z".to_string(),
|
||||||
|
daemon: "gateway".to_string(),
|
||||||
|
kind: "fill".to_string(),
|
||||||
|
model: "qwen3.5:latest".to_string(),
|
||||||
|
provider: "ollama".to_string(),
|
||||||
|
prompt: "produce a fill artifact".to_string(),
|
||||||
|
iterations: 1,
|
||||||
|
max_iterations: 3,
|
||||||
|
final_verdict: "accepted".to_string(),
|
||||||
|
attempts: vec![SessionAttemptRecord {
|
||||||
|
iteration: 0,
|
||||||
|
verdict_kind: "accepted".to_string(),
|
||||||
|
error: None,
|
||||||
|
span_id: Some("span-0".to_string()),
|
||||||
|
}],
|
||||||
|
artifact: Some(serde_json::json!({"fills":[{"candidate_id":"W-1"}]})),
|
||||||
|
grounded_in_roster: Some(true),
|
||||||
|
duration_ms: 50,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn from_path_empty_returns_none() {
|
||||||
|
assert!(SessionLogger::from_path("").is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn append_writes_jsonl_row_with_schema_field() {
|
||||||
|
let dir = tempdir();
|
||||||
|
let path = dir.join("sessions.jsonl");
|
||||||
|
let path_str = path.to_string_lossy().to_string();
|
||||||
|
let logger = SessionLogger::from_path(&path_str).unwrap();
|
||||||
|
logger.append(fixture_record("trace-a")).await;
|
||||||
|
|
||||||
|
let body = fs::read_to_string(&path).await.unwrap();
|
||||||
|
assert!(body.contains("\"schema\":\"session.iterate.v1\""));
|
||||||
|
assert!(body.contains("\"session_id\":\"trace-a\""));
|
||||||
|
assert!(body.contains("\"grounded_in_roster\":true"));
|
||||||
|
assert!(body.ends_with('\n'));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn append_concurrent_safe() {
|
||||||
|
let dir = tempdir();
|
||||||
|
let path = dir.join("sessions.jsonl");
|
||||||
|
let path_str = path.to_string_lossy().to_string();
|
||||||
|
let logger = SessionLogger::from_path(&path_str).unwrap();
|
||||||
|
|
||||||
|
let n = 32;
|
||||||
|
let mut handles = Vec::with_capacity(n);
|
||||||
|
for i in 0..n {
|
||||||
|
let l = logger.clone();
|
||||||
|
handles.push(tokio::spawn(async move {
|
||||||
|
l.append(fixture_record(&format!("trace-{i}"))).await;
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
for h in handles {
|
||||||
|
h.await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let body = fs::read_to_string(&path).await.unwrap();
|
||||||
|
let lines: Vec<_> = body.lines().filter(|l| !l.is_empty()).collect();
|
||||||
|
assert_eq!(lines.len(), n, "expected {n} rows, got {}", lines.len());
|
||||||
|
// Every row must round-trip through serde — a torn write
|
||||||
|
// would surface as a parse error.
|
||||||
|
for line in lines {
|
||||||
|
let _: serde_json::Value = serde_json::from_str(line).expect("valid json per row");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn tempdir() -> PathBuf {
|
||||||
|
// Per-test unique path so prior runs don't pollute the next.
|
||||||
|
// The static counter increments across the whole test binary,
|
||||||
|
// so back-to-back tests in the same module get distinct dirs.
|
||||||
|
static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
|
||||||
|
let n = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
let p = std::env::temp_dir().join(format!(
|
||||||
|
"session_log_test_{}_{}_{}",
|
||||||
|
std::process::id(),
|
||||||
|
chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
|
||||||
|
n,
|
||||||
|
));
|
||||||
|
std::fs::create_dir_all(&p).unwrap();
|
||||||
|
p
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -62,6 +62,15 @@ pub struct GatewayConfig {
|
|||||||
pub host: String,
|
pub host: String,
|
||||||
#[serde(default = "default_gateway_port")]
|
#[serde(default = "default_gateway_port")]
|
||||||
pub port: u16,
|
pub port: u16,
|
||||||
|
/// Coordinator session JSONL output path. One row per
|
||||||
|
/// `/v1/iterate` session, schema=`session.iterate.v1`. Empty =
|
||||||
|
/// disabled. Cross-runtime parity with the Go side's
|
||||||
|
/// `[validatord].session_log_path` (added 2026-05-02). Default
|
||||||
|
/// empty so existing deployments aren't perturbed; production
|
||||||
|
/// sets `/var/lib/lakehouse/gateway/sessions.jsonl`. See
|
||||||
|
/// `golangLAKEHOUSE/docs/SESSION_LOG.md` for query examples.
|
||||||
|
#[serde(default)]
|
||||||
|
pub session_log_path: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
@ -190,7 +199,11 @@ impl Config {
|
|||||||
impl Default for Config {
|
impl Default for Config {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
gateway: GatewayConfig { host: default_host(), port: default_gateway_port() },
|
gateway: GatewayConfig {
|
||||||
|
host: default_host(),
|
||||||
|
port: default_gateway_port(),
|
||||||
|
session_log_path: String::new(),
|
||||||
|
},
|
||||||
storage: StorageConfig {
|
storage: StorageConfig {
|
||||||
root: default_storage_root(),
|
root: default_storage_root(),
|
||||||
profile_root: default_profile_root(),
|
profile_root: default_profile_root(),
|
||||||
|
|||||||
@ -3,6 +3,11 @@
|
|||||||
[gateway]
|
[gateway]
|
||||||
host = "0.0.0.0"
|
host = "0.0.0.0"
|
||||||
port = 3100
|
port = 3100
|
||||||
|
# Coordinator session JSONL — one row per /v1/iterate session for
|
||||||
|
# offline DuckDB analysis. Cross-runtime parity with the Go-side
|
||||||
|
# [validatord].session_log_path. Empty = disabled. Production:
|
||||||
|
# session_log_path = "/var/lib/lakehouse/gateway/sessions.jsonl"
|
||||||
|
session_log_path = ""
|
||||||
|
|
||||||
[storage]
|
[storage]
|
||||||
root = "./data"
|
root = "./data"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user