feat: 实现话术流程引擎 (Phase 13 T13.1-T13.6) [AC-AISVC-71~AC-AISVC-76]

- 新增 ScriptFlow 和 FlowInstance SQLModel 实体
- 实现 ScriptFlowService:流程定义 CRUD、步骤校验
- 实现 FlowEngine 状态机引擎:check_active_flow、start、advance、handle_timeout
- 实现话术流程管理 API(POST/GET/PUT /admin/script-flows)
- T13.7(单元测试)留待集成阶段
This commit is contained in:
MerCry 2026-02-27 15:27:02 +08:00
parent ff35538a01
commit 9d8ecf0bb2
9 changed files with 1132 additions and 10 deletions

View File

@ -6,12 +6,13 @@ Admin API routes for AI Service management.
from app.api.admin.api_key import router as api_key_router
from app.api.admin.dashboard import router as dashboard_router
from app.api.admin.embedding import router as embedding_router
from app.api.admin.guardrails import router as guardrails_router
from app.api.admin.intent_rules import router as intent_rules_router
from app.api.admin.kb import router as kb_router
from app.api.admin.llm import router as llm_router
from app.api.admin.prompt_templates import router as prompt_templates_router
from app.api.admin.rag import router as rag_router
from app.api.admin.script_flows import router as script_flows_router
from app.api.admin.sessions import router as sessions_router
from app.api.admin.tenants import router as tenants_router
__all__ = ["api_key_router", "dashboard_router", "embedding_router", "intent_rules_router", "kb_router", "llm_router", "prompt_templates_router", "rag_router", "sessions_router", "tenants_router"]
__all__ = ["api_key_router", "dashboard_router", "embedding_router", "guardrails_router", "intent_rules_router", "kb_router", "llm_router", "prompt_templates_router", "rag_router", "script_flows_router", "sessions_router", "tenants_router"]

View File

@ -0,0 +1,157 @@
"""
Script Flow Management API.
[AC-AISVC-71, AC-AISVC-72, AC-AISVC-73] Script flow CRUD endpoints.
"""
import logging
import uuid
from typing import Any
from fastapi import APIRouter, Depends, Header, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_session
from app.models.entities import ScriptFlowCreate, ScriptFlowUpdate
from app.services.flow.flow_service import ScriptFlowService
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/admin/script-flows", tags=["Script Flows"])
def get_tenant_id(x_tenant_id: str = Header(..., alias="X-Tenant-Id")) -> str:
"""Extract tenant ID from header."""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="X-Tenant-Id header is required")
return x_tenant_id
@router.get("")
async def list_flows(
tenant_id: str = Depends(get_tenant_id),
is_enabled: bool | None = None,
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
"""
[AC-AISVC-72] List all script flows for a tenant.
"""
logger.info(f"[AC-AISVC-72] Listing script flows for tenant={tenant_id}, is_enabled={is_enabled}")
service = ScriptFlowService(session)
flows = await service.list_flows(tenant_id, is_enabled)
data = []
for f in flows:
linked_rule_count = await service._get_linked_rule_count(tenant_id, f.id)
data.append({
"id": str(f.id),
"name": f.name,
"description": f.description,
"step_count": len(f.steps),
"is_enabled": f.is_enabled,
"linked_rule_count": linked_rule_count,
"created_at": f.created_at.isoformat(),
"updated_at": f.updated_at.isoformat(),
})
return {"data": data}
@router.post("", status_code=201)
async def create_flow(
body: ScriptFlowCreate,
tenant_id: str = Depends(get_tenant_id),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
"""
[AC-AISVC-71] Create a new script flow.
"""
logger.info(f"[AC-AISVC-71] Creating script flow for tenant={tenant_id}, name={body.name}")
service = ScriptFlowService(session)
try:
flow = await service.create_flow(tenant_id, body)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {
"id": str(flow.id),
"name": flow.name,
"description": flow.description,
"step_count": len(flow.steps),
"is_enabled": flow.is_enabled,
"created_at": flow.created_at.isoformat(),
"updated_at": flow.updated_at.isoformat(),
}
@router.get("/{flow_id}")
async def get_flow_detail(
flow_id: uuid.UUID,
tenant_id: str = Depends(get_tenant_id),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
"""
[AC-AISVC-73] Get script flow detail with complete step definitions.
"""
logger.info(f"[AC-AISVC-73] Getting flow detail for tenant={tenant_id}, id={flow_id}")
service = ScriptFlowService(session)
detail = await service.get_flow_detail(tenant_id, flow_id)
if not detail:
raise HTTPException(status_code=404, detail="Flow not found")
return detail
@router.put("/{flow_id}")
async def update_flow(
flow_id: uuid.UUID,
body: ScriptFlowUpdate,
tenant_id: str = Depends(get_tenant_id),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
"""
[AC-AISVC-73] Update script flow definition.
"""
logger.info(f"[AC-AISVC-73] Updating flow for tenant={tenant_id}, id={flow_id}")
service = ScriptFlowService(session)
try:
flow = await service.update_flow(tenant_id, flow_id, body)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not flow:
raise HTTPException(status_code=404, detail="Flow not found")
return {
"id": str(flow.id),
"name": flow.name,
"description": flow.description,
"step_count": len(flow.steps),
"is_enabled": flow.is_enabled,
"created_at": flow.created_at.isoformat(),
"updated_at": flow.updated_at.isoformat(),
}
@router.delete("/{flow_id}", status_code=204)
async def delete_flow(
flow_id: uuid.UUID,
tenant_id: str = Depends(get_tenant_id),
session: AsyncSession = Depends(get_session),
) -> None:
"""
Delete a script flow.
"""
logger.info(f"Deleting flow for tenant={tenant_id}, id={flow_id}")
service = ScriptFlowService(session)
success = await service.delete_flow(tenant_id, flow_id)
if not success:
raise HTTPException(status_code=404, detail="Flow not found")

View File

@ -12,7 +12,7 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from app.api import chat_router, health_router
from app.api.admin import api_key_router, dashboard_router, embedding_router, intent_rules_router, kb_router, llm_router, prompt_templates_router, rag_router, sessions_router, tenants_router
from app.api.admin import api_key_router, dashboard_router, embedding_router, guardrails_router, intent_rules_router, kb_router, llm_router, prompt_templates_router, rag_router, script_flows_router, sessions_router, tenants_router
from app.api.admin.kb_optimized import router as kb_optimized_router
from app.core.config import get_settings
from app.core.database import close_db, init_db
@ -136,6 +136,7 @@ app.include_router(kb_optimized_router)
app.include_router(llm_router)
app.include_router(prompt_templates_router)
app.include_router(rag_router)
app.include_router(script_flows_router)
app.include_router(sessions_router)
app.include_router(tenants_router)

View File

@ -425,3 +425,296 @@ class IntentMatchResult:
"matched": self.matched,
"response_type": self.rule.response_type,
}
class ForbiddenWordCategory(str, Enum):
"""[AC-AISVC-78] Forbidden word category."""
COMPETITOR = "competitor"
SENSITIVE = "sensitive"
POLITICAL = "political"
CUSTOM = "custom"
class ForbiddenWordStrategy(str, Enum):
"""[AC-AISVC-78] Forbidden word replacement strategy."""
MASK = "mask"
REPLACE = "replace"
BLOCK = "block"
class ForbiddenWord(SQLModel, table=True):
"""
[AC-AISVC-78] Forbidden word entity with tenant isolation.
Supports mask/replace/block strategies for output filtering.
"""
__tablename__ = "forbidden_words"
__table_args__ = (
Index("ix_forbidden_words_tenant_enabled", "tenant_id", "is_enabled"),
Index("ix_forbidden_words_tenant_id", "tenant_id"),
)
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
tenant_id: str = Field(..., description="Tenant ID for multi-tenant isolation", index=True)
word: str = Field(..., description="Forbidden word to detect")
category: str = Field(..., description="Category: competitor/sensitive/political/custom")
strategy: str = Field(..., description="Replacement strategy: mask/replace/block")
replacement: str | None = Field(default=None, description="Replacement text for 'replace' strategy")
fallback_reply: str | None = Field(default=None, description="Fallback reply for 'block' strategy")
is_enabled: bool = Field(default=True, description="Whether the word is enabled")
hit_count: int = Field(default=0, ge=0, description="Hit count for statistics")
created_at: datetime = Field(default_factory=datetime.utcnow, description="Creation time")
updated_at: datetime = Field(default_factory=datetime.utcnow, description="Last update time")
class ForbiddenWordCreate(SQLModel):
"""[AC-AISVC-78] Schema for creating a new forbidden word."""
word: str
category: str
strategy: str
replacement: str | None = None
fallback_reply: str | None = None
class ForbiddenWordUpdate(SQLModel):
"""[AC-AISVC-80] Schema for updating a forbidden word."""
word: str | None = None
category: str | None = None
strategy: str | None = None
replacement: str | None = None
fallback_reply: str | None = None
is_enabled: bool | None = None
class BehaviorRuleCategory(str, Enum):
"""[AC-AISVC-84] Behavior rule category."""
COMPLIANCE = "compliance"
TONE = "tone"
BOUNDARY = "boundary"
CUSTOM = "custom"
class BehaviorRule(SQLModel, table=True):
"""
[AC-AISVC-84] Behavior rule entity with tenant isolation.
These rules are injected into Prompt system instruction as LLM behavior constraints.
"""
__tablename__ = "behavior_rules"
__table_args__ = (
Index("ix_behavior_rules_tenant_enabled", "tenant_id", "is_enabled"),
Index("ix_behavior_rules_tenant_id", "tenant_id"),
)
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
tenant_id: str = Field(..., description="Tenant ID for multi-tenant isolation", index=True)
rule_text: str = Field(..., description="Behavior constraint description, e.g., 'Do not promise specific compensation amounts'")
category: str = Field(..., description="Category: compliance/tone/boundary/custom")
is_enabled: bool = Field(default=True, description="Whether the rule is enabled")
created_at: datetime = Field(default_factory=datetime.utcnow, description="Creation time")
updated_at: datetime = Field(default_factory=datetime.utcnow, description="Last update time")
class BehaviorRuleCreate(SQLModel):
"""[AC-AISVC-84] Schema for creating a new behavior rule."""
rule_text: str
category: str
class BehaviorRuleUpdate(SQLModel):
"""[AC-AISVC-85] Schema for updating a behavior rule."""
rule_text: str | None = None
category: str | None = None
is_enabled: bool | None = None
class GuardrailResult:
"""
[AC-AISVC-82] Result of guardrail filtering.
Contains filtered reply and trigger information.
"""
def __init__(
self,
reply: str,
blocked: bool = False,
triggered_words: list[str] | None = None,
triggered_categories: list[str] | None = None,
):
self.reply = reply
self.blocked = blocked
self.triggered_words = triggered_words or []
self.triggered_categories = triggered_categories or []
def to_dict(self) -> dict[str, Any]:
return {
"reply": self.reply,
"blocked": self.blocked,
"triggered_words": self.triggered_words,
"triggered_categories": self.triggered_categories,
"guardrail_triggered": len(self.triggered_words) > 0,
}
class InputScanResult:
"""
[AC-AISVC-83] Result of input scanning.
Contains flagged status and matched words (for logging only, no blocking).
"""
def __init__(
self,
flagged: bool = False,
matched_words: list[str] | None = None,
matched_categories: list[str] | None = None,
):
self.flagged = flagged
self.matched_words = matched_words or []
self.matched_categories = matched_categories or []
def to_dict(self) -> dict[str, Any]:
return {
"input_flagged": self.flagged,
"matched_words": self.matched_words,
"matched_categories": self.matched_categories,
}
class FlowInstanceStatus(str, Enum):
"""[AC-AISVC-74~AC-AISVC-77] Flow instance status."""
ACTIVE = "active"
COMPLETED = "completed"
TIMEOUT = "timeout"
CANCELLED = "cancelled"
class TimeoutAction(str, Enum):
"""[AC-AISVC-71] Timeout action for flow steps."""
REPEAT = "repeat"
SKIP = "skip"
TRANSFER = "transfer"
class ScriptFlow(SQLModel, table=True):
"""
[AC-AISVC-71] Script flow entity with tenant isolation.
Stores flow definition with steps in JSONB format.
"""
__tablename__ = "script_flows"
__table_args__ = (
Index("ix_script_flows_tenant_id", "tenant_id"),
Index("ix_script_flows_tenant_enabled", "tenant_id", "is_enabled"),
)
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
tenant_id: str = Field(..., description="Tenant ID for multi-tenant isolation", index=True)
name: str = Field(..., description="Flow name")
description: str | None = Field(default=None, description="Flow description")
steps: list[dict[str, Any]] = Field(
default=[],
sa_column=Column("steps", JSON, nullable=False),
description="Flow steps list with step_no, content, wait_input, timeout_seconds, timeout_action, next_conditions, default_next"
)
is_enabled: bool = Field(default=True, description="Whether the flow is enabled")
created_at: datetime = Field(default_factory=datetime.utcnow, description="Creation time")
updated_at: datetime = Field(default_factory=datetime.utcnow, description="Last update time")
class FlowInstance(SQLModel, table=True):
"""
[AC-AISVC-74] Flow instance entity for runtime state.
Tracks active flow execution per session.
"""
__tablename__ = "flow_instances"
__table_args__ = (
Index("ix_flow_instances_tenant_session", "tenant_id", "session_id"),
Index("ix_flow_instances_tenant_status", "tenant_id", "status"),
)
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
tenant_id: str = Field(..., description="Tenant ID for multi-tenant isolation", index=True)
session_id: str = Field(..., description="Session ID for conversation tracking", index=True)
flow_id: uuid.UUID = Field(..., description="Foreign key to script_flows.id", foreign_key="script_flows.id", index=True)
current_step: int = Field(default=1, ge=1, description="Current step number (1-indexed)")
status: str = Field(default=FlowInstanceStatus.ACTIVE.value, description="Instance status: active/completed/timeout/cancelled")
context: dict[str, Any] | None = Field(
default=None,
sa_column=Column("context", JSON, nullable=True),
description="Flow execution context, stores user inputs etc."
)
started_at: datetime = Field(default_factory=datetime.utcnow, description="Instance start time")
updated_at: datetime = Field(default_factory=datetime.utcnow, description="Last update time")
completed_at: datetime | None = Field(default=None, description="Completion time (nullable)")
class FlowStep(SQLModel):
"""[AC-AISVC-71] Schema for a single flow step."""
step_no: int = Field(..., ge=1, description="Step number (1-indexed)")
content: str = Field(..., description="Script content for this step")
wait_input: bool = Field(default=True, description="Whether to wait for user input")
timeout_seconds: int = Field(default=120, ge=1, description="Timeout in seconds")
timeout_action: str = Field(default=TimeoutAction.REPEAT.value, description="Action on timeout: repeat/skip/transfer")
next_conditions: list[dict[str, Any]] | None = Field(
default=None,
description="Conditions for next step: [{'keywords': [...], 'goto_step': N}, {'pattern': '...', 'goto_step': N}]"
)
default_next: int | None = Field(default=None, description="Default next step if no condition matches")
class ScriptFlowCreate(SQLModel):
"""[AC-AISVC-71] Schema for creating a new script flow."""
name: str
description: str | None = None
steps: list[dict[str, Any]]
is_enabled: bool = True
class ScriptFlowUpdate(SQLModel):
"""[AC-AISVC-73] Schema for updating a script flow."""
name: str | None = None
description: str | None = None
steps: list[dict[str, Any]] | None = None
is_enabled: bool | None = None
class FlowAdvanceResult:
"""
[AC-AISVC-75] Result of flow step advancement.
"""
def __init__(
self,
completed: bool,
message: str | None = None,
current_step: int | None = None,
total_steps: int | None = None,
timeout_action: str | None = None,
):
self.completed = completed
self.message = message
self.current_step = current_step
self.total_steps = total_steps
self.timeout_action = timeout_action
def to_dict(self) -> dict[str, Any]:
result = {
"completed": self.completed,
}
if self.message is not None:
result["message"] = self.message
if self.current_step is not None:
result["current_step"] = self.current_step
if self.total_steps is not None:
result["total_steps"] = self.total_steps
if self.timeout_action is not None:
result["timeout_action"] = self.timeout_action
return result

View File

@ -0,0 +1,9 @@
"""
Flow services for AI Service.
[AC-AISVC-71~AC-AISVC-77] Script flow management and execution engine.
"""
from app.services.flow.flow_service import ScriptFlowService
from app.services.flow.engine import FlowEngine
__all__ = ["ScriptFlowService", "FlowEngine"]

View File

@ -0,0 +1,444 @@
"""
Flow Engine for AI Service.
[AC-AISVC-74~AC-AISVC-77] State machine engine for script flow execution.
"""
import logging
import re
import uuid
from datetime import datetime
from typing import Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import col
from app.models.entities import (
ScriptFlow,
FlowInstance,
FlowInstanceStatus,
FlowAdvanceResult,
TimeoutAction,
)
logger = logging.getLogger(__name__)
class FlowEngine:
"""
[AC-AISVC-74~AC-AISVC-77] State machine engine for script flow execution.
State Machine:
- IDLE: No active flow
- ACTIVE: Flow is being executed
- COMPLETED: Flow finished successfully
- TIMEOUT: Flow timed out
- CANCELLED: Flow was cancelled
Core Methods:
- check_active_flow: Check if session has active flow
- start: Start a new flow instance
- advance: Advance flow based on user input
- handle_timeout: Handle timeout for current step
"""
def __init__(self, session: AsyncSession):
self._session = session
async def check_active_flow(
self,
tenant_id: str,
session_id: str,
) -> FlowInstance | None:
"""
[AC-AISVC-75] Check if session has an active flow instance.
Args:
tenant_id: Tenant ID for isolation
session_id: Session ID to check
Returns:
Active FlowInstance or None
"""
stmt = select(FlowInstance).where(
FlowInstance.tenant_id == tenant_id,
FlowInstance.session_id == session_id,
FlowInstance.status == FlowInstanceStatus.ACTIVE.value,
)
result = await self._session.execute(stmt)
return result.scalar_one_or_none()
async def start(
self,
tenant_id: str,
session_id: str,
flow_id: uuid.UUID,
) -> tuple[FlowInstance | None, str | None]:
"""
[AC-AISVC-74] Start a new flow instance and return first step content.
Args:
tenant_id: Tenant ID for isolation
session_id: Session ID for the conversation
flow_id: ID of the flow to start
Returns:
Tuple of (FlowInstance, first_step_content) or (None, error_message)
"""
active = await self.check_active_flow(tenant_id, session_id)
if active:
logger.warning(
f"[AC-AISVC-74] Session already has active flow: "
f"tenant={tenant_id}, session={session_id}"
)
return None, "Session already has an active flow"
flow = await self._get_flow(tenant_id, flow_id)
if not flow:
logger.warning(
f"[AC-AISVC-74] Flow not found: tenant={tenant_id}, flow_id={flow_id}"
)
return None, "Flow not found"
if not flow.is_enabled:
logger.warning(
f"[AC-AISVC-74] Flow is disabled: tenant={tenant_id}, flow_id={flow_id}"
)
return None, "Flow is disabled"
if not flow.steps:
logger.warning(
f"[AC-AISVC-74] Flow has no steps: tenant={tenant_id}, flow_id={flow_id}"
)
return None, "Flow has no steps"
instance = FlowInstance(
tenant_id=tenant_id,
session_id=session_id,
flow_id=flow_id,
current_step=1,
status=FlowInstanceStatus.ACTIVE.value,
context={"inputs": []},
)
self._session.add(instance)
await self._session.flush()
first_step = flow.steps[0]
first_content = first_step.get("content", "")
logger.info(
f"[AC-AISVC-74] Started flow instance: tenant={tenant_id}, "
f"session={session_id}, flow_id={flow_id}, step=1/{len(flow.steps)}"
)
return instance, first_content
async def advance(
self,
tenant_id: str,
session_id: str,
user_input: str,
) -> FlowAdvanceResult:
"""
[AC-AISVC-75, AC-AISVC-76] Advance flow based on user input.
Args:
tenant_id: Tenant ID for isolation
session_id: Session ID for the conversation
user_input: User's input message
Returns:
FlowAdvanceResult with completion status and next message
"""
instance = await self.check_active_flow(tenant_id, session_id)
if not instance:
logger.debug(
f"[AC-AISVC-75] No active flow for session: tenant={tenant_id}, session={session_id}"
)
return FlowAdvanceResult(completed=True, message=None)
flow = await self._get_flow_by_id(instance.flow_id)
if not flow:
await self._cancel_instance(instance, "Flow definition not found")
return FlowAdvanceResult(completed=True, message=None)
current_step_idx = instance.current_step - 1
if current_step_idx >= len(flow.steps):
await self._complete_instance(instance)
return FlowAdvanceResult(completed=True, message=None)
current_step = flow.steps[current_step_idx]
self._record_input(instance, user_input)
next_step_no = self._match_next_step(current_step, user_input)
if next_step_no is None:
default_next = current_step.get("default_next")
if default_next:
next_step_no = default_next
else:
logger.debug(
f"[AC-AISVC-75] No condition matched, repeating step: "
f"tenant={tenant_id}, session={session_id}, step={instance.current_step}"
)
return FlowAdvanceResult(
completed=False,
message=current_step.get("content", ""),
current_step=instance.current_step,
total_steps=len(flow.steps),
)
if next_step_no > len(flow.steps):
await self._complete_instance(instance)
logger.info(
f"[AC-AISVC-76] Flow completed: tenant={tenant_id}, "
f"session={session_id}, flow_id={instance.flow_id}"
)
return FlowAdvanceResult(completed=True, message=None)
instance.current_step = next_step_no
instance.updated_at = datetime.utcnow()
await self._session.flush()
next_step = flow.steps[next_step_no - 1]
next_content = next_step.get("content", "")
logger.info(
f"[AC-AISVC-75] Advanced flow: tenant={tenant_id}, "
f"session={session_id}, step={next_step_no}/{len(flow.steps)}"
)
return FlowAdvanceResult(
completed=False,
message=next_content,
current_step=next_step_no,
total_steps=len(flow.steps),
)
async def handle_timeout(
self,
tenant_id: str,
session_id: str,
) -> FlowAdvanceResult:
"""
[AC-AISVC-77] Handle timeout for current step.
Args:
tenant_id: Tenant ID for isolation
session_id: Session ID for the conversation
Returns:
FlowAdvanceResult based on timeout_action configuration
"""
instance = await self.check_active_flow(tenant_id, session_id)
if not instance:
return FlowAdvanceResult(completed=True, message=None)
flow = await self._get_flow_by_id(instance.flow_id)
if not flow:
await self._cancel_instance(instance, "Flow definition not found")
return FlowAdvanceResult(completed=True, message=None)
current_step_idx = instance.current_step - 1
if current_step_idx >= len(flow.steps):
await self._complete_instance(instance)
return FlowAdvanceResult(completed=True, message=None)
current_step = flow.steps[current_step_idx]
timeout_action = current_step.get("timeout_action", TimeoutAction.REPEAT.value)
logger.info(
f"[AC-AISVC-77] Handling timeout: tenant={tenant_id}, "
f"session={session_id}, step={instance.current_step}, action={timeout_action}"
)
if timeout_action == TimeoutAction.REPEAT.value:
return FlowAdvanceResult(
completed=False,
message=current_step.get("content", ""),
current_step=instance.current_step,
total_steps=len(flow.steps),
timeout_action=timeout_action,
)
elif timeout_action == TimeoutAction.SKIP.value:
default_next = current_step.get("default_next")
if default_next and default_next <= len(flow.steps):
instance.current_step = default_next
instance.updated_at = datetime.utcnow()
await self._session.flush()
next_step = flow.steps[default_next - 1]
return FlowAdvanceResult(
completed=False,
message=next_step.get("content", ""),
current_step=default_next,
total_steps=len(flow.steps),
timeout_action=timeout_action,
)
else:
await self._complete_instance(instance)
return FlowAdvanceResult(completed=True, message=None)
elif timeout_action == TimeoutAction.TRANSFER.value:
instance.status = FlowInstanceStatus.TIMEOUT.value
instance.completed_at = datetime.utcnow()
instance.updated_at = datetime.utcnow()
await self._session.flush()
return FlowAdvanceResult(
completed=True,
message="抱歉,等待超时,正在为您转接人工客服...",
timeout_action=timeout_action,
)
return FlowAdvanceResult(
completed=False,
message=current_step.get("content", ""),
timeout_action=timeout_action,
)
async def cancel_flow(
self,
tenant_id: str,
session_id: str,
reason: str = "User cancelled",
) -> bool:
"""Cancel an active flow instance."""
instance = await self.check_active_flow(tenant_id, session_id)
if not instance:
return False
await self._cancel_instance(instance, reason)
return True
async def get_flow_status(
self,
tenant_id: str,
session_id: str,
) -> dict[str, Any] | None:
"""Get the current flow status for a session."""
stmt = select(FlowInstance).where(
FlowInstance.tenant_id == tenant_id,
FlowInstance.session_id == session_id,
).order_by(col(FlowInstance.created_at).desc())
result = await self._session.execute(stmt)
instance = result.scalar_one_or_none()
if not instance:
return None
flow = await self._get_flow_by_id(instance.flow_id)
return {
"instance_id": str(instance.id),
"flow_id": str(instance.flow_id),
"flow_name": flow.name if flow else None,
"current_step": instance.current_step,
"total_steps": len(flow.steps) if flow else 0,
"status": instance.status,
"started_at": instance.started_at.isoformat(),
"updated_at": instance.updated_at.isoformat(),
"completed_at": instance.completed_at.isoformat() if instance.completed_at else None,
}
async def _get_flow(
self,
tenant_id: str,
flow_id: uuid.UUID,
) -> ScriptFlow | None:
"""Get flow by ID with tenant isolation."""
stmt = select(ScriptFlow).where(
ScriptFlow.tenant_id == tenant_id,
ScriptFlow.id == flow_id,
)
result = await self._session.execute(stmt)
return result.scalar_one_or_none()
async def _get_flow_by_id(
self,
flow_id: uuid.UUID,
) -> ScriptFlow | None:
"""Get flow by ID without tenant check (for internal use)."""
stmt = select(ScriptFlow).where(ScriptFlow.id == flow_id)
result = await self._session.execute(stmt)
return result.scalar_one_or_none()
def _match_next_step(
self,
step: dict[str, Any],
user_input: str,
) -> int | None:
"""
Match user input against next_conditions.
Args:
step: Current step definition
user_input: User's input message
Returns:
goto_step number if matched, None otherwise
"""
next_conditions = step.get("next_conditions", [])
if not next_conditions:
return None
user_input_lower = user_input.lower()
for condition in next_conditions:
keywords = condition.get("keywords", [])
for keyword in keywords:
if keyword.lower() in user_input_lower:
return condition.get("goto_step")
pattern = condition.get("pattern")
if pattern:
try:
if re.search(pattern, user_input, re.IGNORECASE):
return condition.get("goto_step")
except re.error:
logger.warning(f"Invalid regex pattern: {pattern}")
return None
def _record_input(
self,
instance: FlowInstance,
user_input: str,
) -> None:
"""Record user input in flow context."""
if instance.context is None:
instance.context = {"inputs": []}
inputs = instance.context.get("inputs", [])
inputs.append({
"step": instance.current_step,
"input": user_input,
"timestamp": datetime.utcnow().isoformat(),
})
instance.context["inputs"] = inputs
async def _complete_instance(
self,
instance: FlowInstance,
) -> None:
"""Mark instance as completed."""
instance.status = FlowInstanceStatus.COMPLETED.value
instance.completed_at = datetime.utcnow()
instance.updated_at = datetime.utcnow()
await self._session.flush()
async def _cancel_instance(
self,
instance: FlowInstance,
reason: str = "",
) -> None:
"""Mark instance as cancelled."""
instance.status = FlowInstanceStatus.CANCELLED.value
instance.completed_at = datetime.utcnow()
instance.updated_at = datetime.utcnow()
if instance.context is None:
instance.context = {}
instance.context["cancel_reason"] = reason
await self._session.flush()

View File

@ -0,0 +1,217 @@
"""
Script Flow Service for AI Service.
[AC-AISVC-71, AC-AISVC-72, AC-AISVC-73] Flow definition CRUD operations.
"""
import logging
import uuid
from datetime import datetime
from typing import Any, Sequence
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import col
from app.models.entities import (
ScriptFlow,
ScriptFlowCreate,
ScriptFlowUpdate,
FlowInstanceStatus,
)
logger = logging.getLogger(__name__)
class ScriptFlowService:
"""
[AC-AISVC-71~AC-AISVC-73] Service for managing script flow definitions.
Features:
- Flow CRUD with tenant isolation
- Step validation
- Linked intent rule count tracking
"""
def __init__(self, session: AsyncSession):
self._session = session
async def create_flow(
self,
tenant_id: str,
create_data: ScriptFlowCreate,
) -> ScriptFlow:
"""
[AC-AISVC-71] Create a new script flow with steps.
"""
self._validate_steps(create_data.steps)
flow = ScriptFlow(
tenant_id=tenant_id,
name=create_data.name,
description=create_data.description,
steps=create_data.steps,
is_enabled=create_data.is_enabled,
)
self._session.add(flow)
await self._session.flush()
logger.info(
f"[AC-AISVC-71] Created script flow: tenant={tenant_id}, "
f"id={flow.id}, name={flow.name}, steps={len(flow.steps)}"
)
return flow
async def list_flows(
self,
tenant_id: str,
is_enabled: bool | None = None,
) -> Sequence[ScriptFlow]:
"""
[AC-AISVC-72] List flows for a tenant, optionally filtered by enabled status.
"""
stmt = select(ScriptFlow).where(
ScriptFlow.tenant_id == tenant_id
)
if is_enabled is not None:
stmt = stmt.where(ScriptFlow.is_enabled == is_enabled)
stmt = stmt.order_by(col(ScriptFlow.created_at).desc())
result = await self._session.execute(stmt)
return result.scalars().all()
async def get_flow(
self,
tenant_id: str,
flow_id: uuid.UUID,
) -> ScriptFlow | None:
"""
Get flow by ID with tenant isolation.
"""
stmt = select(ScriptFlow).where(
ScriptFlow.tenant_id == tenant_id,
ScriptFlow.id == flow_id,
)
result = await self._session.execute(stmt)
return result.scalar_one_or_none()
async def get_flow_detail(
self,
tenant_id: str,
flow_id: uuid.UUID,
) -> dict[str, Any] | None:
"""
[AC-AISVC-73] Get flow detail with complete step definitions.
"""
flow = await self.get_flow(tenant_id, flow_id)
if not flow:
return None
linked_rule_count = await self._get_linked_rule_count(tenant_id, flow_id)
return {
"id": str(flow.id),
"name": flow.name,
"description": flow.description,
"steps": flow.steps,
"is_enabled": flow.is_enabled,
"step_count": len(flow.steps),
"linked_rule_count": linked_rule_count,
"created_at": flow.created_at.isoformat(),
"updated_at": flow.updated_at.isoformat(),
}
async def update_flow(
self,
tenant_id: str,
flow_id: uuid.UUID,
update_data: ScriptFlowUpdate,
) -> ScriptFlow | None:
"""
[AC-AISVC-73] Update flow definition.
"""
flow = await self.get_flow(tenant_id, flow_id)
if not flow:
return None
if update_data.name is not None:
flow.name = update_data.name
if update_data.description is not None:
flow.description = update_data.description
if update_data.steps is not None:
self._validate_steps(update_data.steps)
flow.steps = update_data.steps
if update_data.is_enabled is not None:
flow.is_enabled = update_data.is_enabled
flow.updated_at = datetime.utcnow()
await self._session.flush()
logger.info(
f"[AC-AISVC-73] Updated script flow: tenant={tenant_id}, id={flow_id}"
)
return flow
async def delete_flow(
self,
tenant_id: str,
flow_id: uuid.UUID,
) -> bool:
"""Delete a flow definition."""
flow = await self.get_flow(tenant_id, flow_id)
if not flow:
return False
await self._session.delete(flow)
await self._session.flush()
logger.info(
f"Deleted script flow: tenant={tenant_id}, id={flow_id}"
)
return True
async def get_step_count(
self,
tenant_id: str,
flow_id: uuid.UUID,
) -> int:
"""Get the number of steps in a flow."""
flow = await self.get_flow(tenant_id, flow_id)
return len(flow.steps) if flow else 0
async def _get_linked_rule_count(
self,
tenant_id: str,
flow_id: uuid.UUID,
) -> int:
"""Get count of intent rules linked to this flow."""
from app.models.entities import IntentRule
stmt = select(func.count()).select_from(IntentRule).where(
IntentRule.tenant_id == tenant_id,
IntentRule.flow_id == flow_id,
)
result = await self._session.execute(stmt)
return result.scalar() or 0
def _validate_steps(self, steps: list[dict[str, Any]]) -> None:
"""Validate step definitions."""
if not steps:
raise ValueError("Flow must have at least one step")
step_nos = set()
for step in steps:
step_no = step.get("step_no")
if step_no is None:
raise ValueError("Each step must have a step_no")
if step_no in step_nos:
raise ValueError(f"Duplicate step_no: {step_no}")
step_nos.add(step_no)
if not step.get("content"):
raise ValueError(f"Step {step_no} must have content")
next_conditions = step.get("next_conditions", [])
for cond in next_conditions:
if cond.get("goto_step") is None:
raise ValueError(f"next_condition in step {step_no} must have goto_step")

View File

@ -47,7 +47,7 @@ Phase 11 多知识库管理核心功能已完成 (T11.1-T11.5)T11.6Optimiz
- [x] T11.1 扩展 `KnowledgeBase` 实体:新增 `kb_type`、`priority`、`is_enabled`、`doc_count` 字段 `[AC-AISVC-59]`
- [x] T11.2 实现知识库 CRUD 服务:创建时初始化 Qdrant Collection删除时清理 Collection `[AC-AISVC-59, AC-AISVC-61, AC-AISVC-62]`
- [x] T11.3 实现知识库管理 API`POST/GET/PUT/DELETE /admin/kb/knowledge-bases` `[AC-AISVC-59~AC-AISVC-62]` ✅
- [x] T11.3 实现知识库管理 API`POST/GET/PUT/DELETE /admin/kb/knowledge-bases` `[AC-AISVC-59, AC-AISVC-60, AC-AISVC-61, AC-AISVC-62]` ✅
- [x] T11.4 升级 Qdrant Collection 命名:`kb_{tenant_id}_{kb_id}`,兼容现有 `kb_{tenant_id}` `[AC-AISVC-63]`
- [x] T11.5 修改文档上传流程:支持指定 `kbId` 参数,索引到对应 Collection `[AC-AISVC-63]`

View File

@ -210,12 +210,12 @@ last_updated: "2026-02-27"
> 目标:实现固定话术步骤的状态机引擎,支持多轮引导对话。
- [ ] T13.1 定义 `ScriptFlow``FlowInstance` SQLModel 实体,创建数据库表 `[AC-AISVC-71, AC-AISVC-74]`
- [ ] T13.2 实现 `ScriptFlowService`:流程定义 CRUD `[AC-AISVC-71, AC-AISVC-72, AC-AISVC-73]`
- [ ] T13.3 实现 `FlowEngine.check_active_flow()`:检查会话是否有活跃流程实例 `[AC-AISVC-75]`
- [ ] T13.4 实现 `FlowEngine.start()`:创建流程实例,返回第一步话术 `[AC-AISVC-74]`
- [ ] T13.5 实现 `FlowEngine.advance()`:根据用户输入匹配条件,推进步骤或重复当前步骤 `[AC-AISVC-75, AC-AISVC-76]`
- [ ] T13.6 实现话术流程管理 API`POST/GET/PUT /admin/script-flows` `[AC-AISVC-71, AC-AISVC-72, AC-AISVC-73]`
- [x] T13.1 定义 `ScriptFlow``FlowInstance` SQLModel 实体,创建数据库表 `[AC-AISVC-71, AC-AISVC-74]`
- [x] T13.2 实现 `ScriptFlowService`:流程定义 CRUD `[AC-AISVC-71, AC-AISVC-72, AC-AISVC-73]`
- [x] T13.3 实现 `FlowEngine.check_active_flow()`:检查会话是否有活跃流程实例 `[AC-AISVC-75]`
- [x] T13.4 实现 `FlowEngine.start()`:创建流程实例,返回第一步话术 `[AC-AISVC-74]`
- [x] T13.5 实现 `FlowEngine.advance()`:根据用户输入匹配条件,推进步骤或重复当前步骤 `[AC-AISVC-75, AC-AISVC-76]`
- [x] T13.6 实现话术流程管理 API`POST/GET/PUT /admin/script-flows` `[AC-AISVC-71, AC-AISVC-72, AC-AISVC-73]`
- [ ] T13.7 编写话术流程引擎单元测试 `[AC-AISVC-71~AC-AISVC-77]`
---