// Staffing workers corpus driver — second-of-two corpora that proves // the multi-corpus matrix indexer end-to-end. Mirrors the candidates // driver's parquet pattern but handles multi-chunk arrow tables // (workers_500k.parquet has multiple row groups, candidates fits in // one). // // Source: /home/profit/lakehouse/data/datasets/workers_500k.parquet // (500000 rows, 18 cols including role + skills + certifications + // archetype + reliability scores + resume_text). // // IDs prefixed "w-" so multi-corpus matrix queries returning workers // alongside candidates ("c-") stay unambiguous in merged results. // // Default -limit 5000 because the goal of this driver is multi-corpus // reality testing, not the 500K stress test (separate concern, see // project_golang_lakehouse.md scale framing). package main import ( "context" "flag" "fmt" "io" "log" "net/http" "os" "strings" "time" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/parquet/file" "github.com/apache/arrow-go/v18/parquet/pqarrow" "git.agentview.dev/profit/golangLAKEHOUSE/internal/corpusingest" ) const ( indexName = "workers" dim = 768 ) // workersSource implements corpusingest.Source over an in-memory // arrow.Table loaded from workers_500k.parquet. Unlike the candidates // driver, this MUST handle multi-chunk arrow columns — a 500K-row // parquet has ≥1 row group, each becoming its own chunk after read. type workersSource struct { cols struct { workerID *chunkedInt64 name, role, city, state, skills, certs, archetype, resume, comm *chunkedString } n int64 cur int64 } // chunkedString lets per-row access work whether the table came back // with one chunk or many. Forward-only iteration; not safe to seek. type chunkedString struct { chunks []*array.String sizes []int64 } func newChunkedString(col *arrow.Chunked) (*chunkedString, error) { cs := &chunkedString{} for i, ch := range col.Chunks() { s, ok := ch.(*array.String) if !ok { return nil, fmt.Errorf("chunk %d is %T, want *array.String", i, ch) } cs.chunks = append(cs.chunks, s) cs.sizes = append(cs.sizes, int64(s.Len())) } return cs, nil } // At returns the value at the global row index. O(chunks) per call; // fine for our scale (≤5000 rows × ~5 chunks). func (c *chunkedString) At(row int64) string { var offset int64 for i, s := range c.chunks { n := c.sizes[i] if row < offset+n { return s.Value(int(row - offset)) } offset += n } return "" } type chunkedInt64 struct { chunks []*array.Int64 sizes []int64 } func newChunkedInt64(col *arrow.Chunked) (*chunkedInt64, error) { ci := &chunkedInt64{} for i, ch := range col.Chunks() { s, ok := ch.(*array.Int64) if !ok { return nil, fmt.Errorf("chunk %d is %T, want *array.Int64", i, ch) } ci.chunks = append(ci.chunks, s) ci.sizes = append(ci.sizes, int64(s.Len())) } return ci, nil } func (c *chunkedInt64) At(row int64) int64 { var offset int64 for i, s := range c.chunks { n := c.sizes[i] if row < offset+n { return s.Value(int(row - offset)) } offset += n } return 0 } func newWorkersSource(path string) (*workersSource, func(), error) { f, err := os.Open(path) if err != nil { return nil, nil, fmt.Errorf("open parquet: %w", err) } pf, err := file.NewParquetReader(f) if err != nil { f.Close() return nil, nil, fmt.Errorf("parquet reader: %w", err) } fr, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) if err != nil { pf.Close() f.Close() return nil, nil, fmt.Errorf("arrow reader: %w", err) } table, err := fr.ReadTable(context.Background()) if err != nil { pf.Close() f.Close() return nil, nil, fmt.Errorf("read table: %w", err) } src := &workersSource{n: table.NumRows()} schema := table.Schema() stringCol := func(name string) (*chunkedString, error) { idx := schema.FieldIndices(name) if len(idx) == 0 { return nil, fmt.Errorf("column %q not found", name) } return newChunkedString(table.Column(idx[0]).Data()) } int64Col := func(name string) (*chunkedInt64, error) { idx := schema.FieldIndices(name) if len(idx) == 0 { return nil, fmt.Errorf("column %q not found", name) } return newChunkedInt64(table.Column(idx[0]).Data()) } cleanup := func() { table.Release() pf.Close() f.Close() } wid, err := int64Col("worker_id") if err != nil { cleanup() return nil, nil, err } src.cols.workerID = wid for _, t := range []struct { name string dst **chunkedString }{ {"name", &src.cols.name}, {"role", &src.cols.role}, {"city", &src.cols.city}, {"state", &src.cols.state}, {"skills", &src.cols.skills}, {"certifications", &src.cols.certs}, {"archetype", &src.cols.archetype}, {"resume_text", &src.cols.resume}, {"communications", &src.cols.comm}, } { col, err := stringCol(t.name) if err != nil { cleanup() return nil, nil, err } *t.dst = col } return src, cleanup, nil } func (s *workersSource) Next() (corpusingest.Row, error) { if s.cur >= s.n { return corpusingest.Row{}, io.EOF } i := s.cur s.cur++ workerID := s.cols.workerID.At(i) name := s.cols.name.At(i) role := s.cols.role.At(i) city := s.cols.city.At(i) state := s.cols.state.At(i) skills := s.cols.skills.At(i) certs := s.cols.certs.At(i) archetype := s.cols.archetype.At(i) resume := s.cols.resume.At(i) // Embed text: role first (most semantically dense for staffing // queries), then skills + certs, then location, archetype, finally // the prose resume. Same ordering rationale as the candidates // driver and the original 500K driver. var b strings.Builder b.WriteString("Worker role: ") b.WriteString(role) b.WriteString(". Skills: ") b.WriteString(skills) b.WriteString(". Certifications: ") b.WriteString(certs) b.WriteString(". Based in ") b.WriteString(city) b.WriteString(", ") b.WriteString(state) b.WriteString(". Archetype: ") b.WriteString(archetype) b.WriteString(". ") b.WriteString(resume) return corpusingest.Row{ ID: fmt.Sprintf("w-%d", workerID), Text: b.String(), Metadata: map[string]any{ "worker_id": workerID, "name": name, "role": role, "city": city, "state": state, "skills": skills, "certifications": certs, "archetype": archetype, }, }, nil } func main() { var ( gateway = flag.String("gateway", "http://127.0.0.1:3110", "gateway base URL") parquetPath = flag.String("parquet", "/home/profit/lakehouse/data/datasets/workers_500k.parquet", "workers parquet") limit = flag.Int("limit", 5000, "limit rows (0 = all 500K — usually not what you want here)") drop = flag.Bool("drop", true, "DELETE workers index before populate") ) flag.Parse() hc := &http.Client{Timeout: 5 * time.Minute} ctx := context.Background() src, cleanup, err := newWorkersSource(*parquetPath) if err != nil { log.Fatalf("open workers source: %v", err) } defer cleanup() stats, err := corpusingest.Run(ctx, corpusingest.Config{ GatewayURL: *gateway, IndexName: indexName, Dimension: dim, Distance: "cosine", EmbedBatch: 16, EmbedWorkers: 8, AddBatch: 500, Limit: *limit, DropExisting: *drop, HTTPClient: hc, LogProgress: 10 * time.Second, }, src) if err != nil { log.Fatalf("ingest: %v", err) } fmt.Printf("[workers] populate: scanned=%d embedded=%d added=%d wall=%v\n", stats.Scanned, stats.Embedded, stats.Added, stats.Wall.Round(time.Millisecond)) }