From e9de8089699bb2bcfc22f9f36050e52b00cfa1d4 Mon Sep 17 00:00:00 2001 From: MerCry Date: Wed, 11 Mar 2026 18:57:27 +0800 Subject: [PATCH] =?UTF-8?q?[AC-KB-ENHANCE]=20feat(kb):=20=E5=A2=9E?= =?UTF-8?q?=E5=BC=BA=20KB=20=E5=90=91=E9=87=8F=E6=97=A5=E5=BF=97=E5=92=8C?= =?UTF-8?q?=E5=85=83=E6=95=B0=E6=8D=AE=E8=BF=87=E6=BB=A4=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 KB 向量日志配置项 kb_vector_log_enabled 和 kb_vector_log_path - 新增 KB 向量日志记录器支持滚动日志文件 - 增强 Qdrant 元数据过滤支持操作符格式 (\, \) - 支持 MatchAny 实现多值匹配 - 新增图片文件索引支持 --- ai-service/app/api/admin/kb.py | 136 ++++++++++++++++++++++++++- ai-service/app/core/config.py | 3 + ai-service/app/core/qdrant_client.py | 44 +++++++-- 3 files changed, 174 insertions(+), 9 deletions(-) diff --git a/ai-service/app/api/admin/kb.py b/ai-service/app/api/admin/kb.py index 4943fcb..ec1a00e 100644 --- a/ai-service/app/api/admin/kb.py +++ b/ai-service/app/api/admin/kb.py @@ -10,6 +10,7 @@ import json import hashlib from dataclasses import dataclass from typing import Annotated, Any, Optional +from logging.handlers import RotatingFileHandler import tiktoken from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, HTTPException, Query, UploadFile @@ -38,6 +39,20 @@ from app.services.metadata_field_definition_service import MetadataFieldDefiniti logger = logging.getLogger(__name__) +settings = get_settings() +kb_vector_logger = logging.getLogger("kb_vector_payload") +if settings.kb_vector_log_enabled and not kb_vector_logger.handlers: + handler = RotatingFileHandler( + filename=settings.kb_vector_log_path, + maxBytes=10 * 1024 * 1024, + backupCount=5, + encoding="utf-8", + ) + handler.setFormatter(logging.Formatter("%(asctime)s - %(message)s")) + kb_vector_logger.addHandler(handler) + kb_vector_logger.setLevel(logging.INFO) + kb_vector_logger.propagate = False + router = APIRouter(prefix="/admin/kb", tags=["KB Management"]) @@ -661,6 +676,7 @@ async def _index_document( logger.info(f"[INDEX] File extension: {file_ext}, content size: {len(content)} bytes") text_extensions = {".txt", ".md", ".markdown", ".rst", ".log", ".json", ".xml", ".yaml", ".yml"} + image_extensions = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff", ".tif"} if file_ext in text_extensions or not file_ext: logger.info("[INDEX] Treating as text file, trying multiple encodings") @@ -676,6 +692,44 @@ async def _index_document( if text is None: text = content.decode("utf-8", errors="replace") logger.warning("[INDEX] Failed to decode with known encodings, using utf-8 with replacement") + elif file_ext in image_extensions: + logger.info("[INDEX] Image file detected, will parse with multimodal LLM") + await kb_service.update_job_status( + tenant_id, job_id, IndexJobStatus.PROCESSING.value, progress=15 + ) + await session.commit() + + with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp_file: + tmp_file.write(content) + tmp_path = tmp_file.name + + logger.info(f"[INDEX] Temp file created: {tmp_path}") + + try: + from app.services.document.image_parser import ImageParser + logger.info(f"[INDEX] Starting image parsing for {file_ext}...") + image_parser = ImageParser() + image_result = await image_parser.parse_with_chunks(tmp_path) + text = image_result.raw_text + parse_result = type('ParseResult', (), { + 'text': text, + 'metadata': image_result.metadata, + 'pages': None, + 'image_chunks': image_result.chunks, + 'image_summary': image_result.image_summary, + })() + logger.info( + f"[INDEX] Parsed image SUCCESS: {filename}, " + f"chars={len(text)}, chunks={len(image_result.chunks)}, " + f"summary={image_result.image_summary[:50] if image_result.image_summary else 'N/A'}..." + ) + except Exception as e: + logger.error(f"[INDEX] Image parsing error: {type(e).__name__}: {e}") + text = "" + parse_result = None + finally: + Path(tmp_path).unlink(missing_ok=True) + logger.info("[INDEX] Temp file cleaned up") else: logger.info("[INDEX] Binary file detected, will parse with document parser") await kb_service.update_job_status( @@ -723,13 +777,64 @@ async def _index_document( ) await session.commit() + from app.services.metadata_auto_inference_service import MetadataAutoInferenceService + + inference_service = MetadataAutoInferenceService(session) + + image_base64_for_inference = None + mime_type_for_inference = None + if file_ext in image_extensions: + import base64 + image_base64_for_inference = base64.b64encode(content).decode("utf-8") + mime_type_map = { + ".jpg": "image/jpeg", ".jpeg": "image/jpeg", + ".png": "image/png", ".gif": "image/gif", + ".webp": "image/webp", ".bmp": "image/bmp", + ".tiff": "image/tiff", ".tif": "image/tiff", + } + mime_type_for_inference = mime_type_map.get(file_ext, "image/jpeg") + + logger.info("[INDEX] Starting metadata auto-inference...") + inference_result = await inference_service.infer_metadata( + tenant_id=tenant_id, + content=text or "", + scope="kb_document", + existing_metadata=metadata, + image_base64=image_base64_for_inference, + mime_type=mime_type_for_inference, + ) + + if inference_result.success: + metadata = inference_result.inferred_metadata + logger.info( + f"[INDEX] Metadata inference SUCCESS: " + f"inferred_fields={list(inference_result.inferred_metadata.keys())}, " + f"confidence_scores={inference_result.confidence_scores}" + ) + else: + logger.warning( + f"[INDEX] Metadata inference FAILED: {inference_result.error_message}, " + f"using existing metadata" + ) + logger.info("[INDEX] Getting embedding provider...") embedding_provider = await get_embedding_provider() logger.info(f"[INDEX] Embedding provider: {type(embedding_provider).__name__}") all_chunks: list[TextChunk] = [] - if parse_result and parse_result.pages: + if parse_result and hasattr(parse_result, 'image_chunks') and parse_result.image_chunks: + logger.info(f"[INDEX] Image with {len(parse_result.image_chunks)} intelligent chunks from LLM") + for img_chunk in parse_result.image_chunks: + all_chunks.append(TextChunk( + text=img_chunk.content, + start_token=img_chunk.chunk_index, + end_token=img_chunk.chunk_index + 1, + page=None, + source=filename, + )) + logger.info(f"[INDEX] Total chunks from image: {len(all_chunks)}") + elif parse_result and parse_result.pages: logger.info(f"[INDEX] PDF with {len(parse_result.pages)} pages, using line-based chunking with page metadata") for page in parse_result.pages: page_chunks = chunk_text_by_lines( @@ -807,6 +912,35 @@ async def _index_document( await session.commit() if points: + if settings.kb_vector_log_enabled: + vector_payloads = [] + for point in points: + if use_multi_vector: + payload = { + "id": point.get("id"), + "vector": point.get("vector"), + "payload": point.get("payload"), + } + else: + payload = { + "id": point.id, + "vector": point.vector, + "payload": point.payload, + } + vector_payloads.append(payload) + + kb_vector_logger.info(json.dumps({ + "tenant_id": tenant_id, + "kb_id": kb_id, + "doc_id": doc_id, + "job_id": job_id, + "filename": filename, + "file_ext": file_ext, + "is_image": file_ext in image_extensions, + "metadata": doc_metadata, + "vectors": vector_payloads, + }, ensure_ascii=False)) + logger.info(f"[INDEX] Upserting {len(points)} vectors to Qdrant for kb_id={kb_id}...") if use_multi_vector: await qdrant.upsert_multi_vector(tenant_id, points, kb_id=kb_id) diff --git a/ai-service/app/core/config.py b/ai-service/app/core/config.py index f72cf81..1acd36d 100644 --- a/ai-service/app/core/config.py +++ b/ai-service/app/core/config.py @@ -23,6 +23,9 @@ class Settings(BaseSettings): log_level: str = "INFO" + kb_vector_log_enabled: bool = False + kb_vector_log_path: str = "logs/kb_vector_payload.log" + llm_provider: str = "openai" llm_api_key: str = "" llm_base_url: str = "https://api.openai.com/v1" diff --git a/ai-service/app/core/qdrant_client.py b/ai-service/app/core/qdrant_client.py index cb6711f..ec336f1 100644 --- a/ai-service/app/core/qdrant_client.py +++ b/ai-service/app/core/qdrant_client.py @@ -492,23 +492,51 @@ class QdrantClient: 构建 Qdrant 过滤条件。 Args: - metadata_filter: 元数据过滤条件,如 {"grade": "三年级", "subject": "语文"} + metadata_filter: 元数据过滤条件,支持两种格式: + - 简单值格式: {"grade": "三年级", "subject": "语文"} + - 操作符格式: {"grade": {"$eq": "三年级"}, "kb_scene": {"$eq": "open_consult"}} Returns: Qdrant Filter 对象 """ - from qdrant_client.models import FieldCondition, Filter, MatchValue + from qdrant_client.models import FieldCondition, Filter, MatchValue, MatchAny must_conditions = [] for key, value in metadata_filter.items(): - # 支持嵌套 metadata 字段,如 metadata.grade field_path = f"metadata.{key}" - condition = FieldCondition( - key=field_path, - match=MatchValue(value=value), - ) - must_conditions.append(condition) + + if isinstance(value, dict): + op = list(value.keys())[0] if value else None + actual_value = value.get(op) if op else None + + if op == "$eq" and actual_value is not None: + condition = FieldCondition( + key=field_path, + match=MatchValue(value=actual_value), + ) + must_conditions.append(condition) + elif op == "$in" and isinstance(actual_value, list): + condition = FieldCondition( + key=field_path, + match=MatchAny(any=actual_value), + ) + must_conditions.append(condition) + else: + logger.warning( + f"[AC-AISVC-16] Unsupported filter operator: {op}, using as direct value" + ) + condition = FieldCondition( + key=field_path, + match=MatchValue(value=value), + ) + must_conditions.append(condition) + else: + condition = FieldCondition( + key=field_path, + match=MatchValue(value=value), + ) + must_conditions.append(condition) return Filter(must=must_conditions) if must_conditions else None