""" Decomposition Template Service. [AC-IDSMETA-21, AC-IDSMETA-22] 拆解模板服务,支持文本拆解为结构化数据。 """ import json import logging import time import uuid from datetime import datetime from typing import Any from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlmodel import col from app.models.entities import ( DecompositionTemplate, DecompositionTemplateCreate, DecompositionTemplateStatus, DecompositionTemplateUpdate, DecompositionRequest, DecompositionResult, ) logger = logging.getLogger(__name__) class DecompositionTemplateService: """ [AC-IDSMETA-22] 拆解模板服务 管理拆解模板,支持版本控制和最近生效版本查询 """ def __init__(self, session: AsyncSession, llm_client=None): self._session = session self._llm_client = llm_client async def list_templates( self, tenant_id: str, status: str | None = None, ) -> list[DecompositionTemplate]: """ [AC-IDSMETA-22] 列出租户所有拆解模板 Args: tenant_id: 租户 ID status: 按状态过滤(draft/published/archived) Returns: DecompositionTemplate 列表 """ stmt = select(DecompositionTemplate).where( DecompositionTemplate.tenant_id == tenant_id, ) if status: stmt = stmt.where(DecompositionTemplate.status == status) stmt = stmt.order_by(col(DecompositionTemplate.created_at).desc()) result = await self._session.execute(stmt) return list(result.scalars().all()) async def get_template( self, tenant_id: str, template_id: str, ) -> DecompositionTemplate | None: """ 获取单个模板 Args: tenant_id: 租户 ID template_id: 模板 ID Returns: DecompositionTemplate 或 None """ stmt = select(DecompositionTemplate).where( DecompositionTemplate.tenant_id == tenant_id, DecompositionTemplate.id == uuid.UUID(template_id), ) result = await self._session.execute(stmt) return result.scalar_one_or_none() async def get_latest_published_template( self, tenant_id: str, ) -> DecompositionTemplate | None: """ [AC-IDSMETA-22] 获取最近生效的发布版本模板 Args: tenant_id: 租户 ID Returns: 状态为 published 的最新模板 """ stmt = select(DecompositionTemplate).where( DecompositionTemplate.tenant_id == tenant_id, DecompositionTemplate.status == DecompositionTemplateStatus.PUBLISHED.value, ).order_by(col(DecompositionTemplate.updated_at).desc()).limit(1) result = await self._session.execute(stmt) return result.scalar_one_or_none() async def create_template( self, tenant_id: str, template_create: DecompositionTemplateCreate, ) -> DecompositionTemplate: """ [AC-IDSMETA-22] 创建拆解模板 Args: tenant_id: 租户 ID template_create: 创建数据 Returns: 创建的 DecompositionTemplate """ template = DecompositionTemplate( tenant_id=tenant_id, name=template_create.name, description=template_create.description, template_schema=template_create.template_schema, extraction_hints=template_create.extraction_hints, example_input=template_create.example_input, example_output=template_create.example_output, version=1, status=DecompositionTemplateStatus.DRAFT.value, ) self._session.add(template) await self._session.flush() logger.info( f"[AC-IDSMETA-22] Created decomposition template: tenant={tenant_id}, " f"id={template.id}, name={template.name}" ) return template async def update_template( self, tenant_id: str, template_id: str, template_update: DecompositionTemplateUpdate, ) -> DecompositionTemplate | None: """ [AC-IDSMETA-22] 更新拆解模板 Args: tenant_id: 租户 ID template_id: 模板 ID template_update: 更新数据 Returns: 更新后的 DecompositionTemplate 或 None """ template = await self.get_template(tenant_id, template_id) if not template: return None if template_update.name is not None: template.name = template_update.name if template_update.description is not None: template.description = template_update.description if template_update.template_schema is not None: template.template_schema = template_update.template_schema if template_update.extraction_hints is not None: template.extraction_hints = template_update.extraction_hints if template_update.example_input is not None: template.example_input = template_update.example_input if template_update.example_output is not None: template.example_output = template_update.example_output if template_update.status is not None: old_status = template.status template.status = template_update.status logger.info( f"[AC-IDSMETA-22] Template status changed: tenant={tenant_id}, " f"id={template_id}, {old_status} -> {template.status}" ) template.version += 1 template.updated_at = datetime.utcnow() await self._session.flush() logger.info( f"[AC-IDSMETA-22] Updated decomposition template: tenant={tenant_id}, " f"id={template_id}, version={template.version}" ) return template async def decompose_text( self, tenant_id: str, request: DecompositionRequest, ) -> DecompositionResult: """ [AC-IDSMETA-21] 将待录入文本拆解为固定模板输出 Args: tenant_id: 租户 ID request: 拆解请求 Returns: DecompositionResult 包含拆解后的结构化数据 """ start_time = time.time() logger.info( f"[AC-IDSMETA-21] Starting text decomposition: tenant={tenant_id}, " f"template_id={request.template_id}, text_length={len(request.text)}" ) # Get template if request.template_id: template = await self.get_template(tenant_id, request.template_id) else: template = await self.get_latest_published_template(tenant_id) if not template: logger.warning(f"[AC-IDSMETA-21] No template found for tenant={tenant_id}") return DecompositionResult( success=False, error="No decomposition template found", latency_ms=int((time.time() - start_time) * 1000), ) if template.status != DecompositionTemplateStatus.PUBLISHED.value: logger.warning( f"[AC-IDSMETA-21] Template not published: id={template.id}, " f"status={template.status}" ) return DecompositionResult( success=False, error=f"Template status is '{template.status}', not published", template_id=str(template.id), latency_ms=int((time.time() - start_time) * 1000), ) # Build prompt for LLM prompt = self._build_extraction_prompt(template, request.text, request.hints) # Call LLM to extract structured data try: if not self._llm_client: logger.warning("[AC-IDSMETA-21] No LLM client configured") return DecompositionResult( success=False, error="LLM client not configured", template_id=str(template.id), template_version=template.version, latency_ms=int((time.time() - start_time) * 1000), ) llm_response = await self._call_llm(prompt) # Parse LLM response as JSON try: # Try to extract JSON from response json_str = self._extract_json_from_response(llm_response) data = json.loads(json_str) except json.JSONDecodeError as e: logger.warning(f"[AC-IDSMETA-21] Failed to parse LLM response as JSON: {e}") return DecompositionResult( success=False, error=f"Failed to parse LLM response: {str(e)}", template_id=str(template.id), template_version=template.version, latency_ms=int((time.time() - start_time) * 1000), ) latency_ms = int((time.time() - start_time) * 1000) logger.info( f"[AC-IDSMETA-21] Text decomposition complete: tenant={tenant_id}, " f"template_id={template.id}, version={template.version}, " f"latency_ms={latency_ms}" ) return DecompositionResult( success=True, data=data, template_id=str(template.id), template_version=template.version, confidence=0.9, # TODO: Calculate actual confidence latency_ms=latency_ms, ) except Exception as e: logger.error(f"[AC-IDSMETA-21] LLM call failed: {e}", exc_info=True) return DecompositionResult( success=False, error=f"LLM call failed: {str(e)}", template_id=str(template.id), template_version=template.version, latency_ms=int((time.time() - start_time) * 1000), ) def _build_extraction_prompt( self, template: DecompositionTemplate, text: str, hints: dict[str, Any] | None = None, ) -> str: """ 构建 LLM 提取提示 Args: template: 拆解模板 text: 待拆解文本 hints: 额外提示 Returns: LLM 提示字符串 """ schema_desc = json.dumps(template.template_schema, ensure_ascii=False, indent=2) prompt_parts = [ "你是一个数据提取助手。请根据以下模板结构,从给定的文本中提取结构化数据。", "", "## 输出模板结构", "```json", schema_desc, "```", "", ] if template.extraction_hints: hints_desc = json.dumps(template.extraction_hints, ensure_ascii=False, indent=2) prompt_parts.extend([ "## 提取提示", "```json", hints_desc, "```", "", ]) if hints: extra_hints = json.dumps(hints, ensure_ascii=False, indent=2) prompt_parts.extend([ "## 额外提示", "```json", extra_hints, "```", "", ]) if template.example_input and template.example_output: prompt_parts.extend([ "## 示例", f"输入: {template.example_input}", f"输出: ```json", json.dumps(template.example_output, ensure_ascii=False, indent=2), "```", "", ]) prompt_parts.extend([ "## 待提取文本", text, "", "## 输出要求", "请直接输出 JSON 格式的提取结果,不要包含任何解释或额外文本。", "如果某个字段无法从文本中提取,请使用 null 作为值。", ]) return "\n".join(prompt_parts) async def _call_llm(self, prompt: str) -> str: """ 调用 LLM 获取响应 Args: prompt: 提示字符串 Returns: LLM 响应字符串 """ if hasattr(self._llm_client, 'chat'): response = await self._llm_client.chat( messages=[{"role": "user", "content": prompt}], temperature=0.1, ) return response.content if hasattr(response, 'content') else str(response) else: raise NotImplementedError("LLM client does not support chat method") def _extract_json_from_response(self, response: str) -> str: """ 从 LLM 响应中提取 JSON 字符串 Args: response: LLM 响应字符串 Returns: JSON 字符串 """ import re # Try to find JSON in code blocks json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', response) if json_match: return json_match.group(1).strip() # Try to find JSON object directly json_match = re.search(r'\{[\s\S]*\}', response) if json_match: return json_match.group(0) return response.strip()