Architectural snapshot of the lakehouse codebase at the point where the
full matrix-driven agent loop with Mem0 versioning + deletion was
validated end-to-end.
WHAT THIS REPO IS
A clean single-commit snapshot of the lakehouse code. Heavy test data
(.parquet datasets, vector indexes) excluded — see REPLICATION.md for
regen path. Full lakehouse history at git.agentview.dev/profit/lakehouse.
WHAT WAS PROVEN
- Vector retrieval across multi-corpora matrix (chicago_permits + entity
briefs + sec_tickers + distilled procedural + llm_team runs)
- Observer hand-review (cloud + heuristic fallback) gating each candidate
- Local-model agent loop (qwen3.5:latest) with tool use + scratchpad
- Playbook seal on success → next-iter retrieval surfaces it as preamble
- Mem0 versioning + deletion in pathway_memory:
* UPSERT: ADD on new workflow, UPDATE bumps replay_count on identical
* REVISE: chains versions, parent.superseded_at + superseded_by stamped
* RETIRE: marks specific trace retired with reason, excluded from retrieval
* HISTORY: walks chain root→tip, cycle-safe
KEY DIRECTORIES
- crates/vectord/src/pathway_memory.rs — Mem0 ops live here
- crates/vectord/src/playbook_memory.rs — original Mem0 reference
- tests/agent_test/ — local-model agent harness + PRD + session archives
- scripts/dump_raw_corpus.sh — MinIO bucket dump (raw test corpus)
- scripts/vectorize_raw_corpus.ts — corpus → vector indexes
- scripts/analyze_chicago_contracts.ts — real inference pipeline
- scripts/seal_agent_playbook.ts — Mem0 upsert from agent traces
Replication: see REPLICATION.md for Debian 13 clean install + cloud-only
adaptation (no local Ollama).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
96 lines
3.9 KiB
Bash
Executable File
96 lines
3.9 KiB
Bash
Executable File
#!/bin/bash
|
|
# One-shot dump of all testing data into the `raw` MinIO bucket.
|
|
# Persistent test corpus so we don't re-extract every run.
|
|
#
|
|
# Layout:
|
|
# raw/
|
|
# staffing/ — workers_500k.parquet, resumes.parquet
|
|
# entities/ — entities.jsonl, sec_company_tickers.json
|
|
# llm_team/ — *.jsonl extracts from knowledge_base PG tables
|
|
# chicago/ — permits_<date>.json (last 30 days)
|
|
# MANIFEST.json — documents what's here + when
|
|
|
|
set -euo pipefail
|
|
|
|
REPO=/home/profit/lakehouse
|
|
BUCKET=raw
|
|
ALIAS=local
|
|
STAGE=$(mktemp -d /tmp/raw_dump.XXXXX)
|
|
trap 'rm -rf "$STAGE"' EXIT
|
|
DATE=$(date -u +%Y-%m-%d)
|
|
|
|
log() { echo "[dump $(date -u +%H:%M:%S)] $*"; }
|
|
|
|
log "creating bucket ${ALIAS}/${BUCKET} (idempotent)"
|
|
mc mb --ignore-existing ${ALIAS}/${BUCKET}
|
|
|
|
# ─── 1. STAFFING ───
|
|
log "staffing/ — workers_500k.parquet (323 MB) + resumes.parquet"
|
|
mc cp -q ${REPO}/data/datasets/workers_500k.parquet ${ALIAS}/${BUCKET}/staffing/workers_500k.parquet
|
|
mc cp -q ${REPO}/data/datasets/resumes.parquet ${ALIAS}/${BUCKET}/staffing/resumes.parquet
|
|
|
|
# ─── 2. ENTITIES + SEC + GEO ───
|
|
log "entities/ — contractor entities cache + SEC tickers + svep + tif districts"
|
|
mc cp -q ${REPO}/data/_entity_cache/entities.jsonl ${ALIAS}/${BUCKET}/entities/entities.jsonl
|
|
mc cp -q ${REPO}/data/_entity_cache/sec_company_tickers.json ${ALIAS}/${BUCKET}/sec/company_tickers.json
|
|
mc cp -q ${REPO}/data/_entity_cache/svep_log.json ${ALIAS}/${BUCKET}/entities/svep_log.json
|
|
mc cp -q ${REPO}/data/_entity_cache/tif_districts.geojson ${ALIAS}/${BUCKET}/chicago/tif_districts.geojson
|
|
|
|
# ─── 3. LLM TEAM HISTORY (Postgres → JSONL → S3) ───
|
|
log "llm_team/ — extracting from knowledge_base PG tables"
|
|
LLM_TABLES=(team_runs pipeline_runs lab_experiments lab_trials meta_pipelines meta_runs conversations response_cache memory_entries adaptive_runs)
|
|
for tbl in "${LLM_TABLES[@]}"; do
|
|
out=${STAGE}/${tbl}.jsonl
|
|
rows=$(sudo -u postgres psql -d knowledge_base -At -c "SELECT COUNT(*) FROM ${tbl};" 2>/dev/null || echo 0)
|
|
if [ "$rows" -eq 0 ]; then
|
|
log " · ${tbl}: 0 rows, skipping"
|
|
continue
|
|
fi
|
|
sudo -u postgres psql -d knowledge_base -At -c "COPY (SELECT row_to_json(t) FROM ${tbl} t) TO STDOUT;" > "$out" 2>/dev/null
|
|
size=$(du -h "$out" | awk '{print $1}')
|
|
log " · ${tbl}: ${rows} rows (${size})"
|
|
mc cp -q "$out" ${ALIAS}/${BUCKET}/llm_team/${tbl}.jsonl
|
|
done
|
|
|
|
# ─── 4. CHICAGO PERMITS (last 30 days, paginated) ───
|
|
log "chicago/ — pulling last 30 days of permits from data.cityofchicago.org"
|
|
since=$(date -u -d '30 days ago' +%Y-%m-%d)
|
|
out=${STAGE}/permits_${DATE}.json
|
|
url="https://data.cityofchicago.org/resource/ydr8-5enu.json?\$where=issue_date%3E='${since}'&\$limit=10000&\$order=issue_date%20DESC"
|
|
if curl -sf --max-time 60 "$url" -o "$out"; then
|
|
count=$(python3 -c "import json; print(len(json.load(open('${out}'))))")
|
|
size=$(du -h "$out" | awk '{print $1}')
|
|
log " · permits since ${since}: ${count} records (${size})"
|
|
mc cp -q "$out" ${ALIAS}/${BUCKET}/chicago/permits_${DATE}.json
|
|
else
|
|
log " · WARN: chicago permits fetch failed; skipping"
|
|
fi
|
|
|
|
# ─── 5. MANIFEST ───
|
|
log "writing MANIFEST.json"
|
|
manifest=${STAGE}/MANIFEST.json
|
|
python3 - <<PY
|
|
import json, subprocess, datetime
|
|
out = subprocess.check_output(['mc','ls','-r','--json','${ALIAS}/${BUCKET}'], text=True)
|
|
items = []
|
|
for line in out.strip().split('\n'):
|
|
if not line: continue
|
|
o = json.loads(line)
|
|
items.append({'key': o.get('key',''), 'size': o.get('size',0)})
|
|
total_size = sum(i['size'] for i in items)
|
|
manifest = {
|
|
'bucket': '${BUCKET}',
|
|
'created_at': datetime.datetime.utcnow().isoformat() + 'Z',
|
|
'total_objects': len(items),
|
|
'total_size_bytes': total_size,
|
|
'total_size_human': f'{total_size / (1024*1024):.1f} MB',
|
|
'items': items,
|
|
}
|
|
with open('${manifest}','w') as f:
|
|
json.dump(manifest, f, indent=2)
|
|
PY
|
|
mc cp -q "$manifest" ${ALIAS}/${BUCKET}/MANIFEST.json
|
|
|
|
log "DONE. Bucket contents:"
|
|
mc ls -r ${ALIAS}/${BUCKET} | head -30
|