diff --git a/data/_catalog/manifests/0efdef2d-52bc-4154-82d4-056bb3a1915d.json b/data/_catalog/manifests/0efdef2d-52bc-4154-82d4-056bb3a1915d.json new file mode 100644 index 0000000..e75242f --- /dev/null +++ b/data/_catalog/manifests/0efdef2d-52bc-4154-82d4-056bb3a1915d.json @@ -0,0 +1,159 @@ +{ + "id": "0efdef2d-52bc-4154-82d4-056bb3a1915d", + "name": "ethereal_workers", + "schema_fingerprint": "9a1286ffada5390b459b217f56263e5ebffec1520ea29bab0be2efc6d5381adc", + "objects": [ + { + "bucket": "primary", + "key": "datasets/ethereal_workers.parquet", + "size_bytes": 6991716, + "created_at": "2026-04-17T03:23:29.399966984Z" + } + ], + "created_at": "2026-04-17T03:23:29.399967772Z", + "updated_at": "2026-04-17T03:23:29.400227081Z", + "description": "", + "owner": "", + "sensitivity": "pii", + "columns": [ + { + "name": "worker_id", + "data_type": "Int64", + "sensitivity": null, + "description": "", + "is_pii": false + }, + { + "name": "name", + "data_type": "Utf8", + "sensitivity": "pii", + "description": "", + "is_pii": true + }, + { + "name": "role", + "data_type": "Utf8", + "sensitivity": null, + "description": "", + "is_pii": false + }, + { + "name": "email", + "data_type": "Utf8", + "sensitivity": "pii", + "description": "", + "is_pii": true + }, + { + "name": "phone", + "data_type": "Int64", + "sensitivity": "pii", + "description": "", + "is_pii": true + }, + { + "name": "city", + "data_type": "Utf8", + "sensitivity": null, + "description": "", + "is_pii": false + }, + { + "name": "state", + "data_type": "Utf8", + "sensitivity": null, + "description": "", + "is_pii": false + }, + { + "name": "zip", + "data_type": "Int64", + "sensitivity": "pii", + "description": "", + "is_pii": true + }, + { + "name": "skills", + "data_type": "Utf8", + "sensitivity": null, + "description": "", + "is_pii": false + }, + { + "name": "certifications", + "data_type": "Utf8", + "sensitivity": null, + "description": "", + "is_pii": false + }, + { + "name": "archetype", + "data_type": "Utf8", + "sensitivity": null, + "description": "", + "is_pii": false + }, + { + "name": "reliability", + "data_type": "Float64", + "sensitivity": null, + "description": "", + "is_pii": false + }, + { + "name": "responsiveness", + "data_type": "Float64", + "sensitivity": null, + "description": "", + "is_pii": false + }, + { + "name": "engagement", + "data_type": "Float64", + "sensitivity": null, + "description": "", + "is_pii": false + }, + { + "name": "compliance", + "data_type": "Float64", + "sensitivity": null, + "description": "", + "is_pii": false + }, + { + "name": "availability", + "data_type": "Float64", + "sensitivity": null, + "description": "", + "is_pii": false + }, + { + "name": "communications", + "data_type": "Utf8", + "sensitivity": null, + "description": "", + "is_pii": false + }, + { + "name": "resume_text", + "data_type": "Utf8", + "sensitivity": null, + "description": "", + "is_pii": false + } + ], + "lineage": { + "source_system": "csv", + "source_file": "ethereal_workers.csv", + "ingest_job": "ingest-1776396209399", + "ingest_timestamp": "2026-04-17T03:23:29.399966984Z", + "parent_datasets": [] + }, + "freshness": null, + "tags": [], + "row_count": 10000, + "last_embedded_at": null, + "embedding_stale_since": null, + "embedding_refresh_policy": null +} \ No newline at end of file diff --git a/data/datasets/ethereal_workers.parquet b/data/datasets/ethereal_workers.parquet new file mode 100644 index 0000000..0244a57 Binary files /dev/null and b/data/datasets/ethereal_workers.parquet differ diff --git a/scripts/serve_imagegen.py b/scripts/serve_imagegen.py new file mode 100644 index 0000000..fc18ba0 --- /dev/null +++ b/scripts/serve_imagegen.py @@ -0,0 +1,433 @@ +#!/usr/bin/env python3 +"""Image generation service — proxies to ComfyUI API on :8188. + +Serves on :3600. Submits workflow to ComfyUI, polls for completion, returns image. +Falls back to direct diffusers if ComfyUI is unavailable. + +Features: + - Disk cache — same prompt returns cached image instantly + - Negative prompt for quality (no faces, hands, text) + - DreamShaper XL Turbo for high-quality editorial illustrations +""" + +import base64 +import hashlib +import io +import json +import os +import random +import time +import urllib.request +import urllib.error +from http.server import HTTPServer, BaseHTTPRequestHandler +from pathlib import Path + +PORT = int(os.environ.get("IMAGEGEN_PORT", "3600")) +COMFYUI_URL = os.environ.get("COMFYUI_URL", "http://localhost:8188") +CACHE_DIR = Path(os.environ.get("IMAGEGEN_CACHE", "./data/_imagecache")) +CACHE_DIR.mkdir(parents=True, exist_ok=True) +WORKFLOW_PATH = "/opt/ComfyUI/workflows/editorial_hero.json" + + +def _cache_key(prompt, width, height, steps): + return hashlib.sha256(f"{prompt}|{width}|{height}|{steps}".encode()).hexdigest()[:24] + +def _cache_get(key): + fp = CACHE_DIR / f"{key}.webp" + return base64.b64encode(fp.read_bytes()).decode() if fp.exists() else None + +def _cache_put(key, img_bytes): + (CACHE_DIR / f"{key}.webp").write_bytes(img_bytes) + + +def _comfyui_generate(prompt, width=1024, height=512, steps=8, seed=None): + """Submit workflow to ComfyUI and wait for result.""" + # Load workflow template + with open(WORKFLOW_PATH) as f: + workflow = json.load(f) + + # Customize + if seed is None: + seed = random.randint(0, 2**32) + workflow["3"]["inputs"]["seed"] = seed + workflow["3"]["inputs"]["steps"] = steps + workflow["5"]["inputs"]["width"] = width + workflow["5"]["inputs"]["height"] = height + workflow["6"]["inputs"]["text"] = prompt + + # Submit to ComfyUI + payload = json.dumps({"prompt": workflow}).encode() + req = urllib.request.Request( + f"{COMFYUI_URL}/prompt", + data=payload, + headers={"Content-Type": "application/json"} + ) + resp = urllib.request.urlopen(req, timeout=10) + result = json.loads(resp.read()) + prompt_id = result["prompt_id"] + + # Poll for completion + for _ in range(120): # up to 2 minutes + time.sleep(0.5) + try: + status_req = urllib.request.Request(f"{COMFYUI_URL}/history/{prompt_id}") + status_resp = urllib.request.urlopen(status_req, timeout=5) + history = json.loads(status_resp.read()) + if prompt_id in history: + outputs = history[prompt_id].get("outputs", {}) + # Find the SaveImage node output + for node_id, node_out in outputs.items(): + images = node_out.get("images", []) + if images: + img_info = images[0] + # Fetch the image + img_url = f"{COMFYUI_URL}/view?filename={img_info['filename']}&subfolder={img_info.get('subfolder', '')}&type={img_info.get('type', 'output')}" + img_resp = urllib.request.urlopen(img_url, timeout=10) + img_data = img_resp.read() + # Convert to webp + from PIL import Image + img = Image.open(io.BytesIO(img_data)) + buf = io.BytesIO() + img.save(buf, format="WEBP", quality=90) + return buf.getvalue(), seed + return None, seed # completed but no images + except Exception: + continue + return None, seed # timeout + + +def _diffusers_fallback(prompt, width, height, steps, seed): + """Fallback: use raw diffusers SDXL Turbo if ComfyUI is down.""" + import torch + from diffusers import AutoPipelineForText2Image + + pipe = AutoPipelineForText2Image.from_pretrained( + "stabilityai/sdxl-turbo", torch_dtype=torch.float16, variant="fp16" + ).to("cuda") + pipe.enable_attention_slicing() + + if seed is None: + seed = random.randint(0, 2**32) + gen = torch.Generator("cuda").manual_seed(seed) + result = pipe(prompt=prompt, num_inference_steps=steps, guidance_scale=0.0, + width=width, height=height, generator=gen) + buf = io.BytesIO() + result.images[0].save(buf, format="WEBP", quality=90) + del pipe + torch.cuda.empty_cache() + return buf.getvalue(), seed + + +class ImageHandler(BaseHTTPRequestHandler): + def log_message(self, fmt, *args): pass + + def _json(self, code, data): + self.send_response(code) + self.send_header("Content-Type", "application/json") + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + self.wfile.write(json.dumps(data).encode()) + + def do_OPTIONS(self): + self.send_response(200) + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS") + self.send_header("Access-Control-Allow-Headers", "Content-Type") + self.end_headers() + + def do_GET(self): + if self.path == "/health": + comfy_ok = False + try: + r = urllib.request.urlopen(f"{COMFYUI_URL}/system_stats", timeout=3) + comfy_ok = r.status == 200 + except: pass + cached = len(list(CACHE_DIR.glob("*.webp"))) + self._json(200, {"status": "ok", "comfyui": comfy_ok, "cached_images": cached}) + elif self.path == "/cache/stats": + files = list(CACHE_DIR.glob("*.webp")) + self._json(200, {"count": len(files), "total_mb": round(sum(f.stat().st_size for f in files)/1024**2, 1)}) + else: + self._json(404, {"error": "not found"}) + + def do_POST(self): + if self.path == "/generate": + self._generate() + elif self.path == "/blender": + self._blender_render() + elif self.path == "/img-to-3d": + self._img_to_3d() + elif self.path == "/scene-glb": + self._scene_glb() + else: + self._json(404, {"error": "not found"}) + + def _generate(self): + try: + length = int(self.headers.get("Content-Length", 0)) + body = json.loads(self.rfile.read(length)) if length else {} + except: + self._json(400, {"error": "invalid JSON"}); return + + prompt = body.get("prompt", "").strip() + if not prompt: + self._json(400, {"error": "prompt required"}); return + + width = min(max(int(body.get("width", 1280)), 256), 1920) + height = min(max(int(body.get("height", 720)), 256), 1080) + steps = min(max(int(body.get("steps", 50)), 1), 80) + seed = body.get("seed") + + # Cache check + key = _cache_key(prompt, width, height, steps) + cached = _cache_get(key) + if cached: + self._json(200, {"image": cached, "format": "webp", "width": width, "height": height, + "cached": True, "prompt": prompt[:200]}); return + + t0 = time.time() + img_bytes = None + + # Try ComfyUI first + try: + comfy_check = urllib.request.urlopen(f"{COMFYUI_URL}/system_stats", timeout=3) + if comfy_check.status == 200: + img_bytes, seed = _comfyui_generate(prompt, width, height, steps, seed) + backend = "comfyui" + except: + pass + + # Fallback to diffusers + if not img_bytes: + try: + img_bytes, seed = _diffusers_fallback(prompt, width, height, steps, seed) + backend = "diffusers" + except Exception as e: + self._json(500, {"error": str(e)[:300]}); return + + if not img_bytes: + self._json(500, {"error": "generation failed"}); return + + elapsed_ms = int((time.time() - t0) * 1000) + img_b64 = base64.b64encode(img_bytes).decode() + _cache_put(key, img_bytes) + + self._json(200, { + "image": img_b64, "format": "webp", "width": width, "height": height, + "steps": steps, "seed": seed, "time_ms": elapsed_ms, + "backend": backend, "prompt": prompt[:200], "cached": False, + }) + print(f"[IMAGEGEN] {backend} {width}x{height} in {elapsed_ms}ms") + + + def _blender_render(self): + """Render a 3D hero banner via Blender Cycles GPU.""" + try: + length = int(self.headers.get("Content-Length", 0)) + body = json.loads(self.rfile.read(length)) if length else {} + except: + self._json(400, {"error": "invalid JSON"}); return + + seed = body.get("seed", random.randint(0, 99999)) + + # Cache check + key = f"blender-{seed}" + cached = _cache_get(key) + if cached: + self._json(200, {"image": cached, "format": "webp", "backend": "blender-cached", + "cached": True, "seed": seed}); return + + t0 = time.time() + output_png = f"/tmp/blender_render_{seed}.png" + script = "/opt/ComfyUI/blender_scripts/hero_cycles.py" + + try: + import subprocess + result = subprocess.run( + ["blender", "--background", "--python", script, "--", str(seed), output_png], + capture_output=True, text=True, timeout=300 + ) + + if not os.path.exists(output_png): + self._json(500, {"error": "Blender render failed: " + result.stderr[-300:] if result.stderr else "no output"}); return + + # Convert to webp + from PIL import Image + img = Image.open(output_png) + buf = io.BytesIO() + img.save(buf, format="WEBP", quality=92) + img_bytes = buf.getvalue() + os.remove(output_png) + + elapsed_ms = int((time.time() - t0) * 1000) + img_b64 = base64.b64encode(img_bytes).decode() + _cache_put(key, img_bytes) + + self._json(200, { + "image": img_b64, "format": "webp", "width": 1280, "height": 320, + "seed": seed, "time_ms": elapsed_ms, "backend": "blender-cycles", + "cached": False, + }) + print(f"[BLENDER] Rendered seed={seed} in {elapsed_ms}ms") + + except Exception as e: + self._json(500, {"error": str(e)[:300]}) + + def _img_to_3d(self): + """Full pipeline: AI image → 3D displacement → Blender render.""" + try: + length = int(self.headers.get("Content-Length", 0)) + body = json.loads(self.rfile.read(length)) if length else {} + except: + self._json(400, {"error": "invalid JSON"}); return + + prompt = body.get("prompt", "abstract flowing golden energy, fractal patterns, dark background, sharp detail").strip() + seed = body.get("seed", random.randint(0, 99999)) + + key = f"img3d-{hashlib.sha256(prompt.encode()).hexdigest()[:12]}-{seed % 4}" + cached = _cache_get(key) + if cached: + self._json(200, {"image": cached, "format": "webp", "backend": "img3d-cached", "cached": True}); return + + t0 = time.time() + try: + import subprocess + + # Step 1: Generate source image via ComfyUI directly + src_path = f"/tmp/img3d_src_{seed}.png" + try: + img_bytes_src, _ = _comfyui_generate(prompt, 512, 512, 25, seed) + if not img_bytes_src: + # Fallback to diffusers + img_bytes_src, _ = _diffusers_fallback(prompt, 512, 512, 8, seed) + if not img_bytes_src: + self._json(500, {"error": "Failed to generate source image"}); return + from PIL import Image + img_src = Image.open(io.BytesIO(img_bytes_src)) + img_src.save(src_path, "PNG") + except Exception as e: + self._json(500, {"error": f"Source image failed: {e}"}); return + + # Step 2: TripoSR — convert image to 3D mesh + mesh_path = f"/tmp/triposr_mesh_{seed}.obj" + out_path = f"/tmp/img3d_out_{seed}.png" + try: + # Free VRAM for TripoSR + subprocess.run(["systemctl", "stop", "comfyui"], capture_output=True, timeout=10) + time.sleep(3) + + triposr_script = f""" +import torch, sys +sys.path.insert(0, '/opt/TripoSR') +from PIL import Image +from tsr.system import TSR +model = TSR.from_pretrained('stabilityai/TripoSR', config_name='config.yaml', weight_name='model.ckpt') +model.to('cuda') +image = Image.open('{src_path}').convert('RGB') +with torch.no_grad(): + scene_codes = model([image], device='cuda') +meshes = model.extract_mesh(scene_codes, has_vertex_color=True, resolution=128) +meshes[0].export('{mesh_path}') +del model; torch.cuda.empty_cache() +print('[TRIPOSR] mesh exported') +""" + result = subprocess.run( + ["python3", "-c", triposr_script], + capture_output=True, text=True, timeout=120) + + if not os.path.exists(mesh_path): + # Fallback to displacement approach + print(f"[IMG2-3D] TripoSR failed, falling back to displacement: {result.stderr[-200:]}") + script = "/opt/ComfyUI/blender_scripts/image_to_3d.py" + result = subprocess.run( + ["blender", "--background", "--python", script, "--", src_path, out_path, str(seed)], + capture_output=True, text=True, timeout=120) + else: + # Step 3: Render the TripoSR mesh in Blender with gold materials + script = "/opt/ComfyUI/blender_scripts/triposr_render.py" + result = subprocess.run( + ["blender", "--background", "--python", script, "--", mesh_path, out_path, str(seed)], + capture_output=True, text=True, timeout=120) + try: os.remove(mesh_path) + except: pass + finally: + # Always restart ComfyUI + subprocess.run(["systemctl", "start", "comfyui"], capture_output=True, timeout=10) + + if not os.path.exists(out_path): + self._json(500, {"error": "Blender 3D render failed"}); return + + from PIL import Image + img = Image.open(out_path) + buf = io.BytesIO() + img.save(buf, format="WEBP", quality=92) + img_bytes = buf.getvalue() + + # Cleanup temp files + for f in [src_path, out_path]: + try: os.remove(f) + except: pass + + elapsed = int((time.time() - t0) * 1000) + img_b64 = base64.b64encode(img_bytes).decode() + _cache_put(key, img_bytes) + + self._json(200, { + "image": img_b64, "format": "webp", "width": 1280, "height": 320, + "seed": seed, "time_ms": elapsed, "backend": "img-to-3d", "cached": False, + }) + print(f"[IMG2-3D] seed={seed} prompt={prompt[:50]} in {elapsed}ms") + + except Exception as e: + self._json(500, {"error": str(e)[:300]}) + + + def _scene_glb(self): + """Generate a 3D scene and export as GLB for Three.js viewer.""" + try: + length = int(self.headers.get("Content-Length", 0)) + body = json.loads(self.rfile.read(length)) if length else {} + except: + self._json(400, {"error": "invalid JSON"}); return + + seed = body.get("seed", random.randint(0, 99999)) + key = f"glb-{seed}" + + # Check cache + glb_cache = CACHE_DIR / f"{key}.glb" + if glb_cache.exists(): + glb_b64 = base64.b64encode(glb_cache.read_bytes()).decode() + self._json(200, {"glb": glb_b64, "seed": seed, "cached": True}) + return + + t0 = time.time() + glb_path = f"/tmp/scene_{seed}.glb" + try: + import subprocess + result = subprocess.run( + ["blender", "--background", "--python", "/opt/ComfyUI/blender_scripts/export_glb.py", + "--", str(seed), glb_path], + capture_output=True, text=True, timeout=120) + + if not os.path.exists(glb_path): + self._json(500, {"error": "GLB export failed: " + result.stderr[-200:] if result.stderr else "no output"}) + return + + glb_bytes = open(glb_path, 'rb').read() + os.remove(glb_path) + glb_cache.write_bytes(glb_bytes) + glb_b64 = base64.b64encode(glb_bytes).decode() + elapsed = int((time.time() - t0) * 1000) + + self._json(200, {"glb": glb_b64, "seed": seed, "time_ms": elapsed, "cached": False}) + print(f"[GLB] seed={seed} in {elapsed}ms size={len(glb_bytes)//1024}KB") + + except Exception as e: + self._json(500, {"error": str(e)[:300]}) + + +if __name__ == "__main__": + print(f"[IMAGEGEN] Starting on port {PORT}") + print(f"[IMAGEGEN] ComfyUI backend: {COMFYUI_URL}") + print(f"[IMAGEGEN] Cache: {CACHE_DIR}") + HTTPServer(("0.0.0.0", PORT), ImageHandler).serve_forever() diff --git a/scripts/serve_lab.py b/scripts/serve_lab.py new file mode 100644 index 0000000..1b9dae0 --- /dev/null +++ b/scripts/serve_lab.py @@ -0,0 +1,369 @@ +#!/usr/bin/env python3 +"""Pipeline Lab server — serves notebook UI on :3500, proxies API to sidecar :3200.""" + +import http.server +import json +import urllib.request +import urllib.error + +PORT = 3500 +SIDECAR = "http://localhost:3200" + + +class LabHandler(http.server.BaseHTTPRequestHandler): + def log_message(self, fmt, *args): + pass # quiet + + def do_GET(self): + if self.path == "/" or self.path == "": + self._serve_ui() + elif self.path.startswith("/lab/"): + self._proxy("GET") + else: + self.send_error(404) + + def do_POST(self): + if self.path.startswith("/lab/"): + self._proxy("POST") + else: + self.send_error(404) + + def do_DELETE(self): + if self.path.startswith("/lab/"): + self._proxy("DELETE") + else: + self.send_error(404) + + def _proxy(self, method): + """Proxy request to sidecar.""" + url = SIDECAR + self.path + body = None + if method in ("POST", "PUT"): + length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(length) if length else None + + req = urllib.request.Request(url, data=body, method=method) + req.add_header("Content-Type", self.headers.get("Content-Type", "application/json")) + + try: + with urllib.request.urlopen(req, timeout=120) as resp: + data = resp.read() + self.send_response(resp.status) + self.send_header("Content-Type", resp.headers.get("Content-Type", "application/json")) + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + self.wfile.write(data) + except urllib.error.HTTPError as e: + self.send_response(e.code) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(e.read()) + except Exception as e: + self.send_response(502) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"error": str(e)}).encode()) + + def _serve_ui(self): + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.end_headers() + self.wfile.write(HTML.encode()) + + +HTML = r""" + + +Pipeline Lab — Lakehouse + +

Pipeline Lab // Lakehouse

+
Embedding-based screening vs LLM classification — iterative experimentation
+ + +
+
Exemplars: 0
+
Categories: 0
+
Pipelines: 0
+
Sidecar: ...
+
+ +
+ + + + + + + + + +
+ +
+ + +""" + + +if __name__ == "__main__": + print(f"Pipeline Lab running on http://0.0.0.0:{PORT}") + server = http.server.HTTPServer(("0.0.0.0", PORT), LabHandler) + server.serve_forever()