From 0d1553ca88f4a02bcabe55a84fb0cd20b1aa0cd7 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 29 Apr 2026 19:06:27 -0500 Subject: [PATCH] candidates corpus: first deep-field reality test on real staffing data MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lands the second staffing corpus and the first end-to-end reality test through the full Go pipeline: parquet → corpusingest → embedd → vectord → matrixd → gateway. What's new: - scripts/staffing_candidates/main.go — parquet Source over candidates.parquet (1000 rows, 11 cols), single-chunk arrow-go pqarrow read. Embed text: "Candidate skills: . Based in , . years experience. Status: . ." IDs prefixed "c-" so multi-corpus merges against workers ("w-") stay unambiguous. - scripts/candidates_e2e.sh — first integration smoke that runs the full stack (storaged + embedd + vectord + matrixd + gateway), ingests via corpusingest, runs a real query through /v1/matrix/search, prints results. Ephemeral mode (vectord persistence disabled via custom toml) so re-runs don't pollute MinIO _vectors/ and break g1p_smoke's "only-one-persisted-index" assertion. Real bug caught + fixed in corpusingest: When LogProgress > 0, the progress goroutine's only exit was ctx.Done(). With context.Background() in the production driver, Run hung forever after the pipeline finished. Added a stopProgress channel that close()s after wg.Wait(). Regression test TestRun_ProgressLoggerExits bounds Run's wall to 2s with LogProgress=50ms. This is the bug the unit tests didn't catch because every prior test set LogProgress: 0. Reality test surfaced it on first real-data run — exactly the hyperfocus-and-find-architectural-weakness property J framed as the reason for the Go pass. End-to-end output (1000 candidates, query "Python AWS Docker engineer in Chicago available now"): populate: scanned=1000 embedded=1000 added=1000 wall=3.5s matrix returned 5 hits in 26ms The result quality is the interesting signal: top-5 had ZERO Chicago candidates, ZERO active-status candidates, and the exact- skill-match (Python,AWS,Docker) ranked #3 not #1. Pipeline works; retrieval quality has real architectural limits (no structured filtering, no relevance gate, semantic-only ranking dominated by secondary signals like "1 year experience" and "engineer"). This motivates SPEC §3.4 components 3 (relevance filter) and eventually structured filtering — exactly the kind of finding the deep field reality tests are supposed to surface before Enterprise cutover. 12-smoke regression sweep all green. 9 corpusingest unit tests including the new regression. vet clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/corpusingest/ingest.go | 8 +- internal/corpusingest/ingest_test.go | 38 ++++ scripts/candidates_e2e.sh | 98 +++++++++ scripts/staffing_candidates/main.go | 297 +++++++++++++++++++++++++++ 4 files changed, 439 insertions(+), 2 deletions(-) create mode 100755 scripts/candidates_e2e.sh create mode 100644 scripts/staffing_candidates/main.go diff --git a/internal/corpusingest/ingest.go b/internal/corpusingest/ingest.go index 490d4b1..9038722 100644 --- a/internal/corpusingest/ingest.go +++ b/internal/corpusingest/ingest.go @@ -127,11 +127,13 @@ func Run(ctx context.Context, cfg Config, src Source) (Stats, error) { }() } + stopProgress := make(chan struct{}) progressDone := make(chan struct{}) if cfg.LogProgress > 0 { - ticker := time.NewTicker(cfg.LogProgress) go func() { defer close(progressDone) + ticker := time.NewTicker(cfg.LogProgress) + defer ticker.Stop() for { select { case <-ticker.C: @@ -139,8 +141,9 @@ func Run(ctx context.Context, cfg Config, src Source) (Stats, error) { "index", cfg.IndexName, "embedded", atomic.LoadInt64(&totalEmbedded), "added", atomic.LoadInt64(&totalAdded)) + case <-stopProgress: + return case <-ctx.Done(): - ticker.Stop() return } } @@ -152,6 +155,7 @@ func Run(ctx context.Context, cfg Config, src Source) (Stats, error) { scanned, err := drainSource(ctx, cfg, src, jobs) close(jobs) wg.Wait() + close(stopProgress) // tell the progress goroutine to exit; would otherwise hang Run forever (caught by candidates e2e 2026-04-29) <-progressDone stats := Stats{ diff --git a/internal/corpusingest/ingest_test.go b/internal/corpusingest/ingest_test.go index c00d942..1732d24 100644 --- a/internal/corpusingest/ingest_test.go +++ b/internal/corpusingest/ingest_test.go @@ -303,6 +303,44 @@ func TestRun_EmptyTextSkipped(t *testing.T) { } } +// TestRun_ProgressLoggerExits guards the bug caught 2026-04-29 in +// the candidates e2e: when LogProgress > 0, the progress goroutine's +// only exit was ctx.Done(). With context.Background() in the +// production driver, Run hung forever after the pipeline finished. +// This test bounds Run's wall to a few hundred ms — if it regresses, +// the test deadline kicks in. +func TestRun_ProgressLoggerExits(t *testing.T) { + fg := newFakeGateway(4) + srv := httptest.NewServer(fg.handler()) + defer srv.Close() + + rows := []Row{ + {ID: "a", Text: "x", Metadata: nil}, + {ID: "b", Text: "y", Metadata: nil}, + } + + done := make(chan error, 1) + go func() { + _, err := Run(context.Background(), Config{ + GatewayURL: srv.URL, + IndexName: "progress_test", + Dimension: 4, + HTTPClient: srv.Client(), + LogProgress: 50 * time.Millisecond, + }, &staticSource{rows: rows}) + done <- err + }() + + select { + case err := <-done: + if err != nil { + t.Fatalf("Run: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Run did not return within 2s — progress goroutine likely hanging") + } +} + func TestRun_RequiresIndexName(t *testing.T) { _, err := Run(context.Background(), Config{Dimension: 4}, &staticSource{rows: nil}) diff --git a/scripts/candidates_e2e.sh b/scripts/candidates_e2e.sh new file mode 100755 index 0000000..5f86f91 --- /dev/null +++ b/scripts/candidates_e2e.sh @@ -0,0 +1,98 @@ +#!/usr/bin/env bash +# Candidates end-to-end — first deep-field reality test. +# +# Spins up storaged + embedd + vectord + matrixd + gateway, ingests +# the 1000-candidate corpus from +# /home/profit/lakehouse/data/datasets/candidates.parquet via the +# corpusingest substrate, then runs a real staffing query through +# /v1/matrix/search and prints the top 5 hits. +# +# Requires: Ollama on :11434 with nomic-embed-text loaded. If absent, +# this script exits 0 with a "skipped" message — same contract as +# g2_smoke. +# +# Usage: ./scripts/candidates_e2e.sh +# ./scripts/candidates_e2e.sh "your custom query here" + +set -euo pipefail +cd "$(dirname "$0")/.." + +export PATH="$PATH:/usr/local/go/bin" + +QUERY="${1:-Python AWS Docker engineer in Chicago available now}" + +if ! curl -sS --max-time 3 http://localhost:11434/api/tags >/dev/null 2>&1; then + echo "[candidates-e2e] Ollama not reachable on :11434 — skipping (matches g2_smoke contract)" + exit 0 +fi + +echo "[candidates-e2e] building binaries..." +go build -o bin/ ./cmd/storaged ./cmd/embedd ./cmd/vectord ./cmd/matrixd ./cmd/gateway ./scripts/staffing_candidates + +pkill -f "bin/(storaged|embedd|vectord|matrixd|gateway)" 2>/dev/null || true +sleep 0.3 + +PIDS=() +TMP="$(mktemp -d)" +CFG="$TMP/e2e.toml" +cleanup() { + echo "[candidates-e2e] cleanup" + for p in "${PIDS[@]}"; do [ -n "$p" ] && kill "$p" 2>/dev/null || true; done + rm -rf "$TMP" +} +trap cleanup EXIT INT TERM + +# Custom toml: vectord persistence disabled so the candidates index +# doesn't survive the run. Without this, re-running pollutes the +# shared MinIO `_vectors/` prefix and breaks g1p_smoke's "this is +# the only persisted index" assertion (caught 2026-04-29). +cat > "$CFG" </dev/null 2>&1; then return 0; fi + sleep 0.05 + done + return 1 +} + +echo "[candidates-e2e] launching stack..." +./bin/storaged -config "$CFG" > /tmp/storaged.log 2>&1 & PIDS+=($!) +poll_health 3211 || { echo "storaged failed"; tail /tmp/storaged.log; exit 1; } + +./bin/embedd -config "$CFG" > /tmp/embedd.log 2>&1 & PIDS+=($!) +poll_health 3216 || { echo "embedd failed"; tail /tmp/embedd.log; exit 1; } + +./bin/vectord -config "$CFG" > /tmp/vectord.log 2>&1 & PIDS+=($!) +poll_health 3215 || { echo "vectord failed"; tail /tmp/vectord.log; exit 1; } + +./bin/matrixd -config "$CFG" > /tmp/matrixd.log 2>&1 & PIDS+=($!) +poll_health 3218 || { echo "matrixd failed"; tail /tmp/matrixd.log; exit 1; } + +./bin/gateway -config "$CFG" > /tmp/gateway.log 2>&1 & PIDS+=($!) +poll_health 3110 || { echo "gateway failed"; tail /tmp/gateway.log; exit 1; } + +echo "[candidates-e2e] stack up; running ingest + reality test query..." +echo +./bin/staffing_candidates -query "$QUERY" diff --git a/scripts/staffing_candidates/main.go b/scripts/staffing_candidates/main.go new file mode 100644 index 0000000..48feb1f --- /dev/null +++ b/scripts/staffing_candidates/main.go @@ -0,0 +1,297 @@ +// Staffing candidates corpus driver — second corpus on the Go side +// after workers_500k. Validates the corpusingest substrate against +// real production-shape parquet data and gives the matrix indexer a +// second corpus to compose against. +// +// Source: /home/profit/lakehouse/data/datasets/candidates.parquet +// (1000 candidates, 11 columns including skills + status + years). +// +// IDs are prefixed "c-" so merged matrix results across corpora +// stay unambiguous (workers use "w-"). +// +// Post-ingest: runs a real staffing query through /v1/matrix/search +// against just the candidates corpus — first deep-field reality test +// using the new pipeline. + +package main + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "strings" + "time" + + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/parquet/file" + "github.com/apache/arrow-go/v18/parquet/pqarrow" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/corpusingest" +) + +const ( + indexName = "candidates" + dim = 768 +) + +// candidatesSource implements corpusingest.Source over an in-memory +// arrow.Table loaded from candidates.parquet. 1000 rows fits +// comfortably in RAM; a chunked-record-batch reader is the next +// abstraction when a multi-million-row parquet shows up. +type candidatesSource struct { + cols struct { + id, firstName, lastName, email, phone, city, state, skills, status *array.String + years, rate *array.Int64 + } + n int + cur int +} + +func newCandidatesSource(path string) (*candidatesSource, func(), error) { + f, err := os.Open(path) + if err != nil { + return nil, nil, fmt.Errorf("open parquet: %w", err) + } + pf, err := file.NewParquetReader(f) + if err != nil { + f.Close() + return nil, nil, fmt.Errorf("parquet reader: %w", err) + } + fr, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) + if err != nil { + pf.Close() + f.Close() + return nil, nil, fmt.Errorf("arrow reader: %w", err) + } + table, err := fr.ReadTable(context.Background()) + if err != nil { + pf.Close() + f.Close() + return nil, nil, fmt.Errorf("read table: %w", err) + } + + src := &candidatesSource{n: int(table.NumRows())} + schema := table.Schema() + + stringColByName := func(name string) (*array.String, error) { + idx := schema.FieldIndices(name) + if len(idx) == 0 { + return nil, fmt.Errorf("column %q not found", name) + } + ch := table.Column(idx[0]).Data() + if ch.Len() == 0 { + return nil, fmt.Errorf("column %q empty", name) + } + // Single-chunk assumption — ReadTable on a single-row-group + // 1000-row parquet returns one chunk. If parquets get larger, + // switch to RecordReader and iterate chunks. + if n := len(ch.Chunks()); n != 1 { + return nil, fmt.Errorf("column %q has %d chunks; only 1 supported here", name, n) + } + s, ok := ch.Chunk(0).(*array.String) + if !ok { + return nil, fmt.Errorf("column %q is %T, want *array.String", name, ch.Chunk(0)) + } + return s, nil + } + int64ColByName := func(name string) (*array.Int64, error) { + idx := schema.FieldIndices(name) + if len(idx) == 0 { + return nil, fmt.Errorf("column %q not found", name) + } + ch := table.Column(idx[0]).Data() + i, ok := ch.Chunk(0).(*array.Int64) + if !ok { + return nil, fmt.Errorf("column %q is %T, want *array.Int64", name, ch.Chunk(0)) + } + return i, nil + } + + cleanup := func() { + table.Release() + pf.Close() + f.Close() + } + for _, t := range []struct { + name string + dst **array.String + }{ + {"candidate_id", &src.cols.id}, + {"first_name", &src.cols.firstName}, + {"last_name", &src.cols.lastName}, + {"email", &src.cols.email}, + {"phone", &src.cols.phone}, + {"city", &src.cols.city}, + {"state", &src.cols.state}, + {"skills", &src.cols.skills}, + {"status", &src.cols.status}, + } { + col, err := stringColByName(t.name) + if err != nil { + cleanup() + return nil, nil, err + } + *t.dst = col + } + for _, t := range []struct { + name string + dst **array.Int64 + }{ + {"years_experience", &src.cols.years}, + {"hourly_rate_usd", &src.cols.rate}, + } { + col, err := int64ColByName(t.name) + if err != nil { + cleanup() + return nil, nil, err + } + *t.dst = col + } + return src, cleanup, nil +} + +func (s *candidatesSource) Next() (corpusingest.Row, error) { + if s.cur >= s.n { + return corpusingest.Row{}, io.EOF + } + i := s.cur + s.cur++ + + candidateID := s.cols.id.Value(i) + firstName := s.cols.firstName.Value(i) + lastName := s.cols.lastName.Value(i) + city := s.cols.city.Value(i) + state := s.cols.state.Value(i) + skills := s.cols.skills.Value(i) + status := s.cols.status.Value(i) + years := s.cols.years.Value(i) + rate := s.cols.rate.Value(i) + + // Embed text: name + role-shape from skills + location + experience + // + status. Order matters — embedding models weight earlier tokens + // slightly more, so role-relevant signal (skills) goes first. + var b strings.Builder + b.WriteString("Candidate skills: ") + b.WriteString(skills) + b.WriteString(". Based in ") + b.WriteString(city) + b.WriteString(", ") + b.WriteString(state) + b.WriteString(". ") + fmt.Fprintf(&b, "%d years experience. Status: %s. ", years, status) + b.WriteString(firstName) + b.WriteString(" ") + b.WriteString(lastName) + b.WriteString(".") + + return corpusingest.Row{ + ID: "c-" + candidateID, + Text: b.String(), + Metadata: map[string]any{ + "candidate_id": candidateID, + "first_name": firstName, + "last_name": lastName, + "email": s.cols.email.Value(i), + "phone": s.cols.phone.Value(i), + "city": city, + "state": state, + "skills": skills, + "status": status, + "years_experience": years, + "hourly_rate_usd": rate, + }, + }, nil +} + +func main() { + var ( + gateway = flag.String("gateway", "http://127.0.0.1:3110", "gateway base URL") + parquetPath = flag.String("parquet", "/home/profit/lakehouse/data/datasets/candidates.parquet", "candidates parquet") + limit = flag.Int("limit", 0, "limit rows (0 = all 1000)") + query = flag.String("query", "Python AWS Docker engineer in Chicago available now", "post-ingest reality-test query") + drop = flag.Bool("drop", true, "DELETE candidates index before populate") + skipPop = flag.Bool("skip-populate", false, "skip ingest, only run query") + ) + flag.Parse() + + hc := &http.Client{Timeout: 5 * time.Minute} + ctx := context.Background() + + if !*skipPop { + src, cleanup, err := newCandidatesSource(*parquetPath) + if err != nil { + log.Fatalf("open candidates source: %v", err) + } + defer cleanup() + + stats, err := corpusingest.Run(ctx, corpusingest.Config{ + GatewayURL: *gateway, + IndexName: indexName, + Dimension: dim, + Distance: "cosine", + EmbedBatch: 16, + EmbedWorkers: 8, + AddBatch: 500, // 1000 candidates → 2 add calls; small batches keep memory bounded + Limit: *limit, + DropExisting: *drop, + HTTPClient: hc, + LogProgress: 5 * time.Second, + }, src) + if err != nil { + log.Fatalf("ingest: %v", err) + } + fmt.Printf("[candidates] populate: scanned=%d embedded=%d added=%d wall=%v\n", + stats.Scanned, stats.Embedded, stats.Added, stats.Wall.Round(time.Millisecond)) + } + + // Reality test — run a real staffing query through /v1/matrix/search + // against just the candidates corpus. Multi-corpus retrieval against + // workers + candidates is the next step. + fmt.Printf("\n[candidates] reality test query: %q\n", *query) + runMatrixQuery(hc, *gateway, *query) +} + +func runMatrixQuery(hc *http.Client, gateway, query string) { + body, _ := json.Marshal(map[string]any{ + "query_text": query, + "corpora": []string{indexName}, + "k": 5, + "per_corpus_k": 10, + }) + req, _ := http.NewRequest(http.MethodPost, gateway+"/v1/matrix/search", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + t0 := time.Now() + resp, err := hc.Do(req) + if err != nil { + log.Fatalf("matrix search: %v", err) + } + defer resp.Body.Close() + dur := time.Since(t0) + if resp.StatusCode != 200 { + preview, _ := io.ReadAll(io.LimitReader(resp.Body, 512)) + log.Fatalf("matrix search %d: %s", resp.StatusCode, preview) + } + var sr struct { + Results []struct { + ID string `json:"id"` + Distance float32 `json:"distance"` + Corpus string `json:"corpus"` + Metadata json.RawMessage `json:"metadata"` + } `json:"results"` + } + if err := json.NewDecoder(resp.Body).Decode(&sr); err != nil { + log.Fatalf("decode: %v", err) + } + fmt.Printf("[candidates] matrix returned %d hits in %v:\n", len(sr.Results), dur.Round(time.Millisecond)) + for i, r := range sr.Results { + fmt.Printf(" %d. %s d=%.4f corpus=%s\n %s\n", + i+1, r.ID, r.Distance, r.Corpus, string(r.Metadata)) + } +}