""" [v0.8.0] Fusion policy for hybrid intent routing. [AC-AISVC-115~AC-AISVC-117] Fusion decision logic for three-way matching. """ import logging from collections.abc import Callable from typing import TYPE_CHECKING from app.services.intent.models import ( FusionConfig, FusionResult, LlmJudgeResult, RouteTrace, RuleMatchResult, SemanticMatchResult, ) if TYPE_CHECKING: from app.models.entities import IntentRule logger = logging.getLogger(__name__) DecisionCondition = Callable[[RuleMatchResult, SemanticMatchResult, LlmJudgeResult], bool] class FusionPolicy: """ [AC-AISVC-115] Fusion decision policy for hybrid routing. Decision priority: 1. rule_high_confidence: RuleMatcher hit with score=1.0 2. llm_judge: LlmJudge triggered and returned valid intent 3. semantic_override: RuleMatcher missed but SemanticMatcher high confidence 4. rule_semantic_agree: Both match same intent 5. semantic_fallback: SemanticMatcher medium confidence 6. rule_fallback: Only rule matched 7. no_match: All low confidence """ DECISION_PRIORITY: list[tuple[str, DecisionCondition]] = [ ("rule_high_confidence", lambda r, s, llm: r.score == 1.0 and r.rule is not None), ("llm_judge", lambda r, s, llm: llm.triggered and llm.intent_id is not None), ( "semantic_override", lambda r, s, llm: r.score == 0 and s.top_score > 0.7 and not s.skipped and len(s.candidates) > 0, ), ( "rule_semantic_agree", lambda r, s, llm: r.score > 0 and s.top_score > 0.5 and not s.skipped and len(s.candidates) > 0 and r.rule_id == s.candidates[0].rule.id, ), ( "semantic_fallback", lambda r, s, llm: s.top_score > 0.5 and not s.skipped and len(s.candidates) > 0, ), ("rule_fallback", lambda r, s, llm: r.score > 0), ("no_match", lambda r, s, llm: True), ] def __init__(self, config: FusionConfig | None = None): """ Initialize fusion policy with configuration. Args: config: Fusion configuration, uses default if not provided """ self._config = config or FusionConfig() def fuse( self, rule_result: RuleMatchResult, semantic_result: SemanticMatchResult, llm_result: LlmJudgeResult | None, ) -> FusionResult: """ [AC-AISVC-115] Execute fusion decision. Args: rule_result: Rule matching result semantic_result: Semantic matching result llm_result: LLM judge result (may be None) Returns: FusionResult with final intent, confidence, and trace """ trace = self._build_trace(rule_result, semantic_result, llm_result) final_intent = None final_confidence = 0.0 decision_reason = "no_match" effective_llm_result = llm_result or LlmJudgeResult.empty() for reason, condition in self.DECISION_PRIORITY: if condition(rule_result, semantic_result, effective_llm_result): decision_reason = reason break if decision_reason == "rule_high_confidence": final_intent = rule_result.rule final_confidence = 1.0 elif decision_reason == "llm_judge" and llm_result: final_intent = self._find_rule_by_id( llm_result.intent_id, rule_result, semantic_result ) final_confidence = llm_result.score elif decision_reason == "semantic_override": final_intent = semantic_result.candidates[0].rule final_confidence = semantic_result.top_score elif decision_reason == "rule_semantic_agree": final_intent = rule_result.rule final_confidence = self._calculate_weighted_confidence( rule_result, semantic_result, llm_result ) elif decision_reason == "semantic_fallback": final_intent = semantic_result.candidates[0].rule final_confidence = semantic_result.top_score elif decision_reason == "rule_fallback": final_intent = rule_result.rule final_confidence = rule_result.score need_clarify = final_confidence < self._config.clarify_threshold clarify_candidates = None if need_clarify and len(semantic_result.candidates) > 1: clarify_candidates = [c.rule for c in semantic_result.candidates[:3]] trace.fusion = { "weights": { "w_rule": self._config.w_rule, "w_semantic": self._config.w_semantic, "w_llm": self._config.w_llm, }, "final_confidence": final_confidence, "decision_reason": decision_reason, "need_clarify": need_clarify, } logger.info( f"[AC-AISVC-115] Fusion decision: reason={decision_reason}, " f"confidence={final_confidence:.3f}, need_clarify={need_clarify}" ) return FusionResult( final_intent=final_intent, final_confidence=final_confidence, decision_reason=decision_reason, need_clarify=need_clarify, clarify_candidates=clarify_candidates, trace=trace, ) def _build_trace( self, rule_result: RuleMatchResult, semantic_result: SemanticMatchResult, llm_result: LlmJudgeResult | None, ) -> RouteTrace: """ [AC-AISVC-122] Build route trace log. """ return RouteTrace( rule_match={ "rule_id": str(rule_result.rule_id) if rule_result.rule_id else None, "rule_name": rule_result.rule.name if rule_result.rule else None, "match_type": rule_result.match_type, "matched_text": rule_result.matched_text, "score": rule_result.score, "duration_ms": rule_result.duration_ms, }, semantic_match={ "top_candidates": [ { "rule_id": str(c.rule.id), "rule_name": c.rule.name, "score": c.score, } for c in semantic_result.candidates ], "top_score": semantic_result.top_score, "duration_ms": semantic_result.duration_ms, "skipped": semantic_result.skipped, "skip_reason": semantic_result.skip_reason, }, llm_judge={ "triggered": llm_result.triggered if llm_result else False, "intent_id": llm_result.intent_id if llm_result else None, "intent_name": llm_result.intent_name if llm_result else None, "score": llm_result.score if llm_result else 0.0, "reasoning": llm_result.reasoning if llm_result else None, "duration_ms": llm_result.duration_ms if llm_result else 0, "tokens_used": llm_result.tokens_used if llm_result else 0, }, fusion={}, ) def _calculate_weighted_confidence( self, rule_result: RuleMatchResult, semantic_result: SemanticMatchResult, llm_result: LlmJudgeResult | None, ) -> float: """ [AC-AISVC-116] Calculate weighted confidence. Formula: final_confidence = (w_rule * rule_score + w_semantic * semantic_score + w_llm * llm_score) / total_weight Returns: Weighted confidence in [0.0, 1.0] """ rule_score = rule_result.score semantic_score = semantic_result.top_score if not semantic_result.skipped else 0.0 llm_score = llm_result.score if llm_result and llm_result.triggered else 0.0 total_weight = self._config.w_rule + self._config.w_semantic if llm_result and llm_result.triggered: total_weight += self._config.w_llm if total_weight == 0: return 0.0 confidence = ( self._config.w_rule * rule_score + self._config.w_semantic * semantic_score + self._config.w_llm * llm_score ) / total_weight return min(1.0, max(0.0, confidence)) def _find_rule_by_id( self, intent_id: str | None, rule_result: RuleMatchResult, semantic_result: SemanticMatchResult, ) -> "IntentRule | None": """Find rule by ID from rule or semantic results.""" if not intent_id: return None if rule_result.rule_id and str(rule_result.rule_id) == intent_id: return rule_result.rule for candidate in semantic_result.candidates: if str(candidate.rule.id) == intent_id: return candidate.rule return None