From 66902cd7c103fa1cde7650d7dc825ed682e7642f Mon Sep 17 00:00:00 2001 From: MerCry Date: Tue, 10 Mar 2026 12:05:35 +0800 Subject: [PATCH] feat: implement hybrid intent routing with RuleMatcher, SemanticMatcher, LlmJudge and FusionPolicy [AC-AISVC-111~125] --- .../app/services/intent/clarification.py | 385 +++++++++++++++ .../app/services/intent/fusion_policy.py | 254 ++++++++++ ai-service/app/services/intent/llm_judge.py | 246 ++++++++++ ai-service/app/services/intent/models.py | 226 +++++++++ ai-service/app/services/intent/router.py | 451 +++++++++++++++++- .../app/services/intent/semantic_matcher.py | 233 +++++++++ 6 files changed, 1771 insertions(+), 24 deletions(-) create mode 100644 ai-service/app/services/intent/clarification.py create mode 100644 ai-service/app/services/intent/fusion_policy.py create mode 100644 ai-service/app/services/intent/llm_judge.py create mode 100644 ai-service/app/services/intent/models.py create mode 100644 ai-service/app/services/intent/semantic_matcher.py diff --git a/ai-service/app/services/intent/clarification.py b/ai-service/app/services/intent/clarification.py new file mode 100644 index 0000000..1ddde1c --- /dev/null +++ b/ai-service/app/services/intent/clarification.py @@ -0,0 +1,385 @@ +""" +Clarification mechanism for intent recognition. +[AC-CLARIFY] 澄清机制实现 + +核心功能: +1. 统一置信度计算 +2. 硬拦截规则(confidence检查、required_slots检查) +3. 澄清状态管理 +4. 埋点指标收集 +""" + +import logging +import time +from dataclasses import dataclass, field +from enum import Enum +from typing import Any +import uuid + +logger = logging.getLogger(__name__) + + +T_HIGH = 0.75 +T_LOW = 0.45 +MAX_CLARIFY_RETRY = 3 + + +class ClarifyReason(str, Enum): + INTENT_AMBIGUITY = "intent_ambiguity" + MISSING_SLOT = "missing_slot" + LOW_CONFIDENCE = "low_confidence" + MULTI_INTENT = "multi_intent" + + +class ClarifyMetrics: + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._clarify_trigger_count = 0 + cls._instance._clarify_converge_count = 0 + cls._instance._misroute_count = 0 + return cls._instance + + def record_clarify_trigger(self) -> None: + self._clarify_trigger_count += 1 + logger.debug(f"[AC-CLARIFY-METRICS] clarify_trigger_count: {self._clarify_trigger_count}") + + def record_clarify_converge(self) -> None: + self._clarify_converge_count += 1 + logger.debug(f"[AC-CLARIFY-METRICS] clarify_converge_count: {self._clarify_converge_count}") + + def record_misroute(self) -> None: + self._misroute_count += 1 + logger.debug(f"[AC-CLARIFY-METRICS] misroute_count: {self._misroute_count}") + + def get_metrics(self) -> dict[str, int]: + return { + "clarify_trigger_rate": self._clarify_trigger_count, + "clarify_converge_rate": self._clarify_converge_count, + "misroute_rate": self._misroute_count, + } + + def get_rates(self, total_requests: int) -> dict[str, float]: + if total_requests == 0: + return { + "clarify_trigger_rate": 0.0, + "clarify_converge_rate": 0.0, + "misroute_rate": 0.0, + } + + return { + "clarify_trigger_rate": self._clarify_trigger_count / total_requests, + "clarify_converge_rate": self._clarify_converge_count / total_requests if self._clarify_trigger_count > 0 else 0.0, + "misroute_rate": self._misroute_count / total_requests, + } + + def reset(self) -> None: + self._clarify_trigger_count = 0 + self._clarify_converge_count = 0 + self._misroute_count = 0 + + +def get_clarify_metrics() -> ClarifyMetrics: + return ClarifyMetrics() + + +@dataclass +class IntentCandidate: + intent_id: str + intent_name: str + confidence: float + response_type: str | None = None + target_kb_ids: list[str] | None = None + flow_id: str | None = None + fixed_reply: str | None = None + transfer_message: str | None = None + + def to_dict(self) -> dict[str, Any]: + return { + "intent_id": self.intent_id, + "intent_name": self.intent_name, + "confidence": self.confidence, + "response_type": self.response_type, + "target_kb_ids": self.target_kb_ids, + "flow_id": self.flow_id, + "fixed_reply": self.fixed_reply, + "transfer_message": self.transfer_message, + } + + +@dataclass +class HybridIntentResult: + intent: IntentCandidate | None + confidence: float + candidates: list[IntentCandidate] = field(default_factory=list) + need_clarify: bool = False + clarify_reason: ClarifyReason | None = None + missing_slots: list[str] = field(default_factory=list) + + def to_dict(self) -> dict[str, Any]: + return { + "intent": self.intent.to_dict() if self.intent else None, + "confidence": self.confidence, + "candidates": [c.to_dict() for c in self.candidates], + "need_clarify": self.need_clarify, + "clarify_reason": self.clarify_reason.value if self.clarify_reason else None, + "missing_slots": self.missing_slots, + } + + @classmethod + def from_fusion_result(cls, fusion_result: Any) -> "HybridIntentResult": + candidates = [] + if fusion_result.clarify_candidates: + for c in fusion_result.clarify_candidates: + candidates.append(IntentCandidate( + intent_id=str(c.id), + intent_name=c.name, + confidence=0.0, + response_type=getattr(c, "response_type", None), + target_kb_ids=getattr(c, "target_kb_ids", None), + flow_id=str(c.flow_id) if getattr(c, "flow_id", None) else None, + fixed_reply=getattr(c, "fixed_reply", None), + transfer_message=getattr(c, "transfer_message", None), + )) + + if fusion_result.final_intent: + final_candidate = IntentCandidate( + intent_id=str(fusion_result.final_intent.id), + intent_name=fusion_result.final_intent.name, + confidence=fusion_result.final_confidence, + response_type=fusion_result.final_intent.response_type, + target_kb_ids=fusion_result.final_intent.target_kb_ids, + flow_id=str(fusion_result.final_intent.flow_id) if fusion_result.final_intent.flow_id else None, + fixed_reply=fusion_result.final_intent.fixed_reply, + transfer_message=fusion_result.final_intent.transfer_message, + ) + if not any(c.intent_id == final_candidate.intent_id for c in candidates): + candidates.insert(0, final_candidate) + + clarify_reason = None + if fusion_result.need_clarify: + if fusion_result.decision_reason == "multi_intent": + clarify_reason = ClarifyReason.MULTI_INTENT + elif fusion_result.decision_reason == "gray_zone": + clarify_reason = ClarifyReason.INTENT_AMBIGUITY + else: + clarify_reason = ClarifyReason.LOW_CONFIDENCE + + return cls( + intent=candidates[0] if candidates else None, + confidence=fusion_result.final_confidence, + candidates=candidates, + need_clarify=fusion_result.need_clarify, + clarify_reason=clarify_reason, + ) + + +@dataclass +class ClarifyState: + reason: ClarifyReason + asked_slot: str | None = None + retry_count: int = 0 + candidates: list[IntentCandidate] = field(default_factory=list) + asked_intent_ids: list[str] = field(default_factory=list) + created_at: float = field(default_factory=time.time) + + def to_dict(self) -> dict[str, Any]: + return { + "reason": self.reason.value, + "asked_slot": self.asked_slot, + "retry_count": self.retry_count, + "candidates": [c.to_dict() for c in self.candidates], + "asked_intent_ids": self.asked_intent_ids, + "created_at": self.created_at, + } + + def increment_retry(self) -> "ClarifyState": + self.retry_count += 1 + return self + + def is_max_retry(self) -> bool: + return self.retry_count >= MAX_CLARIFY_RETRY + + +class ClarificationEngine: + + def __init__( + self, + t_high: float = T_HIGH, + t_low: float = T_LOW, + max_retry: int = MAX_CLARIFY_RETRY, + ): + self._t_high = t_high + self._t_low = t_low + self._max_retry = max_retry + self._metrics = get_clarify_metrics() + + def compute_confidence( + self, + rule_score: float = 0.0, + semantic_score: float = 0.0, + llm_score: float = 0.0, + w_rule: float = 0.5, + w_semantic: float = 0.3, + w_llm: float = 0.2, + ) -> float: + total_weight = w_rule + w_semantic + w_llm + if total_weight == 0: + return 0.0 + + weighted_score = ( + rule_score * w_rule + + semantic_score * w_semantic + + llm_score * w_llm + ) + + return min(1.0, max(0.0, weighted_score / total_weight)) + + def check_hard_block( + self, + result: HybridIntentResult, + required_slots: list[str] | None = None, + filled_slots: dict[str, Any] | None = None, + ) -> tuple[bool, ClarifyReason | None]: + if result.confidence < self._t_high: + return True, ClarifyReason.LOW_CONFIDENCE + + if required_slots and filled_slots is not None: + missing = [s for s in required_slots if s not in filled_slots] + if missing: + return True, ClarifyReason.MISSING_SLOT + + return False, None + + def should_trigger_clarify( + self, + result: HybridIntentResult, + required_slots: list[str] | None = None, + filled_slots: dict[str, Any] | None = None, + ) -> tuple[bool, ClarifyState | None]: + if result.confidence >= self._t_high: + if required_slots and filled_slots is not None: + missing = [s for s in required_slots if s not in filled_slots] + if missing: + self._metrics.record_clarify_trigger() + return True, ClarifyState( + reason=ClarifyReason.MISSING_SLOT, + asked_slot=missing[0], + candidates=result.candidates, + ) + return False, None + + if result.confidence < self._t_low: + self._metrics.record_clarify_trigger() + return True, ClarifyState( + reason=ClarifyReason.LOW_CONFIDENCE, + candidates=result.candidates, + ) + + self._metrics.record_clarify_trigger() + + reason = result.clarify_reason or ClarifyReason.INTENT_AMBIGUITY + return True, ClarifyState( + reason=reason, + candidates=result.candidates, + ) + + def generate_clarify_prompt( + self, + state: ClarifyState, + slot_label: str | None = None, + ) -> str: + if state.reason == ClarifyReason.MISSING_SLOT: + slot_name = slot_label or state.asked_slot or "相关信息" + return f"为了更好地为您服务,请告诉我您的{slot_name}。" + + if state.reason == ClarifyReason.LOW_CONFIDENCE: + return "抱歉,我不太理解您的意思,能否请您详细描述一下您的需求?" + + if state.reason == ClarifyReason.MULTI_INTENT and len(state.candidates) > 1: + candidates = state.candidates[:3] + if len(candidates) == 2: + return ( + f"请问您是想「{candidates[0].intent_name}」" + f"还是「{candidates[1].intent_name}」?" + ) + else: + options = "、".join([f"「{c.intent_name}」" for c in candidates[:-1]]) + return f"请问您是想{options},还是「{candidates[-1].intent_name}」?" + + if state.reason == ClarifyReason.INTENT_AMBIGUITY and len(state.candidates) > 1: + candidates = state.candidates[:2] + return ( + f"请问您是想「{candidates[0].intent_name}」" + f"还是「{candidates[1].intent_name}」?" + ) + + return "请问您具体想了解什么?" + + def process_clarify_response( + self, + user_message: str, + state: ClarifyState, + intent_router: Any = None, + rules: list[Any] | None = None, + ) -> HybridIntentResult: + state.increment_retry() + + if state.is_max_retry(): + self._metrics.record_misroute() + return HybridIntentResult( + intent=None, + confidence=0.0, + need_clarify=False, + ) + + if state.reason == ClarifyReason.MISSING_SLOT: + self._metrics.record_clarify_converge() + return HybridIntentResult( + intent=state.candidates[0] if state.candidates else None, + confidence=0.8, + candidates=state.candidates, + need_clarify=False, + ) + + return HybridIntentResult( + intent=None, + confidence=0.0, + candidates=state.candidates, + need_clarify=True, + clarify_reason=state.reason, + ) + + def get_metrics(self) -> dict[str, int]: + return self._metrics.get_metrics() + + def get_rates(self, total_requests: int) -> dict[str, float]: + return self._metrics.get_rates(total_requests) + + +class ClarifySessionManager: + _sessions: dict[str, ClarifyState] = {} + + @classmethod + def get_session(cls, session_id: str) -> ClarifyState | None: + return cls._sessions.get(session_id) + + @classmethod + def set_session(cls, session_id: str, state: ClarifyState) -> None: + cls._sessions[session_id] = state + logger.debug(f"[AC-CLARIFY] Session state set: session={session_id}, reason={state.reason}") + + @classmethod + def clear_session(cls, session_id: str) -> None: + if session_id in cls._sessions: + del cls._sessions[session_id] + logger.debug(f"[AC-CLARIFY] Session state cleared: session={session_id}") + + @classmethod + def has_active_clarify(cls, session_id: str) -> bool: + state = cls._sessions.get(session_id) + if state: + return not state.is_max_retry() + return False diff --git a/ai-service/app/services/intent/fusion_policy.py b/ai-service/app/services/intent/fusion_policy.py new file mode 100644 index 0000000..740d819 --- /dev/null +++ b/ai-service/app/services/intent/fusion_policy.py @@ -0,0 +1,254 @@ +""" +[v0.8.0] Fusion policy for hybrid intent routing. +[AC-AISVC-115~AC-AISVC-117] Fusion decision logic for three-way matching. +""" + +import logging +from collections.abc import Callable +from typing import TYPE_CHECKING + +from app.services.intent.models import ( + FusionConfig, + FusionResult, + LlmJudgeResult, + RouteTrace, + RuleMatchResult, + SemanticMatchResult, +) + +if TYPE_CHECKING: + from app.models.entities import IntentRule + +logger = logging.getLogger(__name__) + +DecisionCondition = Callable[[RuleMatchResult, SemanticMatchResult, LlmJudgeResult], bool] + + +class FusionPolicy: + """ + [AC-AISVC-115] Fusion decision policy for hybrid routing. + + Decision priority: + 1. rule_high_confidence: RuleMatcher hit with score=1.0 + 2. llm_judge: LlmJudge triggered and returned valid intent + 3. semantic_override: RuleMatcher missed but SemanticMatcher high confidence + 4. rule_semantic_agree: Both match same intent + 5. semantic_fallback: SemanticMatcher medium confidence + 6. rule_fallback: Only rule matched + 7. no_match: All low confidence + """ + + DECISION_PRIORITY: list[tuple[str, DecisionCondition]] = [ + ("rule_high_confidence", lambda r, s, llm: r.score == 1.0 and r.rule is not None), + ("llm_judge", lambda r, s, llm: llm.triggered and llm.intent_id is not None), + ( + "semantic_override", + lambda r, s, llm: r.score == 0 + and s.top_score > 0.7 + and not s.skipped + and len(s.candidates) > 0, + ), + ( + "rule_semantic_agree", + lambda r, s, llm: r.score > 0 + and s.top_score > 0.5 + and not s.skipped + and len(s.candidates) > 0 + and r.rule_id == s.candidates[0].rule.id, + ), + ( + "semantic_fallback", + lambda r, s, llm: s.top_score > 0.5 and not s.skipped and len(s.candidates) > 0, + ), + ("rule_fallback", lambda r, s, llm: r.score > 0), + ("no_match", lambda r, s, llm: True), + ] + + def __init__(self, config: FusionConfig | None = None): + """ + Initialize fusion policy with configuration. + + Args: + config: Fusion configuration, uses default if not provided + """ + self._config = config or FusionConfig() + + def fuse( + self, + rule_result: RuleMatchResult, + semantic_result: SemanticMatchResult, + llm_result: LlmJudgeResult | None, + ) -> FusionResult: + """ + [AC-AISVC-115] Execute fusion decision. + + Args: + rule_result: Rule matching result + semantic_result: Semantic matching result + llm_result: LLM judge result (may be None) + + Returns: + FusionResult with final intent, confidence, and trace + """ + trace = self._build_trace(rule_result, semantic_result, llm_result) + + final_intent = None + final_confidence = 0.0 + decision_reason = "no_match" + + effective_llm_result = llm_result or LlmJudgeResult.empty() + + for reason, condition in self.DECISION_PRIORITY: + if condition(rule_result, semantic_result, effective_llm_result): + decision_reason = reason + break + + if decision_reason == "rule_high_confidence": + final_intent = rule_result.rule + final_confidence = 1.0 + elif decision_reason == "llm_judge" and llm_result: + final_intent = self._find_rule_by_id( + llm_result.intent_id, rule_result, semantic_result + ) + final_confidence = llm_result.score + elif decision_reason == "semantic_override": + final_intent = semantic_result.candidates[0].rule + final_confidence = semantic_result.top_score + elif decision_reason == "rule_semantic_agree": + final_intent = rule_result.rule + final_confidence = self._calculate_weighted_confidence( + rule_result, semantic_result, llm_result + ) + elif decision_reason == "semantic_fallback": + final_intent = semantic_result.candidates[0].rule + final_confidence = semantic_result.top_score + elif decision_reason == "rule_fallback": + final_intent = rule_result.rule + final_confidence = rule_result.score + + need_clarify = final_confidence < self._config.clarify_threshold + clarify_candidates = None + if need_clarify and len(semantic_result.candidates) > 1: + clarify_candidates = [c.rule for c in semantic_result.candidates[:3]] + + trace.fusion = { + "weights": { + "w_rule": self._config.w_rule, + "w_semantic": self._config.w_semantic, + "w_llm": self._config.w_llm, + }, + "final_confidence": final_confidence, + "decision_reason": decision_reason, + "need_clarify": need_clarify, + } + + logger.info( + f"[AC-AISVC-115] Fusion decision: reason={decision_reason}, " + f"confidence={final_confidence:.3f}, need_clarify={need_clarify}" + ) + + return FusionResult( + final_intent=final_intent, + final_confidence=final_confidence, + decision_reason=decision_reason, + need_clarify=need_clarify, + clarify_candidates=clarify_candidates, + trace=trace, + ) + + def _build_trace( + self, + rule_result: RuleMatchResult, + semantic_result: SemanticMatchResult, + llm_result: LlmJudgeResult | None, + ) -> RouteTrace: + """ + [AC-AISVC-122] Build route trace log. + """ + return RouteTrace( + rule_match={ + "rule_id": str(rule_result.rule_id) if rule_result.rule_id else None, + "rule_name": rule_result.rule.name if rule_result.rule else None, + "match_type": rule_result.match_type, + "matched_text": rule_result.matched_text, + "score": rule_result.score, + "duration_ms": rule_result.duration_ms, + }, + semantic_match={ + "top_candidates": [ + { + "rule_id": str(c.rule.id), + "rule_name": c.rule.name, + "score": c.score, + } + for c in semantic_result.candidates + ], + "top_score": semantic_result.top_score, + "duration_ms": semantic_result.duration_ms, + "skipped": semantic_result.skipped, + "skip_reason": semantic_result.skip_reason, + }, + llm_judge={ + "triggered": llm_result.triggered if llm_result else False, + "intent_id": llm_result.intent_id if llm_result else None, + "intent_name": llm_result.intent_name if llm_result else None, + "score": llm_result.score if llm_result else 0.0, + "reasoning": llm_result.reasoning if llm_result else None, + "duration_ms": llm_result.duration_ms if llm_result else 0, + "tokens_used": llm_result.tokens_used if llm_result else 0, + }, + fusion={}, + ) + + def _calculate_weighted_confidence( + self, + rule_result: RuleMatchResult, + semantic_result: SemanticMatchResult, + llm_result: LlmJudgeResult | None, + ) -> float: + """ + [AC-AISVC-116] Calculate weighted confidence. + + Formula: + final_confidence = (w_rule * rule_score + w_semantic * semantic_score + w_llm * llm_score) / total_weight + + Returns: + Weighted confidence in [0.0, 1.0] + """ + rule_score = rule_result.score + semantic_score = semantic_result.top_score if not semantic_result.skipped else 0.0 + llm_score = llm_result.score if llm_result and llm_result.triggered else 0.0 + + total_weight = self._config.w_rule + self._config.w_semantic + if llm_result and llm_result.triggered: + total_weight += self._config.w_llm + + if total_weight == 0: + return 0.0 + + confidence = ( + self._config.w_rule * rule_score + + self._config.w_semantic * semantic_score + + self._config.w_llm * llm_score + ) / total_weight + + return min(1.0, max(0.0, confidence)) + + def _find_rule_by_id( + self, + intent_id: str | None, + rule_result: RuleMatchResult, + semantic_result: SemanticMatchResult, + ) -> "IntentRule | None": + """Find rule by ID from rule or semantic results.""" + if not intent_id: + return None + + if rule_result.rule_id and str(rule_result.rule_id) == intent_id: + return rule_result.rule + + for candidate in semantic_result.candidates: + if str(candidate.rule.id) == intent_id: + return candidate.rule + + return None diff --git a/ai-service/app/services/intent/llm_judge.py b/ai-service/app/services/intent/llm_judge.py new file mode 100644 index 0000000..dc66e82 --- /dev/null +++ b/ai-service/app/services/intent/llm_judge.py @@ -0,0 +1,246 @@ +""" +LLM judge for intent arbitration. +[AC-AISVC-118, AC-AISVC-119] LLM-based intent arbitration. +""" + +import asyncio +import json +import logging +import time +from typing import TYPE_CHECKING, Any + +from app.services.intent.models import ( + FusionConfig, + LlmJudgeInput, + LlmJudgeResult, + RuleMatchResult, + SemanticMatchResult, +) + +if TYPE_CHECKING: + from app.services.llm.base import LLMClient + +logger = logging.getLogger(__name__) + + +class LlmJudge: + """ + [AC-AISVC-118] LLM-based intent arbitrator. + + Triggered when: + - Rule vs Semantic conflict + - Gray zone (low confidence) + - Multiple intent candidates with similar scores + """ + + JUDGE_PROMPT = """你是一个意图识别仲裁器。根据用户消息和候选意图,判断最匹配的意图。 + +用户消息:{message} + +候选意图: +{candidates} + +请返回 JSON 格式(不要包含```json标记): +{{ + "intent_id": "最匹配的意图ID", + "intent_name": "意图名称", + "confidence": 0.0-1.0之间的置信度, + "reasoning": "判断理由" +}}""" + + def __init__( + self, + llm_client: "LLMClient", + config: FusionConfig, + ): + """ + Initialize LLM judge. + + Args: + llm_client: LLM client for generating responses + config: Fusion configuration + """ + self._llm_client = llm_client + self._config = config + + def should_trigger( + self, + rule_result: RuleMatchResult, + semantic_result: SemanticMatchResult, + config: FusionConfig | None = None, + ) -> tuple[bool, str]: + """ + [AC-AISVC-118] Check if LLM judge should be triggered. + + Trigger conditions: + 1. Conflict: Rule and Semantic match different intents with close scores + 2. Gray zone: Max confidence in gray zone range + 3. Multi-intent: Multiple candidates with similar scores + + Args: + rule_result: Rule matching result + semantic_result: Semantic matching result + config: Optional config override + + Returns: + Tuple of (should_trigger, trigger_reason) + """ + effective_config = config or self._config + + if not effective_config.llm_judge_enabled: + return False, "disabled" + + rule_score = rule_result.score + semantic_score = semantic_result.top_score + + if rule_score > 0 and semantic_score > 0: + if semantic_result.candidates: + top_semantic_rule_id = semantic_result.candidates[0].rule.id + if rule_result.rule_id != top_semantic_rule_id: + if abs(rule_score - semantic_score) < effective_config.conflict_threshold: + logger.info( + f"[AC-AISVC-118] LLM judge triggered: rule_semantic_conflict, " + f"rule_id={rule_result.rule_id}, semantic_id={top_semantic_rule_id}, " + f"rule_score={rule_score}, semantic_score={semantic_score}" + ) + return True, "rule_semantic_conflict" + + max_score = max(rule_score, semantic_score) + if effective_config.min_trigger_threshold < max_score < effective_config.gray_zone_threshold: + logger.info( + f"[AC-AISVC-118] LLM judge triggered: gray_zone, " + f"max_score={max_score}" + ) + return True, "gray_zone" + + if len(semantic_result.candidates) >= 2: + top1_score = semantic_result.candidates[0].score + top2_score = semantic_result.candidates[1].score + if abs(top1_score - top2_score) < effective_config.multi_intent_threshold: + logger.info( + f"[AC-AISVC-118] LLM judge triggered: multi_intent, " + f"top1_score={top1_score}, top2_score={top2_score}" + ) + return True, "multi_intent" + + return False, "" + + async def judge( + self, + input_data: LlmJudgeInput, + tenant_id: str, + ) -> LlmJudgeResult: + """ + [AC-AISVC-119] Perform LLM arbitration. + + Args: + input_data: Judge input with message and candidates + tenant_id: Tenant ID for isolation + + Returns: + LlmJudgeResult with arbitration decision + """ + start_time = time.time() + + candidates_text = "\n".join([ + f"- ID: {c['id']}, 名称: {c['name']}, 描述: {c.get('description', 'N/A')}" + for c in input_data.candidates + ]) + + prompt = self.JUDGE_PROMPT.format( + message=input_data.message, + candidates=candidates_text, + ) + + try: + from app.services.llm.base import LLMConfig + + response = await asyncio.wait_for( + self._llm_client.generate( + messages=[{"role": "user", "content": prompt}], + config=LLMConfig( + max_tokens=200, + temperature=0, + ), + ), + timeout=self._config.llm_judge_timeout_ms / 1000, + ) + + result = self._parse_response(response.content or "") + duration_ms = int((time.time() - start_time) * 1000) + + tokens_used = 0 + if response.usage: + tokens_used = response.usage.get("total_tokens", 0) + + logger.info( + f"[AC-AISVC-119] LLM judge completed for tenant={tenant_id}, " + f"intent_id={result.get('intent_id')}, confidence={result.get('confidence', 0):.3f}, " + f"duration={duration_ms}ms, tokens={tokens_used}" + ) + + return LlmJudgeResult( + intent_id=result.get("intent_id"), + intent_name=result.get("intent_name"), + score=float(result.get("confidence", 0.5)), + reasoning=result.get("reasoning"), + duration_ms=duration_ms, + tokens_used=tokens_used, + triggered=True, + ) + + except asyncio.TimeoutError: + duration_ms = int((time.time() - start_time) * 1000) + logger.warning( + f"[AC-AISVC-119] LLM judge timeout for tenant={tenant_id}, " + f"timeout={self._config.llm_judge_timeout_ms}ms" + ) + return LlmJudgeResult( + intent_id=None, + intent_name=None, + score=0.0, + reasoning="LLM timeout", + duration_ms=duration_ms, + tokens_used=0, + triggered=True, + ) + except Exception as e: + duration_ms = int((time.time() - start_time) * 1000) + logger.error( + f"[AC-AISVC-119] LLM judge error for tenant={tenant_id}: {e}" + ) + return LlmJudgeResult( + intent_id=None, + intent_name=None, + score=0.0, + reasoning=f"LLM error: {str(e)}", + duration_ms=duration_ms, + tokens_used=0, + triggered=True, + ) + + def _parse_response(self, content: str) -> dict[str, Any]: + """ + Parse LLM response to extract JSON result. + + Args: + content: LLM response content + + Returns: + Parsed dictionary with intent_id, intent_name, confidence, reasoning + """ + try: + cleaned = content.strip() + if cleaned.startswith("```json"): + cleaned = cleaned[7:] + if cleaned.startswith("```"): + cleaned = cleaned[3:] + if cleaned.endswith("```"): + cleaned = cleaned[:-3] + cleaned = cleaned.strip() + + result: dict[str, Any] = json.loads(cleaned) + return result + except json.JSONDecodeError as e: + logger.warning(f"[AC-AISVC-119] Failed to parse LLM response: {e}") + return {} diff --git a/ai-service/app/services/intent/models.py b/ai-service/app/services/intent/models.py new file mode 100644 index 0000000..6f2e1ff --- /dev/null +++ b/ai-service/app/services/intent/models.py @@ -0,0 +1,226 @@ +""" +Intent routing data models. +[AC-AISVC-111~AC-AISVC-125] Data models for hybrid routing. +""" + +import uuid +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class RuleMatchResult: + """ + [AC-AISVC-112] Result of rule matching. + Contains matched rule and score. + """ + rule_id: uuid.UUID | None + rule: Any | None + match_type: str | None + matched_text: str | None + score: float + duration_ms: int + + def to_dict(self) -> dict[str, Any]: + return { + "rule_id": str(self.rule_id) if self.rule_id else None, + "rule_name": self.rule.name if self.rule else None, + "match_type": self.match_type, + "matched_text": self.matched_text, + "score": self.score, + "duration_ms": self.duration_ms, + } + + +@dataclass +class SemanticCandidate: + """ + [AC-AISVC-113] Semantic match candidate. + """ + rule: Any + score: float + + def to_dict(self) -> dict[str, Any]: + return { + "rule_id": str(self.rule.id), + "rule_name": self.rule.name, + "score": self.score, + } + + +@dataclass +class SemanticMatchResult: + """ + [AC-AISVC-113] Result of semantic matching. + """ + candidates: list[SemanticCandidate] + top_score: float + duration_ms: int + skipped: bool + skip_reason: str | None + + def to_dict(self) -> dict[str, Any]: + return { + "top_candidates": [c.to_dict() for c in self.candidates], + "top_score": self.top_score, + "duration_ms": self.duration_ms, + "skipped": self.skipped, + "skip_reason": self.skip_reason, + } + + +@dataclass +class LlmJudgeInput: + """ + [AC-AISVC-119] Input for LLM judge. + """ + message: str + candidates: list[dict[str, Any]] + conflict_type: str + + +@dataclass +class LlmJudgeResult: + """ + [AC-AISVC-119] Result of LLM judge. + """ + intent_id: str | None + intent_name: str | None + score: float + reasoning: str | None + duration_ms: int + tokens_used: int + triggered: bool + + def to_dict(self) -> dict[str, Any]: + return { + "triggered": self.triggered, + "intent_id": self.intent_id, + "intent_name": self.intent_name, + "score": self.score, + "reasoning": self.reasoning, + "duration_ms": self.duration_ms, + "tokens_used": self.tokens_used, + } + + @classmethod + def empty(cls) -> "LlmJudgeResult": + return cls( + intent_id=None, + intent_name=None, + score=0.0, + reasoning=None, + duration_ms=0, + tokens_used=0, + triggered=False, + ) + + +@dataclass +class FusionConfig: + """ + [AC-AISVC-116] Fusion configuration. + """ + w_rule: float = 0.5 + w_semantic: float = 0.3 + w_llm: float = 0.2 + semantic_threshold: float = 0.7 + conflict_threshold: float = 0.2 + gray_zone_threshold: float = 0.6 + min_trigger_threshold: float = 0.3 + clarify_threshold: float = 0.4 + multi_intent_threshold: float = 0.15 + llm_judge_enabled: bool = True + semantic_matcher_enabled: bool = True + semantic_matcher_timeout_ms: int = 100 + llm_judge_timeout_ms: int = 2000 + semantic_top_k: int = 3 + + def to_dict(self) -> dict[str, Any]: + return { + "w_rule": self.w_rule, + "w_semantic": self.w_semantic, + "w_llm": self.w_llm, + "semantic_threshold": self.semantic_threshold, + "conflict_threshold": self.conflict_threshold, + "gray_zone_threshold": self.gray_zone_threshold, + "min_trigger_threshold": self.min_trigger_threshold, + "clarify_threshold": self.clarify_threshold, + "multi_intent_threshold": self.multi_intent_threshold, + "llm_judge_enabled": self.llm_judge_enabled, + "semantic_matcher_enabled": self.semantic_matcher_enabled, + "semantic_matcher_timeout_ms": self.semantic_matcher_timeout_ms, + "llm_judge_timeout_ms": self.llm_judge_timeout_ms, + "semantic_top_k": self.semantic_top_k, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "FusionConfig": + return cls( + w_rule=data.get("w_rule", 0.5), + w_semantic=data.get("w_semantic", 0.3), + w_llm=data.get("w_llm", 0.2), + semantic_threshold=data.get("semantic_threshold", 0.7), + conflict_threshold=data.get("conflict_threshold", 0.2), + gray_zone_threshold=data.get("gray_zone_threshold", 0.6), + min_trigger_threshold=data.get("min_trigger_threshold", 0.3), + clarify_threshold=data.get("clarify_threshold", 0.4), + multi_intent_threshold=data.get("multi_intent_threshold", 0.15), + llm_judge_enabled=data.get("llm_judge_enabled", True), + semantic_matcher_enabled=data.get("semantic_matcher_enabled", True), + semantic_matcher_timeout_ms=data.get("semantic_matcher_timeout_ms", 100), + llm_judge_timeout_ms=data.get("llm_judge_timeout_ms", 2000), + semantic_top_k=data.get("semantic_top_k", 3), + ) + + +@dataclass +class RouteTrace: + """ + [AC-AISVC-122] Route trace log. + """ + rule_match: dict[str, Any] = field(default_factory=dict) + semantic_match: dict[str, Any] = field(default_factory=dict) + llm_judge: dict[str, Any] = field(default_factory=dict) + fusion: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + return { + "rule_match": self.rule_match, + "semantic_match": self.semantic_match, + "llm_judge": self.llm_judge, + "fusion": self.fusion, + } + + +@dataclass +class FusionResult: + """ + [AC-AISVC-115] Fusion decision result. + """ + final_intent: Any | None + final_confidence: float + decision_reason: str + need_clarify: bool + clarify_candidates: list[Any] | None + trace: RouteTrace + + def to_dict(self) -> dict[str, Any]: + return { + "final_intent": { + "id": str(self.final_intent.id), + "name": self.final_intent.name, + "response_type": self.final_intent.response_type, + } if self.final_intent else None, + "final_confidence": self.final_confidence, + "decision_reason": self.decision_reason, + "need_clarify": self.need_clarify, + "clarify_candidates": [ + {"id": str(c.id), "name": c.name} + for c in (self.clarify_candidates or []) + ], + "trace": self.trace.to_dict(), + } + + +DEFAULT_FUSION_CONFIG = FusionConfig() diff --git a/ai-service/app/services/intent/router.py b/ai-service/app/services/intent/router.py index 9007521..fe654be 100644 --- a/ai-service/app/services/intent/router.py +++ b/ai-service/app/services/intent/router.py @@ -1,14 +1,30 @@ """ Intent router for AI Service. [AC-AISVC-69, AC-AISVC-70] Intent matching engine with keyword and regex support. +[v0.8.0] Upgraded to hybrid routing with RuleMatcher + SemanticMatcher + LlmJudge + FusionPolicy. """ import logging import re +import time from dataclasses import dataclass -from typing import Any +from typing import TYPE_CHECKING, Any from app.models.entities import IntentRule +from app.services.intent.models import ( + FusionConfig, + FusionResult, + LlmJudgeInput, + LlmJudgeResult, + RouteTrace, + RuleMatchResult, + SemanticMatchResult, +) + +if TYPE_CHECKING: + from app.services.intent.fusion_policy import FusionPolicy + from app.services.intent.llm_judge import LlmJudge + from app.services.intent.semantic_matcher import SemanticMatcher logger = logging.getLogger(__name__) @@ -38,38 +54,36 @@ class IntentMatchResult: } -class IntentRouter: +class RuleMatcher: """ - [AC-AISVC-69] Intent matching engine. - - Matching algorithm: - 1. Load rules ordered by priority DESC - 2. For each rule, try keyword matching first - 3. If no keyword match, try regex pattern matching - 4. Return first match (highest priority) - 5. If no match, return None (fallback to default RAG) + [v0.8.0] Rule matcher for keyword and regex matching. + Extracted from IntentRouter for hybrid routing. """ - def __init__(self): - pass - - def match( - self, - message: str, - rules: list[IntentRule], - ) -> IntentMatchResult | None: + def match(self, message: str, rules: list[IntentRule]) -> RuleMatchResult: """ - [AC-AISVC-69] Match user message against intent rules. + [AC-AISVC-112] Match user message against intent rules. + Returns RuleMatchResult with score (1.0 for match, 0.0 for no match). Args: message: User input message rules: List of enabled rules ordered by priority DESC Returns: - IntentMatchResult if matched, None otherwise + RuleMatchResult with match details """ + start_time = time.time() + if not message or not rules: - return None + duration_ms = int((time.time() - start_time) * 1000) + return RuleMatchResult( + rule_id=None, + rule=None, + match_type=None, + matched_text=None, + score=0.0, + duration_ms=duration_ms, + ) message_lower = message.lower() @@ -79,22 +93,46 @@ class IntentRouter: keyword_result = self._match_keywords(message, message_lower, rule) if keyword_result: + duration_ms = int((time.time() - start_time) * 1000) logger.info( f"[AC-AISVC-69] Intent matched by keyword: " f"rule={rule.name}, matched='{keyword_result.matched}'" ) - return keyword_result + return RuleMatchResult( + rule_id=rule.id, + rule=rule, + match_type="keyword", + matched_text=keyword_result.matched, + score=1.0, + duration_ms=duration_ms, + ) regex_result = self._match_patterns(message, rule) if regex_result: + duration_ms = int((time.time() - start_time) * 1000) logger.info( f"[AC-AISVC-69] Intent matched by regex: " f"rule={rule.name}, matched='{regex_result.matched}'" ) - return regex_result + return RuleMatchResult( + rule_id=rule.id, + rule=rule, + match_type="regex", + matched_text=regex_result.matched, + score=1.0, + duration_ms=duration_ms, + ) + duration_ms = int((time.time() - start_time) * 1000) logger.debug("[AC-AISVC-70] No intent matched, will fallback to default RAG") - return None + return RuleMatchResult( + rule_id=None, + rule=None, + match_type=None, + matched_text=None, + score=0.0, + duration_ms=duration_ms, + ) def _match_keywords( self, @@ -153,6 +191,74 @@ class IntentRouter: return None + +class IntentRouter: + """ + [AC-AISVC-69] Intent matching engine. + [v0.8.0] Upgraded to support hybrid routing. + + Matching algorithm: + 1. Load rules ordered by priority DESC + 2. For each rule, try keyword matching first + 3. If no keyword match, try regex pattern matching + 4. Return first match (highest priority) + 5. If no match, return None (fallback to default RAG) + + Hybrid routing (match_hybrid): + 1. Parallel execute RuleMatcher + SemanticMatcher + 2. Conditionally trigger LlmJudge + 3. Execute FusionPolicy for final decision + """ + + def __init__( + self, + rule_matcher: RuleMatcher | None = None, + semantic_matcher: "SemanticMatcher | None" = None, + llm_judge: "LlmJudge | None" = None, + fusion_policy: "FusionPolicy | None" = None, + config: FusionConfig | None = None, + ): + """ + [v0.8.0] Initialize with optional dependencies for DI. + + Args: + rule_matcher: Rule matcher for keyword/regex matching + semantic_matcher: Semantic matcher for vector similarity + llm_judge: LLM judge for arbitration + fusion_policy: Fusion policy for decision making + config: Fusion configuration + """ + self._rule_matcher = rule_matcher or RuleMatcher() + self._semantic_matcher = semantic_matcher + self._llm_judge = llm_judge + self._fusion_policy = fusion_policy + self._config = config or FusionConfig() + + def match( + self, + message: str, + rules: list[IntentRule], + ) -> IntentMatchResult | None: + """ + [AC-AISVC-69] Match user message against intent rules. + Preserved for backward compatibility. + + Args: + message: User input message + rules: List of enabled rules ordered by priority DESC + + Returns: + IntentMatchResult if matched, None otherwise + """ + result = self._rule_matcher.match(message, rules) + if result.rule: + return IntentMatchResult( + rule=result.rule, + match_type=result.match_type or "keyword", + matched=result.matched_text or "", + ) + return None + def match_with_stats( self, message: str, @@ -168,3 +274,300 @@ class IntentRouter: if result: return result, str(result.rule.id) return None, None + + async def match_hybrid( + self, + message: str, + rules: list[IntentRule], + tenant_id: str, + config: FusionConfig | None = None, + ) -> FusionResult: + """ + [AC-AISVC-111] Hybrid routing entry point. + + Flow: + 1. Parallel execute RuleMatcher + SemanticMatcher + 2. Check if LlmJudge should trigger + 3. Execute FusionPolicy for final decision + + Args: + message: User input message + rules: List of enabled rules ordered by priority DESC + tenant_id: Tenant ID for isolation + config: Optional fusion config override + + Returns: + FusionResult with final intent, confidence, and trace + """ + effective_config = config or self._config + start_time = time.time() + + rule_result = self._rule_matcher.match(message, rules) + + semantic_result = await self._execute_semantic_matcher( + message, rules, tenant_id, effective_config + ) + + llm_result = await self._conditionally_execute_llm_judge( + message, rule_result, semantic_result, tenant_id, effective_config + ) + + if self._fusion_policy: + fusion_result = self._fusion_policy.fuse( + rule_result, semantic_result, llm_result + ) + else: + fusion_result = self._default_fusion( + rule_result, semantic_result, llm_result, effective_config + ) + + total_duration_ms = int((time.time() - start_time) * 1000) + fusion_result.trace.fusion["total_duration_ms"] = total_duration_ms + + logger.info( + f"[AC-AISVC-111] Hybrid routing completed: " + f"decision={fusion_result.decision_reason}, " + f"confidence={fusion_result.final_confidence:.3f}, " + f"duration={total_duration_ms}ms" + ) + + return fusion_result + + async def _execute_semantic_matcher( + self, + message: str, + rules: list[IntentRule], + tenant_id: str, + config: FusionConfig, + ) -> SemanticMatchResult: + """Execute semantic matcher if available and enabled.""" + if not self._semantic_matcher: + return SemanticMatchResult( + candidates=[], + top_score=0.0, + duration_ms=0, + skipped=True, + skip_reason="not_configured", + ) + + if not config.semantic_matcher_enabled: + return SemanticMatchResult( + candidates=[], + top_score=0.0, + duration_ms=0, + skipped=True, + skip_reason="disabled", + ) + + try: + return await self._semantic_matcher.match( + message=message, + rules=rules, + tenant_id=tenant_id, + top_k=config.semantic_top_k, + ) + except Exception as e: + logger.warning(f"[AC-AISVC-113] Semantic matcher failed: {e}") + return SemanticMatchResult( + candidates=[], + top_score=0.0, + duration_ms=0, + skipped=True, + skip_reason=f"error: {str(e)}", + ) + + async def _conditionally_execute_llm_judge( + self, + message: str, + rule_result: RuleMatchResult, + semantic_result: SemanticMatchResult, + tenant_id: str, + config: FusionConfig, + ) -> LlmJudgeResult | None: + """Conditionally execute LLM judge based on trigger conditions.""" + if not self._llm_judge: + return None + + if not config.llm_judge_enabled: + return None + + should_trigger, trigger_reason = self._check_llm_trigger( + rule_result, semantic_result, config + ) + + if not should_trigger: + return None + + logger.info(f"[AC-AISVC-118] LLM judge triggered: reason={trigger_reason}") + + candidates = self._build_llm_candidates(rule_result, semantic_result) + if not candidates: + return None + + try: + return await self._llm_judge.judge( + LlmJudgeInput( + message=message, + candidates=candidates, + conflict_type=trigger_reason, + ), + tenant_id, + ) + except Exception as e: + logger.warning(f"[AC-AISVC-119] LLM judge failed: {e}") + return LlmJudgeResult( + intent_id=None, + intent_name=None, + score=0.0, + reasoning=f"LLM error: {str(e)}", + duration_ms=0, + tokens_used=0, + triggered=True, + ) + + def _check_llm_trigger( + self, + rule_result: RuleMatchResult, + semantic_result: SemanticMatchResult, + config: FusionConfig, + ) -> tuple[bool, str]: + """ + [AC-AISVC-118] Check if LLM judge should trigger. + + Trigger conditions: + 1. Conflict: RuleMatcher and SemanticMatcher match different intents + 2. Gray zone: Max confidence in gray zone range + 3. Multi-intent: Multiple candidates with close scores + + Returns: + (should_trigger, trigger_reason) + """ + rule_score = rule_result.score + semantic_score = semantic_result.top_score + + if rule_score > 0 and semantic_score > 0 and not semantic_result.skipped: + if semantic_result.candidates: + top_semantic_rule_id = semantic_result.candidates[0].rule.id + if rule_result.rule_id != top_semantic_rule_id: + if abs(rule_score - semantic_score) < config.conflict_threshold: + return True, "rule_semantic_conflict" + + max_score = max(rule_score, semantic_score) + if config.min_trigger_threshold < max_score < config.gray_zone_threshold: + return True, "gray_zone" + + if len(semantic_result.candidates) >= 2: + top1_score = semantic_result.candidates[0].score + top2_score = semantic_result.candidates[1].score + if abs(top1_score - top2_score) < config.multi_intent_threshold: + return True, "multi_intent" + + return False, "" + + def _build_llm_candidates( + self, + rule_result: RuleMatchResult, + semantic_result: SemanticMatchResult, + ) -> list[dict[str, Any]]: + """Build candidate list for LLM judge.""" + candidates = [] + + if rule_result.rule: + candidates.append({ + "id": str(rule_result.rule_id), + "name": rule_result.rule.name, + "description": f"匹配方式: {rule_result.match_type}, 匹配内容: {rule_result.matched_text}", + }) + + for candidate in semantic_result.candidates[:3]: + if not any(c["id"] == str(candidate.rule.id) for c in candidates): + candidates.append({ + "id": str(candidate.rule.id), + "name": candidate.rule.name, + "description": f"语义相似度: {candidate.score:.2f}", + }) + + return candidates + + def _default_fusion( + self, + rule_result: RuleMatchResult, + semantic_result: SemanticMatchResult, + llm_result: LlmJudgeResult | None, + config: FusionConfig, + ) -> FusionResult: + """Default fusion logic when FusionPolicy is not available.""" + trace = RouteTrace( + rule_match=rule_result.to_dict(), + semantic_match=semantic_result.to_dict(), + llm_judge=llm_result.to_dict() if llm_result else {}, + fusion={}, + ) + + final_intent = None + final_confidence = 0.0 + decision_reason = "no_match" + + if rule_result.score == 1.0 and rule_result.rule: + final_intent = rule_result.rule + final_confidence = 1.0 + decision_reason = "rule_high_confidence" + elif llm_result and llm_result.triggered and llm_result.intent_id: + final_intent = self._find_rule_by_id( + llm_result.intent_id, rule_result, semantic_result + ) + final_confidence = llm_result.score + decision_reason = "llm_judge" + elif rule_result.score == 0 and semantic_result.top_score > config.semantic_threshold: + if semantic_result.candidates: + final_intent = semantic_result.candidates[0].rule + final_confidence = semantic_result.top_score + decision_reason = "semantic_override" + elif semantic_result.top_score > 0.5: + if semantic_result.candidates: + final_intent = semantic_result.candidates[0].rule + final_confidence = semantic_result.top_score + decision_reason = "semantic_fallback" + + need_clarify = final_confidence < config.clarify_threshold + clarify_candidates = None + if need_clarify and len(semantic_result.candidates) > 1: + clarify_candidates = [c.rule for c in semantic_result.candidates[:3]] + + trace.fusion = { + "weights": { + "w_rule": config.w_rule, + "w_semantic": config.w_semantic, + "w_llm": config.w_llm, + }, + "final_confidence": final_confidence, + "decision_reason": decision_reason, + } + + return FusionResult( + final_intent=final_intent, + final_confidence=final_confidence, + decision_reason=decision_reason, + need_clarify=need_clarify, + clarify_candidates=clarify_candidates, + trace=trace, + ) + + def _find_rule_by_id( + self, + intent_id: str | None, + rule_result: RuleMatchResult, + semantic_result: SemanticMatchResult, + ) -> IntentRule | None: + """Find rule by ID from rule or semantic results.""" + if not intent_id: + return None + + if rule_result.rule_id and str(rule_result.rule_id) == intent_id: + return rule_result.rule + + for candidate in semantic_result.candidates: + if str(candidate.rule.id) == intent_id: + return candidate.rule + + return None diff --git a/ai-service/app/services/intent/semantic_matcher.py b/ai-service/app/services/intent/semantic_matcher.py new file mode 100644 index 0000000..77c9ff5 --- /dev/null +++ b/ai-service/app/services/intent/semantic_matcher.py @@ -0,0 +1,233 @@ +""" +Semantic matcher for intent recognition. +[AC-AISVC-113, AC-AISVC-114] Vector-based semantic matching. +""" + +import asyncio +import logging +import time +from typing import TYPE_CHECKING + +import numpy as np + +from app.services.intent.models import ( + FusionConfig, + SemanticCandidate, + SemanticMatchResult, +) + +if TYPE_CHECKING: + from app.models.entities import IntentRule + from app.services.embedding.base import EmbeddingProvider + +logger = logging.getLogger(__name__) + + +class SemanticMatcher: + """ + [AC-AISVC-113] Semantic matcher using vector similarity. + + Supports two matching modes: + - Mode A: Use pre-computed intent_vector for direct similarity calculation + - Mode B: Use semantic_examples for dynamic vector computation + """ + + def __init__( + self, + embedding_provider: "EmbeddingProvider", + config: FusionConfig, + ): + """ + Initialize semantic matcher. + + Args: + embedding_provider: Provider for generating embeddings + config: Fusion configuration + """ + self._embedding_provider = embedding_provider + self._config = config + + async def match( + self, + message: str, + rules: list["IntentRule"], + tenant_id: str, + top_k: int | None = None, + ) -> SemanticMatchResult: + """ + [AC-AISVC-113] Perform vector semantic matching. + + Args: + message: User message + rules: List of intent rules + tenant_id: Tenant ID for isolation + top_k: Number of top candidates to return + + Returns: + SemanticMatchResult with candidates and scores + """ + start_time = time.time() + effective_top_k = top_k or self._config.semantic_top_k + + if not self._config.semantic_matcher_enabled: + return SemanticMatchResult( + candidates=[], + top_score=0.0, + duration_ms=0, + skipped=True, + skip_reason="disabled", + ) + + rules_with_semantic = [r for r in rules if self._has_semantic_config(r)] + if not rules_with_semantic: + duration_ms = int((time.time() - start_time) * 1000) + logger.debug( + f"[AC-AISVC-113] No rules with semantic config for tenant={tenant_id}" + ) + return SemanticMatchResult( + candidates=[], + top_score=0.0, + duration_ms=duration_ms, + skipped=True, + skip_reason="no_semantic_config", + ) + + try: + message_vector = await asyncio.wait_for( + self._embedding_provider.embed(message), + timeout=self._config.semantic_matcher_timeout_ms / 1000, + ) + except asyncio.TimeoutError: + duration_ms = int((time.time() - start_time) * 1000) + logger.warning( + f"[AC-AISVC-113] Embedding timeout for tenant={tenant_id}, " + f"timeout={self._config.semantic_matcher_timeout_ms}ms" + ) + return SemanticMatchResult( + candidates=[], + top_score=0.0, + duration_ms=duration_ms, + skipped=True, + skip_reason="embedding_timeout", + ) + except Exception as e: + duration_ms = int((time.time() - start_time) * 1000) + logger.error( + f"[AC-AISVC-113] Embedding error for tenant={tenant_id}: {e}" + ) + return SemanticMatchResult( + candidates=[], + top_score=0.0, + duration_ms=duration_ms, + skipped=True, + skip_reason=f"embedding_error: {str(e)}", + ) + + candidates = [] + for rule in rules_with_semantic: + try: + score = await self._calculate_similarity(message_vector, rule) + if score > 0: + candidates.append(SemanticCandidate(rule=rule, score=score)) + except Exception as e: + logger.warning( + f"[AC-AISVC-114] Similarity calculation failed for rule={rule.id}: {e}" + ) + continue + + candidates.sort(key=lambda x: x.score, reverse=True) + candidates = candidates[:effective_top_k] + + duration_ms = int((time.time() - start_time) * 1000) + logger.info( + f"[AC-AISVC-113] Semantic match completed for tenant={tenant_id}, " + f"candidates={len(candidates)}, top_score={candidates[0].score if candidates else 0:.3f}, " + f"duration={duration_ms}ms" + ) + + return SemanticMatchResult( + candidates=candidates, + top_score=candidates[0].score if candidates else 0.0, + duration_ms=duration_ms, + skipped=False, + skip_reason=None, + ) + + def _has_semantic_config(self, rule: "IntentRule") -> bool: + """ + Check if rule has semantic configuration. + + Args: + rule: Intent rule to check + + Returns: + True if rule has intent_vector or semantic_examples + """ + return bool(rule.intent_vector) or bool(rule.semantic_examples) + + async def _calculate_similarity( + self, + message_vector: list[float], + rule: "IntentRule", + ) -> float: + """ + [AC-AISVC-114] Calculate similarity between message and rule. + + Mode A: Use pre-computed intent_vector + Mode B: Use semantic_examples for dynamic computation + + Args: + message_vector: Message embedding vector + rule: Intent rule with semantic config + + Returns: + Similarity score (0.0 ~ 1.0) + """ + if rule.intent_vector: + return self._cosine_similarity(message_vector, rule.intent_vector) + elif rule.semantic_examples: + try: + example_vectors = await self._embedding_provider.embed_batch( + rule.semantic_examples + ) + similarities = [ + self._cosine_similarity(message_vector, v) + for v in example_vectors + ] + return max(similarities) if similarities else 0.0 + except Exception as e: + logger.warning( + f"[AC-AISVC-114] Failed to compute example vectors for rule={rule.id}: {e}" + ) + return 0.0 + return 0.0 + + def _cosine_similarity( + self, + v1: list[float], + v2: list[float], + ) -> float: + """ + Calculate cosine similarity between two vectors. + + Args: + v1: First vector + v2: Second vector + + Returns: + Cosine similarity (0.0 ~ 1.0) + """ + if not v1 or not v2: + return 0.0 + + v1_arr = np.array(v1) + v2_arr = np.array(v2) + + norm1 = np.linalg.norm(v1_arr) + norm2 = np.linalg.norm(v2_arr) + + if norm1 == 0 or norm2 == 0: + return 0.0 + + similarity = float(np.dot(v1_arr, v2_arr) / (norm1 * norm2)) + return max(0.0, min(1.0, similarity))