use arrow::array::{ArrayRef, Int32Array, StringArray}; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; use std::sync::Arc; /// Extract text from a PDF file. /// /// Two-tier approach: /// 1. **Text extraction (lopdf)** — fast, works on digital/native PDFs. /// Returns one row per page: (source_file, page_number, text_content). /// 2. **OCR fallback (Tesseract)** — if text extraction yields zero /// pages (common with scanned documents, image-only PDFs, or PDFs /// exported from scanners without an OCR layer), we extract embedded /// images from the PDF and run Tesseract on each. Falls back /// gracefully if Tesseract isn't installed (returns an error /// directing the operator to install it). /// /// Schema is identical for both paths so downstream consumers (chunker, /// vectord, queryd) don't need to know how the text was produced. pub fn parse_pdf(content: &[u8], source_filename: &str) -> Result<(Arc, Vec), String> { let doc = lopdf::Document::load_mem(content) .map_err(|e| format!("PDF load error: {e}"))?; let pages = doc.get_pages(); let mut page_numbers: Vec = Vec::new(); let mut page_texts: Vec = Vec::new(); let mut sources: Vec = Vec::new(); for (&page_num, _) in pages.iter() { let text = doc.extract_text(&[page_num]).unwrap_or_default(); let text = text.trim().to_string(); if !text.is_empty() { page_numbers.push(page_num as i32); page_texts.push(text); sources.push(source_filename.to_string()); } } // Tier 2: OCR fallback for scanned pages. if page_numbers.is_empty() { tracing::info!( "no extractable text in '{}' — attempting OCR on {} pages", source_filename, pages.len(), ); let ocr_results = ocr_pdf_images(&doc, source_filename)?; if ocr_results.is_empty() { return Err( "PDF contains no extractable text and OCR produced no output. \ Ensure tesseract is installed (`apt install tesseract-ocr tesseract-ocr-eng`)." .into(), ); } for (pg, text) in ocr_results { page_numbers.push(pg); page_texts.push(text); sources.push(source_filename.to_string()); } tracing::info!( "OCR recovered {} pages from '{}'", page_numbers.len(), source_filename, ); } else { tracing::info!( "extracted {} pages with text from PDF '{}'", page_numbers.len(), source_filename, ); } let schema = Arc::new(Schema::new(vec![ Field::new("source_file", DataType::Utf8, false), Field::new("page_number", DataType::Int32, false), Field::new("text_content", DataType::Utf8, false), ])); let arrays: Vec = vec![ Arc::new(StringArray::from(sources)), Arc::new(Int32Array::from(page_numbers)), Arc::new(StringArray::from(page_texts)), ]; let batch = RecordBatch::try_new(schema.clone(), arrays) .map_err(|e| format!("RecordBatch error: {e}"))?; Ok((schema, vec![batch])) } /// Extract embedded images from the PDF, write each to a temp file, /// run Tesseract, collect the OCR'd text. Returns (page_number, text) /// pairs for pages that yielded non-empty output. /// /// Image extraction strategy: walk every page's resources looking for /// XObject streams with /Subtype /Image. Decode the raw stream bytes /// (lopdf handles FlateDecode/DCTDecode) and write them as-is to a /// temp file with the appropriate extension. Tesseract handles JPEG, /// PNG, TIFF, and BMP natively. fn ocr_pdf_images( doc: &lopdf::Document, source_filename: &str, ) -> Result, String> { if !tesseract_available() { return Err( "tesseract not found on PATH — install with `apt install tesseract-ocr tesseract-ocr-eng`" .into(), ); } let pages = doc.get_pages(); let mut results: Vec<(i32, String)> = Vec::new(); for (&page_num, &page_id) in &pages { let images = extract_page_images(doc, page_id); if images.is_empty() { continue; } // OCR each image on this page and concatenate the text. let mut page_text = String::new(); for (idx, img_bytes) in images.iter().enumerate() { match ocr_image_bytes(img_bytes, page_num, idx) { Ok(text) if !text.trim().is_empty() => { if !page_text.is_empty() { page_text.push('\n'); } page_text.push_str(text.trim()); } Ok(_) => {} // empty OCR — skip Err(e) => { tracing::warn!( "OCR failed on '{}' page {} image {}: {e}", source_filename, page_num, idx, ); } } } if !page_text.is_empty() { results.push((page_num as i32, page_text)); } } Ok(results) } /// Extract raw image bytes from a PDF page's XObject resources. /// Handles the common case: page /Resources → /XObject dict → streams /// with /Subtype /Image. fn extract_page_images( doc: &lopdf::Document, page_id: lopdf::ObjectId, ) -> Vec> { let mut images = Vec::new(); let page_obj = match doc.get_object(page_id) { Ok(o) => o, Err(_) => return images, }; let page_dict = match page_obj.as_dict() { Ok(d) => d, Err(_) => return images, }; // Resolve /Resources → /XObject let resources = page_dict .get(b"Resources") .ok() .and_then(|r| doc.dereference(r).ok()) .map(|(_, o)| o.clone()); let xobjects = resources .as_ref() .and_then(|r| r.as_dict().ok()) .and_then(|d| d.get(b"XObject").ok()) .and_then(|x| doc.dereference(x).ok()) .map(|(_, o)| o.clone()); let xobj_dict = match xobjects.as_ref().and_then(|x| x.as_dict().ok()) { Some(d) => d, None => return images, }; for (_name, obj_ref) in xobj_dict.iter() { let obj = match doc.dereference(obj_ref) { Ok((_, o)) => o.clone(), Err(_) => continue, }; let stream = match obj.as_stream() { Ok(s) => s, Err(_) => continue, }; // Check /Subtype == /Image let is_image = stream.dict.get(b"Subtype") .ok() .and_then(|s| s.as_name().ok()) .map(|n| n == b"Image") .unwrap_or(false); if !is_image { continue; } // Get decoded content — lopdf handles FlateDecode/DCTDecode. let content = match stream.decompressed_content() { Ok(c) => c, Err(_) => { // Try raw content as fallback (some DCT streams are stored raw). stream.content.clone() } }; if !content.is_empty() { images.push(content); } } images } /// Run Tesseract on raw image bytes. Writes to a temp file, shells out, /// reads the output. Returns the OCR'd text. fn ocr_image_bytes( img_bytes: &[u8], page_num: u32, img_idx: usize, ) -> Result { use std::io::Write; use std::process::Command; let tmp_dir = std::env::temp_dir(); // Detect image format from magic bytes to pick the right extension. let ext = if img_bytes.starts_with(&[0xFF, 0xD8]) { "jpg" } else if img_bytes.starts_with(&[0x89, 0x50, 0x4E, 0x47]) { "png" } else if img_bytes.starts_with(b"II") || img_bytes.starts_with(b"MM") { "tiff" } else { // Tesseract can often handle raw pixel data with the right options, // but writing as PNG is safer. For truly unknown formats, try PNG // wrapping; if Tesseract can't read it, it'll error cleanly. "png" }; let img_path = tmp_dir.join(format!("lh_ocr_p{page_num}_i{img_idx}.{ext}")); let out_base = tmp_dir.join(format!("lh_ocr_p{page_num}_i{img_idx}_out")); let out_path = tmp_dir.join(format!("lh_ocr_p{page_num}_i{img_idx}_out.txt")); // Write image bytes to temp file. let mut f = std::fs::File::create(&img_path) .map_err(|e| format!("create temp image: {e}"))?; f.write_all(img_bytes) .map_err(|e| format!("write temp image: {e}"))?; drop(f); // Run Tesseract. let status = Command::new("tesseract") .arg(&img_path) .arg(&out_base) .arg("--oem").arg("3") // LSTM + legacy combined .arg("--psm").arg("6") // assume uniform block of text .arg("-l").arg("eng") .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::null()) .status() .map_err(|e| format!("tesseract exec: {e}"))?; // Cleanup input regardless of outcome. let _ = std::fs::remove_file(&img_path); if !status.success() { let _ = std::fs::remove_file(&out_path); return Err(format!("tesseract exited with status {status}")); } let text = std::fs::read_to_string(&out_path).unwrap_or_default(); let _ = std::fs::remove_file(&out_path); Ok(text) } /// Quick check: is `tesseract` on PATH? fn tesseract_available() -> bool { std::process::Command::new("tesseract") .arg("--version") .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::null()) .status() .map(|s| s.success()) .unwrap_or(false) }