246 lines
7.5 KiB
Python
246 lines
7.5 KiB
Python
|
|
"""
|
|||
|
|
Intent rule tester for AI Service.
|
|||
|
|
[AC-AISVC-96] Intent rule testing service with conflict detection.
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import logging
|
|||
|
|
import re
|
|||
|
|
from dataclasses import dataclass, field
|
|||
|
|
from typing import Any
|
|||
|
|
|
|||
|
|
from app.models.entities import IntentRule
|
|||
|
|
from app.services.intent.router import IntentRouter
|
|||
|
|
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class IntentRuleTestCase:
|
|||
|
|
"""Result of testing a single message against a rule."""
|
|||
|
|
|
|||
|
|
message: str
|
|||
|
|
matched: bool
|
|||
|
|
matched_keywords: list[str] = field(default_factory=list)
|
|||
|
|
matched_patterns: list[str] = field(default_factory=list)
|
|||
|
|
match_type: str | None = None
|
|||
|
|
priority: int = 0
|
|||
|
|
priority_rank: int = 0
|
|||
|
|
conflict_rules: list[dict[str, Any]] = field(default_factory=list)
|
|||
|
|
reason: str | None = None
|
|||
|
|
|
|||
|
|
def to_dict(self) -> dict[str, Any]:
|
|||
|
|
return {
|
|||
|
|
"message": self.message,
|
|||
|
|
"matched": self.matched,
|
|||
|
|
"matchedKeywords": self.matched_keywords,
|
|||
|
|
"matchedPatterns": self.matched_patterns,
|
|||
|
|
"matchType": self.match_type,
|
|||
|
|
"priority": self.priority,
|
|||
|
|
"priorityRank": self.priority_rank,
|
|||
|
|
"conflictRules": self.conflict_rules,
|
|||
|
|
"reason": self.reason,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class IntentRuleTestResult:
|
|||
|
|
"""Result of testing multiple messages against a rule."""
|
|||
|
|
|
|||
|
|
rule_id: str
|
|||
|
|
rule_name: str
|
|||
|
|
results: list[IntentRuleTestCase]
|
|||
|
|
summary: dict[str, Any]
|
|||
|
|
|
|||
|
|
def to_dict(self) -> dict[str, Any]:
|
|||
|
|
return {
|
|||
|
|
"ruleId": self.rule_id,
|
|||
|
|
"ruleName": self.rule_name,
|
|||
|
|
"results": [r.to_dict() for r in self.results],
|
|||
|
|
"summary": self.summary,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
class IntentRuleTester:
|
|||
|
|
"""
|
|||
|
|
[AC-AISVC-96] Intent rule testing service.
|
|||
|
|
|
|||
|
|
Features:
|
|||
|
|
- Test single rule against multiple messages
|
|||
|
|
- Detect priority conflicts (other rules that also match)
|
|||
|
|
- Provide detailed match information
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(self):
|
|||
|
|
self._router = IntentRouter()
|
|||
|
|
|
|||
|
|
async def test_rule(
|
|||
|
|
self,
|
|||
|
|
rule: IntentRule,
|
|||
|
|
test_messages: list[str],
|
|||
|
|
all_rules: list[IntentRule],
|
|||
|
|
) -> IntentRuleTestResult:
|
|||
|
|
"""
|
|||
|
|
Test a rule against multiple messages and detect conflicts.
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
rule: The rule to test
|
|||
|
|
test_messages: List of test messages
|
|||
|
|
all_rules: All rules for conflict detection (ordered by priority DESC)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
IntentRuleTestResult with detailed test results
|
|||
|
|
"""
|
|||
|
|
results = []
|
|||
|
|
priority_rank = self._calculate_priority_rank(rule, all_rules)
|
|||
|
|
|
|||
|
|
for message in test_messages:
|
|||
|
|
test_case = self._test_single_message(rule, message, all_rules, priority_rank)
|
|||
|
|
results.append(test_case)
|
|||
|
|
|
|||
|
|
matched_count = sum(1 for r in results if r.matched)
|
|||
|
|
summary = {
|
|||
|
|
"totalTests": len(test_messages),
|
|||
|
|
"matchedCount": matched_count,
|
|||
|
|
"matchRate": matched_count / len(test_messages) if test_messages else 0,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logger.info(
|
|||
|
|
f"[AC-AISVC-96] Tested rule {rule.name}: "
|
|||
|
|
f"{matched_count}/{len(test_messages)} matched"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
return IntentRuleTestResult(
|
|||
|
|
rule_id=str(rule.id),
|
|||
|
|
rule_name=rule.name,
|
|||
|
|
results=results,
|
|||
|
|
summary=summary,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def _test_single_message(
|
|||
|
|
self,
|
|||
|
|
rule: IntentRule,
|
|||
|
|
message: str,
|
|||
|
|
all_rules: list[IntentRule],
|
|||
|
|
priority_rank: int,
|
|||
|
|
) -> IntentRuleTestCase:
|
|||
|
|
"""Test a single message against a rule."""
|
|||
|
|
matched_keywords = self._match_keywords(message, rule)
|
|||
|
|
matched_patterns = self._match_patterns(message, rule)
|
|||
|
|
|
|||
|
|
matched = len(matched_keywords) > 0 or len(matched_patterns) > 0
|
|||
|
|
match_type = None
|
|||
|
|
reason = None
|
|||
|
|
|
|||
|
|
if matched:
|
|||
|
|
if matched_keywords:
|
|||
|
|
match_type = "keyword"
|
|||
|
|
else:
|
|||
|
|
match_type = "regex"
|
|||
|
|
else:
|
|||
|
|
reason = self._determine_unmatch_reason(message, rule)
|
|||
|
|
|
|||
|
|
conflict_rules = self._detect_conflicts(rule, message, all_rules)
|
|||
|
|
|
|||
|
|
return IntentRuleTestCase(
|
|||
|
|
message=message,
|
|||
|
|
matched=matched,
|
|||
|
|
matched_keywords=matched_keywords,
|
|||
|
|
matched_patterns=matched_patterns,
|
|||
|
|
match_type=match_type,
|
|||
|
|
priority=rule.priority,
|
|||
|
|
priority_rank=priority_rank,
|
|||
|
|
conflict_rules=conflict_rules,
|
|||
|
|
reason=reason,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def _match_keywords(self, message: str, rule: IntentRule) -> list[str]:
|
|||
|
|
"""Match message against rule keywords."""
|
|||
|
|
matched = []
|
|||
|
|
keywords = rule.keywords or []
|
|||
|
|
message_lower = message.lower()
|
|||
|
|
|
|||
|
|
for keyword in keywords:
|
|||
|
|
if keyword and keyword.lower() in message_lower:
|
|||
|
|
matched.append(keyword)
|
|||
|
|
|
|||
|
|
return matched
|
|||
|
|
|
|||
|
|
def _match_patterns(self, message: str, rule: IntentRule) -> list[str]:
|
|||
|
|
"""Match message against rule regex patterns."""
|
|||
|
|
matched = []
|
|||
|
|
patterns = rule.patterns or []
|
|||
|
|
|
|||
|
|
for pattern in patterns:
|
|||
|
|
if not pattern:
|
|||
|
|
continue
|
|||
|
|
try:
|
|||
|
|
if re.search(pattern, message, re.IGNORECASE):
|
|||
|
|
matched.append(pattern)
|
|||
|
|
except re.error as e:
|
|||
|
|
logger.warning(f"Invalid regex pattern: {pattern}, error: {e}")
|
|||
|
|
|
|||
|
|
return matched
|
|||
|
|
|
|||
|
|
def _detect_conflicts(
|
|||
|
|
self,
|
|||
|
|
current_rule: IntentRule,
|
|||
|
|
message: str,
|
|||
|
|
all_rules: list[IntentRule],
|
|||
|
|
) -> list[dict[str, Any]]:
|
|||
|
|
"""Detect other rules that also match the message (priority conflicts)."""
|
|||
|
|
conflicts = []
|
|||
|
|
|
|||
|
|
for other_rule in all_rules:
|
|||
|
|
if str(other_rule.id) == str(current_rule.id):
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
if not other_rule.is_enabled:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
matched_keywords = self._match_keywords(message, other_rule)
|
|||
|
|
matched_patterns = self._match_patterns(message, other_rule)
|
|||
|
|
|
|||
|
|
if matched_keywords or matched_patterns:
|
|||
|
|
conflicts.append({
|
|||
|
|
"ruleId": str(other_rule.id),
|
|||
|
|
"ruleName": other_rule.name,
|
|||
|
|
"priority": other_rule.priority,
|
|||
|
|
"reason": f"同时匹配(优先级:{other_rule.priority})",
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
return conflicts
|
|||
|
|
|
|||
|
|
def _calculate_priority_rank(
|
|||
|
|
self,
|
|||
|
|
rule: IntentRule,
|
|||
|
|
all_rules: list[IntentRule],
|
|||
|
|
) -> int:
|
|||
|
|
"""Calculate the priority rank of a rule among all rules."""
|
|||
|
|
enabled_rules = [r for r in all_rules if r.is_enabled]
|
|||
|
|
sorted_rules = sorted(enabled_rules, key=lambda r: r.priority, reverse=True)
|
|||
|
|
|
|||
|
|
for rank, r in enumerate(sorted_rules, start=1):
|
|||
|
|
if str(r.id) == str(rule.id):
|
|||
|
|
return rank
|
|||
|
|
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
def _determine_unmatch_reason(self, message: str, rule: IntentRule) -> str:
|
|||
|
|
"""Determine why a message did not match a rule."""
|
|||
|
|
keywords = rule.keywords or []
|
|||
|
|
patterns = rule.patterns or []
|
|||
|
|
|
|||
|
|
if not keywords and not patterns:
|
|||
|
|
return "规则未配置关键词或正则表达式"
|
|||
|
|
|
|||
|
|
if keywords:
|
|||
|
|
keyword_str = "、".join(keywords[:3])
|
|||
|
|
if len(keywords) > 3:
|
|||
|
|
keyword_str += f"等{len(keywords)}个"
|
|||
|
|
return f"关键词不匹配(规则关键词:{keyword_str})"
|
|||
|
|
|
|||
|
|
if patterns:
|
|||
|
|
return f"正则表达式不匹配(规则模式:{len(patterns)}个)"
|
|||
|
|
|
|||
|
|
return "未匹配"
|