From 9d8ecf0bb245950020ad70778bad59926e52f02d Mon Sep 17 00:00:00 2001 From: MerCry Date: Fri, 27 Feb 2026 15:27:02 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=E8=AF=9D=E6=9C=AF?= =?UTF-8?q?=E6=B5=81=E7=A8=8B=E5=BC=95=E6=93=8E=20(Phase=2013=20T13.1-T13.?= =?UTF-8?q?6)=20[AC-AISVC-71~AC-AISVC-76]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 ScriptFlow 和 FlowInstance SQLModel 实体 - 实现 ScriptFlowService:流程定义 CRUD、步骤校验 - 实现 FlowEngine 状态机引擎:check_active_flow、start、advance、handle_timeout - 实现话术流程管理 API(POST/GET/PUT /admin/script-flows) - T13.7(单元测试)留待集成阶段 --- ai-service/app/api/admin/__init__.py | 5 +- ai-service/app/api/admin/script_flows.py | 157 +++++++ ai-service/app/main.py | 3 +- ai-service/app/models/entities.py | 293 ++++++++++++ ai-service/app/services/flow/__init__.py | 9 + ai-service/app/services/flow/engine.py | 444 +++++++++++++++++++ ai-service/app/services/flow/flow_service.py | 217 +++++++++ docs/progress/ai-service-progress.md | 2 +- spec/ai-service/tasks.md | 12 +- 9 files changed, 1132 insertions(+), 10 deletions(-) create mode 100644 ai-service/app/api/admin/script_flows.py create mode 100644 ai-service/app/services/flow/__init__.py create mode 100644 ai-service/app/services/flow/engine.py create mode 100644 ai-service/app/services/flow/flow_service.py diff --git a/ai-service/app/api/admin/__init__.py b/ai-service/app/api/admin/__init__.py index d444b7f..9aef820 100644 --- a/ai-service/app/api/admin/__init__.py +++ b/ai-service/app/api/admin/__init__.py @@ -6,12 +6,13 @@ Admin API routes for AI Service management. from app.api.admin.api_key import router as api_key_router from app.api.admin.dashboard import router as dashboard_router from app.api.admin.embedding import router as embedding_router +from app.api.admin.guardrails import router as guardrails_router from app.api.admin.intent_rules import router as intent_rules_router from app.api.admin.kb import router as kb_router from app.api.admin.llm import router as llm_router from app.api.admin.prompt_templates import router as prompt_templates_router from app.api.admin.rag import router as rag_router +from app.api.admin.script_flows import router as script_flows_router from app.api.admin.sessions import router as sessions_router from app.api.admin.tenants import router as tenants_router - -__all__ = ["api_key_router", "dashboard_router", "embedding_router", "intent_rules_router", "kb_router", "llm_router", "prompt_templates_router", "rag_router", "sessions_router", "tenants_router"] +__all__ = ["api_key_router", "dashboard_router", "embedding_router", "guardrails_router", "intent_rules_router", "kb_router", "llm_router", "prompt_templates_router", "rag_router", "script_flows_router", "sessions_router", "tenants_router"] diff --git a/ai-service/app/api/admin/script_flows.py b/ai-service/app/api/admin/script_flows.py new file mode 100644 index 0000000..0c3731f --- /dev/null +++ b/ai-service/app/api/admin/script_flows.py @@ -0,0 +1,157 @@ +""" +Script Flow Management API. +[AC-AISVC-71, AC-AISVC-72, AC-AISVC-73] Script flow CRUD endpoints. +""" + +import logging +import uuid +from typing import Any + +from fastapi import APIRouter, Depends, Header, HTTPException +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.database import get_session +from app.models.entities import ScriptFlowCreate, ScriptFlowUpdate +from app.services.flow.flow_service import ScriptFlowService + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/admin/script-flows", tags=["Script Flows"]) + + +def get_tenant_id(x_tenant_id: str = Header(..., alias="X-Tenant-Id")) -> str: + """Extract tenant ID from header.""" + if not x_tenant_id: + raise HTTPException(status_code=400, detail="X-Tenant-Id header is required") + return x_tenant_id + + +@router.get("") +async def list_flows( + tenant_id: str = Depends(get_tenant_id), + is_enabled: bool | None = None, + session: AsyncSession = Depends(get_session), +) -> dict[str, Any]: + """ + [AC-AISVC-72] List all script flows for a tenant. + """ + logger.info(f"[AC-AISVC-72] Listing script flows for tenant={tenant_id}, is_enabled={is_enabled}") + + service = ScriptFlowService(session) + flows = await service.list_flows(tenant_id, is_enabled) + + data = [] + for f in flows: + linked_rule_count = await service._get_linked_rule_count(tenant_id, f.id) + data.append({ + "id": str(f.id), + "name": f.name, + "description": f.description, + "step_count": len(f.steps), + "is_enabled": f.is_enabled, + "linked_rule_count": linked_rule_count, + "created_at": f.created_at.isoformat(), + "updated_at": f.updated_at.isoformat(), + }) + + return {"data": data} + + +@router.post("", status_code=201) +async def create_flow( + body: ScriptFlowCreate, + tenant_id: str = Depends(get_tenant_id), + session: AsyncSession = Depends(get_session), +) -> dict[str, Any]: + """ + [AC-AISVC-71] Create a new script flow. + """ + logger.info(f"[AC-AISVC-71] Creating script flow for tenant={tenant_id}, name={body.name}") + + service = ScriptFlowService(session) + + try: + flow = await service.create_flow(tenant_id, body) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + return { + "id": str(flow.id), + "name": flow.name, + "description": flow.description, + "step_count": len(flow.steps), + "is_enabled": flow.is_enabled, + "created_at": flow.created_at.isoformat(), + "updated_at": flow.updated_at.isoformat(), + } + + +@router.get("/{flow_id}") +async def get_flow_detail( + flow_id: uuid.UUID, + tenant_id: str = Depends(get_tenant_id), + session: AsyncSession = Depends(get_session), +) -> dict[str, Any]: + """ + [AC-AISVC-73] Get script flow detail with complete step definitions. + """ + logger.info(f"[AC-AISVC-73] Getting flow detail for tenant={tenant_id}, id={flow_id}") + + service = ScriptFlowService(session) + detail = await service.get_flow_detail(tenant_id, flow_id) + + if not detail: + raise HTTPException(status_code=404, detail="Flow not found") + + return detail + + +@router.put("/{flow_id}") +async def update_flow( + flow_id: uuid.UUID, + body: ScriptFlowUpdate, + tenant_id: str = Depends(get_tenant_id), + session: AsyncSession = Depends(get_session), +) -> dict[str, Any]: + """ + [AC-AISVC-73] Update script flow definition. + """ + logger.info(f"[AC-AISVC-73] Updating flow for tenant={tenant_id}, id={flow_id}") + + service = ScriptFlowService(session) + + try: + flow = await service.update_flow(tenant_id, flow_id, body) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + if not flow: + raise HTTPException(status_code=404, detail="Flow not found") + + return { + "id": str(flow.id), + "name": flow.name, + "description": flow.description, + "step_count": len(flow.steps), + "is_enabled": flow.is_enabled, + "created_at": flow.created_at.isoformat(), + "updated_at": flow.updated_at.isoformat(), + } + + +@router.delete("/{flow_id}", status_code=204) +async def delete_flow( + flow_id: uuid.UUID, + tenant_id: str = Depends(get_tenant_id), + session: AsyncSession = Depends(get_session), +) -> None: + """ + Delete a script flow. + """ + logger.info(f"Deleting flow for tenant={tenant_id}, id={flow_id}") + + service = ScriptFlowService(session) + success = await service.delete_flow(tenant_id, flow_id) + + if not success: + raise HTTPException(status_code=404, detail="Flow not found") diff --git a/ai-service/app/main.py b/ai-service/app/main.py index b377b26..cc0bc3c 100644 --- a/ai-service/app/main.py +++ b/ai-service/app/main.py @@ -12,7 +12,7 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from app.api import chat_router, health_router -from app.api.admin import api_key_router, dashboard_router, embedding_router, intent_rules_router, kb_router, llm_router, prompt_templates_router, rag_router, sessions_router, tenants_router +from app.api.admin import api_key_router, dashboard_router, embedding_router, guardrails_router, intent_rules_router, kb_router, llm_router, prompt_templates_router, rag_router, script_flows_router, sessions_router, tenants_router from app.api.admin.kb_optimized import router as kb_optimized_router from app.core.config import get_settings from app.core.database import close_db, init_db @@ -136,6 +136,7 @@ app.include_router(kb_optimized_router) app.include_router(llm_router) app.include_router(prompt_templates_router) app.include_router(rag_router) +app.include_router(script_flows_router) app.include_router(sessions_router) app.include_router(tenants_router) diff --git a/ai-service/app/models/entities.py b/ai-service/app/models/entities.py index 7c70f3c..8ac5558 100644 --- a/ai-service/app/models/entities.py +++ b/ai-service/app/models/entities.py @@ -425,3 +425,296 @@ class IntentMatchResult: "matched": self.matched, "response_type": self.rule.response_type, } + + +class ForbiddenWordCategory(str, Enum): + """[AC-AISVC-78] Forbidden word category.""" + COMPETITOR = "competitor" + SENSITIVE = "sensitive" + POLITICAL = "political" + CUSTOM = "custom" + + +class ForbiddenWordStrategy(str, Enum): + """[AC-AISVC-78] Forbidden word replacement strategy.""" + MASK = "mask" + REPLACE = "replace" + BLOCK = "block" + + +class ForbiddenWord(SQLModel, table=True): + """ + [AC-AISVC-78] Forbidden word entity with tenant isolation. + Supports mask/replace/block strategies for output filtering. + """ + + __tablename__ = "forbidden_words" + __table_args__ = ( + Index("ix_forbidden_words_tenant_enabled", "tenant_id", "is_enabled"), + Index("ix_forbidden_words_tenant_id", "tenant_id"), + ) + + id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True) + tenant_id: str = Field(..., description="Tenant ID for multi-tenant isolation", index=True) + word: str = Field(..., description="Forbidden word to detect") + category: str = Field(..., description="Category: competitor/sensitive/political/custom") + strategy: str = Field(..., description="Replacement strategy: mask/replace/block") + replacement: str | None = Field(default=None, description="Replacement text for 'replace' strategy") + fallback_reply: str | None = Field(default=None, description="Fallback reply for 'block' strategy") + is_enabled: bool = Field(default=True, description="Whether the word is enabled") + hit_count: int = Field(default=0, ge=0, description="Hit count for statistics") + created_at: datetime = Field(default_factory=datetime.utcnow, description="Creation time") + updated_at: datetime = Field(default_factory=datetime.utcnow, description="Last update time") + + +class ForbiddenWordCreate(SQLModel): + """[AC-AISVC-78] Schema for creating a new forbidden word.""" + + word: str + category: str + strategy: str + replacement: str | None = None + fallback_reply: str | None = None + + +class ForbiddenWordUpdate(SQLModel): + """[AC-AISVC-80] Schema for updating a forbidden word.""" + + word: str | None = None + category: str | None = None + strategy: str | None = None + replacement: str | None = None + fallback_reply: str | None = None + is_enabled: bool | None = None + + +class BehaviorRuleCategory(str, Enum): + """[AC-AISVC-84] Behavior rule category.""" + COMPLIANCE = "compliance" + TONE = "tone" + BOUNDARY = "boundary" + CUSTOM = "custom" + + +class BehaviorRule(SQLModel, table=True): + """ + [AC-AISVC-84] Behavior rule entity with tenant isolation. + These rules are injected into Prompt system instruction as LLM behavior constraints. + """ + + __tablename__ = "behavior_rules" + __table_args__ = ( + Index("ix_behavior_rules_tenant_enabled", "tenant_id", "is_enabled"), + Index("ix_behavior_rules_tenant_id", "tenant_id"), + ) + + id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True) + tenant_id: str = Field(..., description="Tenant ID for multi-tenant isolation", index=True) + rule_text: str = Field(..., description="Behavior constraint description, e.g., 'Do not promise specific compensation amounts'") + category: str = Field(..., description="Category: compliance/tone/boundary/custom") + is_enabled: bool = Field(default=True, description="Whether the rule is enabled") + created_at: datetime = Field(default_factory=datetime.utcnow, description="Creation time") + updated_at: datetime = Field(default_factory=datetime.utcnow, description="Last update time") + + +class BehaviorRuleCreate(SQLModel): + """[AC-AISVC-84] Schema for creating a new behavior rule.""" + + rule_text: str + category: str + + +class BehaviorRuleUpdate(SQLModel): + """[AC-AISVC-85] Schema for updating a behavior rule.""" + + rule_text: str | None = None + category: str | None = None + is_enabled: bool | None = None + + +class GuardrailResult: + """ + [AC-AISVC-82] Result of guardrail filtering. + Contains filtered reply and trigger information. + """ + + def __init__( + self, + reply: str, + blocked: bool = False, + triggered_words: list[str] | None = None, + triggered_categories: list[str] | None = None, + ): + self.reply = reply + self.blocked = blocked + self.triggered_words = triggered_words or [] + self.triggered_categories = triggered_categories or [] + + def to_dict(self) -> dict[str, Any]: + return { + "reply": self.reply, + "blocked": self.blocked, + "triggered_words": self.triggered_words, + "triggered_categories": self.triggered_categories, + "guardrail_triggered": len(self.triggered_words) > 0, + } + + +class InputScanResult: + """ + [AC-AISVC-83] Result of input scanning. + Contains flagged status and matched words (for logging only, no blocking). + """ + + def __init__( + self, + flagged: bool = False, + matched_words: list[str] | None = None, + matched_categories: list[str] | None = None, + ): + self.flagged = flagged + self.matched_words = matched_words or [] + self.matched_categories = matched_categories or [] + + def to_dict(self) -> dict[str, Any]: + return { + "input_flagged": self.flagged, + "matched_words": self.matched_words, + "matched_categories": self.matched_categories, + } + + +class FlowInstanceStatus(str, Enum): + """[AC-AISVC-74~AC-AISVC-77] Flow instance status.""" + ACTIVE = "active" + COMPLETED = "completed" + TIMEOUT = "timeout" + CANCELLED = "cancelled" + + +class TimeoutAction(str, Enum): + """[AC-AISVC-71] Timeout action for flow steps.""" + REPEAT = "repeat" + SKIP = "skip" + TRANSFER = "transfer" + + +class ScriptFlow(SQLModel, table=True): + """ + [AC-AISVC-71] Script flow entity with tenant isolation. + Stores flow definition with steps in JSONB format. + """ + + __tablename__ = "script_flows" + __table_args__ = ( + Index("ix_script_flows_tenant_id", "tenant_id"), + Index("ix_script_flows_tenant_enabled", "tenant_id", "is_enabled"), + ) + + id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True) + tenant_id: str = Field(..., description="Tenant ID for multi-tenant isolation", index=True) + name: str = Field(..., description="Flow name") + description: str | None = Field(default=None, description="Flow description") + steps: list[dict[str, Any]] = Field( + default=[], + sa_column=Column("steps", JSON, nullable=False), + description="Flow steps list with step_no, content, wait_input, timeout_seconds, timeout_action, next_conditions, default_next" + ) + is_enabled: bool = Field(default=True, description="Whether the flow is enabled") + created_at: datetime = Field(default_factory=datetime.utcnow, description="Creation time") + updated_at: datetime = Field(default_factory=datetime.utcnow, description="Last update time") + + +class FlowInstance(SQLModel, table=True): + """ + [AC-AISVC-74] Flow instance entity for runtime state. + Tracks active flow execution per session. + """ + + __tablename__ = "flow_instances" + __table_args__ = ( + Index("ix_flow_instances_tenant_session", "tenant_id", "session_id"), + Index("ix_flow_instances_tenant_status", "tenant_id", "status"), + ) + + id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True) + tenant_id: str = Field(..., description="Tenant ID for multi-tenant isolation", index=True) + session_id: str = Field(..., description="Session ID for conversation tracking", index=True) + flow_id: uuid.UUID = Field(..., description="Foreign key to script_flows.id", foreign_key="script_flows.id", index=True) + current_step: int = Field(default=1, ge=1, description="Current step number (1-indexed)") + status: str = Field(default=FlowInstanceStatus.ACTIVE.value, description="Instance status: active/completed/timeout/cancelled") + context: dict[str, Any] | None = Field( + default=None, + sa_column=Column("context", JSON, nullable=True), + description="Flow execution context, stores user inputs etc." + ) + started_at: datetime = Field(default_factory=datetime.utcnow, description="Instance start time") + updated_at: datetime = Field(default_factory=datetime.utcnow, description="Last update time") + completed_at: datetime | None = Field(default=None, description="Completion time (nullable)") + + +class FlowStep(SQLModel): + """[AC-AISVC-71] Schema for a single flow step.""" + + step_no: int = Field(..., ge=1, description="Step number (1-indexed)") + content: str = Field(..., description="Script content for this step") + wait_input: bool = Field(default=True, description="Whether to wait for user input") + timeout_seconds: int = Field(default=120, ge=1, description="Timeout in seconds") + timeout_action: str = Field(default=TimeoutAction.REPEAT.value, description="Action on timeout: repeat/skip/transfer") + next_conditions: list[dict[str, Any]] | None = Field( + default=None, + description="Conditions for next step: [{'keywords': [...], 'goto_step': N}, {'pattern': '...', 'goto_step': N}]" + ) + default_next: int | None = Field(default=None, description="Default next step if no condition matches") + + +class ScriptFlowCreate(SQLModel): + """[AC-AISVC-71] Schema for creating a new script flow.""" + + name: str + description: str | None = None + steps: list[dict[str, Any]] + is_enabled: bool = True + + +class ScriptFlowUpdate(SQLModel): + """[AC-AISVC-73] Schema for updating a script flow.""" + + name: str | None = None + description: str | None = None + steps: list[dict[str, Any]] | None = None + is_enabled: bool | None = None + + +class FlowAdvanceResult: + """ + [AC-AISVC-75] Result of flow step advancement. + """ + + def __init__( + self, + completed: bool, + message: str | None = None, + current_step: int | None = None, + total_steps: int | None = None, + timeout_action: str | None = None, + ): + self.completed = completed + self.message = message + self.current_step = current_step + self.total_steps = total_steps + self.timeout_action = timeout_action + + def to_dict(self) -> dict[str, Any]: + result = { + "completed": self.completed, + } + if self.message is not None: + result["message"] = self.message + if self.current_step is not None: + result["current_step"] = self.current_step + if self.total_steps is not None: + result["total_steps"] = self.total_steps + if self.timeout_action is not None: + result["timeout_action"] = self.timeout_action + return result diff --git a/ai-service/app/services/flow/__init__.py b/ai-service/app/services/flow/__init__.py new file mode 100644 index 0000000..e3acdcf --- /dev/null +++ b/ai-service/app/services/flow/__init__.py @@ -0,0 +1,9 @@ +""" +Flow services for AI Service. +[AC-AISVC-71~AC-AISVC-77] Script flow management and execution engine. +""" + +from app.services.flow.flow_service import ScriptFlowService +from app.services.flow.engine import FlowEngine + +__all__ = ["ScriptFlowService", "FlowEngine"] diff --git a/ai-service/app/services/flow/engine.py b/ai-service/app/services/flow/engine.py new file mode 100644 index 0000000..46bb356 --- /dev/null +++ b/ai-service/app/services/flow/engine.py @@ -0,0 +1,444 @@ +""" +Flow Engine for AI Service. +[AC-AISVC-74~AC-AISVC-77] State machine engine for script flow execution. +""" + +import logging +import re +import uuid +from datetime import datetime +from typing import Any + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlmodel import col + +from app.models.entities import ( + ScriptFlow, + FlowInstance, + FlowInstanceStatus, + FlowAdvanceResult, + TimeoutAction, +) + +logger = logging.getLogger(__name__) + + +class FlowEngine: + """ + [AC-AISVC-74~AC-AISVC-77] State machine engine for script flow execution. + + State Machine: + - IDLE: No active flow + - ACTIVE: Flow is being executed + - COMPLETED: Flow finished successfully + - TIMEOUT: Flow timed out + - CANCELLED: Flow was cancelled + + Core Methods: + - check_active_flow: Check if session has active flow + - start: Start a new flow instance + - advance: Advance flow based on user input + - handle_timeout: Handle timeout for current step + """ + + def __init__(self, session: AsyncSession): + self._session = session + + async def check_active_flow( + self, + tenant_id: str, + session_id: str, + ) -> FlowInstance | None: + """ + [AC-AISVC-75] Check if session has an active flow instance. + + Args: + tenant_id: Tenant ID for isolation + session_id: Session ID to check + + Returns: + Active FlowInstance or None + """ + stmt = select(FlowInstance).where( + FlowInstance.tenant_id == tenant_id, + FlowInstance.session_id == session_id, + FlowInstance.status == FlowInstanceStatus.ACTIVE.value, + ) + result = await self._session.execute(stmt) + return result.scalar_one_or_none() + + async def start( + self, + tenant_id: str, + session_id: str, + flow_id: uuid.UUID, + ) -> tuple[FlowInstance | None, str | None]: + """ + [AC-AISVC-74] Start a new flow instance and return first step content. + + Args: + tenant_id: Tenant ID for isolation + session_id: Session ID for the conversation + flow_id: ID of the flow to start + + Returns: + Tuple of (FlowInstance, first_step_content) or (None, error_message) + """ + active = await self.check_active_flow(tenant_id, session_id) + if active: + logger.warning( + f"[AC-AISVC-74] Session already has active flow: " + f"tenant={tenant_id}, session={session_id}" + ) + return None, "Session already has an active flow" + + flow = await self._get_flow(tenant_id, flow_id) + if not flow: + logger.warning( + f"[AC-AISVC-74] Flow not found: tenant={tenant_id}, flow_id={flow_id}" + ) + return None, "Flow not found" + + if not flow.is_enabled: + logger.warning( + f"[AC-AISVC-74] Flow is disabled: tenant={tenant_id}, flow_id={flow_id}" + ) + return None, "Flow is disabled" + + if not flow.steps: + logger.warning( + f"[AC-AISVC-74] Flow has no steps: tenant={tenant_id}, flow_id={flow_id}" + ) + return None, "Flow has no steps" + + instance = FlowInstance( + tenant_id=tenant_id, + session_id=session_id, + flow_id=flow_id, + current_step=1, + status=FlowInstanceStatus.ACTIVE.value, + context={"inputs": []}, + ) + self._session.add(instance) + await self._session.flush() + + first_step = flow.steps[0] + first_content = first_step.get("content", "") + + logger.info( + f"[AC-AISVC-74] Started flow instance: tenant={tenant_id}, " + f"session={session_id}, flow_id={flow_id}, step=1/{len(flow.steps)}" + ) + + return instance, first_content + + async def advance( + self, + tenant_id: str, + session_id: str, + user_input: str, + ) -> FlowAdvanceResult: + """ + [AC-AISVC-75, AC-AISVC-76] Advance flow based on user input. + + Args: + tenant_id: Tenant ID for isolation + session_id: Session ID for the conversation + user_input: User's input message + + Returns: + FlowAdvanceResult with completion status and next message + """ + instance = await self.check_active_flow(tenant_id, session_id) + if not instance: + logger.debug( + f"[AC-AISVC-75] No active flow for session: tenant={tenant_id}, session={session_id}" + ) + return FlowAdvanceResult(completed=True, message=None) + + flow = await self._get_flow_by_id(instance.flow_id) + if not flow: + await self._cancel_instance(instance, "Flow definition not found") + return FlowAdvanceResult(completed=True, message=None) + + current_step_idx = instance.current_step - 1 + if current_step_idx >= len(flow.steps): + await self._complete_instance(instance) + return FlowAdvanceResult(completed=True, message=None) + + current_step = flow.steps[current_step_idx] + + self._record_input(instance, user_input) + + next_step_no = self._match_next_step(current_step, user_input) + + if next_step_no is None: + default_next = current_step.get("default_next") + if default_next: + next_step_no = default_next + else: + logger.debug( + f"[AC-AISVC-75] No condition matched, repeating step: " + f"tenant={tenant_id}, session={session_id}, step={instance.current_step}" + ) + return FlowAdvanceResult( + completed=False, + message=current_step.get("content", ""), + current_step=instance.current_step, + total_steps=len(flow.steps), + ) + + if next_step_no > len(flow.steps): + await self._complete_instance(instance) + logger.info( + f"[AC-AISVC-76] Flow completed: tenant={tenant_id}, " + f"session={session_id}, flow_id={instance.flow_id}" + ) + return FlowAdvanceResult(completed=True, message=None) + + instance.current_step = next_step_no + instance.updated_at = datetime.utcnow() + await self._session.flush() + + next_step = flow.steps[next_step_no - 1] + next_content = next_step.get("content", "") + + logger.info( + f"[AC-AISVC-75] Advanced flow: tenant={tenant_id}, " + f"session={session_id}, step={next_step_no}/{len(flow.steps)}" + ) + + return FlowAdvanceResult( + completed=False, + message=next_content, + current_step=next_step_no, + total_steps=len(flow.steps), + ) + + async def handle_timeout( + self, + tenant_id: str, + session_id: str, + ) -> FlowAdvanceResult: + """ + [AC-AISVC-77] Handle timeout for current step. + + Args: + tenant_id: Tenant ID for isolation + session_id: Session ID for the conversation + + Returns: + FlowAdvanceResult based on timeout_action configuration + """ + instance = await self.check_active_flow(tenant_id, session_id) + if not instance: + return FlowAdvanceResult(completed=True, message=None) + + flow = await self._get_flow_by_id(instance.flow_id) + if not flow: + await self._cancel_instance(instance, "Flow definition not found") + return FlowAdvanceResult(completed=True, message=None) + + current_step_idx = instance.current_step - 1 + if current_step_idx >= len(flow.steps): + await self._complete_instance(instance) + return FlowAdvanceResult(completed=True, message=None) + + current_step = flow.steps[current_step_idx] + timeout_action = current_step.get("timeout_action", TimeoutAction.REPEAT.value) + + logger.info( + f"[AC-AISVC-77] Handling timeout: tenant={tenant_id}, " + f"session={session_id}, step={instance.current_step}, action={timeout_action}" + ) + + if timeout_action == TimeoutAction.REPEAT.value: + return FlowAdvanceResult( + completed=False, + message=current_step.get("content", ""), + current_step=instance.current_step, + total_steps=len(flow.steps), + timeout_action=timeout_action, + ) + + elif timeout_action == TimeoutAction.SKIP.value: + default_next = current_step.get("default_next") + if default_next and default_next <= len(flow.steps): + instance.current_step = default_next + instance.updated_at = datetime.utcnow() + await self._session.flush() + + next_step = flow.steps[default_next - 1] + return FlowAdvanceResult( + completed=False, + message=next_step.get("content", ""), + current_step=default_next, + total_steps=len(flow.steps), + timeout_action=timeout_action, + ) + else: + await self._complete_instance(instance) + return FlowAdvanceResult(completed=True, message=None) + + elif timeout_action == TimeoutAction.TRANSFER.value: + instance.status = FlowInstanceStatus.TIMEOUT.value + instance.completed_at = datetime.utcnow() + instance.updated_at = datetime.utcnow() + await self._session.flush() + + return FlowAdvanceResult( + completed=True, + message="抱歉,等待超时,正在为您转接人工客服...", + timeout_action=timeout_action, + ) + + return FlowAdvanceResult( + completed=False, + message=current_step.get("content", ""), + timeout_action=timeout_action, + ) + + async def cancel_flow( + self, + tenant_id: str, + session_id: str, + reason: str = "User cancelled", + ) -> bool: + """Cancel an active flow instance.""" + instance = await self.check_active_flow(tenant_id, session_id) + if not instance: + return False + + await self._cancel_instance(instance, reason) + return True + + async def get_flow_status( + self, + tenant_id: str, + session_id: str, + ) -> dict[str, Any] | None: + """Get the current flow status for a session.""" + stmt = select(FlowInstance).where( + FlowInstance.tenant_id == tenant_id, + FlowInstance.session_id == session_id, + ).order_by(col(FlowInstance.created_at).desc()) + result = await self._session.execute(stmt) + instance = result.scalar_one_or_none() + + if not instance: + return None + + flow = await self._get_flow_by_id(instance.flow_id) + + return { + "instance_id": str(instance.id), + "flow_id": str(instance.flow_id), + "flow_name": flow.name if flow else None, + "current_step": instance.current_step, + "total_steps": len(flow.steps) if flow else 0, + "status": instance.status, + "started_at": instance.started_at.isoformat(), + "updated_at": instance.updated_at.isoformat(), + "completed_at": instance.completed_at.isoformat() if instance.completed_at else None, + } + + async def _get_flow( + self, + tenant_id: str, + flow_id: uuid.UUID, + ) -> ScriptFlow | None: + """Get flow by ID with tenant isolation.""" + stmt = select(ScriptFlow).where( + ScriptFlow.tenant_id == tenant_id, + ScriptFlow.id == flow_id, + ) + result = await self._session.execute(stmt) + return result.scalar_one_or_none() + + async def _get_flow_by_id( + self, + flow_id: uuid.UUID, + ) -> ScriptFlow | None: + """Get flow by ID without tenant check (for internal use).""" + stmt = select(ScriptFlow).where(ScriptFlow.id == flow_id) + result = await self._session.execute(stmt) + return result.scalar_one_or_none() + + def _match_next_step( + self, + step: dict[str, Any], + user_input: str, + ) -> int | None: + """ + Match user input against next_conditions. + + Args: + step: Current step definition + user_input: User's input message + + Returns: + goto_step number if matched, None otherwise + """ + next_conditions = step.get("next_conditions", []) + if not next_conditions: + return None + + user_input_lower = user_input.lower() + + for condition in next_conditions: + keywords = condition.get("keywords", []) + for keyword in keywords: + if keyword.lower() in user_input_lower: + return condition.get("goto_step") + + pattern = condition.get("pattern") + if pattern: + try: + if re.search(pattern, user_input, re.IGNORECASE): + return condition.get("goto_step") + except re.error: + logger.warning(f"Invalid regex pattern: {pattern}") + + return None + + def _record_input( + self, + instance: FlowInstance, + user_input: str, + ) -> None: + """Record user input in flow context.""" + if instance.context is None: + instance.context = {"inputs": []} + + inputs = instance.context.get("inputs", []) + inputs.append({ + "step": instance.current_step, + "input": user_input, + "timestamp": datetime.utcnow().isoformat(), + }) + instance.context["inputs"] = inputs + + async def _complete_instance( + self, + instance: FlowInstance, + ) -> None: + """Mark instance as completed.""" + instance.status = FlowInstanceStatus.COMPLETED.value + instance.completed_at = datetime.utcnow() + instance.updated_at = datetime.utcnow() + await self._session.flush() + + async def _cancel_instance( + self, + instance: FlowInstance, + reason: str = "", + ) -> None: + """Mark instance as cancelled.""" + instance.status = FlowInstanceStatus.CANCELLED.value + instance.completed_at = datetime.utcnow() + instance.updated_at = datetime.utcnow() + if instance.context is None: + instance.context = {} + instance.context["cancel_reason"] = reason + await self._session.flush() diff --git a/ai-service/app/services/flow/flow_service.py b/ai-service/app/services/flow/flow_service.py new file mode 100644 index 0000000..7fed5af --- /dev/null +++ b/ai-service/app/services/flow/flow_service.py @@ -0,0 +1,217 @@ +""" +Script Flow Service for AI Service. +[AC-AISVC-71, AC-AISVC-72, AC-AISVC-73] Flow definition CRUD operations. +""" + +import logging +import uuid +from datetime import datetime +from typing import Any, Sequence + +from sqlalchemy import select, func +from sqlalchemy.ext.asyncio import AsyncSession +from sqlmodel import col + +from app.models.entities import ( + ScriptFlow, + ScriptFlowCreate, + ScriptFlowUpdate, + FlowInstanceStatus, +) + +logger = logging.getLogger(__name__) + + +class ScriptFlowService: + """ + [AC-AISVC-71~AC-AISVC-73] Service for managing script flow definitions. + + Features: + - Flow CRUD with tenant isolation + - Step validation + - Linked intent rule count tracking + """ + + def __init__(self, session: AsyncSession): + self._session = session + + async def create_flow( + self, + tenant_id: str, + create_data: ScriptFlowCreate, + ) -> ScriptFlow: + """ + [AC-AISVC-71] Create a new script flow with steps. + """ + self._validate_steps(create_data.steps) + + flow = ScriptFlow( + tenant_id=tenant_id, + name=create_data.name, + description=create_data.description, + steps=create_data.steps, + is_enabled=create_data.is_enabled, + ) + self._session.add(flow) + await self._session.flush() + + logger.info( + f"[AC-AISVC-71] Created script flow: tenant={tenant_id}, " + f"id={flow.id}, name={flow.name}, steps={len(flow.steps)}" + ) + return flow + + async def list_flows( + self, + tenant_id: str, + is_enabled: bool | None = None, + ) -> Sequence[ScriptFlow]: + """ + [AC-AISVC-72] List flows for a tenant, optionally filtered by enabled status. + """ + stmt = select(ScriptFlow).where( + ScriptFlow.tenant_id == tenant_id + ) + + if is_enabled is not None: + stmt = stmt.where(ScriptFlow.is_enabled == is_enabled) + + stmt = stmt.order_by(col(ScriptFlow.created_at).desc()) + result = await self._session.execute(stmt) + return result.scalars().all() + + async def get_flow( + self, + tenant_id: str, + flow_id: uuid.UUID, + ) -> ScriptFlow | None: + """ + Get flow by ID with tenant isolation. + """ + stmt = select(ScriptFlow).where( + ScriptFlow.tenant_id == tenant_id, + ScriptFlow.id == flow_id, + ) + result = await self._session.execute(stmt) + return result.scalar_one_or_none() + + async def get_flow_detail( + self, + tenant_id: str, + flow_id: uuid.UUID, + ) -> dict[str, Any] | None: + """ + [AC-AISVC-73] Get flow detail with complete step definitions. + """ + flow = await self.get_flow(tenant_id, flow_id) + if not flow: + return None + + linked_rule_count = await self._get_linked_rule_count(tenant_id, flow_id) + + return { + "id": str(flow.id), + "name": flow.name, + "description": flow.description, + "steps": flow.steps, + "is_enabled": flow.is_enabled, + "step_count": len(flow.steps), + "linked_rule_count": linked_rule_count, + "created_at": flow.created_at.isoformat(), + "updated_at": flow.updated_at.isoformat(), + } + + async def update_flow( + self, + tenant_id: str, + flow_id: uuid.UUID, + update_data: ScriptFlowUpdate, + ) -> ScriptFlow | None: + """ + [AC-AISVC-73] Update flow definition. + """ + flow = await self.get_flow(tenant_id, flow_id) + if not flow: + return None + + if update_data.name is not None: + flow.name = update_data.name + if update_data.description is not None: + flow.description = update_data.description + if update_data.steps is not None: + self._validate_steps(update_data.steps) + flow.steps = update_data.steps + if update_data.is_enabled is not None: + flow.is_enabled = update_data.is_enabled + flow.updated_at = datetime.utcnow() + + await self._session.flush() + + logger.info( + f"[AC-AISVC-73] Updated script flow: tenant={tenant_id}, id={flow_id}" + ) + return flow + + async def delete_flow( + self, + tenant_id: str, + flow_id: uuid.UUID, + ) -> bool: + """Delete a flow definition.""" + flow = await self.get_flow(tenant_id, flow_id) + if not flow: + return False + + await self._session.delete(flow) + await self._session.flush() + + logger.info( + f"Deleted script flow: tenant={tenant_id}, id={flow_id}" + ) + return True + + async def get_step_count( + self, + tenant_id: str, + flow_id: uuid.UUID, + ) -> int: + """Get the number of steps in a flow.""" + flow = await self.get_flow(tenant_id, flow_id) + return len(flow.steps) if flow else 0 + + async def _get_linked_rule_count( + self, + tenant_id: str, + flow_id: uuid.UUID, + ) -> int: + """Get count of intent rules linked to this flow.""" + from app.models.entities import IntentRule + + stmt = select(func.count()).select_from(IntentRule).where( + IntentRule.tenant_id == tenant_id, + IntentRule.flow_id == flow_id, + ) + result = await self._session.execute(stmt) + return result.scalar() or 0 + + def _validate_steps(self, steps: list[dict[str, Any]]) -> None: + """Validate step definitions.""" + if not steps: + raise ValueError("Flow must have at least one step") + + step_nos = set() + for step in steps: + step_no = step.get("step_no") + if step_no is None: + raise ValueError("Each step must have a step_no") + if step_no in step_nos: + raise ValueError(f"Duplicate step_no: {step_no}") + step_nos.add(step_no) + + if not step.get("content"): + raise ValueError(f"Step {step_no} must have content") + + next_conditions = step.get("next_conditions", []) + for cond in next_conditions: + if cond.get("goto_step") is None: + raise ValueError(f"next_condition in step {step_no} must have goto_step") diff --git a/docs/progress/ai-service-progress.md b/docs/progress/ai-service-progress.md index b099465..260af8f 100644 --- a/docs/progress/ai-service-progress.md +++ b/docs/progress/ai-service-progress.md @@ -47,7 +47,7 @@ Phase 11 多知识库管理核心功能已完成 (T11.1-T11.5),T11.6(Optimiz - [x] T11.1 扩展 `KnowledgeBase` 实体:新增 `kb_type`、`priority`、`is_enabled`、`doc_count` 字段 `[AC-AISVC-59]` ✅ - [x] T11.2 实现知识库 CRUD 服务:创建时初始化 Qdrant Collection,删除时清理 Collection `[AC-AISVC-59, AC-AISVC-61, AC-AISVC-62]` ✅ -- [x] T11.3 实现知识库管理 API:`POST/GET/PUT/DELETE /admin/kb/knowledge-bases` `[AC-AISVC-59~AC-AISVC-62]` ✅ +- [x] T11.3 实现知识库管理 API:`POST/GET/PUT/DELETE /admin/kb/knowledge-bases` `[AC-AISVC-59, AC-AISVC-60, AC-AISVC-61, AC-AISVC-62]` ✅ - [x] T11.4 升级 Qdrant Collection 命名:`kb_{tenant_id}_{kb_id}`,兼容现有 `kb_{tenant_id}` `[AC-AISVC-63]` ✅ - [x] T11.5 修改文档上传流程:支持指定 `kbId` 参数,索引到对应 Collection `[AC-AISVC-63]` ✅ diff --git a/spec/ai-service/tasks.md b/spec/ai-service/tasks.md index f669d45..7fdd726 100644 --- a/spec/ai-service/tasks.md +++ b/spec/ai-service/tasks.md @@ -210,12 +210,12 @@ last_updated: "2026-02-27" > 目标:实现固定话术步骤的状态机引擎,支持多轮引导对话。 -- [ ] T13.1 定义 `ScriptFlow` 和 `FlowInstance` SQLModel 实体,创建数据库表 `[AC-AISVC-71, AC-AISVC-74]` -- [ ] T13.2 实现 `ScriptFlowService`:流程定义 CRUD `[AC-AISVC-71, AC-AISVC-72, AC-AISVC-73]` -- [ ] T13.3 实现 `FlowEngine.check_active_flow()`:检查会话是否有活跃流程实例 `[AC-AISVC-75]` -- [ ] T13.4 实现 `FlowEngine.start()`:创建流程实例,返回第一步话术 `[AC-AISVC-74]` -- [ ] T13.5 实现 `FlowEngine.advance()`:根据用户输入匹配条件,推进步骤或重复当前步骤 `[AC-AISVC-75, AC-AISVC-76]` -- [ ] T13.6 实现话术流程管理 API:`POST/GET/PUT /admin/script-flows` `[AC-AISVC-71, AC-AISVC-72, AC-AISVC-73]` +- [x] T13.1 定义 `ScriptFlow` 和 `FlowInstance` SQLModel 实体,创建数据库表 `[AC-AISVC-71, AC-AISVC-74]` ✅ +- [x] T13.2 实现 `ScriptFlowService`:流程定义 CRUD `[AC-AISVC-71, AC-AISVC-72, AC-AISVC-73]` ✅ +- [x] T13.3 实现 `FlowEngine.check_active_flow()`:检查会话是否有活跃流程实例 `[AC-AISVC-75]` ✅ +- [x] T13.4 实现 `FlowEngine.start()`:创建流程实例,返回第一步话术 `[AC-AISVC-74]` ✅ +- [x] T13.5 实现 `FlowEngine.advance()`:根据用户输入匹配条件,推进步骤或重复当前步骤 `[AC-AISVC-75, AC-AISVC-76]` ✅ +- [x] T13.6 实现话术流程管理 API:`POST/GET/PUT /admin/script-flows` `[AC-AISVC-71, AC-AISVC-72, AC-AISVC-73]` ✅ - [ ] T13.7 编写话术流程引擎单元测试 `[AC-AISVC-71~AC-AISVC-77]` ---