""" Excel document parser implementation. [AC-AISVC-35] Excel (.xlsx) parsing using openpyxl. Extracts text content from Excel spreadsheets. """ import logging from pathlib import Path from typing import Any from app.services.document.base import ( DocumentParseException, DocumentParser, ParseResult, ) logger = logging.getLogger(__name__) class ExcelParser(DocumentParser): """ Parser for Excel documents. [AC-AISVC-35] Uses openpyxl for text extraction. """ def __init__( self, include_empty_cells: bool = False, max_rows_per_sheet: int = 10000, **kwargs: Any ): self._include_empty_cells = include_empty_cells self._max_rows_per_sheet = max_rows_per_sheet self._extra_config = kwargs self._openpyxl = None def _get_openpyxl(self): """Lazy import of openpyxl.""" if self._openpyxl is None: try: import openpyxl self._openpyxl = openpyxl except ImportError: raise DocumentParseException( "openpyxl not installed. Install with: pip install openpyxl", parser="excel" ) return self._openpyxl def parse(self, file_path: str | Path) -> ParseResult: """ Parse an Excel document and extract text content. [AC-AISVC-35] Converts spreadsheet data to structured text. """ path = Path(file_path) if not path.exists(): raise DocumentParseException( f"File not found: {path}", file_path=str(path), parser="excel" ) if not self.supports_extension(path.suffix): raise DocumentParseException( f"Unsupported file extension: {path.suffix}", file_path=str(path), parser="excel" ) openpyxl = self._get_openpyxl() try: workbook = openpyxl.load_workbook(path, read_only=True, data_only=True) text_parts = [] sheet_count = len(workbook.sheetnames) total_rows = 0 for sheet_name in workbook.sheetnames: sheet = workbook[sheet_name] sheet_text_parts = [] row_count = 0 for row in sheet.iter_rows(max_row=self._max_rows_per_sheet): row_values = [] has_content = False for cell in row: value = cell.value if value is not None: has_content = True row_values.append(str(value)) elif self._include_empty_cells: row_values.append("") else: row_values.append("") if has_content or self._include_empty_cells: sheet_text_parts.append(" | ".join(row_values)) row_count += 1 if sheet_text_parts: text_parts.append(f"[Sheet: {sheet_name}]\n" + "\n".join(sheet_text_parts)) total_rows += row_count workbook.close() full_text = "\n\n".join(text_parts) file_size = path.stat().st_size logger.info( f"Parsed Excel: {path.name}, sheets={sheet_count}, " f"rows={total_rows}, chars={len(full_text)}, size={file_size}" ) return ParseResult( text=full_text, source_path=str(path), file_size=file_size, metadata={ "format": "xlsx", "sheet_count": sheet_count, "total_rows": total_rows, } ) except DocumentParseException: raise except Exception as e: raise DocumentParseException( f"Failed to parse Excel document: {e}", file_path=str(path), parser="excel", details={"error": str(e)} ) def get_supported_extensions(self) -> list[str]: """Get supported file extensions.""" return [".xlsx", ".xls"] class CSVParser(DocumentParser): """ Parser for CSV files. [AC-AISVC-35] Uses Python's built-in csv module. """ def __init__(self, delimiter: str = ",", encoding: str = "utf-8", **kwargs: Any): self._delimiter = delimiter self._encoding = encoding self._extra_config = kwargs def parse(self, file_path: str | Path) -> ParseResult: """ Parse a CSV file and extract text content. [AC-AISVC-35] Converts CSV data to structured text. """ import csv path = Path(file_path) if not path.exists(): raise DocumentParseException( f"File not found: {path}", file_path=str(path), parser="csv" ) try: text_parts = [] row_count = 0 with open(path, "r", encoding=self._encoding, newline="") as f: reader = csv.reader(f, delimiter=self._delimiter) for row in reader: text_parts.append(" | ".join(row)) row_count += 1 full_text = "\n".join(text_parts) file_size = path.stat().st_size logger.info( f"Parsed CSV: {path.name}, rows={row_count}, " f"chars={len(full_text)}, size={file_size}" ) return ParseResult( text=full_text, source_path=str(path), file_size=file_size, metadata={ "format": "csv", "row_count": row_count, "delimiter": self._delimiter, } ) except UnicodeDecodeError: try: with open(path, "r", encoding="gbk", newline="") as f: reader = csv.reader(f, delimiter=self._delimiter) for row in reader: text_parts.append(" | ".join(row)) row_count += 1 full_text = "\n".join(text_parts) file_size = path.stat().st_size return ParseResult( text=full_text, source_path=str(path), file_size=file_size, metadata={ "format": "csv", "row_count": row_count, "delimiter": self._delimiter, "encoding": "gbk", } ) except Exception as e: raise DocumentParseException( f"Failed to parse CSV with encoding fallback: {e}", file_path=str(path), parser="csv", details={"error": str(e)} ) except Exception as e: raise DocumentParseException( f"Failed to parse CSV: {e}", file_path=str(path), parser="csv", details={"error": str(e)} ) def get_supported_extensions(self) -> list[str]: """Get supported file extensions.""" return [".csv"]