ai-robot-core/ai-service/app/services/flow/engine.py

445 lines
14 KiB
Python
Raw Normal View History

"""
Flow Engine for AI Service.
[AC-AISVC-74~AC-AISVC-77] State machine engine for script flow execution.
"""
import logging
import re
import uuid
from datetime import datetime
from typing import Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import col
from app.models.entities import (
FlowAdvanceResult,
FlowInstance,
FlowInstanceStatus,
ScriptFlow,
TimeoutAction,
)
logger = logging.getLogger(__name__)
class FlowEngine:
"""
[AC-AISVC-74~AC-AISVC-77] State machine engine for script flow execution.
State Machine:
- IDLE: No active flow
- ACTIVE: Flow is being executed
- COMPLETED: Flow finished successfully
- TIMEOUT: Flow timed out
- CANCELLED: Flow was cancelled
Core Methods:
- check_active_flow: Check if session has active flow
- start: Start a new flow instance
- advance: Advance flow based on user input
- handle_timeout: Handle timeout for current step
"""
def __init__(self, session: AsyncSession):
self._session = session
async def check_active_flow(
self,
tenant_id: str,
session_id: str,
) -> FlowInstance | None:
"""
[AC-AISVC-75] Check if session has an active flow instance.
Args:
tenant_id: Tenant ID for isolation
session_id: Session ID to check
Returns:
Active FlowInstance or None
"""
stmt = select(FlowInstance).where(
FlowInstance.tenant_id == tenant_id,
FlowInstance.session_id == session_id,
FlowInstance.status == FlowInstanceStatus.ACTIVE.value,
)
result = await self._session.execute(stmt)
return result.scalar_one_or_none()
async def start(
self,
tenant_id: str,
session_id: str,
flow_id: uuid.UUID,
) -> tuple[FlowInstance | None, str | None]:
"""
[AC-AISVC-74] Start a new flow instance and return first step content.
Args:
tenant_id: Tenant ID for isolation
session_id: Session ID for the conversation
flow_id: ID of the flow to start
Returns:
Tuple of (FlowInstance, first_step_content) or (None, error_message)
"""
active = await self.check_active_flow(tenant_id, session_id)
if active:
logger.warning(
f"[AC-AISVC-74] Session already has active flow: "
f"tenant={tenant_id}, session={session_id}"
)
return None, "Session already has an active flow"
flow = await self._get_flow(tenant_id, flow_id)
if not flow:
logger.warning(
f"[AC-AISVC-74] Flow not found: tenant={tenant_id}, flow_id={flow_id}"
)
return None, "Flow not found"
if not flow.is_enabled:
logger.warning(
f"[AC-AISVC-74] Flow is disabled: tenant={tenant_id}, flow_id={flow_id}"
)
return None, "Flow is disabled"
if not flow.steps:
logger.warning(
f"[AC-AISVC-74] Flow has no steps: tenant={tenant_id}, flow_id={flow_id}"
)
return None, "Flow has no steps"
instance = FlowInstance(
tenant_id=tenant_id,
session_id=session_id,
flow_id=flow_id,
current_step=1,
status=FlowInstanceStatus.ACTIVE.value,
context={"inputs": []},
)
self._session.add(instance)
await self._session.flush()
first_step = flow.steps[0]
first_content = first_step.get("content", "")
logger.info(
f"[AC-AISVC-74] Started flow instance: tenant={tenant_id}, "
f"session={session_id}, flow_id={flow_id}, step=1/{len(flow.steps)}"
)
return instance, first_content
async def advance(
self,
tenant_id: str,
session_id: str,
user_input: str,
) -> FlowAdvanceResult:
"""
[AC-AISVC-75, AC-AISVC-76] Advance flow based on user input.
Args:
tenant_id: Tenant ID for isolation
session_id: Session ID for the conversation
user_input: User's input message
Returns:
FlowAdvanceResult with completion status and next message
"""
instance = await self.check_active_flow(tenant_id, session_id)
if not instance:
logger.debug(
f"[AC-AISVC-75] No active flow for session: tenant={tenant_id}, session={session_id}"
)
return FlowAdvanceResult(completed=True, message=None)
flow = await self._get_flow_by_id(instance.flow_id)
if not flow:
await self._cancel_instance(instance, "Flow definition not found")
return FlowAdvanceResult(completed=True, message=None)
current_step_idx = instance.current_step - 1
if current_step_idx >= len(flow.steps):
await self._complete_instance(instance)
return FlowAdvanceResult(completed=True, message=None)
current_step = flow.steps[current_step_idx]
self._record_input(instance, user_input)
next_step_no = self._match_next_step(current_step, user_input)
if next_step_no is None:
default_next = current_step.get("default_next")
if default_next:
next_step_no = default_next
else:
logger.debug(
f"[AC-AISVC-75] No condition matched, repeating step: "
f"tenant={tenant_id}, session={session_id}, step={instance.current_step}"
)
return FlowAdvanceResult(
completed=False,
message=current_step.get("content", ""),
current_step=instance.current_step,
total_steps=len(flow.steps),
)
if next_step_no > len(flow.steps):
await self._complete_instance(instance)
logger.info(
f"[AC-AISVC-76] Flow completed: tenant={tenant_id}, "
f"session={session_id}, flow_id={instance.flow_id}"
)
return FlowAdvanceResult(completed=True, message=None)
instance.current_step = next_step_no
instance.updated_at = datetime.utcnow()
await self._session.flush()
next_step = flow.steps[next_step_no - 1]
next_content = next_step.get("content", "")
logger.info(
f"[AC-AISVC-75] Advanced flow: tenant={tenant_id}, "
f"session={session_id}, step={next_step_no}/{len(flow.steps)}"
)
return FlowAdvanceResult(
completed=False,
message=next_content,
current_step=next_step_no,
total_steps=len(flow.steps),
)
async def handle_timeout(
self,
tenant_id: str,
session_id: str,
) -> FlowAdvanceResult:
"""
[AC-AISVC-77] Handle timeout for current step.
Args:
tenant_id: Tenant ID for isolation
session_id: Session ID for the conversation
Returns:
FlowAdvanceResult based on timeout_action configuration
"""
instance = await self.check_active_flow(tenant_id, session_id)
if not instance:
return FlowAdvanceResult(completed=True, message=None)
flow = await self._get_flow_by_id(instance.flow_id)
if not flow:
await self._cancel_instance(instance, "Flow definition not found")
return FlowAdvanceResult(completed=True, message=None)
current_step_idx = instance.current_step - 1
if current_step_idx >= len(flow.steps):
await self._complete_instance(instance)
return FlowAdvanceResult(completed=True, message=None)
current_step = flow.steps[current_step_idx]
timeout_action = current_step.get("timeout_action", TimeoutAction.REPEAT.value)
logger.info(
f"[AC-AISVC-77] Handling timeout: tenant={tenant_id}, "
f"session={session_id}, step={instance.current_step}, action={timeout_action}"
)
if timeout_action == TimeoutAction.REPEAT.value:
return FlowAdvanceResult(
completed=False,
message=current_step.get("content", ""),
current_step=instance.current_step,
total_steps=len(flow.steps),
timeout_action=timeout_action,
)
elif timeout_action == TimeoutAction.SKIP.value:
default_next = current_step.get("default_next")
if default_next and default_next <= len(flow.steps):
instance.current_step = default_next
instance.updated_at = datetime.utcnow()
await self._session.flush()
next_step = flow.steps[default_next - 1]
return FlowAdvanceResult(
completed=False,
message=next_step.get("content", ""),
current_step=default_next,
total_steps=len(flow.steps),
timeout_action=timeout_action,
)
else:
await self._complete_instance(instance)
return FlowAdvanceResult(completed=True, message=None)
elif timeout_action == TimeoutAction.TRANSFER.value:
instance.status = FlowInstanceStatus.TIMEOUT.value
instance.completed_at = datetime.utcnow()
instance.updated_at = datetime.utcnow()
await self._session.flush()
return FlowAdvanceResult(
completed=True,
message="抱歉,等待超时,正在为您转接人工客服...",
timeout_action=timeout_action,
)
return FlowAdvanceResult(
completed=False,
message=current_step.get("content", ""),
timeout_action=timeout_action,
)
async def cancel_flow(
self,
tenant_id: str,
session_id: str,
reason: str = "User cancelled",
) -> bool:
"""Cancel an active flow instance."""
instance = await self.check_active_flow(tenant_id, session_id)
if not instance:
return False
await self._cancel_instance(instance, reason)
return True
async def get_flow_status(
self,
tenant_id: str,
session_id: str,
) -> dict[str, Any] | None:
"""Get the current flow status for a session."""
stmt = select(FlowInstance).where(
FlowInstance.tenant_id == tenant_id,
FlowInstance.session_id == session_id,
).order_by(col(FlowInstance.created_at).desc())
result = await self._session.execute(stmt)
instance = result.scalar_one_or_none()
if not instance:
return None
flow = await self._get_flow_by_id(instance.flow_id)
return {
"instance_id": str(instance.id),
"flow_id": str(instance.flow_id),
"flow_name": flow.name if flow else None,
"current_step": instance.current_step,
"total_steps": len(flow.steps) if flow else 0,
"status": instance.status,
"started_at": instance.started_at.isoformat(),
"updated_at": instance.updated_at.isoformat(),
"completed_at": instance.completed_at.isoformat() if instance.completed_at else None,
}
async def _get_flow(
self,
tenant_id: str,
flow_id: uuid.UUID,
) -> ScriptFlow | None:
"""Get flow by ID with tenant isolation."""
stmt = select(ScriptFlow).where(
ScriptFlow.tenant_id == tenant_id,
ScriptFlow.id == flow_id,
)
result = await self._session.execute(stmt)
return result.scalar_one_or_none()
async def _get_flow_by_id(
self,
flow_id: uuid.UUID,
) -> ScriptFlow | None:
"""Get flow by ID without tenant check (for internal use)."""
stmt = select(ScriptFlow).where(ScriptFlow.id == flow_id)
result = await self._session.execute(stmt)
return result.scalar_one_or_none()
def _match_next_step(
self,
step: dict[str, Any],
user_input: str,
) -> int | None:
"""
Match user input against next_conditions.
Args:
step: Current step definition
user_input: User's input message
Returns:
goto_step number if matched, None otherwise
"""
next_conditions = step.get("next_conditions", [])
if not next_conditions:
return None
user_input_lower = user_input.lower()
for condition in next_conditions:
keywords = condition.get("keywords", [])
for keyword in keywords:
if keyword.lower() in user_input_lower:
return condition.get("goto_step")
pattern = condition.get("pattern")
if pattern:
try:
if re.search(pattern, user_input, re.IGNORECASE):
return condition.get("goto_step")
except re.error:
logger.warning(f"Invalid regex pattern: {pattern}")
return None
def _record_input(
self,
instance: FlowInstance,
user_input: str,
) -> None:
"""Record user input in flow context."""
if instance.context is None:
instance.context = {"inputs": []}
inputs = instance.context.get("inputs", [])
inputs.append({
"step": instance.current_step,
"input": user_input,
"timestamp": datetime.utcnow().isoformat(),
})
instance.context["inputs"] = inputs
async def _complete_instance(
self,
instance: FlowInstance,
) -> None:
"""Mark instance as completed."""
instance.status = FlowInstanceStatus.COMPLETED.value
instance.completed_at = datetime.utcnow()
instance.updated_at = datetime.utcnow()
await self._session.flush()
async def _cancel_instance(
self,
instance: FlowInstance,
reason: str = "",
) -> None:
"""Mark instance as cancelled."""
instance.status = FlowInstanceStatus.CANCELLED.value
instance.completed_at = datetime.utcnow()
instance.updated_at = datetime.utcnow()
if instance.context is None:
instance.context = {}
instance.context["cancel_reason"] = reason
await self._session.flush()