lakehouse/crates/ingestd/src/my_stream.rs
root 0d037cfac1 Phases 16.2 + L2 + 17 VRAM gate + MySQL + 18 Lance hybrid milestone
Five threads of work landing as one milestone — all individually
verified end-to-end against real data, full release build clean,
46 unit tests pass.

## Phase 16.2 / 16.5 — autotune agent + ingest triggers

`vectord::agent` is a long-running tokio task that watches the trial
journal and autonomously proposes + runs new HNSW configs. Distinct
from `autotune::run_autotune` (synchronous one-shot grid). Triggered
on POST /vectors/agent/enqueue/{idx} or by the periodic wake; ingest
paths now push DatasetAppended events when an index's source dataset
gets re-ingested. Rate-limited (max_trials_per_hour) and cooldown-
gated so it can't saturate Ollama under live load.

The proposer is ε-greedy around the current champion: with prob 0.25
sample random from full bounds, otherwise perturb champion ± small
delta on both axes. Dedup against history. Deterministic — RNG seeded
from history.len() so the same journal state proposes the same next
config (helps offline replay debugging).

`[agent]` config section in lakehouse.toml; opt-in via enabled=true.

## Federation Layer 2 — runtime bucket lifecycle + per-index scoping

`BucketRegistry.buckets` moved to `std::sync::RwLock<HashMap>` so
buckets can be added/removed after startup. POST /storage/buckets
provisions at runtime; DELETE /storage/buckets/{name} unregisters
(refuses primary/rescue with 403). Local-backend buckets get their
root directory auto-created.

`IndexMeta.bucket` (default "primary" via serde) records each index's
home bucket. `TrialJournal` and `PromotionRegistry` now hold
Arc<BucketRegistry> + IndexRegistry; they resolve target store per-
index via IndexMeta.bucket. PromotionRegistry::list_all scans every
bucket and dedups by index_name. Pre-federation indexes keep working
unchanged — they just default to primary.

`ModelProfile.bucket: Option<String>` declares per-profile artifact
home. POST /vectors/profile/{id}/activate auto-provisions the
profile's bucket under storage.profile_root if not yet registered.

EvalSets stay primary-only for now — noted gap, low-risk to extend
later with the same resolver pattern.

## Phase 17 — VRAM-aware two-profile gate

Sidecar gains POST /admin/unload (Ollama keep_alive=0 trick — forces
immediate VRAM release), POST /admin/preload (keep_alive=5m with
empty prompt, takes the slot warm), and GET /admin/vram (combines
nvidia-smi snapshot with Ollama /api/ps). Exposed via aibridge as
unload_model / preload_model / vram_snapshot.

`VectorState.active_profile` is the GPU-slot singleton —
Arc<RwLock<Option<ActiveProfileSlot>>>. activate_profile checks for
a previous profile with a different ollama_name and unloads it
before preloading the new one; same-model reactivations skip the
unload (Ollama no-ops). New routes: POST /vectors/profile/{id}/
deactivate (unload + clear slot), GET /vectors/profile/active.

Verified live: staffing-recruiter (qwen2.5) → docs-assistant
(mistral) swap freed qwen2.5 from VRAM and loaded mistral. nomic-
embed-text persists across swaps because both profiles use it —
free optimization that fell out of the design. Scoped search
correctly 403s cross-profile in both directions.

## MySQL streaming connector

`crates/ingestd/src/my_stream.rs` mirrors pg_stream.rs for MySQL.
Pure-rust `mysql_async` driver (default-features=false to avoid C
deps). Same OFFSET pagination, same Parquet-streaming write shape.
Type mapping per ADR-010: int/bigint → Int32/Int64, decimal/float
→ Float64, tinyint(1)/bool → Boolean, everything else → Utf8 with
fallback parsers for date/time/json/uuid via Display.

POST /ingest/mysql parallel to /ingest/db. Same PII auto-detection,
same lineage capture (source_system="mysql"), same agent-trigger
hook. `redact_dsn` generalized — was hardcoded to "postgresql://"
length, now works for any scheme://user:pass@host/path URL (latent
PII leak fix for MySQL DSNs).

Verified live against MariaDB on localhost: 10 rows × 9 columns of
test data round-tripped through datatypes int/varchar/decimal/
tinyint/datetime/text. PII detection auto-flagged name + email.
Aggregation queries through DataFusion match the source values
exactly.

## Phase 18 — Hybrid Parquet+HNSW ⊕ Lance backend (ADR-019)

`vectord-lance` is a new firewall crate. Lance pulls Arrow 57 and
DataFusion 52 — incompatible with the rest of the workspace's
Arrow 55 / DataFusion 47. The firewall isolates that dep tree:
public API uses only std types (Vec<f32>, Vec<String>, Hit, Row,
*Stats), so no Arrow types cross the crate boundary and nothing
propagates to vectord. The ADR-019 path that didn't ship until now.

`vectord::lance_backend::LanceRegistry` lazy-creates a
LanceVectorStore per index, resolving bucket → URI via the
conventional local-bucket layout. `IndexMeta.vector_backend` and
`ModelProfile.vector_backend` carry the choice (default Parquet so
existing indexes unchanged).

Six routes under /vectors/lance/*:
- migrate/{idx}: convert binary-blob Parquet → Lance FixedSizeList
- index/{idx}: build IVF_PQ
- search/{idx}: vector search (embed via sidecar)
- doc/{idx}/{doc_id}: random row fetch
- append/{idx}: native fragment append
- stats/{idx}: row count + index presence

Verified live on the real resumes_100k_v2 corpus (100K × 768d):
- Migrate: 0.57s
- Build IVF_PQ index: 16.2s (matches ADR-019 bench; 14× faster than
  HNSW's 230s for the same data)
- Search end-to-end (Ollama embed + Lance scan): 23-53ms
- Random doc_id fetch: 5-7ms (filter scan; faster than Parquet's
  ~35ms full-file scan, slower than the bench's 311us positional
  take — would close that gap with a scalar btree on doc_id)
- Append 100 rows: 3.3ms / +320KB on disk vs Parquet's required
  full ~330MB rewrite — the structural win
- Index survives append; both backends coexist cleanly

## Known follow-ups not in this milestone

- ModelProfile.vector_backend doesn't yet auto-route /vectors/profile/
  {id}/search to Lance; callers go through /vectors/lance/* directly
- Scalar btree on doc_id (closes the 5-7ms → ~300us gap)
- vectord-lance built default-features=false → no S3 yet
- IVF_PQ recall not measured (ADR-019 caveat) — needs a Lance-aware
  variant of the eval harness
- Watcher-path ingest doesn't push agent triggers (HTTP paths do)
- EvalSets still primary-only (federation gap)
- No PATCH endpoint to move an existing index between buckets
- The pre-existing storaged::append_log doctest fails to compile
  (malformed `{prefix}/` parses as code fence) — pre-existing bug,
  left for a focused fix

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 20:24:46 -05:00

411 lines
14 KiB
Rust

/// Streaming MySQL ingest.
///
/// Mirrors `pg_stream` for MySQL sources. Same OFFSET-paginated strategy,
/// same Parquet-streaming shape. Uses `mysql_async` (pure-rust) so we
/// don't need a C client library at build time.
///
/// Type mapping follows ADR-010 (default to string on ambiguity):
/// - Booleans (TINYINT(1)) and integer types map to Arrow Int32/Int64
/// - Floating point and decimals → Float64
/// - Everything else (text, varchar, json, date, timestamp) → Utf8
///
/// What's deliberately not supported (yet):
/// - TLS connections — `minimal` feature is plain TCP only. Upgrade
/// when a tenant actually needs it.
/// - Keyset pagination — OFFSET scans are O(N²) at multi-million-row
/// scale; same caveat as `pg_stream`.
/// - BINARY/BLOB columns — currently rendered as base64 or empty string
/// via Display-fallback.
use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use mysql_async::prelude::*;
use mysql_async::{Conn, Opts, Row, Value};
use parquet::arrow::ArrowWriter;
use std::sync::Arc;
/// Request shape for MySQL streaming ingest.
#[derive(Debug, Clone, serde::Deserialize)]
pub struct MyStreamRequest {
/// mysql://user:pass@host:port/db
pub dsn: String,
pub table: String,
#[serde(default)]
pub dataset_name: Option<String>,
/// Rows per fetch. Default 10_000.
#[serde(default)]
pub batch_size: Option<usize>,
/// Column to ORDER BY for stable pagination. If omitted, the first
/// column returned by the schema probe is used.
#[serde(default)]
pub order_by: Option<String>,
/// Hard cap on total rows (for sampling / previews).
#[serde(default)]
pub limit: Option<usize>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct MyStreamResult {
pub table: String,
pub rows: usize,
pub batches: usize,
pub columns: usize,
pub schema: Vec<String>,
pub parquet_bytes: u64,
pub duration_secs: f32,
}
/// Parsed DSN pieces. Kept local (rather than reusing pg's DbConfig) so
/// the MySQL connector doesn't depend on the PG path.
#[derive(Debug, Clone)]
pub struct MyConfig {
pub host: String,
pub port: u16,
pub user: String,
pub password: String,
pub database: String,
}
impl MyConfig {
/// Build a mysql_async `Opts` from the parsed config.
pub fn to_opts(&self) -> Opts {
let url = if self.password.is_empty() {
format!(
"mysql://{}@{}:{}/{}",
percent(&self.user), percent(&self.host), self.port, percent(&self.database),
)
} else {
format!(
"mysql://{}:{}@{}:{}/{}",
percent(&self.user), percent(&self.password),
percent(&self.host), self.port, percent(&self.database),
)
};
Opts::from_url(&url).expect("MyConfig.to_opts: rebuilt URL should parse")
}
}
/// Minimal URL-encoder for the few characters that commonly appear in
/// MySQL passwords. mysql_async's URL parser expects valid URL-encoded
/// components.
fn percent(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for c in s.chars() {
match c {
'@' | ':' | '/' | '?' | '#' | '[' | ']' | ' ' | '%' => {
out.push_str(&format!("%{:02X}", c as u32));
}
_ => out.push(c),
}
}
out
}
/// Parse a mysql:// DSN.
/// Supports: mysql://[user[:password]@]host[:port][/db]
pub fn parse_dsn(dsn: &str) -> Result<MyConfig, String> {
let rest = dsn
.strip_prefix("mysql://")
.ok_or_else(|| "DSN must start with mysql://".to_string())?;
let (auth_host, database) = match rest.split_once('/') {
Some((ah, db)) => (ah, db.split('?').next().unwrap_or(db).to_string()),
None => (rest, String::new()),
};
let (userpass, hostport) = match auth_host.rsplit_once('@') {
Some((up, hp)) => (Some(up), hp),
None => (None, auth_host),
};
let (user, password) = match userpass {
Some(up) => match up.split_once(':') {
Some((u, p)) => (u.to_string(), p.to_string()),
None => (up.to_string(), String::new()),
},
None => ("root".to_string(), String::new()),
};
let (host, port) = match hostport.rsplit_once(':') {
Some((h, p)) => (
h.to_string(),
p.parse::<u16>().map_err(|_| format!("invalid port in DSN: {p}"))?,
),
None => (hostport.to_string(), 3306),
};
if host.is_empty() {
return Err("DSN has no host".into());
}
if database.is_empty() {
return Err("DSN has no database path (mysql://... /db)".into());
}
Ok(MyConfig { host, port, user, password, database })
}
/// Stream a MySQL table as Parquet bytes.
pub async fn stream_table_to_parquet(
req: &MyStreamRequest,
) -> Result<(bytes::Bytes, MyStreamResult), String> {
let t0 = std::time::Instant::now();
let config = parse_dsn(&req.dsn)?;
let batch_size = req.batch_size.unwrap_or(10_000).max(1);
let mut conn = Conn::new(config.to_opts()).await
.map_err(|e| format!("mysql connect: {e}"))?;
// Probe columns via information_schema — gives real type names that
// we can map to Arrow dtypes without needing to fetch a row first.
let columns = probe_columns(&mut conn, &config.database, &req.table).await?;
if columns.is_empty() {
return Err(format!("table '{}' not found or has no columns", req.table));
}
let arrow_fields: Vec<Field> = columns
.iter()
.map(|(name, ty)| Field::new(name, mysql_type_to_arrow(ty), true))
.collect();
let schema = Arc::new(Schema::new(arrow_fields));
let schema_report: Vec<String> = columns
.iter()
.map(|(n, t)| format!("{}:{}", n, t))
.collect();
let order_col = req.order_by.clone().unwrap_or_else(|| columns[0].0.clone());
let mut buf: Vec<u8> = Vec::with_capacity(1024 * 1024);
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)
.map_err(|e| format!("ArrowWriter init: {e}"))?;
let mut total_rows: usize = 0;
let mut batch_count: usize = 0;
let row_cap = req.limit.unwrap_or(usize::MAX);
loop {
let remaining = row_cap.saturating_sub(total_rows);
if remaining == 0 { break; }
let fetch = remaining.min(batch_size);
// Backticks are MySQL's identifier quote. Forbid backticks in
// table/column names to prevent injection — neither pg nor us
// should be accepting those anyway.
if req.table.contains('`') || order_col.contains('`') {
return Err("table or order_by column contains backticks — refused".into());
}
let sql = format!(
"SELECT * FROM `{}` ORDER BY `{}` LIMIT {} OFFSET {}",
req.table, order_col, fetch, total_rows,
);
let rows: Vec<Row> = conn.query(&sql).await
.map_err(|e| format!("fetch batch at offset {total_rows}: {e}"))?;
if rows.is_empty() { break; }
let n = rows.len();
let arrays: Vec<ArrayRef> = columns
.iter()
.enumerate()
.map(|(idx, (_, ty))| rows_to_column(&rows, idx, ty))
.collect::<Result<_, _>>()?;
let batch = RecordBatch::try_new(schema.clone(), arrays)
.map_err(|e| format!("RecordBatch: {e}"))?;
writer.write(&batch).map_err(|e| format!("ArrowWriter::write: {e}"))?;
total_rows += n;
batch_count += 1;
tracing::info!(
"mysql stream '{}': fetched batch {} ({} rows, total {})",
req.table, batch_count, n, total_rows,
);
if n < fetch { break; }
}
writer.close().map_err(|e| format!("ArrowWriter::close: {e}"))?;
conn.disconnect().await.ok();
let result = MyStreamResult {
table: req.table.clone(),
rows: total_rows,
batches: batch_count,
columns: columns.len(),
schema: schema_report,
parquet_bytes: buf.len() as u64,
duration_secs: t0.elapsed().as_secs_f32(),
};
Ok((bytes::Bytes::from(buf), result))
}
async fn probe_columns(
conn: &mut Conn,
schema: &str,
table: &str,
) -> Result<Vec<(String, String)>, String> {
let sql = format!(
"SELECT COLUMN_NAME, DATA_TYPE FROM information_schema.columns \
WHERE table_schema = '{}' AND table_name = '{}' \
ORDER BY ORDINAL_POSITION",
schema.replace('\'', "''"),
table.replace('\'', "''"),
);
let rows: Vec<(String, String)> = conn.query(&sql).await
.map_err(|e| format!("probe columns: {e}"))?;
Ok(rows)
}
/// MySQL data type string → Arrow DataType. Conservative: anything we
/// don't recognize becomes Utf8 (ADR-010).
fn mysql_type_to_arrow(ty: &str) -> DataType {
match ty.to_lowercase().as_str() {
"tinyint" | "smallint" | "mediumint" | "int" | "integer" => DataType::Int32,
"bigint" => DataType::Int64,
"float" | "double" | "decimal" | "numeric" | "real" => DataType::Float64,
"bit" | "bool" | "boolean" => DataType::Boolean,
_ => DataType::Utf8,
}
}
/// Convert a single column slice of MySQL rows into an Arrow array.
fn rows_to_column(
rows: &[Row],
idx: usize,
ty: &str,
) -> Result<ArrayRef, String> {
let arrow_ty = mysql_type_to_arrow(ty);
match arrow_ty {
DataType::Boolean => {
let v: Vec<Option<bool>> = rows.iter().map(|r| cell_as_bool(r, idx)).collect();
Ok(Arc::new(BooleanArray::from(v)))
}
DataType::Int32 => {
let v: Vec<Option<i32>> = rows.iter().map(|r| cell_as_i64(r, idx).map(|n| n as i32)).collect();
Ok(Arc::new(Int32Array::from(v)))
}
DataType::Int64 => {
let v: Vec<Option<i64>> = rows.iter().map(|r| cell_as_i64(r, idx)).collect();
Ok(Arc::new(Int64Array::from(v)))
}
DataType::Float64 => {
let v: Vec<Option<f64>> = rows.iter().map(|r| cell_as_f64(r, idx)).collect();
Ok(Arc::new(Float64Array::from(v)))
}
_ => {
let v: Vec<Option<String>> = rows.iter().map(|r| cell_as_string(r, idx)).collect();
Ok(Arc::new(StringArray::from(v)))
}
}
}
fn cell(r: &Row, idx: usize) -> &Value { r.as_ref(idx).unwrap_or(&Value::NULL) }
fn cell_as_bool(r: &Row, idx: usize) -> Option<bool> {
match cell(r, idx) {
Value::NULL => None,
Value::Int(n) => Some(*n != 0),
Value::UInt(n) => Some(*n != 0),
Value::Bytes(b) => std::str::from_utf8(b).ok().and_then(|s| {
match s.to_ascii_lowercase().as_str() {
"true" | "1" | "y" | "yes" => Some(true),
"false" | "0" | "n" | "no" => Some(false),
_ => None,
}
}),
_ => None,
}
}
fn cell_as_i64(r: &Row, idx: usize) -> Option<i64> {
match cell(r, idx) {
Value::NULL => None,
Value::Int(n) => Some(*n),
Value::UInt(n) => i64::try_from(*n).ok(),
Value::Float(f) => Some(*f as i64),
Value::Double(f) => Some(*f as i64),
Value::Bytes(b) => std::str::from_utf8(b).ok().and_then(|s| s.parse().ok()),
_ => None,
}
}
fn cell_as_f64(r: &Row, idx: usize) -> Option<f64> {
match cell(r, idx) {
Value::NULL => None,
Value::Int(n) => Some(*n as f64),
Value::UInt(n) => Some(*n as f64),
Value::Float(f) => Some(*f as f64),
Value::Double(f) => Some(*f),
Value::Bytes(b) => std::str::from_utf8(b).ok().and_then(|s| s.parse().ok()),
_ => None,
}
}
fn cell_as_string(r: &Row, idx: usize) -> Option<String> {
match cell(r, idx) {
Value::NULL => None,
Value::Bytes(b) => Some(String::from_utf8_lossy(b).into_owned()),
Value::Int(n) => Some(n.to_string()),
Value::UInt(n) => Some(n.to_string()),
Value::Float(f) => Some(f.to_string()),
Value::Double(f) => Some(f.to_string()),
Value::Date(y, mo, d, h, mi, s, _us) => {
Some(format!("{:04}-{:02}-{:02} {:02}:{:02}:{:02}", y, mo, d, h, mi, s))
}
Value::Time(neg, days, h, mi, s, _us) => {
let sign = if *neg { "-" } else { "" };
Some(format!("{}{}d {:02}:{:02}:{:02}", sign, days, h, mi, s))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_dsn_full() {
let c = parse_dsn("mysql://daisy:secret@db.example.com:3307/my_db").unwrap();
assert_eq!(c.host, "db.example.com");
assert_eq!(c.port, 3307);
assert_eq!(c.user, "daisy");
assert_eq!(c.password, "secret");
assert_eq!(c.database, "my_db");
}
#[test]
fn parse_dsn_default_port() {
let c = parse_dsn("mysql://root@localhost/shop").unwrap();
assert_eq!(c.port, 3306);
assert_eq!(c.user, "root");
assert_eq!(c.password, "");
}
#[test]
fn parse_dsn_no_auth() {
let c = parse_dsn("mysql://127.0.0.1:3306/analytics").unwrap();
assert_eq!(c.user, "root");
assert_eq!(c.host, "127.0.0.1");
}
#[test]
fn parse_dsn_rejects_non_mysql() {
assert!(parse_dsn("postgresql://host/db").is_err());
}
#[test]
fn parse_dsn_requires_db() {
assert!(parse_dsn("mysql://localhost:3306").is_err());
}
#[test]
fn type_map_basics() {
assert_eq!(mysql_type_to_arrow("int"), DataType::Int32);
assert_eq!(mysql_type_to_arrow("BIGINT"), DataType::Int64);
assert_eq!(mysql_type_to_arrow("decimal"), DataType::Float64);
assert_eq!(mysql_type_to_arrow("varchar"), DataType::Utf8);
assert_eq!(mysql_type_to_arrow("json"), DataType::Utf8);
assert_eq!(mysql_type_to_arrow("bool"), DataType::Boolean);
}
}