use dioxus::prelude::*; use serde::{Deserialize, Serialize}; fn api_base() -> String { if let Some(window) = web_sys::window() { if let Ok(hostname) = window.location().hostname() { return format!("http://{}:3100", hostname); } } "http://localhost:3100".to_string() } fn main() { dioxus::launch(App); } // --- API Types --- #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] struct Dataset { id: String, name: String, schema_fingerprint: String, objects: Vec, created_at: String, updated_at: String, } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] struct ObjRef { bucket: String, key: String, size_bytes: u64, created_at: String, } #[derive(Debug, Clone, Deserialize, PartialEq)] struct QueryResponse { columns: Vec, rows: serde_json::Value, row_count: usize, } #[derive(Debug, Clone, Deserialize, PartialEq)] struct ColumnInfo { name: String, data_type: String, } #[derive(Debug, Clone, Deserialize, PartialEq)] struct GenerateResponse { text: String, model: String, tokens_evaluated: Option, tokens_generated: Option, } #[derive(Debug, Clone, Deserialize, PartialEq)] struct EmbedResponse { embeddings: Vec>, model: String, dimensions: usize, } // --- API Calls --- async fn fetch_datasets() -> Result, String> { let resp = reqwest::get(&format!("{}/catalog/datasets", api_base())) .await.map_err(|e| e.to_string())?; resp.json().await.map_err(|e| e.to_string()) } async fn run_sql(sql: &str) -> Result { let client = reqwest::Client::new(); let resp = client.post(&format!("{}/query/sql", api_base())) .json(&serde_json::json!({"sql": sql})) .send().await.map_err(|e| e.to_string())?; if !resp.status().is_success() { return Err(resp.text().await.unwrap_or_default()); } resp.json().await.map_err(|e| e.to_string()) } async fn ai_generate(prompt: &str, max_tokens: u32) -> Result { let client = reqwest::Client::new(); let resp = client.post(&format!("{}/ai/generate", api_base())) .json(&serde_json::json!({"prompt": prompt, "max_tokens": max_tokens, "temperature": 0.2})) .send().await.map_err(|e| e.to_string())?; if !resp.status().is_success() { return Err(resp.text().await.unwrap_or_default()); } resp.json().await.map_err(|e| e.to_string()) } async fn ai_embed(texts: Vec) -> Result { let client = reqwest::Client::new(); let resp = client.post(&format!("{}/ai/embed", api_base())) .json(&serde_json::json!({"texts": texts})) .send().await.map_err(|e| e.to_string())?; if !resp.status().is_success() { return Err(resp.text().await.unwrap_or_default()); } resp.json().await.map_err(|e| e.to_string()) } async fn fetch_health(path: &str) -> Result { let resp = reqwest::get(&format!("{}{}", api_base(), path)) .await.map_err(|e| e.to_string())?; resp.text().await.map_err(|e| e.to_string()) } /// Get schema context for datasets (used for AI SQL generation). /// Limits to core tables to keep prompt size reasonable. async fn get_schema_context(datasets: &[Dataset]) -> String { let core_tables = ["candidates", "clients", "job_orders", "placements", "timesheets", "call_log", "email_log"]; let mut ctx = String::from("DATABASE SCHEMA:\n\n"); for ds in datasets.iter().filter(|d| core_tables.contains(&d.name.as_str())) { let desc = run_sql(&format!("DESCRIBE {}", ds.name)).await; match desc { Ok(resp) => { ctx.push_str(&format!("TABLE: {}\n Columns:\n", ds.name)); if let Some(rows) = resp.rows.as_array() { for row in rows { let col = row.get("column_name").and_then(|v| v.as_str()).unwrap_or("?"); let dt = row.get("data_type").and_then(|v| v.as_str()).unwrap_or("?"); ctx.push_str(&format!(" {}.{} ({})\n", ds.name, col, dt)); } } ctx.push('\n'); } Err(_) => { ctx.push_str(&format!("TABLE: {} (schema unavailable)\n\n", ds.name)); } } } // Add relationship hints so the model knows how to JOIN ctx.push_str("RELATIONSHIPS:\n"); ctx.push_str(" candidates.candidate_id = placements.candidate_id\n"); ctx.push_str(" candidates.candidate_id = timesheets.candidate_id\n"); ctx.push_str(" candidates.candidate_id = call_log.candidate_id\n"); ctx.push_str(" candidates.candidate_id = email_log.candidate_id\n"); ctx.push_str(" clients.client_id = job_orders.client_id\n"); ctx.push_str(" clients.client_id = placements.client_id\n"); ctx.push_str(" clients.client_id = timesheets.client_id\n"); ctx.push_str(" job_orders.job_order_id = placements.job_order_id\n"); ctx.push_str(" placements.placement_id = timesheets.placement_id\n"); ctx.push_str("\nNOTE: 'vertical' is only in candidates, clients, and job_orders. To get vertical for timesheets or placements, JOIN to those tables.\n"); ctx } // --- Tabs --- #[derive(Clone, PartialEq)] enum Tab { Dashboard, Ask, Explore, Sql, Ingest, Status, } // --- App --- #[component] fn App() -> Element { let mut active_tab = use_signal(|| Tab::Dashboard); let mut datasets = use_signal(Vec::::new); let mut ds_loading = use_signal(|| true); use_effect(move || { spawn(async move { ds_loading.set(true); if let Ok(ds) = fetch_datasets().await { datasets.set(ds); } ds_loading.set(false); }); }); let refresh = move |_| { spawn(async move { ds_loading.set(true); if let Ok(ds) = fetch_datasets().await { datasets.set(ds); } ds_loading.set(false); }); }; rsx! { div { class: "app", div { class: "header", h1 { "LAKEHOUSE" } div { class: "tabs", button { class: if *active_tab.read() == Tab::Dashboard { "tab active" } else { "tab" }, onclick: move |_| active_tab.set(Tab::Dashboard), "Dashboard" } button { class: if *active_tab.read() == Tab::Ask { "tab active" } else { "tab" }, onclick: move |_| active_tab.set(Tab::Ask), "Ask" } button { class: if *active_tab.read() == Tab::Explore { "tab active" } else { "tab" }, onclick: move |_| active_tab.set(Tab::Explore), "Explore" } button { class: if *active_tab.read() == Tab::Sql { "tab active" } else { "tab" }, onclick: move |_| active_tab.set(Tab::Sql), "SQL" } button { class: if *active_tab.read() == Tab::Ingest { "tab active" } else { "tab" }, onclick: move |_| active_tab.set(Tab::Ingest), "Ingest" } button { class: if *active_tab.read() == Tab::Status { "tab active" } else { "tab" }, onclick: move |_| active_tab.set(Tab::Status), "System" } } div { class: "header-right", span { class: "ds-count", {if *ds_loading.read() { "...".to_string() } else { format!("{} datasets", datasets.read().len()) }} } button { class: "refresh-btn", onclick: refresh, "↻" } } } div { class: "content-full", match *active_tab.read() { Tab::Dashboard => rsx! { DashboardPanel {} }, Tab::Ask => rsx! { AskPanel { datasets: datasets.read().clone() } }, Tab::Explore => rsx! { ExplorePanel { datasets: datasets.read().clone() } }, Tab::Sql => rsx! { SqlPanel {} }, Tab::Ingest => rsx! { IngestPanel {} }, Tab::Status => rsx! { StatusPanel {} }, } } } } } // === ASK — Natural language → SQL → Results === #[component] fn AskPanel(datasets: Vec) -> Element { let mut question = use_signal(|| String::new()); let mut generated_sql = use_signal(|| None::); let mut result = use_signal(|| None::>); let mut thinking = use_signal(|| false); let mut step = use_signal(|| String::new()); let datasets_clone = datasets.clone(); let ask = move |_| { let q = question.read().clone(); if q.trim().is_empty() { return; } let ds = datasets_clone.clone(); spawn(async move { thinking.set(true); generated_sql.set(None); result.set(None); // Step 1: Get schema context step.set("reading schemas...".into()); let schema_ctx = get_schema_context(&ds).await; // Step 2: Generate SQL step.set("writing SQL...".into()); let prompt = format!( "You are a SQL assistant. You write Apache DataFusion SQL (PostgreSQL-compatible).\n\n\ CRITICAL: You MUST only use column names that appear in the schema below. Do NOT invent or guess column names.\n\n\ {schema_ctx}\n\ User question: {q}\n\n\ Rules:\n\ - ONLY use table and column names listed above\n\ - If unsure about a column name, pick the closest match from the schema\n\ - Output ONLY the SQL query. No markdown, no explanation, no backticks." ); match ai_generate(&prompt, 512).await { Ok(resp) => { let sql = clean_sql(&resp.text); generated_sql.set(Some(sql.clone())); // Step 3: Execute step.set("running query...".into()); let query_result = run_sql(&sql).await; // Step 3b: If schema error, retry with the error as feedback if let Err(ref err) = query_result { if err.contains("error") { step.set("fixing SQL...".into()); let retry_prompt = format!( "The SQL you wrote had an error:\n{err}\n\n\ {schema_ctx}\n\ Original question: {q}\n\n\ Write a CORRECTED SQL query using ONLY the columns listed in the schema. Output ONLY SQL." ); if let Ok(retry_resp) = ai_generate(&retry_prompt, 512).await { let retry_sql = clean_sql(&retry_resp.text); generated_sql.set(Some(retry_sql.clone())); step.set("running corrected query...".into()); let retry_result = run_sql(&retry_sql).await; result.set(Some(retry_result)); } else { result.set(Some(query_result)); } } else { result.set(Some(query_result)); } } else { result.set(Some(query_result)); } } Err(e) => { result.set(Some(Err(format!("AI error: {e}")))); } } step.set(String::new()); thinking.set(false); }); }; rsx! { div { class: "panel ask-panel", div { class: "ask-hero", h2 { "Ask your data anything" } p { class: "subtitle", "Natural language → SQL → Results. Powered by local AI." } } div { class: "ask-input-row", input { class: "ask-input", value: "{question}", placeholder: "e.g. Which department has the highest average salary?", oninput: move |e| question.set(e.value()), onkeydown: move |e| { if e.key() == Key::Enter { let q = question.read().clone(); if q.trim().is_empty() { return; } let ds = datasets.clone(); spawn(async move { thinking.set(true); generated_sql.set(None); result.set(None); step.set("reading schemas...".into()); let schema_ctx = get_schema_context(&ds).await; step.set("writing SQL...".into()); let prompt = format!( "You are a SQL assistant. You write Apache DataFusion SQL (PostgreSQL-compatible).\n\n\ CRITICAL: You MUST only use column names that appear in the schema below. Do NOT invent or guess column names.\n\n\ {schema_ctx}\n\ User question: {q}\n\n\ Rules:\n\ - ONLY use table and column names listed above\n\ - If unsure about a column name, pick the closest match from the schema\n\ - Output ONLY the SQL query. No markdown, no explanation, no backticks." ); match ai_generate(&prompt, 512).await { Ok(resp) => { let sql = clean_sql(&resp.text); generated_sql.set(Some(sql.clone())); step.set("running query...".into()); let query_result = run_sql(&sql).await; if let Err(ref err) = query_result { if err.contains("error") { step.set("fixing SQL...".into()); let retry_prompt = format!( "The SQL you wrote had an error:\n{err}\n\n{schema_ctx}\n\nOriginal question: {q}\n\nWrite a CORRECTED SQL query using ONLY the columns listed. Output ONLY SQL." ); if let Ok(rr) = ai_generate(&retry_prompt, 512).await { let rsql = clean_sql(&rr.text); generated_sql.set(Some(rsql.clone())); step.set("running corrected query...".into()); result.set(Some(run_sql(&rsql).await)); } else { result.set(Some(query_result)); } } else { result.set(Some(query_result)); } } else { result.set(Some(query_result)); } } Err(e) => { result.set(Some(Err(format!("AI error: {e}")))); } } step.set(String::new()); thinking.set(false); }); } }, } button { class: "btn btn-ask", disabled: *thinking.read(), onclick: ask, {if *thinking.read() { step.read().clone() } else { "Ask".to_string() }} } } div { class: "ask-examples", "Try: " button { class: "example-btn", onclick: move |_| question.set("Which department has the highest average salary?".into()), "highest avg salary by dept" } button { class: "example-btn", onclick: move |_| question.set("Show me the top 3 most expensive products".into()), "top 3 expensive products" } button { class: "example-btn", onclick: move |_| question.set("How many events per action type?".into()), "events by action" } button { class: "example-btn", onclick: move |_| question.set("List all employees who earn more than 90000".into()), "employees > 90k" } } if let Some(sql) = generated_sql.read().as_ref() { div { class: "generated-sql", span { class: "sql-label", "Generated SQL" } pre { class: "sql-code", "{sql}" } } } match result.read().as_ref() { None => rsx! {}, Some(Err(e)) => rsx! { div { class: "error", "{e}" } }, Some(Ok(resp)) => rsx! { ResultsTable { response: resp.clone() } }, } } } } // === EXPLORE — Dataset overview + AI summary === #[component] fn ExplorePanel(datasets: Vec) -> Element { let mut selected = use_signal(|| None::); let mut schema_info = use_signal(|| None::>); let mut preview = use_signal(|| None::>); let mut summary = use_signal(|| None::>); let mut loading = use_signal(|| false); let mut select_ds = move |name: String| { selected.set(Some(name.clone())); spawn(async move { loading.set(true); schema_info.set(None); preview.set(None); summary.set(None); // Get schema let desc = run_sql(&format!("DESCRIBE {name}")).await; schema_info.set(Some(desc)); // Get preview let prev = run_sql(&format!("SELECT * FROM {name} LIMIT 5")).await; // Generate AI summary if let Ok(ref data) = prev { let data_json = serde_json::to_string(&data.rows).unwrap_or_default(); let cols: Vec = data.columns.iter() .map(|c| format!("{} ({})", c.name, c.data_type)) .collect(); let prompt = format!( "You are a data analyst. Describe this dataset in 2-3 sentences. Be specific about what the data contains and what insights it could provide.\n\n\ Table: {name}\n\ Columns: {}\n\ Sample data (first 5 rows): {data_json}\n\n\ Description:", cols.join(", ") ); let summ = ai_generate(&prompt, 256).await.map(|r| r.text); summary.set(Some(summ)); } preview.set(Some(prev)); loading.set(false); }); }; rsx! { div { class: "panel explore-panel", div { class: "explore-grid", // Dataset cards div { class: "ds-cards", h3 { "Datasets" } if datasets.is_empty() { div { class: "empty", "No datasets registered" } } for ds in datasets.iter() { { let name = ds.name.clone(); let name2 = ds.name.clone(); let is_active = selected.read().as_ref() == Some(&ds.name); let obj_count = ds.objects.len(); let total_bytes: u64 = ds.objects.iter().map(|o| o.size_bytes).sum(); rsx! { div { class: if is_active { "ds-card active" } else { "ds-card" }, onclick: move |_| select_ds(name.clone()), div { class: "ds-card-name", "{name2}" } div { class: "ds-card-meta", "{obj_count} file(s) · {total_bytes} bytes" } } } } } } // Detail view div { class: "ds-detail", if *loading.read() { div { class: "loading", "analyzing dataset..." } } else if selected.read().is_none() { div { class: "empty", "select a dataset to explore" } } else { if let Some(ref name) = *selected.read() { h3 { "{name}" } } // AI Summary if let Some(result) = summary.read().as_ref() { div { class: "summary-box", span { class: "summary-label", "AI Summary" } match result { Ok(text) => rsx! { p { class: "summary-text", "{text}" } }, Err(e) => rsx! { p { class: "error-inline", "{e}" } }, } } } // Schema if let Some(Ok(schema)) = schema_info.read().as_ref() { div { class: "schema-box", span { class: "section-label", "Schema" } ResultsTable { response: schema.clone() } } } // Preview if let Some(Ok(prev)) = preview.read().as_ref() { div { class: "preview-box", span { class: "section-label", "Preview (5 rows)" } ResultsTable { response: prev.clone() } } } } } } } } } // === SQL — Raw SQL editor === #[component] fn SqlPanel() -> Element { let mut query_text = use_signal(|| String::from("SELECT * FROM employees LIMIT 10")); let mut result = use_signal(|| None::>); let mut loading = use_signal(|| false); let run = move |_| { let sql = query_text.read().clone(); if sql.trim().is_empty() { return; } spawn(async move { loading.set(true); result.set(Some(run_sql(&sql).await)); loading.set(false); }); }; rsx! { div { class: "panel", div { class: "sql-editor", textarea { class: "sql-textarea", value: "{query_text}", placeholder: "SELECT * FROM dataset LIMIT 100", oninput: move |e| query_text.set(e.value()), onkeydown: move |e| { if e.key() == Key::Enter && e.modifiers().ctrl() { let sql = query_text.read().clone(); if !sql.trim().is_empty() { spawn(async move { loading.set(true); result.set(Some(run_sql(&sql).await)); loading.set(false); }); } } }, } div { class: "sql-actions", button { class: "btn", disabled: *loading.read(), onclick: run, if *loading.read() { "running..." } else { "Run (Ctrl+Enter)" } } } } div { class: "results-area", match result.read().as_ref() { None => rsx! { div { class: "empty", "run a query to see results" } }, Some(Err(e)) => rsx! { div { class: "error", "{e}" } }, Some(Ok(resp)) => rsx! { ResultsTable { response: resp.clone() } }, } } } } } // === DASHBOARD — System overview === #[component] fn DashboardPanel() -> Element { let mut stats = use_signal(|| None::); let mut loading = use_signal(|| false); let do_load = move || { spawn(async move { loading.set(true); let client = reqwest::Client::new(); let datasets = fetch_datasets().await.ok().map(|d| d.len()).unwrap_or(0); let row_count = run_sql("SELECT (SELECT COUNT(*) FROM candidates) + (SELECT COUNT(*) FROM clients) + (SELECT COUNT(*) FROM job_orders) + (SELECT COUNT(*) FROM placements) + (SELECT COUNT(*) FROM timesheets) + (SELECT COUNT(*) FROM call_log) + (SELECT COUNT(*) FROM email_log) as total").await.ok().and_then(|r| r.rows.as_array()?.first()?.get("total")?.as_i64()).unwrap_or(0); let cache: Option = match client.get(&format!("{}/query/cache/stats", api_base())).send().await { Ok(r) => r.json().await.ok(), Err(_) => None, }; let indexes: Vec = match client.get(&format!("{}/vectors/indexes", api_base())).send().await { Ok(r) => r.json().await.unwrap_or_default(), Err(_) => vec![], }; let tools: Vec = match client.get(&format!("{}/tools", api_base())).send().await { Ok(r) => r.json().await.unwrap_or_default(), Err(_) => vec![], }; let jobs: Vec = match client.get(&format!("{}/vectors/jobs", api_base())).send().await { Ok(r) => r.json().await.unwrap_or_default(), Err(_) => vec![], }; let hnsw: Vec = match client.get(&format!("{}/vectors/hnsw/list", api_base())).send().await { Ok(r) => r.json().await.unwrap_or_default(), Err(_) => vec![], }; let journal: Option = match client.get(&format!("{}/journal/stats", api_base())).send().await { Ok(r) => r.json().await.ok(), Err(_) => None, }; stats.set(Some(serde_json::json!({ "datasets": datasets, "total_rows": row_count, "cache": cache, "vector_indexes": indexes.len(), "vector_total_chunks": indexes.iter().filter_map(|i| i.get("chunk_count")?.as_i64()).sum::(), "hnsw_loaded": hnsw.len(), "tools": tools.len(), "jobs_total": jobs.len(), "jobs_running": jobs.iter().filter(|j| j.get("status").and_then(|s| s.as_str()) == Some("running")).count(), "journal": journal, }))); loading.set(false); }); }; // Auto-load on mount // Auto-load on mount use_effect(move || { do_load(); }); rsx! { div { class: "panel", div { class: "dashboard-hero", h2 { "Lakehouse" } p { class: "subtitle", "Rust-first data platform — SQL + AI + RAG over object storage" } } if let Some(s) = stats.read().as_ref() { div { class: "stat-grid", div { class: "stat-card", div { class: "stat-value", "{s[\"datasets\"]}" } div { class: "stat-label", "Datasets" } } div { class: "stat-card", div { class: "stat-value", {format!("{:.1}M", s["total_rows"].as_i64().unwrap_or(0) as f64 / 1_000_000.0)} } div { class: "stat-label", "Total Rows" } } div { class: "stat-card", div { class: "stat-value", {format!("{}K", s["vector_total_chunks"].as_i64().unwrap_or(0) / 1000)} } div { class: "stat-label", "Embeddings" } } div { class: "stat-card accent", div { class: "stat-value", "{s[\"hnsw_loaded\"]}" } div { class: "stat-label", "HNSW Indexes (27ms)" } } div { class: "stat-card", div { class: "stat-value", "{s[\"tools\"]}" } div { class: "stat-label", "Governed Tools" } } div { class: "stat-card", div { class: "stat-value", { s.get("cache") .and_then(|c| c.get("datasets")) .and_then(|d| d.as_i64()) .map(|n| format!("{n}")) .unwrap_or("0".into()) } } div { class: "stat-label", "Cached Datasets" } } } div { class: "architecture-section", h3 { "Architecture" } div { class: "arch-grid", div { class: "arch-card", div { class: "arch-title", "Ingest" } div { class: "arch-items", "CSV, JSON, PDF, Text, PostgreSQL, File Watcher" } } div { class: "arch-card", div { class: "arch-title", "Storage" } div { class: "arch-items", "Parquet on Object Storage, Delta Writes, Compaction" } } div { class: "arch-card", div { class: "arch-title", "Query" } div { class: "arch-items", "DataFusion SQL, MemCache (9.8x), Hot/Cold" } } div { class: "arch-card", div { class: "arch-title", "AI" } div { class: "arch-items", "Ollama (local), Embed, Generate, RAG, HNSW" } } div { class: "arch-card", div { class: "arch-title", "Governance" } div { class: "arch-items", "Event Journal, PII Detection, Tool Registry, Access Control" } } div { class: "arch-card", div { class: "arch-title", "Agents" } div { class: "arch-items", "Workspaces, Handoff, Shortlists, Activity Logs" } } } } div { class: "phases-section", h3 { "Build Progression" } div { class: "phase-list", {rsx! { PhaseItem { num: "0-5", name: "Foundation", detail: "Storage, Catalog, DataFusion, AI, UI, gRPC" } PhaseItem { num: "6", name: "Ingest Pipeline", detail: "CSV/JSON/PDF/Text auto-schema" } PhaseItem { num: "7", name: "Vector + RAG", detail: "Embed, Search, LLM Answers" } PhaseItem { num: "8", name: "Hot Cache", detail: "9.8x speedup, Delta Writes" } PhaseItem { num: "8.5", name: "Agent Workspaces", detail: "Per-contract, Instant Handoff" } PhaseItem { num: "9", name: "Event Journal", detail: "Append-only Mutation History" } PhaseItem { num: "10", name: "Rich Catalog", detail: "PII Detection, Lineage" } PhaseItem { num: "11", name: "Embedding Versioning", detail: "Model-proof Vectors" } PhaseItem { num: "12", name: "Tool Registry", detail: "6 Governed Actions + Audit" } PhaseItem { num: "13", name: "Access Control", detail: "Role-based, Field-level" } PhaseItem { num: "14", name: "Schema Evolution", detail: "Diff Detection, AI Migration" } PhaseItem { num: "15", name: "HNSW Index", detail: "100K Search in 27ms" } PhaseItem { num: "16", name: "File Watcher", detail: "Auto-ingest from Inbox" } PhaseItem { num: "17", name: "DB Connector", detail: "PostgreSQL Import" } }} } } } else if *loading.read() { div { class: "loading", "loading dashboard..." } } button { class: "btn", onclick: move |_| do_load(), "Refresh" } } } } #[component] fn PhaseItem(num: String, name: String, detail: String) -> Element { rsx! { div { class: "phase-item", span { class: "phase-num", "{num}" } span { class: "phase-name", "{name}" } span { class: "phase-detail", "{detail}" } } } } // === INGEST — Data on-ramp === #[component] fn IngestPanel() -> Element { let mut pg_host = use_signal(|| "127.0.0.1".to_string()); let mut pg_db = use_signal(|| "knowledge_base".to_string()); let mut pg_tables = use_signal(|| None::>); let mut pg_result = use_signal(|| None::>); let mut pg_loading = use_signal(|| false); let list_tables = move |_| { let host = pg_host.read().clone(); let db = pg_db.read().clone(); spawn(async move { pg_loading.set(true); let client = reqwest::Client::new(); let resp = client.post(&format!("{}/ingest/postgres/tables", api_base())) .json(&serde_json::json!({"host": host, "port": 5432, "database": db, "user": "postgres", "password": ""})) .send().await; match resp { Ok(r) => { if let Ok(tables) = r.json::>().await { pg_tables.set(Some(tables)); } } Err(e) => pg_tables.set(None), } pg_loading.set(false); }); }; let mut import_table = move |table: String| { let host = pg_host.read().clone(); let db = pg_db.read().clone(); spawn(async move { pg_result.set(None); let client = reqwest::Client::new(); let resp = client.post(&format!("{}/ingest/postgres/import", api_base())) .json(&serde_json::json!({"host": host, "port": 5432, "database": db, "user": "postgres", "password": "", "table": table})) .send().await; match resp { Ok(r) => { match r.json::().await { Ok(v) => pg_result.set(Some(Ok(v))), Err(e) => pg_result.set(Some(Err(e.to_string()))), } } Err(e) => pg_result.set(Some(Err(e.to_string()))), } }); }; rsx! { div { class: "panel", h2 { "Data Ingest" } p { class: "subtitle", "Bring data in from files, databases, or the auto-watch inbox" } div { class: "panel-section", h3 { "File Upload" } p { class: "hint", "POST a file to /ingest/file — or drop it in ./inbox/ for auto-ingest" } div { class: "arch-card", div { class: "arch-title", "Supported Formats" } div { class: "arch-items", "CSV (auto-schema) | JSON (nested flattening) | PDF (text extraction) | Text/SMS" } } div { class: "arch-card", div { class: "arch-title", "Auto-Watch Inbox" } div { class: "arch-items", "Drop files in ./inbox/ → auto-detected → Parquet → queryable in <15s" } } } div { class: "panel-section", h3 { "PostgreSQL Import" } div { class: "form-row", label { "Host" } input { value: "{pg_host}", oninput: move |e| pg_host.set(e.value()) } } div { class: "form-row", label { "Database" } input { value: "{pg_db}", oninput: move |e| pg_db.set(e.value()) } } button { class: "btn", disabled: *pg_loading.read(), onclick: list_tables, "List Tables" } if let Some(tables) = pg_tables.read().as_ref() { div { class: "table-list", for table in tables.iter() { { let t = table.clone(); let t2 = table.clone(); rsx! { div { class: "table-item", span { "{t}" } button { class: "btn btn-sm", onclick: move |_| import_table(t2.clone()), "Import" } } } } } } } if let Some(result) = pg_result.read().as_ref() { match result { Ok(v) => rsx! { div { class: "result-box", pre { {serde_json::to_string_pretty(v).unwrap_or_default()} } } }, Err(e) => rsx! { div { class: "error", "{e}" } }, } } } } } } // === STATUS — System health === #[component] fn StatusPanel() -> Element { let mut results = use_signal(Vec::<(String, String, Result)>::new); let mut checking = use_signal(|| false); let run_checks = move |_| { spawn(async move { checking.set(true); results.set(vec![]); let mut checks = vec![]; let r = fetch_health("/health").await; checks.push(("Gateway".into(), "HTTP ingress".into(), r)); let r = fetch_health("/storage/health").await; checks.push(("Storage".into(), "Object store (Parquet files)".into(), r)); let r = fetch_health("/catalog/health").await; checks.push(("Catalog".into(), "Dataset registry".into(), r)); let r = fetch_health("/query/health").await; checks.push(("Query Engine".into(), "DataFusion SQL".into(), r)); let r = fetch_health("/ai/health").await; checks.push(("AI Bridge".into(), "Ollama sidecar".into(), r)); let r = fetch_datasets().await.map(|ds| format!("{} datasets registered", ds.len())); checks.push(("Catalog Data".into(), "Dataset count".into(), r)); let r = run_sql("SELECT 1 + 1 as result").await .map(|q| format!("1+1 = {} ({} row)", q.rows.as_array().and_then(|a| a.first()).and_then(|r| r.get("result")).map(|v| v.to_string()).unwrap_or("?".into()), q.row_count)); checks.push(("SQL Execution".into(), "DataFusion compute".into(), r)); let r = ai_embed(vec!["health check".into()]).await .map(|e| format!("{}d vector from {}", e.dimensions, e.model)); checks.push(("Embeddings".into(), "nomic-embed-text via Ollama".into(), r)); let r = ai_generate("Say OK", 8).await .map(|g| format!("\"{}\" via {}", g.text.trim(), g.model)); checks.push(("Generation".into(), "LLM via Ollama".into(), r)); results.set(checks); checking.set(false); }); }; rsx! { div { class: "panel", div { class: "status-hero", h2 { "System Status" } p { class: "subtitle", "Verify every layer: Rust gateway → object storage → DataFusion → Ollama" } button { class: "btn btn-ask", disabled: *checking.read(), onclick: run_checks, if *checking.read() { "checking..." } else { "Run All Checks" } } } if !results.read().is_empty() { div { class: "check-grid", for (name, desc, result) in results.read().iter() { { let name = name.clone(); let desc = desc.clone(); let (class, icon, text) = match result { Ok(msg) => ("check-card pass", "✓", msg.clone()), Err(e) => ("check-card fail", "✗", e.clone()), }; rsx! { div { class: "{class}", div { class: "check-icon", "{icon}" } div { class: "check-info", div { class: "check-name", "{name}" } div { class: "check-desc", "{desc}" } div { class: "check-msg", "{text}" } } } } } } } } } } } // === Results Table === #[component] fn ResultsTable(response: QueryResponse) -> Element { let rows = response.rows.as_array(); rsx! { div { class: "results-info", "{response.row_count} row(s) · {response.columns.len()} column(s)" } if response.row_count == 0 { div { class: "empty-sm", "no rows returned" } } else if let Some(rows) = rows { if rows.len() > 500 { div { class: "results-info", "Showing first 500 of {response.row_count} rows (use SQL tab with LIMIT for larger)" } } div { class: "table-wrap", table { thead { tr { for col in response.columns.iter() { th { title: "{col.data_type}", "{col.name}" } } } } tbody { for row in rows.iter().take(500) { tr { for col in response.columns.iter() { td { {format_cell(row.get(&col.name))} } } } } } } } } } } /// Clean AI-generated SQL: extract only the SQL query, strip everything else. fn clean_sql(raw: &str) -> String { let s = raw.trim(); // Strategy 1: If there's a ```sql...``` block, extract just that if let Some(start) = s.find("```sql") { let after = &s[start + 6..]; if let Some(end) = after.find("```") { return after[..end].trim().to_string(); } } if let Some(start) = s.find("```") { let after = &s[start + 3..]; if let Some(end) = after.find("```") { let inner = after[..end].trim(); // Skip leading "sql" keyword let inner = inner.strip_prefix("sql").map(|s| s.trim_start()).unwrap_or(inner); return inner.to_string(); } } // Strategy 2: Find the first SELECT/WITH/INSERT/UPDATE/DELETE statement let upper = s.to_uppercase(); for keyword in &["SELECT", "WITH", "INSERT", "UPDATE", "DELETE"] { if let Some(pos) = upper.find(keyword) { let sql_part = &s[pos..]; // Take up to the first semicolon (or end) let end = sql_part.find(';').map(|p| p + 1).unwrap_or(sql_part.len()); return sql_part[..end].trim().to_string(); } } // Strategy 3: Strip leading "sql" and clean up let mut result = s.to_string(); let lines: Vec<&str> = result.lines().collect(); if let Some(first) = lines.first() { if first.trim().eq_ignore_ascii_case("sql") { result = lines[1..].join("\n").trim().to_string(); } } result } fn format_cell(val: Option<&serde_json::Value>) -> String { match val { None | Some(serde_json::Value::Null) => "—".to_string(), Some(serde_json::Value::String(s)) => s.clone(), Some(serde_json::Value::Number(n)) => n.to_string(), Some(serde_json::Value::Bool(b)) => b.to_string(), Some(other) => other.to_string(), } }