""" Image parser using multimodal LLM. Supports parsing images into structured text content for knowledge base indexing. """ import asyncio import base64 import json import logging from dataclasses import dataclass, field from pathlib import Path from typing import Any from app.services.document.base import ( DocumentParseException, DocumentParser, PageText, ParseResult, ) from app.services.llm.factory import get_llm_config_manager logger = logging.getLogger(__name__) IMAGE_SYSTEM_PROMPT = """你是一个专业的图像内容分析助手。你的任务是分析图片内容,并将其智能拆分为适合知识库检索的独立数据块。 ## 分析要求 1. 仔细分析图片内容,识别其中的文字、图表、数据等信息 2. 根据内容的逻辑结构,智能判断如何拆分为独立的知识条目 3. 每个条目应该是独立、完整、可检索的知识单元 ## 输出格式 请严格按照以下 JSON 格式输出,不要添加任何其他内容: ```json { "image_summary": "图片整体概述(一句话描述图片主题)", "total_chunks": <分块总数>, "chunks": [ { "chunk_index": 0, "content": "该分块的完整内容文字", "chunk_type": "text|table|list|diagram|chart|mixed", "keywords": ["关键词1", "关键词2"] } ] } ``` ## 分块策略 - **单一内容**: 如果图片只有一段完整的文字/信息,可以只输出1个分块 - **多段落内容**: 按段落或逻辑单元拆分,每个段落作为独立分块 - **表格数据**: 将表格内容转换为结构化文字,作为一个分块 - **图表数据**: 描述图表内容和数据,作为一个分块 - **列表内容**: 每个列表项可作为独立分块,或合并为相关的一组 - **混合内容**: 根据内容类型分别处理,确保每个分块主题明确 ## 注意事项 1. 每个分块的 content 必须是完整、可独立理解的文字 2. chunk_type 用于标识内容类型,便于后续处理 3. keywords 提取该分块的核心关键词,便于检索 4. 确保输出的 JSON 格式正确,可以被解析""" IMAGE_USER_PROMPT = "请分析这张图片,按照要求的 JSON 格式输出分块结果。" @dataclass class ImageChunk: """智能分块结果""" chunk_index: int content: str chunk_type: str = "text" keywords: list[str] = field(default_factory=list) @dataclass class ImageParseResult: """图片解析结果(包含智能分块)""" image_summary: str chunks: list[ImageChunk] raw_text: str source_path: str file_size: int metadata: dict[str, Any] = field(default_factory=dict) class ImageParser(DocumentParser): """ Image parser using multimodal LLM. Supports common image formats and extracts text content using vision-capable LLM models (GPT-4V, GPT-4o, etc.). Features: - Intelligent chunking based on content structure - Structured output with keywords and chunk types - Support for various content types (text, table, chart, etc.) """ SUPPORTED_EXTENSIONS = [ ".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff", ".tif" ] def __init__( self, model: str | None = None, max_tokens: int = 4096, timeout_seconds: int = 120, ): self._model = model self._max_tokens = max_tokens self._timeout_seconds = timeout_seconds def parse(self, file_path: str | Path) -> ParseResult: """ Parse an image file and extract text content using multimodal LLM. Note: This method is synchronous but internally uses async operations. For async contexts, use parse_async() instead. Args: file_path: Path to the image file. Returns: ParseResult with extracted text content. Raises: DocumentParseException: If parsing fails. """ path = Path(file_path) if not path.exists(): raise DocumentParseException( f"Image file not found: {file_path}", file_path=str(path), parser="image", ) file_size = path.stat().st_size extension = path.suffix.lower() if extension not in self.SUPPORTED_EXTENSIONS: raise DocumentParseException( f"Unsupported image format: {extension}", file_path=str(path), parser="image", details={"supported_formats": self.SUPPORTED_EXTENSIONS}, ) try: with open(path, "rb") as f: image_data = f.read() image_base64 = base64.b64encode(image_data).decode("utf-8") mime_type = self._get_mime_type(extension) try: loop = asyncio.get_running_loop() import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit( asyncio.run, self._analyze_image_async(image_base64, mime_type) ) result = future.result() except RuntimeError: result = asyncio.run(self._analyze_image_async(image_base64, mime_type)) logger.info( f"[IMAGE-PARSER] Successfully parsed image: {path.name}, " f"size={file_size}, chunks={len(result.chunks)}" ) return ParseResult( text=result.raw_text, source_path=str(path), file_size=file_size, page_count=1, metadata={ "format": extension, "parser": "image", "mime_type": mime_type, "image_summary": result.image_summary, "chunk_count": len(result.chunks), "chunks": [ { "chunk_index": c.chunk_index, "content": c.content, "chunk_type": c.chunk_type, "keywords": c.keywords, } for c in result.chunks ], }, pages=[PageText(page=1, text=result.raw_text)], ) except Exception as e: logger.error(f"[IMAGE-PARSER] Failed to parse image {path}: {e}") raise DocumentParseException( f"Failed to parse image: {str(e)}", file_path=str(path), parser="image", details={"error": str(e)}, ) async def parse_async(self, file_path: str | Path) -> ParseResult: """ Async version of parse method for use in async contexts. Args: file_path: Path to the image file. Returns: ParseResult with extracted text content. """ path = Path(file_path) if not path.exists(): raise DocumentParseException( f"Image file not found: {file_path}", file_path=str(path), parser="image", ) file_size = path.stat().st_size extension = path.suffix.lower() if extension not in self.SUPPORTED_EXTENSIONS: raise DocumentParseException( f"Unsupported image format: {extension}", file_path=str(path), parser="image", details={"supported_formats": self.SUPPORTED_EXTENSIONS}, ) try: with open(path, "rb") as f: image_data = f.read() image_base64 = base64.b64encode(image_data).decode("utf-8") mime_type = self._get_mime_type(extension) result = await self._analyze_image_async(image_base64, mime_type) logger.info( f"[IMAGE-PARSER] Successfully parsed image (async): {path.name}, " f"size={file_size}, chunks={len(result.chunks)}" ) return ParseResult( text=result.raw_text, source_path=str(path), file_size=file_size, page_count=1, metadata={ "format": extension, "parser": "image", "mime_type": mime_type, "image_summary": result.image_summary, "chunk_count": len(result.chunks), "chunks": [ { "chunk_index": c.chunk_index, "content": c.content, "chunk_type": c.chunk_type, "keywords": c.keywords, } for c in result.chunks ], }, pages=[PageText(page=1, text=result.raw_text)], ) except Exception as e: logger.error(f"[IMAGE-PARSER] Failed to parse image {path}: {e}") raise DocumentParseException( f"Failed to parse image: {str(e)}", file_path=str(path), parser="image", details={"error": str(e)}, ) async def parse_with_chunks(self, file_path: str | Path) -> ImageParseResult: """ Parse image and return structured result with intelligent chunks. Args: file_path: Path to the image file. Returns: ImageParseResult with intelligent chunks. """ path = Path(file_path) if not path.exists(): raise DocumentParseException( f"Image file not found: {file_path}", file_path=str(path), parser="image", ) file_size = path.stat().st_size extension = path.suffix.lower() if extension not in self.SUPPORTED_EXTENSIONS: raise DocumentParseException( f"Unsupported image format: {extension}", file_path=str(path), parser="image", details={"supported_formats": self.SUPPORTED_EXTENSIONS}, ) with open(path, "rb") as f: image_data = f.read() image_base64 = base64.b64encode(image_data).decode("utf-8") mime_type = self._get_mime_type(extension) result = await self._analyze_image_async(image_base64, mime_type) result.source_path = str(path) result.file_size = file_size result.metadata = { "format": extension, "parser": "image", "mime_type": mime_type, } return result async def _analyze_image_async(self, image_base64: str, mime_type: str) -> ImageParseResult: """ Analyze image using multimodal LLM and return structured chunks. Args: image_base64: Base64 encoded image data. mime_type: MIME type of the image. Returns: ImageParseResult with intelligent chunks. """ try: manager = get_llm_config_manager() client = manager.get_kb_processing_client() config = manager.kb_processing_config model = self._model or config.get("model", "gpt-4o-mini") messages = [ { "role": "system", "content": IMAGE_SYSTEM_PROMPT, }, { "role": "user", "content": [ { "type": "text", "text": IMAGE_USER_PROMPT, }, { "type": "image_url", "image_url": { "url": f"data:{mime_type};base64,{image_base64}", }, }, ], }, ] from app.services.llm.base import LLMConfig llm_config = LLMConfig( model=model, max_tokens=self._max_tokens, temperature=0.3, timeout_seconds=self._timeout_seconds, ) response = await client.generate(messages=messages, config=llm_config) if not response.content: raise DocumentParseException( "LLM returned empty response for image analysis", parser="image", ) return self._parse_llm_response(response.content) except Exception as e: logger.error(f"[IMAGE-PARSER] LLM analysis failed: {e}") raise def _parse_llm_response(self, response_content: str) -> ImageParseResult: """ Parse LLM response into structured ImageParseResult. Args: response_content: Raw LLM response content. Returns: ImageParseResult with parsed chunks. """ try: json_str = self._extract_json(response_content) data = json.loads(json_str) image_summary = data.get("image_summary", "") chunks_data = data.get("chunks", []) chunks = [] for chunk_data in chunks_data: chunk = ImageChunk( chunk_index=chunk_data.get("chunk_index", len(chunks)), content=chunk_data.get("content", ""), chunk_type=chunk_data.get("chunk_type", "text"), keywords=chunk_data.get("keywords", []), ) if chunk.content.strip(): chunks.append(chunk) if not chunks: chunks.append(ImageChunk( chunk_index=0, content=response_content, chunk_type="text", keywords=[], )) raw_text = "\n\n".join([c.content for c in chunks]) return ImageParseResult( image_summary=image_summary, chunks=chunks, raw_text=raw_text, source_path="", file_size=0, ) except json.JSONDecodeError as e: logger.warning(f"[IMAGE-PARSER] Failed to parse JSON response: {e}, using fallback") return ImageParseResult( image_summary="图片内容", chunks=[ImageChunk( chunk_index=0, content=response_content, chunk_type="text", keywords=[], )], raw_text=response_content, source_path="", file_size=0, ) def _extract_json(self, content: str) -> str: """ Extract JSON from LLM response content. Args: content: Raw response content that may contain JSON. Returns: Extracted JSON string. """ content = content.strip() if content.startswith("{") and content.endswith("}"): return content json_start = content.find("{") json_end = content.rfind("}") if json_start != -1 and json_end != -1 and json_end > json_start: return content[json_start:json_end + 1] return content def _get_mime_type(self, extension: str) -> str: """Get MIME type for image extension.""" mime_types = { ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".gif": "image/gif", ".webp": "image/webp", ".bmp": "image/bmp", ".tiff": "image/tiff", ".tif": "image/tiff", } return mime_types.get(extension.lower(), "image/jpeg") def get_supported_extensions(self) -> list[str]: """Get list of supported image extensions.""" return ImageParser.SUPPORTED_EXTENSIONS