/// Schema evolution: detect changes between ingests and suggest migration rules. /// When a source changes format (columns renamed, added, removed, type changed), /// the system detects the diff and can auto-map using AI or heuristic matching. use arrow::datatypes::{DataType, Schema, SchemaRef}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; /// A detected change between two schema versions. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SchemaChange { pub change_type: ChangeType, pub column: String, pub detail: String, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(rename_all = "snake_case")] pub enum ChangeType { Added, // new column in incoming schema Removed, // column in old schema not in new TypeChanged, // same name, different type Renamed, // likely renamed (fuzzy match) } /// A migration rule — how to map old schema to new. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MigrationRule { pub source_column: String, pub target_column: String, pub transform: String, // "direct", "cast", "concat", "split", "drop" pub confidence: f32, // 0.0 - 1.0 pub auto_generated: bool, } /// Compare two schemas and detect changes. pub fn diff_schemas(old: &SchemaRef, new: &SchemaRef) -> Vec { let mut changes = Vec::new(); let old_fields: HashMap<&str, &DataType> = old.fields().iter() .map(|f| (f.name().as_str(), f.data_type())) .collect(); let new_fields: HashMap<&str, &DataType> = new.fields().iter() .map(|f| (f.name().as_str(), f.data_type())) .collect(); // Check for removed columns for (name, _dt) in &old_fields { if !new_fields.contains_key(name) { // Check if it was renamed (fuzzy match) let possible_rename = find_similar_column(name, &new_fields, &old_fields); if let Some((new_name, confidence)) = possible_rename { changes.push(SchemaChange { change_type: ChangeType::Renamed, column: name.to_string(), detail: format!("likely renamed to '{}' (confidence: {:.0}%)", new_name, confidence * 100.0), }); } else { changes.push(SchemaChange { change_type: ChangeType::Removed, column: name.to_string(), detail: "column no longer present in incoming data".into(), }); } } } // Check for added or type-changed columns for (name, new_dt) in &new_fields { match old_fields.get(name) { None => { // Skip if we already flagged this as a rename target let is_rename_target = changes.iter().any(|c| { c.change_type == ChangeType::Renamed && c.detail.contains(name) }); if !is_rename_target { changes.push(SchemaChange { change_type: ChangeType::Added, column: name.to_string(), detail: format!("new column (type: {})", new_dt), }); } } Some(old_dt) => { if old_dt != new_dt { changes.push(SchemaChange { change_type: ChangeType::TypeChanged, column: name.to_string(), detail: format!("{} → {}", old_dt, new_dt), }); } } } } changes } /// Generate migration rules from detected changes. pub fn generate_rules(changes: &[SchemaChange]) -> Vec { changes.iter().filter_map(|change| { match change.change_type { ChangeType::Renamed => { // Extract the new name from the detail string let new_name = change.detail .split('\'') .nth(1) .unwrap_or(&change.column); Some(MigrationRule { source_column: change.column.clone(), target_column: new_name.to_string(), transform: "direct".into(), confidence: 0.8, auto_generated: true, }) } ChangeType::TypeChanged => { Some(MigrationRule { source_column: change.column.clone(), target_column: change.column.clone(), transform: "cast".into(), confidence: 0.9, auto_generated: true, }) } ChangeType::Added => { Some(MigrationRule { source_column: String::new(), target_column: change.column.clone(), transform: "new_column".into(), confidence: 1.0, auto_generated: true, }) } ChangeType::Removed => { Some(MigrationRule { source_column: change.column.clone(), target_column: String::new(), transform: "drop".into(), confidence: 0.7, auto_generated: true, }) } } }).collect() } /// Build an AI prompt to suggest migration rules for complex cases. pub fn build_migration_prompt( old_schema: &SchemaRef, new_schema: &SchemaRef, changes: &[SchemaChange], ) -> String { let old_cols: Vec = old_schema.fields().iter() .map(|f| format!(" {} ({})", f.name(), f.data_type())) .collect(); let new_cols: Vec = new_schema.fields().iter() .map(|f| format!(" {} ({})", f.name(), f.data_type())) .collect(); let change_list: Vec = changes.iter() .map(|c| format!(" [{:?}] {} — {}", c.change_type, c.column, c.detail)) .collect(); format!( "You are a data migration expert. A data source has changed its schema.\n\n\ OLD SCHEMA:\n{}\n\n\ NEW SCHEMA:\n{}\n\n\ DETECTED CHANGES:\n{}\n\n\ For each change, suggest a migration rule as JSON:\n\ {{\"source_column\": \"old_name\", \"target_column\": \"new_name\", \"transform\": \"direct|cast|concat|split|drop\", \"confidence\": 0.0-1.0}}\n\n\ Output ONLY a JSON array of rules, nothing else.", old_cols.join("\n"), new_cols.join("\n"), change_list.join("\n"), ) } /// Fuzzy match: find a column in new schema that's similar to a removed old column. fn find_similar_column<'a>( old_name: &str, new_fields: &HashMap<&'a str, &DataType>, old_fields: &HashMap<&str, &DataType>, ) -> Option<(String, f32)> { let old_lower = old_name.to_lowercase(); let old_parts: Vec<&str> = old_lower.split('_').collect(); let mut best_match: Option<(String, f32)> = None; for (new_name, _) in new_fields { // Skip columns that already exist in old schema (not renamed) if old_fields.contains_key(new_name) { continue; } let new_lower = new_name.to_lowercase(); let new_parts: Vec<&str> = new_lower.split('_').collect(); // Exact substring match if old_lower.contains(&new_lower) || new_lower.contains(&old_lower) { let confidence = 0.85; if best_match.as_ref().map_or(true, |(_, c)| confidence > *c) { best_match = Some((new_name.to_string(), confidence)); } continue; } // Shared word parts (e.g., "first_name" → "fname", "full_name") let shared: usize = old_parts.iter() .filter(|p| new_parts.iter().any(|np| np.contains(*p) || p.contains(np))) .count(); if shared > 0 { let confidence = shared as f32 / old_parts.len().max(new_parts.len()) as f32 * 0.75; if confidence >= 0.3 && best_match.as_ref().map_or(true, |(_, c)| confidence > *c) { best_match = Some((new_name.to_string(), confidence)); } } } best_match } #[cfg(test)] mod tests { use super::*; use arrow::datatypes::Field; fn schema(fields: Vec<(&str, DataType)>) -> SchemaRef { Arc::new(Schema::new( fields.into_iter().map(|(n, t)| Field::new(n, t, true)).collect::>() )) } #[test] fn detect_added_column() { let old = schema(vec![("id", DataType::Int64), ("name", DataType::Utf8)]); let new = schema(vec![("id", DataType::Int64), ("name", DataType::Utf8), ("email", DataType::Utf8)]); let changes = diff_schemas(&old, &new); assert_eq!(changes.len(), 1); assert_eq!(changes[0].change_type, ChangeType::Added); assert_eq!(changes[0].column, "email"); } #[test] fn detect_removed_column() { let old = schema(vec![("id", DataType::Int64), ("name", DataType::Utf8), ("age", DataType::Int64)]); let new = schema(vec![("id", DataType::Int64), ("name", DataType::Utf8)]); let changes = diff_schemas(&old, &new); assert!(changes.iter().any(|c| c.column == "age" && c.change_type == ChangeType::Removed)); } #[test] fn detect_type_change() { let old = schema(vec![("id", DataType::Int64), ("score", DataType::Int64)]); let new = schema(vec![("id", DataType::Int64), ("score", DataType::Float64)]); let changes = diff_schemas(&old, &new); assert_eq!(changes.len(), 1); assert_eq!(changes[0].change_type, ChangeType::TypeChanged); } #[test] fn detect_rename() { let old = schema(vec![("first_name", DataType::Utf8), ("last_name", DataType::Utf8)]); let new = schema(vec![("full_name", DataType::Utf8)]); let changes = diff_schemas(&old, &new); // Should detect first_name → full_name as a potential rename assert!(changes.iter().any(|c| c.change_type == ChangeType::Renamed)); } #[test] fn generate_rules_from_changes() { let old = schema(vec![("id", DataType::Int64), ("name", DataType::Utf8)]); let new = schema(vec![("id", DataType::Int64), ("name", DataType::Utf8), ("email", DataType::Utf8)]); let changes = diff_schemas(&old, &new); let rules = generate_rules(&changes); assert_eq!(rules.len(), 1); assert_eq!(rules[0].transform, "new_column"); } }