ai-robot-core/ai-service/app/services/document/excel_parser.py

240 lines
7.7 KiB
Python
Raw Normal View History

"""
Excel document parser implementation.
[AC-AISVC-35] Excel (.xlsx) parsing using openpyxl.
Extracts text content from Excel spreadsheets.
"""
import logging
from pathlib import Path
from typing import Any
from app.services.document.base import (
DocumentParseException,
DocumentParser,
ParseResult,
)
logger = logging.getLogger(__name__)
class ExcelParser(DocumentParser):
"""
Parser for Excel documents.
[AC-AISVC-35] Uses openpyxl for text extraction.
"""
def __init__(
self,
include_empty_cells: bool = False,
max_rows_per_sheet: int = 10000,
**kwargs: Any
):
self._include_empty_cells = include_empty_cells
self._max_rows_per_sheet = max_rows_per_sheet
self._extra_config = kwargs
self._openpyxl = None
def _get_openpyxl(self):
"""Lazy import of openpyxl."""
if self._openpyxl is None:
try:
import openpyxl
self._openpyxl = openpyxl
except ImportError:
raise DocumentParseException(
"openpyxl not installed. Install with: pip install openpyxl",
parser="excel"
)
return self._openpyxl
def parse(self, file_path: str | Path) -> ParseResult:
"""
Parse an Excel document and extract text content.
[AC-AISVC-35] Converts spreadsheet data to structured text.
"""
path = Path(file_path)
if not path.exists():
raise DocumentParseException(
f"File not found: {path}",
file_path=str(path),
parser="excel"
)
if not self.supports_extension(path.suffix):
raise DocumentParseException(
f"Unsupported file extension: {path.suffix}",
file_path=str(path),
parser="excel"
)
openpyxl = self._get_openpyxl()
try:
workbook = openpyxl.load_workbook(path, read_only=True, data_only=True)
text_parts = []
sheet_count = len(workbook.sheetnames)
total_rows = 0
for sheet_name in workbook.sheetnames:
sheet = workbook[sheet_name]
sheet_text_parts = []
row_count = 0
for row in sheet.iter_rows(max_row=self._max_rows_per_sheet):
row_values = []
has_content = False
for cell in row:
value = cell.value
if value is not None:
has_content = True
row_values.append(str(value))
elif self._include_empty_cells:
row_values.append("")
else:
row_values.append("")
if has_content or self._include_empty_cells:
sheet_text_parts.append(" | ".join(row_values))
row_count += 1
if sheet_text_parts:
text_parts.append(f"[Sheet: {sheet_name}]\n" + "\n".join(sheet_text_parts))
total_rows += row_count
workbook.close()
full_text = "\n\n".join(text_parts)
file_size = path.stat().st_size
logger.info(
f"Parsed Excel: {path.name}, sheets={sheet_count}, "
f"rows={total_rows}, chars={len(full_text)}, size={file_size}"
)
return ParseResult(
text=full_text,
source_path=str(path),
file_size=file_size,
metadata={
"format": "xlsx",
"sheet_count": sheet_count,
"total_rows": total_rows,
}
)
except DocumentParseException:
raise
except Exception as e:
raise DocumentParseException(
f"Failed to parse Excel document: {e}",
file_path=str(path),
parser="excel",
details={"error": str(e)}
)
def get_supported_extensions(self) -> list[str]:
"""Get supported file extensions."""
return [".xlsx", ".xls"]
class CSVParser(DocumentParser):
"""
Parser for CSV files.
[AC-AISVC-35] Uses Python's built-in csv module.
"""
def __init__(self, delimiter: str = ",", encoding: str = "utf-8", **kwargs: Any):
self._delimiter = delimiter
self._encoding = encoding
self._extra_config = kwargs
def parse(self, file_path: str | Path) -> ParseResult:
"""
Parse a CSV file and extract text content.
[AC-AISVC-35] Converts CSV data to structured text.
"""
import csv
path = Path(file_path)
if not path.exists():
raise DocumentParseException(
f"File not found: {path}",
file_path=str(path),
parser="csv"
)
try:
text_parts = []
row_count = 0
with open(path, "r", encoding=self._encoding, newline="") as f:
reader = csv.reader(f, delimiter=self._delimiter)
for row in reader:
text_parts.append(" | ".join(row))
row_count += 1
full_text = "\n".join(text_parts)
file_size = path.stat().st_size
logger.info(
f"Parsed CSV: {path.name}, rows={row_count}, "
f"chars={len(full_text)}, size={file_size}"
)
return ParseResult(
text=full_text,
source_path=str(path),
file_size=file_size,
metadata={
"format": "csv",
"row_count": row_count,
"delimiter": self._delimiter,
}
)
except UnicodeDecodeError:
try:
with open(path, "r", encoding="gbk", newline="") as f:
reader = csv.reader(f, delimiter=self._delimiter)
for row in reader:
text_parts.append(" | ".join(row))
row_count += 1
full_text = "\n".join(text_parts)
file_size = path.stat().st_size
return ParseResult(
text=full_text,
source_path=str(path),
file_size=file_size,
metadata={
"format": "csv",
"row_count": row_count,
"delimiter": self._delimiter,
"encoding": "gbk",
}
)
except Exception as e:
raise DocumentParseException(
f"Failed to parse CSV with encoding fallback: {e}",
file_path=str(path),
parser="csv",
details={"error": str(e)}
)
except Exception as e:
raise DocumentParseException(
f"Failed to parse CSV: {e}",
file_path=str(path),
parser="csv",
details={"error": str(e)}
)
def get_supported_extensions(self) -> list[str]:
"""Get supported file extensions."""
return [".csv"]