feat: refactor intent_hint and high_risk_check tools to only consume routing_signal fields [AC-MRS-13]

This commit is contained in:
MerCry 2026-03-05 17:19:16 +08:00
parent f9fe6ec615
commit 6e7c162195
2 changed files with 828 additions and 0 deletions

View File

@ -0,0 +1,476 @@
"""
High Risk Check Tool for Mid Platform.
[AC-IDMP-05, AC-IDMP-20] 高风险场景检测工具支持元数据驱动配置
[AC-MRS-13] 只消费 field_roles 包含 routing_signal 的字段
核心特性
- 基于租户元数据配置进行风险判定不是写死关键词
- 支持关键词 + 正则 + 优先级
- 租户隔离tenant_id 必须参与查询
- 超时 <= 500ms可配置
- 返回结构化结果不抛硬异常
- 工具失败时返回可降级结果
- 只消费 routing_signal 角色的字段
高风险场景最小集
1. refund退款
2. complaint_escalation投诉升级
3. privacy_sensitive_promise隐私敏感承诺
4. transfer转人工
"""
from __future__ import annotations
import asyncio
import logging
import re
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.entities import HighRiskPolicy
from app.models.entities import FieldRole
from app.models.mid.schemas import (
ExecutionMode,
HighRiskCheckResult,
HighRiskScenario,
ToolCallStatus,
ToolCallTrace,
ToolType,
)
from app.services.mid.role_based_field_provider import RoleBasedFieldProvider
if TYPE_CHECKING:
from app.services.mid.tool_registry import ToolRegistry
logger = logging.getLogger(__name__)
HIGH_RISK_CHECK_TOOL_NAME = "high_risk_check"
DEFAULT_TIMEOUT_MS = 500
DEFAULT_CONFIDENCE = 0.9
@dataclass
class HighRiskCheckConfig:
"""高风险检测工具配置。"""
enabled: bool = True
timeout_ms: int = DEFAULT_TIMEOUT_MS
default_confidence: float = DEFAULT_CONFIDENCE
@dataclass
class CompiledRule:
"""编译后的高风险规则。"""
rule_id: str
scenario: HighRiskScenario
handler_mode: ExecutionMode
flow_id: str | None
transfer_message: str | None
priority: int
keywords: list[str] = field(default_factory=list)
patterns: list[re.Pattern] = field(default_factory=list)
class HighRiskCheckTool:
"""
[AC-IDMP-05, AC-IDMP-20] 高风险场景检测工具
[AC-MRS-13] 只消费 field_roles 包含 routing_signal 的字段
通过元数据配置动态加载风险规则支持
- 关键词匹配大小写不敏感
- 正则表达式匹配
- 优先级排序优先匹配高优先级规则
- 租户隔离
- 只消费 routing_signal 角色的字段
工具输入
- message: str用户消息
- tenant_id: str租户ID
- domain?: str领域可选
- scene?: str场景可选
- context?: dict上下文可选只消费 routing_signal 字段
工具输出
- matched: bool
- risk_scenario: refund|complaint_escalation|privacy_sensitive_promise|transfer|none
- confidence: float
- recommended_mode: micro_flow|transfer|agent
- rule_id?: str
- reason?: str
- fallback_reason_code?: str
"""
def __init__(
self,
session: AsyncSession,
config: HighRiskCheckConfig | None = None,
):
self._session = session
self._config = config or HighRiskCheckConfig()
self._rules_cache: dict[str, list[CompiledRule]] = {}
self._cache_time: dict[str, float] = {}
self._cache_ttl_seconds = 60
self._role_provider = RoleBasedFieldProvider(session)
@property
def name(self) -> str:
return HIGH_RISK_CHECK_TOOL_NAME
@property
def description(self) -> str:
return (
"高风险场景检测工具。"
"基于租户配置的风险规则,检测用户消息是否命中退款、投诉、隐私承诺、转人工等高风险场景。"
"返回结构化风险结果供 policy_router 使用。"
)
def get_tool_schema(self) -> dict[str, Any]:
return {
"name": self.name,
"description": self.description,
"parameters": {
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "用户消息内容",
},
"tenant_id": {
"type": "string",
"description": "租户ID",
},
"domain": {
"type": "string",
"description": "领域标识(可选)",
},
"scene": {
"type": "string",
"description": "场景标识(可选)",
},
"context": {
"type": "object",
"description": "额外上下文信息",
},
},
"required": ["message", "tenant_id"],
},
}
async def execute(
self,
message: str,
tenant_id: str,
domain: str | None = None,
scene: str | None = None,
context: dict[str, Any] | None = None,
) -> HighRiskCheckResult:
"""
[AC-IDMP-05, AC-IDMP-20] 执行高风险检测
[AC-MRS-13] 只消费 routing_signal 角色的字段
Args:
message: 用户消息
tenant_id: 租户ID
domain: 领域可选
scene: 场景可选
context: 上下文可选只消费 routing_signal 字段
Returns:
HighRiskCheckResult 结构化结果
"""
if not self._config.enabled:
logger.info(f"[AC-IDMP-05] High risk check disabled for tenant={tenant_id}")
return HighRiskCheckResult(
matched=False,
fallback_reason_code="TOOL_DISABLED",
)
start_time = time.time()
routing_signal_fields = await self._role_provider.get_routing_signal_field_keys(tenant_id)
logger.info(
f"[AC-MRS-13] Retrieved {len(routing_signal_fields)} routing_signal fields for tenant={tenant_id}: {routing_signal_fields}"
)
routing_context = {}
if context:
routing_context = {k: v for k, v in context.items() if k in routing_signal_fields}
if routing_context:
logger.info(
f"[AC-MRS-13] Applied routing_signal context: {list(routing_context.keys())}"
)
logger.info(
f"[AC-IDMP-05] Starting high risk check: tenant={tenant_id}, "
f"message={message[:50]}..."
)
try:
timeout_seconds = self._config.timeout_ms / 1000.0
rules = await asyncio.wait_for(
self._get_rules(tenant_id),
timeout=timeout_seconds,
)
if not rules:
logger.info(f"[AC-IDMP-05] No high risk rules for tenant={tenant_id}")
duration_ms = int((time.time() - start_time) * 1000)
return HighRiskCheckResult(
matched=False,
duration_ms=duration_ms,
)
sorted_rules = sorted(rules, key=lambda r: r.priority, reverse=True)
for rule in sorted_rules:
match_result = self._match_rule(message, rule)
if match_result:
duration_ms = int((time.time() - start_time) * 1000)
recommended_mode = rule.handler_mode
logger.info(
f"[AC-IDMP-05] High risk matched: tenant={tenant_id}, "
f"scenario={rule.scenario.value}, rule_id={rule.rule_id}, "
f"duration_ms={duration_ms}"
)
return HighRiskCheckResult(
matched=True,
risk_scenario=rule.scenario,
confidence=self._config.default_confidence,
recommended_mode=recommended_mode,
rule_id=rule.rule_id,
reason=f"匹配到高风险规则: {rule.scenario.value}",
duration_ms=duration_ms,
matched_text=match_result.get("text"),
matched_pattern=match_result.get("pattern"),
)
duration_ms = int((time.time() - start_time) * 1000)
logger.info(
f"[AC-IDMP-05] No high risk matched: tenant={tenant_id}, "
f"duration_ms={duration_ms}"
)
return HighRiskCheckResult(
matched=False,
duration_ms=duration_ms,
)
except asyncio.TimeoutError:
duration_ms = int((time.time() - start_time) * 1000)
logger.warning(
f"[AC-IDMP-05] High risk check timeout: tenant={tenant_id}, "
f"duration_ms={duration_ms}"
)
return HighRiskCheckResult(
matched=False,
fallback_reason_code="CHECK_TIMEOUT",
duration_ms=duration_ms,
)
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
logger.error(
f"[AC-IDMP-05] High risk check error: tenant={tenant_id}, "
f"error={e}"
)
return HighRiskCheckResult(
matched=False,
fallback_reason_code="CHECK_ERROR",
duration_ms=duration_ms,
)
async def _get_rules(self, tenant_id: str) -> list[CompiledRule]:
"""
获取租户的高风险规则带缓存
只返回
- is_enabled = True
- 状态生效的规则
"""
current_time = time.time()
if (
tenant_id in self._rules_cache
and tenant_id in self._cache_time
and current_time - self._cache_time[tenant_id] < self._cache_ttl_seconds
):
return self._rules_cache[tenant_id]
rules = await self._load_rules_from_db(tenant_id)
self._rules_cache[tenant_id] = rules
self._cache_time[tenant_id] = current_time
return rules
async def _load_rules_from_db(self, tenant_id: str) -> list[CompiledRule]:
"""从数据库加载租户的高风险规则。"""
stmt = select(HighRiskPolicy).where(
HighRiskPolicy.tenant_id == tenant_id,
HighRiskPolicy.is_enabled.is_(True),
).order_by(HighRiskPolicy.priority.desc())
result = await self._session.execute(stmt)
policies = result.scalars().all()
rules = []
for policy in policies:
try:
scenario = HighRiskScenario(policy.scenario)
except ValueError:
logger.warning(
f"[AC-IDMP-05] Unknown scenario: {policy.scenario}, "
f"policy_id={policy.id}"
)
continue
handler_mode = ExecutionMode.MICRO_FLOW
if policy.handler_mode == "transfer":
handler_mode = ExecutionMode.TRANSFER
compiled_patterns = []
if policy.patterns:
for pattern_str in policy.patterns:
try:
compiled = re.compile(pattern_str, re.IGNORECASE)
compiled_patterns.append(compiled)
except re.error as e:
logger.warning(
f"[AC-IDMP-05] Invalid pattern: {pattern_str}, "
f"error={e}"
)
rule = CompiledRule(
rule_id=str(policy.id),
scenario=scenario,
handler_mode=handler_mode,
flow_id=str(policy.flow_id) if policy.flow_id else None,
transfer_message=policy.transfer_message,
priority=policy.priority,
keywords=policy.keywords or [],
patterns=compiled_patterns,
)
rules.append(rule)
logger.info(
f"[AC-IDMP-05] Loaded {len(rules)} high risk rules for tenant={tenant_id}"
)
return rules
def _match_rule(self, message: str, rule: CompiledRule) -> dict[str, Any] | None:
"""
检查消息是否匹配规则
优先匹配关键词再匹配正则
Returns:
匹配结果字典 {"text": str, "pattern": str} None
"""
message_lower = message.lower()
for keyword in rule.keywords:
if keyword.lower() in message_lower:
return {
"text": keyword,
"pattern": f"keyword:{keyword}",
}
for pattern in rule.patterns:
match = pattern.search(message)
if match:
return {
"text": match.group(),
"pattern": f"regex:{pattern.pattern}",
}
return None
def create_trace(
self,
result: HighRiskCheckResult,
tenant_id: str,
) -> ToolCallTrace:
"""创建工具调用追踪记录。"""
status = ToolCallStatus.OK
error_code = None
if result.fallback_reason_code:
if "TIMEOUT" in result.fallback_reason_code:
status = ToolCallStatus.TIMEOUT
else:
status = ToolCallStatus.ERROR
error_code = result.fallback_reason_code
return ToolCallTrace(
tool_name=self.name,
tool_type=ToolType.INTERNAL,
duration_ms=result.duration_ms,
status=status,
error_code=error_code,
args_digest=f"tenant={tenant_id}",
result_digest=f"matched={result.matched},scenario={result.risk_scenario}",
)
def register_high_risk_check_tool(
registry: ToolRegistry,
session: AsyncSession,
config: HighRiskCheckConfig | None = None,
) -> None:
"""
[AC-IDMP-05, AC-IDMP-20] high_risk_check 注册到 ToolRegistry
[AC-MRS-13] 只消费 routing_signal 角色的字段
Args:
registry: ToolRegistry 实例
session: 数据库会话
config: 工具配置
"""
from app.services.mid.tool_registry import ToolType as RegistryToolType
async def handler(
message: str,
tenant_id: str = "",
domain: str | None = None,
scene: str | None = None,
context: dict[str, Any] | None = None,
) -> dict[str, Any]:
tool = HighRiskCheckTool(
session=session,
config=config,
)
result = await tool.execute(
message=message,
tenant_id=tenant_id,
domain=domain,
scene=scene,
context=context,
)
return result.model_dump()
registry.register(
name=HIGH_RISK_CHECK_TOOL_NAME,
description="[AC-IDMP-05, AC-IDMP-20, AC-MRS-13] 高风险场景检测工具,基于租户配置检测退款、投诉、隐私承诺、转人工等高风险场景 (only consumes routing_signal fields)",
handler=handler,
tool_type=RegistryToolType.INTERNAL,
version="1.0.0",
auth_required=False,
timeout_ms=config.timeout_ms if config else DEFAULT_TIMEOUT_MS,
enabled=True,
metadata={
"supports_metadata_driven": True,
"min_scenarios": ["refund", "complaint_escalation", "privacy_sensitive_promise", "transfer"],
"supports_routing_signal_filter": True,
},
)
logger.info(f"[AC-IDMP-05] Tool registered: {HIGH_RISK_CHECK_TOOL_NAME}")

View File

@ -0,0 +1,352 @@
"""
Intent Hint Tool for Mid Platform.
[AC-IDMP-02, AC-IDMP-16] Lightweight intent recognition and routing suggestion.
[AC-MRS-13] 只消费 field_roles 包含 routing_signal 的字段
This tool provides a "soft signal" for policy routing decisions.
It does NOT make final decisions - policy_router retains final authority.
"""
import logging
import time
from dataclasses import dataclass
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.mid.schemas import (
ExecutionMode,
HighRiskScenario,
IntentHintOutput,
ToolCallStatus,
ToolCallTrace,
ToolType,
)
from app.models.entities import FieldRole
from app.services.intent.router import IntentRouter
from app.services.intent.rule_service import IntentRuleService
from app.services.mid.role_based_field_provider import RoleBasedFieldProvider
logger = logging.getLogger(__name__)
DEFAULT_HIGH_RISK_KEYWORDS: dict[HighRiskScenario, list[str]] = {
HighRiskScenario.REFUND: ["退款", "退货", "退钱", "退费", "还钱", "退款申请"],
HighRiskScenario.COMPLAINT_ESCALATION: ["投诉", "升级投诉", "举报", "12315", "消费者协会"],
HighRiskScenario.PRIVACY_SENSITIVE_PROMISE: ["承诺", "保证", "一定", "肯定能", "绝对", "担保"],
HighRiskScenario.TRANSFER: ["转人工", "人工客服", "人工服务", "真人", "人工"],
}
LOW_CONFIDENCE_THRESHOLD = 0.3
DEFAULT_TIMEOUT_MS = 500
@dataclass
class IntentHintConfig:
"""Configuration for intent hint tool."""
enabled: bool = True
timeout_ms: int = DEFAULT_TIMEOUT_MS
top_n: int = 3
low_confidence_threshold: float = LOW_CONFIDENCE_THRESHOLD
class IntentHintTool:
"""
[AC-IDMP-02, AC-IDMP-16] Lightweight intent hint tool.
[AC-MRS-13] 只消费 field_roles 包含 routing_signal 的字段
Provides soft signals for policy routing:
- Intent recognition via existing rule engine
- High-risk scenario detection
- Routing suggestions (not final decisions)
The policy_router consumes these hints but retains final decision authority.
"""
def __init__(
self,
session: AsyncSession,
config: IntentHintConfig | None = None,
):
self._session = session
self._config = config or IntentHintConfig()
self._rule_service = IntentRuleService(session)
self._router = IntentRouter()
self._role_provider = RoleBasedFieldProvider(session)
async def execute(
self,
message: str,
tenant_id: str,
history: list[dict[str, Any]] | None = None,
top_n: int | None = None,
context: dict[str, Any] | None = None,
) -> IntentHintOutput:
"""
[AC-IDMP-02] Execute intent hint analysis.
[AC-MRS-13] 只消费 routing_signal 角色的字段
Args:
message: User input message
tenant_id: Tenant ID for rule lookup
history: Optional conversation history
top_n: Number of top suggestions (default from config)
context: Optional context with routing_signal fields
Returns:
IntentHintOutput with routing suggestions
"""
start_time = time.time()
routing_signal_fields = await self._role_provider.get_routing_signal_field_keys(tenant_id)
logger.info(
f"[AC-MRS-13] Retrieved {len(routing_signal_fields)} routing_signal fields for tenant={tenant_id}: {routing_signal_fields}"
)
routing_context = {}
if context:
routing_context = {k: v for k, v in context.items() if k in routing_signal_fields}
if routing_context:
logger.info(
f"[AC-MRS-13] Applied routing_signal context: {list(routing_context.keys())}"
)
if not self._config.enabled:
return IntentHintOutput(
intent=None,
confidence=0.0,
response_type=None,
suggested_mode=None,
fallback_reason_code="tool_disabled",
duration_ms=0,
)
try:
high_risk_scenario = self._check_high_risk(message)
if high_risk_scenario:
logger.info(
f"[AC-IDMP-05, AC-IDMP-20] High-risk detected in hint: {high_risk_scenario}"
)
duration_ms = int((time.time() - start_time) * 1000)
return IntentHintOutput(
intent=None,
confidence=1.0,
response_type="flow" if high_risk_scenario != HighRiskScenario.TRANSFER else "transfer",
suggested_mode=ExecutionMode.TRANSFER if high_risk_scenario == HighRiskScenario.TRANSFER else ExecutionMode.MICRO_FLOW,
high_risk_detected=True,
fallback_reason_code=f"high_risk_{high_risk_scenario.value}",
duration_ms=duration_ms,
)
rules = await self._rule_service.get_enabled_rules_for_matching(tenant_id)
if not rules:
logger.info(f"[AC-IDMP-02] No intent rules found for tenant: {tenant_id}")
duration_ms = int((time.time() - start_time) * 1000)
return IntentHintOutput(
intent=None,
confidence=0.0,
response_type="rag",
suggested_mode=ExecutionMode.AGENT,
fallback_reason_code="no_rules_configured",
duration_ms=duration_ms,
)
match_result = self._router.match(message, rules)
if match_result:
rule = match_result.rule
confidence = 0.8
suggested_mode = self._determine_suggested_mode(
rule.response_type,
confidence,
)
duration_ms = int((time.time() - start_time) * 1000)
logger.info(
f"[AC-IDMP-02] Intent hint matched: intent={rule.name}, "
f"response_type={rule.response_type}, confidence={confidence}"
)
return IntentHintOutput(
intent=rule.name,
confidence=confidence,
response_type=rule.response_type,
suggested_mode=suggested_mode,
target_flow_id=str(rule.flow_id) if rule.flow_id else None,
target_kb_ids=rule.target_kb_ids,
duration_ms=duration_ms,
)
duration_ms = int((time.time() - start_time) * 1000)
logger.info(
f"[AC-IDMP-02] No intent matched, suggesting agent mode with low confidence"
)
return IntentHintOutput(
intent=None,
confidence=0.2,
response_type="rag",
suggested_mode=ExecutionMode.AGENT,
fallback_reason_code="no_intent_matched",
duration_ms=duration_ms,
)
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
logger.error(f"[AC-IDMP-02] Intent hint failed: {e}")
return IntentHintOutput(
intent=None,
confidence=0.0,
response_type=None,
suggested_mode=ExecutionMode.FIXED,
fallback_reason_code=f"hint_error: {str(e)[:50]}",
duration_ms=duration_ms,
)
def _check_high_risk(self, message: str) -> HighRiskScenario | None:
"""
[AC-IDMP-05, AC-IDMP-20] Check for high-risk scenarios.
Returns the first matched high-risk scenario or None.
"""
message_lower = message.lower()
for scenario, keywords in DEFAULT_HIGH_RISK_KEYWORDS.items():
for keyword in keywords:
if keyword.lower() in message_lower:
return scenario
return None
def _determine_suggested_mode(
self,
response_type: str,
confidence: float,
) -> ExecutionMode:
"""
Determine suggested execution mode based on response type and confidence.
"""
if confidence < self._config.low_confidence_threshold:
return ExecutionMode.FIXED
mode_mapping = {
"fixed": ExecutionMode.FIXED,
"transfer": ExecutionMode.TRANSFER,
"flow": ExecutionMode.MICRO_FLOW,
"rag": ExecutionMode.AGENT,
}
return mode_mapping.get(response_type, ExecutionMode.AGENT)
def create_trace(
self,
result: IntentHintOutput,
) -> ToolCallTrace:
"""Create ToolCallTrace for this tool execution."""
status = ToolCallStatus.OK
if result.fallback_reason_code and "error" in result.fallback_reason_code:
status = ToolCallStatus.ERROR
return ToolCallTrace(
tool_name="intent_hint",
tool_type=ToolType.INTERNAL,
duration_ms=result.duration_ms,
status=status,
error_code=result.fallback_reason_code if status == ToolCallStatus.ERROR else None,
result_digest=f"intent={result.intent}, mode={result.suggested_mode}",
)
async def intent_hint_handler(
message: str,
tenant_id: str,
history: list[dict[str, Any]] | None = None,
top_n: int | None = None,
context: dict[str, Any] | None = None,
session: AsyncSession | None = None,
config: IntentHintConfig | None = None,
) -> dict[str, Any]:
"""
Handler function for ToolRegistry registration.
[AC-MRS-13] 支持 context 参数用于 routing_signal 字段
This is the async handler that gets registered to ToolRegistry.
"""
if not session:
return {
"success": False,
"error": "Database session required",
"output": None,
}
tool = IntentHintTool(session=session, config=config)
result = await tool.execute(
message=message,
tenant_id=tenant_id,
history=history,
top_n=top_n,
context=context,
)
return {
"success": True,
"output": result.model_dump(),
"hint": result,
}
def register_intent_hint_tool(
registry: Any,
session: AsyncSession,
config: IntentHintConfig | None = None,
) -> None:
"""
[AC-IDMP-19] Register intent_hint tool to ToolRegistry.
[AC-MRS-13] 支持 context 参数用于 routing_signal 字段
Args:
registry: ToolRegistry instance
session: Database session for intent rule lookup
config: Tool configuration
"""
from app.models.mid.schemas import ToolType
effective_config = config or IntentHintConfig()
async def handler(
message: str,
tenant_id: str,
history: list[dict[str, Any]] | None = None,
top_n: int | None = None,
context: dict[str, Any] | None = None,
) -> dict[str, Any]:
return await intent_hint_handler(
message=message,
tenant_id=tenant_id,
history=history,
top_n=top_n,
context=context,
session=session,
config=effective_config,
)
registry.register(
name="intent_hint",
description="[AC-IDMP-02, AC-IDMP-16, AC-MRS-13] Lightweight intent recognition and routing suggestion tool (only consumes routing_signal fields)",
handler=handler,
tool_type=ToolType.INTERNAL,
version="1.0.0",
enabled=True,
timeout_ms=min(effective_config.timeout_ms, 500),
metadata={
"low_confidence_threshold": effective_config.low_confidence_threshold,
"top_n": effective_config.top_n,
"supports_routing_signal_filter": True,
},
)
logger.info(
f"[AC-IDMP-19] intent_hint tool registered: timeout_ms={effective_config.timeout_ms}"
)