Meta-Pipeline: self-improving multi-mode chains on real system data
Engine: - Chains modes in sequence: extract → research → validate → debate → synthesize - Each stage feeds its output to the next as input - Runs same pipeline with different model sets (one model per iteration) - Auto-scores final output using judge model (1-10) - Keeps best result across all iterations - All stage results + final outputs saved to meta_runs table 4 preset pipelines: 1. Security Deep Dive — security logs through 5-stage analysis 2. Run History Insights — team run data through 4-stage extraction 3. Threat Intel Enrichment — profiled IPs through 5-stage analysis 4. Cross-Report Synthesis — past self-reports through 4-stage debate Database: - meta_pipelines: name, source, stages, status, best_score, iterations - meta_runs: per-iteration stage results, final output, score, models API: - POST /api/meta-pipeline — create pipeline from preset - POST /api/meta-pipeline/:id/start — run in background - POST /api/meta-pipeline/:id/stop — halt execution - GET /api/meta-pipelines — list all with live status - GET /api/meta-pipeline/:id — full detail with all iteration results UI (Lab page): - Magenta-bordered Meta-Pipeline card with 4 clickable presets - Click preset → creates + auto-starts pipeline - Pipeline list with live status dots, progress, scores - Click pipeline → drill-down with per-iteration results - Each stage expandable (click to show output) - Best output highlighted in green border - Auto-refreshes every 5 seconds during runs Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
804898b658
commit
4dc561af12
440
llm_team_ui.py
440
llm_team_ui.py
@ -3385,8 +3385,13 @@ LAB_HTML = r"""
|
||||
<h3>Your Experiments <button class="btn btn-p" style="margin-left:auto" onclick="showCreate()">+ New</button></h3>
|
||||
<div id="exp-list"><div class="empty">Loading...</div></div>
|
||||
</div>
|
||||
<div class="card" style="border-color:rgba(217,70,239,0.3)">
|
||||
<h3 style="color:#d946ef">Meta-Pipeline <span style="font-size:9px;color:var(--text2);font-weight:400;text-transform:none;letter-spacing:0">chain modes on real data, compare models, self-improve</span></h3>
|
||||
<div style="display:grid;grid-template-columns:repeat(auto-fit,minmax(200px,1fr));gap:8px;margin-bottom:12px" id="meta-presets"></div>
|
||||
<div id="meta-pipelines"></div>
|
||||
</div>
|
||||
<div class="card" style="border-color:rgba(74,222,128,0.2)">
|
||||
<h3 style="color:var(--green)">Self-Analysis <span style="font-size:9px;color:var(--text2);font-weight:400;text-transform:none;letter-spacing:0">AI reports from your own system data</span></h3>
|
||||
<h3 style="color:var(--green)">Self-Analysis <span style="font-size:9px;color:var(--text2);font-weight:400;text-transform:none;letter-spacing:0">quick reports from system data</span></h3>
|
||||
<div style="display:grid;gap:8px" id="self-reports"></div>
|
||||
<div id="past-reports" style="margin-top:12px"></div>
|
||||
</div>
|
||||
@ -3943,6 +3948,173 @@ async function viewPastReport(id) {
|
||||
} catch(e) {}
|
||||
}
|
||||
|
||||
// ─── META-PIPELINE UI ───
|
||||
var META_PRESETS = [
|
||||
{name:'Security Deep Dive', source:'security_logs', stages:['extract','research','validate','debate','synthesize'], desc:'Extract attack patterns → research context → validate claims → challenge findings → final brief'},
|
||||
{name:'Run History Insights', source:'team_runs', stages:['extract','research','validate','synthesize'], desc:'Extract patterns from 96 runs → research optimization opportunities → validate → actionable brief'},
|
||||
{name:'Threat Intel Enrichment', source:'threat_intel', stages:['extract','research','validate','consensus','synthesize'], desc:'Analyze profiled IPs → research threat actors → validate attributions → converge → intel report'},
|
||||
{name:'Cross-Report Synthesis', source:'self_reports', stages:['extract','debate','consensus','synthesize'], desc:'Extract findings from past reports → debate conflicts → converge → unified intelligence brief'}
|
||||
];
|
||||
|
||||
function renderMetaPresets() {
|
||||
var el = document.getElementById('meta-presets');
|
||||
if (!el) return;
|
||||
el.textContent = '';
|
||||
META_PRESETS.forEach(function(p) {
|
||||
var card = document.createElement('div');
|
||||
card.style.cssText = 'background:rgba(0,0,0,0.25);border:2px solid rgba(217,70,239,0.15);border-radius:2px;padding:12px;cursor:pointer;transition:border-color 0.15s';
|
||||
card.onmouseenter = function(){card.style.borderColor='#d946ef'};
|
||||
card.onmouseleave = function(){card.style.borderColor='rgba(217,70,239,0.15)'};
|
||||
card.onclick = function(){createMetaPipeline(p)};
|
||||
var name = document.createElement('div');
|
||||
name.style.cssText = 'font-weight:700;font-size:12px;margin-bottom:4px;color:#d946ef';
|
||||
name.textContent = p.name;
|
||||
var stages = document.createElement('div');
|
||||
stages.style.cssText = 'font-family:JetBrains Mono,monospace;font-size:9px;color:var(--text2);margin-bottom:4px';
|
||||
stages.textContent = p.stages.join(' → ');
|
||||
var desc = document.createElement('div');
|
||||
desc.style.cssText = 'font-size:10px;color:var(--text2);line-height:1.4';
|
||||
desc.textContent = p.desc;
|
||||
card.appendChild(name); card.appendChild(stages); card.appendChild(desc);
|
||||
el.appendChild(card);
|
||||
});
|
||||
}
|
||||
|
||||
async function createMetaPipeline(preset) {
|
||||
try {
|
||||
var r = await fetch('/api/meta-pipeline', {method:'POST', headers:{'Content-Type':'application/json'},
|
||||
body:JSON.stringify({name:preset.name, data_source:preset.source, stages:preset.stages})});
|
||||
var d = await r.json();
|
||||
if (d.id) {
|
||||
toast('Pipeline created — starting...', true);
|
||||
await fetch('/api/meta-pipeline/'+d.id+'/start', {method:'POST'});
|
||||
loadMetaPipelines();
|
||||
}
|
||||
} catch(e) { toast('Error: '+e.message, false); }
|
||||
}
|
||||
|
||||
async function loadMetaPipelines() {
|
||||
var el = document.getElementById('meta-pipelines');
|
||||
if (!el) return;
|
||||
try {
|
||||
var r = await fetch('/api/meta-pipelines');
|
||||
var d = await r.json();
|
||||
var pipes = d.pipelines || [];
|
||||
if (!pipes.length) { el.textContent = ''; return; }
|
||||
el.textContent = '';
|
||||
var title = document.createElement('div');
|
||||
title.style.cssText = 'font-family:JetBrains Mono,monospace;font-size:8px;text-transform:uppercase;letter-spacing:2px;color:var(--text2);margin-bottom:8px;padding-top:8px;border-top:1px solid var(--border)';
|
||||
title.textContent = 'Pipeline Runs';
|
||||
el.appendChild(title);
|
||||
pipes.forEach(function(p) {
|
||||
var row = document.createElement('div');
|
||||
row.style.cssText = 'display:flex;align-items:center;gap:8px;padding:8px;background:rgba(0,0,0,0.2);border:2px solid var(--border);border-radius:2px;margin-bottom:4px;cursor:pointer;transition:border-color 0.15s;font-size:11px';
|
||||
if (p.status === 'running') row.style.borderColor = 'rgba(217,70,239,0.3)';
|
||||
if (p.status === 'completed') row.style.borderColor = 'rgba(74,222,128,0.2)';
|
||||
row.onmouseenter = function(){row.style.borderColor='var(--accent)'};
|
||||
row.onmouseleave = function(){row.style.borderColor = p.status==='running'?'rgba(217,70,239,0.3)':p.status==='completed'?'rgba(74,222,128,0.2)':'var(--border)'};
|
||||
// Status dot
|
||||
var dot = document.createElement('div');
|
||||
var dotColor = p.status==='running'?'#d946ef':p.status==='completed'?'var(--green)':'var(--text2)';
|
||||
dot.style.cssText = 'width:8px;height:8px;border-radius:50%;background:'+dotColor+';flex-shrink:0';
|
||||
if (p.status === 'running') dot.style.animation = 'pulse 2s infinite';
|
||||
row.appendChild(dot);
|
||||
var name = document.createElement('span');
|
||||
name.style.cssText = 'font-weight:700;min-width:140px';
|
||||
name.textContent = p.name; row.appendChild(name);
|
||||
var stagesEl = document.createElement('span');
|
||||
stagesEl.style.cssText = 'font-family:JetBrains Mono,monospace;font-size:9px;color:var(--text2);flex:1';
|
||||
stagesEl.textContent = (p.stages||[]).join('→');
|
||||
row.appendChild(stagesEl);
|
||||
if (p.live_status && p.status === 'running') {
|
||||
var prog = document.createElement('span');
|
||||
prog.style.cssText = 'font-family:JetBrains Mono,monospace;font-size:9px;color:#d946ef';
|
||||
prog.textContent = p.live_status.substep || '';
|
||||
row.appendChild(prog);
|
||||
}
|
||||
if (p.best_score > 0) {
|
||||
var score = document.createElement('span');
|
||||
score.style.cssText = 'font-family:JetBrains Mono,monospace;font-size:10px;font-weight:700;color:var(--green)';
|
||||
score.textContent = p.best_score.toFixed(1)+'/10';
|
||||
row.appendChild(score);
|
||||
}
|
||||
var iters = document.createElement('span');
|
||||
iters.style.cssText = 'font-family:JetBrains Mono,monospace;font-size:9px;color:var(--text2)';
|
||||
iters.textContent = (p.iterations||0)+' iters';
|
||||
row.appendChild(iters);
|
||||
row.onclick = function(){viewMetaPipeline(p.id)};
|
||||
el.appendChild(row);
|
||||
});
|
||||
} catch(e) {}
|
||||
}
|
||||
|
||||
async function viewMetaPipeline(pid) {
|
||||
var existing = document.getElementById('meta-detail-'+pid);
|
||||
if (existing) { existing.remove(); return; }
|
||||
try {
|
||||
var r = await fetch('/api/meta-pipeline/'+pid);
|
||||
var d = await r.json();
|
||||
if (d.error) return;
|
||||
var panel = document.createElement('div');
|
||||
panel.id = 'meta-detail-'+pid;
|
||||
panel.style.cssText = 'background:rgba(0,0,0,0.3);border:2px solid #d946ef;border-radius:2px;padding:16px;margin:8px 0;max-height:600px;overflow-y:auto';
|
||||
var title = document.createElement('div');
|
||||
title.style.cssText = 'font-family:JetBrains Mono,monospace;font-size:10px;color:#d946ef;text-transform:uppercase;letter-spacing:1px;margin-bottom:10px;display:flex;justify-content:space-between';
|
||||
title.textContent = d.name + ' — ' + (d.runs||[]).length + ' iterations';
|
||||
var closeBtn = document.createElement('span');
|
||||
closeBtn.style.cssText = 'cursor:pointer;opacity:0.6';
|
||||
closeBtn.textContent = '✕';
|
||||
closeBtn.onclick = function(e){e.stopPropagation();panel.remove()};
|
||||
title.appendChild(closeBtn);
|
||||
panel.appendChild(title);
|
||||
// Show each iteration
|
||||
(d.runs||[]).forEach(function(run) {
|
||||
var iterDiv = document.createElement('div');
|
||||
iterDiv.style.cssText = 'margin-bottom:12px;padding-bottom:12px;border-bottom:1px solid var(--border)';
|
||||
var hdr = document.createElement('div');
|
||||
hdr.style.cssText = 'font-family:JetBrains Mono,monospace;font-size:10px;display:flex;gap:8px;align-items:center;margin-bottom:6px';
|
||||
hdr.innerHTML = '<span style="color:#d946ef;font-weight:700">Iteration '+run.iteration+'</span>'
|
||||
+ '<span style="color:'+(run.score>=7?'var(--green)':run.score>=5?'var(--accent)':'var(--red)')+';font-weight:700">Score: '+run.score.toFixed(1)+'/10</span>'
|
||||
+ '<span style="color:var(--text2)">Models: '+(run.model_config.models||[]).join(', ')+'</span>';
|
||||
iterDiv.appendChild(hdr);
|
||||
// Stage results
|
||||
(run.stage_results||[]).forEach(function(sr) {
|
||||
var stageEl = document.createElement('div');
|
||||
stageEl.style.cssText = 'margin:4px 0;cursor:pointer';
|
||||
var stageHead = document.createElement('div');
|
||||
stageHead.style.cssText = 'font-family:JetBrains Mono,monospace;font-size:9px;color:'+(sr.error?'var(--red)':'var(--text2)')+';display:flex;gap:6px';
|
||||
stageHead.textContent = '▶ '+sr.stage+' ('+sr.model+') — '+(sr.chars||0)+' chars';
|
||||
var stageBody = document.createElement('div');
|
||||
stageBody.style.cssText = 'display:none;background:rgba(0,0,0,0.2);border:1px solid var(--border);border-radius:2px;padding:8px;margin-top:4px;font-size:11px;line-height:1.5;white-space:pre-wrap;max-height:300px;overflow-y:auto;color:var(--text)';
|
||||
stageBody.textContent = sr.output||'';
|
||||
stageHead.onclick = function(){
|
||||
if (stageBody.style.display==='none'){stageBody.style.display='block';stageHead.textContent='▼ '+sr.stage+' ('+sr.model+') — '+(sr.chars||0)+' chars'}
|
||||
else{stageBody.style.display='none';stageHead.textContent='▶ '+sr.stage+' ('+sr.model+') — '+(sr.chars||0)+' chars'}
|
||||
};
|
||||
stageEl.appendChild(stageHead);stageEl.appendChild(stageBody);iterDiv.appendChild(stageEl);
|
||||
});
|
||||
panel.appendChild(iterDiv);
|
||||
});
|
||||
// Best output
|
||||
if (d.results && d.results.best_output) {
|
||||
var bestDiv = document.createElement('div');
|
||||
bestDiv.style.cssText = 'border:2px solid var(--green);border-radius:2px;padding:12px;background:rgba(74,222,128,0.03)';
|
||||
var bestTitle = document.createElement('div');
|
||||
bestTitle.style.cssText = 'font-family:JetBrains Mono,monospace;font-size:9px;color:var(--green);text-transform:uppercase;letter-spacing:1px;margin-bottom:6px;font-weight:700';
|
||||
bestTitle.textContent = 'Best Output (score: '+d.best_score.toFixed(1)+'/10, models: '+(d.results.best_models||[]).join(', ')+')';
|
||||
var bestBody = document.createElement('div');
|
||||
bestBody.style.cssText = 'font-size:12px;line-height:1.6;white-space:pre-wrap;max-height:400px;overflow-y:auto';
|
||||
bestBody.textContent = d.results.best_output;
|
||||
bestDiv.appendChild(bestTitle);bestDiv.appendChild(bestBody);panel.appendChild(bestDiv);
|
||||
}
|
||||
document.getElementById('meta-pipelines').appendChild(panel);
|
||||
} catch(e) {}
|
||||
}
|
||||
|
||||
renderMetaPresets();
|
||||
loadMetaPipelines();
|
||||
setInterval(function(){if(document.getElementById('meta-pipelines'))loadMetaPipelines()},5000);
|
||||
|
||||
renderSelfReports();
|
||||
loadPastReports();
|
||||
|
||||
@ -5806,6 +5978,272 @@ def get_self_report(rid):
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
|
||||
# ─── META-PIPELINE ENGINE ──────────────────────────────────────
|
||||
|
||||
_meta_threads = {}
|
||||
_meta_status = {} # pipeline_id -> {stage, substep, progress}
|
||||
|
||||
def _gather_data_source(source):
|
||||
"""Pull data from a system source for pipeline input."""
|
||||
if source == "team_runs":
|
||||
with get_db() as conn:
|
||||
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
||||
cur.execute("SELECT mode, prompt, responses, models_used, created_at FROM team_runs WHERE archived=false ORDER BY created_at DESC LIMIT 20")
|
||||
runs = cur.fetchall()
|
||||
parts = []
|
||||
for r in runs:
|
||||
resps = r.get("responses") or []
|
||||
parts.append(f"[{r['mode']}] Prompt: {r['prompt'][:150]}")
|
||||
for resp in resps[:3]:
|
||||
parts.append(f" {resp.get('role','?')} ({resp.get('model','?')}): {resp.get('text','')[:200]}")
|
||||
return "TEAM RUN HISTORY (last 20 runs):\n\n" + "\n".join(parts)
|
||||
|
||||
elif source == "security_logs":
|
||||
try:
|
||||
with open("/var/log/llm-team-security.log") as f:
|
||||
lines = [l.strip() for l in f.readlines() if "192.168" not in l][-60:]
|
||||
return "SECURITY LOG (last 60 external entries):\n\n" + "\n".join(lines)
|
||||
except Exception:
|
||||
return "No security log data."
|
||||
|
||||
elif source == "threat_intel":
|
||||
with get_db() as conn:
|
||||
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
||||
cur.execute("SELECT ip, threat_level, classification, summary, country, isp, is_proxy, open_ports, blocklist_count FROM threat_intel ORDER BY enriched_at DESC LIMIT 20")
|
||||
rows = cur.fetchall()
|
||||
parts = [f"{r['ip']} [{r.get('threat_level','?')}] {r.get('classification','?')} — {r.get('country','?')}/{r.get('isp','?')} proxy={r.get('is_proxy')} ports={r.get('open_ports',[])} blocklists={r.get('blocklist_count',0)} — {r.get('summary','')[:100]}" for r in rows]
|
||||
return "THREAT INTEL DATABASE (" + str(len(rows)) + " profiled IPs):\n\n" + "\n".join(parts)
|
||||
|
||||
elif source == "self_reports":
|
||||
with get_db() as conn:
|
||||
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
||||
cur.execute("SELECT report_type, model, report, created_at FROM self_reports ORDER BY created_at DESC LIMIT 5")
|
||||
rows = cur.fetchall()
|
||||
parts = [f"[{r['report_type']}] ({r['model']}, {r['created_at']}):\n{r['report'][:500]}" for r in rows]
|
||||
return "SELF-ANALYSIS REPORTS (last 5):\n\n" + "\n\n".join(parts)
|
||||
|
||||
return "No data source: " + source
|
||||
|
||||
|
||||
STAGE_PROMPTS = {
|
||||
"extract": "Extract all key facts, entities, patterns, and data points from this data. Return a structured list of findings. Be thorough — capture everything meaningful.\n\nDATA:\n{input}",
|
||||
"research": "You are a research analyst. Given these extracted findings, generate deeper research questions and investigate each one. Expand on patterns, identify gaps, and add context.\n\nFINDINGS:\n{input}",
|
||||
"validate": "You are a fact-checker. Review these findings and research notes. For each claim, mark as VERIFIED, UNCERTAIN, or FLAGGED. Be rigorous — challenge assumptions.\n\nRESEARCH:\n{input}",
|
||||
"debate": "You are a critical analyst. Challenge these findings. What are the weak points? What alternative explanations exist? What's being overlooked? Play devil's advocate.\n\nVALIDATED FINDINGS:\n{input}",
|
||||
"consensus": "You are synthesizing multiple analytical perspectives. Merge the validated findings with the critical challenges. Identify what's strongly supported vs. what needs more investigation.\n\nFINDINGS + CRITIQUES:\n{input}",
|
||||
"synthesize": "Produce a final, actionable intelligence brief from all the analysis. Include: Executive Summary, Key Findings (ranked by confidence), Actionable Recommendations, and Open Questions.\n\nALL ANALYSIS:\n{input}",
|
||||
}
|
||||
|
||||
|
||||
def _run_meta_pipeline(pipeline_id):
|
||||
"""Execute a meta-pipeline: chain stages, try model variants, score results."""
|
||||
try:
|
||||
with get_db() as conn:
|
||||
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
||||
cur.execute("SELECT * FROM meta_pipelines WHERE id = %s", (pipeline_id,))
|
||||
pipe = cur.fetchone()
|
||||
if not pipe:
|
||||
return
|
||||
|
||||
stages = pipe["stages"] or ["extract", "research", "validate", "synthesize"]
|
||||
data_source = pipe["data_source"]
|
||||
config = pipe["config"] or {}
|
||||
model_sets = config.get("model_sets", [["qwen2.5:latest"], ["mistral:latest"], ["gemma2:latest"]])
|
||||
max_iterations = config.get("max_iterations", len(model_sets))
|
||||
|
||||
_meta_status[pipeline_id] = {"stage": 0, "substep": "Gathering data...", "progress": 0, "iteration": 0}
|
||||
|
||||
# Gather source data
|
||||
source_data = _gather_data_source(data_source)
|
||||
|
||||
best_score = 0
|
||||
best_output = ""
|
||||
best_models = []
|
||||
|
||||
for iteration, models in enumerate(model_sets[:max_iterations]):
|
||||
if iteration >= max_iterations:
|
||||
break
|
||||
|
||||
# Check if still running
|
||||
with get_db() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT status FROM meta_pipelines WHERE id = %s", (pipeline_id,))
|
||||
row = cur.fetchone()
|
||||
if not row or row[0] != "running":
|
||||
break
|
||||
|
||||
_meta_status[pipeline_id]["iteration"] = iteration + 1
|
||||
stage_results = []
|
||||
current_input = source_data
|
||||
model_idx = 0
|
||||
|
||||
for si, stage_name in enumerate(stages):
|
||||
model = models[model_idx % len(models)]
|
||||
model_idx += 1
|
||||
|
||||
_meta_status[pipeline_id].update({
|
||||
"stage": si + 1,
|
||||
"substep": f"Stage {si+1}/{len(stages)}: {stage_name} ({model})",
|
||||
"progress": int(((iteration * len(stages) + si) / (max_iterations * len(stages))) * 100)
|
||||
})
|
||||
|
||||
prompt_template = STAGE_PROMPTS.get(stage_name, "Analyze this data:\n\n{input}")
|
||||
# Cap input to prevent context overflow
|
||||
capped_input = current_input[:6000] if len(current_input) > 6000 else current_input
|
||||
prompt = prompt_template.replace("{input}", capped_input)
|
||||
|
||||
try:
|
||||
result = query_model(model, prompt)
|
||||
stage_results.append({
|
||||
"stage": stage_name, "model": model,
|
||||
"output": result, "chars": len(result)
|
||||
})
|
||||
# Chain: this stage's output becomes next stage's input
|
||||
current_input = result
|
||||
except Exception as e:
|
||||
stage_results.append({
|
||||
"stage": stage_name, "model": model,
|
||||
"output": f"Error: {e}", "chars": 0, "error": True
|
||||
})
|
||||
|
||||
# Score the final output using a judge
|
||||
final_output = current_input
|
||||
judge_model = models[0]
|
||||
try:
|
||||
score_prompt = (
|
||||
f"Score this analysis on a scale of 1-10 for: completeness, accuracy, actionability, and clarity. "
|
||||
f"Return ONLY a JSON object: {{\"score\": N, \"reason\": \"brief\"}}\n\n{final_output[:3000]}"
|
||||
)
|
||||
score_raw = query_model(judge_model, score_prompt)
|
||||
import re
|
||||
score_match = re.search(r'"score"\s*:\s*(\d+\.?\d*)', score_raw)
|
||||
score = float(score_match.group(1)) if score_match else 5.0
|
||||
except Exception:
|
||||
score = 5.0
|
||||
|
||||
duration_ms = 0 # simplified
|
||||
|
||||
# Save this iteration
|
||||
with get_db() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"INSERT INTO meta_runs (pipeline_id, iteration, stage_results, final_output, score, model_config, duration_ms) VALUES (%s,%s,%s,%s,%s,%s,%s)",
|
||||
(pipeline_id, iteration + 1, json.dumps(stage_results), final_output[:10000], score, json.dumps({"models": models}), duration_ms)
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
if score > best_score:
|
||||
best_score = score
|
||||
best_output = final_output
|
||||
best_models = models
|
||||
|
||||
_meta_status[pipeline_id]["substep"] = f"Iteration {iteration+1} scored {score:.1f}/10"
|
||||
|
||||
# Finalize
|
||||
with get_db() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"UPDATE meta_pipelines SET status='completed', best_score=%s, iterations=%s, results=%s, updated_at=NOW() WHERE id=%s",
|
||||
(best_score, len(model_sets[:max_iterations]), json.dumps({"best_models": best_models, "best_output": best_output[:5000]}), pipeline_id)
|
||||
)
|
||||
conn.commit()
|
||||
_meta_status[pipeline_id] = {"stage": len(stages), "substep": "Complete", "progress": 100, "iteration": len(model_sets[:max_iterations])}
|
||||
|
||||
except Exception as e:
|
||||
with get_db() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("UPDATE meta_pipelines SET status='error', updated_at=NOW() WHERE id=%s", (pipeline_id,))
|
||||
conn.commit()
|
||||
_meta_status[pipeline_id] = {"substep": f"Error: {e}", "progress": 0}
|
||||
|
||||
|
||||
@app.route("/api/meta-pipeline", methods=["POST"])
|
||||
@admin_required
|
||||
def create_meta_pipeline():
|
||||
d = request.json or {}
|
||||
name = d.get("name", "Untitled Pipeline")
|
||||
data_source = d.get("data_source", "team_runs")
|
||||
stages = d.get("stages", ["extract", "research", "validate", "synthesize"])
|
||||
models = d.get("models", [])
|
||||
if not models:
|
||||
cfg = load_config()
|
||||
base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434")
|
||||
try:
|
||||
resp = requests.get(f"{base}/api/tags", timeout=5)
|
||||
all_m = [m["name"] for m in resp.json().get("models", []) if m["size"] > 1e9]
|
||||
models = [[m] for m in all_m[:4]]
|
||||
except Exception:
|
||||
models = [["qwen2.5:latest"], ["mistral:latest"]]
|
||||
|
||||
config = {"model_sets": models, "max_iterations": len(models)}
|
||||
with get_db() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"INSERT INTO meta_pipelines (name, data_source, stages, total_stages, config) VALUES (%s,%s,%s,%s,%s) RETURNING id",
|
||||
(name, data_source, json.dumps(stages), len(stages), json.dumps(config))
|
||||
)
|
||||
pid = cur.fetchone()[0]
|
||||
conn.commit()
|
||||
return jsonify({"id": pid})
|
||||
|
||||
|
||||
@app.route("/api/meta-pipeline/<int:pid>/start", methods=["POST"])
|
||||
@admin_required
|
||||
def start_meta_pipeline(pid):
|
||||
with get_db() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("UPDATE meta_pipelines SET status='running' WHERE id=%s", (pid,))
|
||||
conn.commit()
|
||||
t = threading.Thread(target=_run_meta_pipeline, args=(pid,), daemon=True)
|
||||
_meta_threads[pid] = t
|
||||
t.start()
|
||||
return jsonify({"ok": True})
|
||||
|
||||
|
||||
@app.route("/api/meta-pipeline/<int:pid>/stop", methods=["POST"])
|
||||
@admin_required
|
||||
def stop_meta_pipeline(pid):
|
||||
with get_db() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("UPDATE meta_pipelines SET status='stopped' WHERE id=%s", (pid,))
|
||||
conn.commit()
|
||||
return jsonify({"ok": True})
|
||||
|
||||
|
||||
@app.route("/api/meta-pipelines")
|
||||
@admin_required
|
||||
def list_meta_pipelines():
|
||||
with get_db() as conn:
|
||||
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
||||
cur.execute("SELECT id, name, data_source, stages, status, best_score, iterations, total_stages, created_at, updated_at FROM meta_pipelines ORDER BY created_at DESC LIMIT 20")
|
||||
rows = cur.fetchall()
|
||||
for r in rows:
|
||||
r["created_at"] = r["created_at"].isoformat()
|
||||
r["updated_at"] = r["updated_at"].isoformat()
|
||||
r["live_status"] = _meta_status.get(r["id"])
|
||||
return jsonify({"pipelines": rows})
|
||||
|
||||
|
||||
@app.route("/api/meta-pipeline/<int:pid>")
|
||||
@admin_required
|
||||
def get_meta_pipeline(pid):
|
||||
with get_db() as conn:
|
||||
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
||||
cur.execute("SELECT * FROM meta_pipelines WHERE id=%s", (pid,))
|
||||
pipe = cur.fetchone()
|
||||
if not pipe:
|
||||
return jsonify({"error": "not found"}), 404
|
||||
pipe["created_at"] = pipe["created_at"].isoformat()
|
||||
pipe["updated_at"] = pipe["updated_at"].isoformat()
|
||||
cur.execute("SELECT * FROM meta_runs WHERE pipeline_id=%s ORDER BY iteration", (pid,))
|
||||
runs = cur.fetchall()
|
||||
for r in runs:
|
||||
r["created_at"] = r["created_at"].isoformat()
|
||||
pipe["runs"] = runs
|
||||
pipe["live_status"] = _meta_status.get(pid)
|
||||
return jsonify(pipe)
|
||||
|
||||
|
||||
@app.route("/api/pipelines")
|
||||
@login_required
|
||||
def get_pipelines():
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user