111 lines
3.1 KiB
Python
111 lines
3.1 KiB
Python
"""
|
|
Input scanner for AI Service.
|
|
[AC-AISVC-83] User input pre-detection (logging only, no blocking).
|
|
"""
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
from app.models.entities import (
|
|
ForbiddenWord,
|
|
InputScanResult,
|
|
)
|
|
from app.services.guardrail.word_service import ForbiddenWordService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class InputScanner:
|
|
"""
|
|
[AC-AISVC-83] Input scanner for pre-detection of forbidden words.
|
|
|
|
Features:
|
|
- Scans user input for forbidden words
|
|
- Records matched words and categories in metadata
|
|
- Does NOT block the request (only logging)
|
|
- Used for monitoring and analytics
|
|
"""
|
|
|
|
def __init__(self, word_service: ForbiddenWordService):
|
|
self._word_service = word_service
|
|
|
|
async def scan(
|
|
self,
|
|
text: str,
|
|
tenant_id: str,
|
|
) -> InputScanResult:
|
|
"""
|
|
[AC-AISVC-83] Scan user input for forbidden words.
|
|
|
|
Args:
|
|
text: User input text to scan
|
|
tenant_id: Tenant ID for isolation
|
|
|
|
Returns:
|
|
InputScanResult with flagged status and matched words
|
|
"""
|
|
if not text or not text.strip():
|
|
return InputScanResult(flagged=False)
|
|
|
|
words = await self._word_service.get_enabled_words_for_filtering(tenant_id)
|
|
|
|
if not words:
|
|
return InputScanResult(flagged=False)
|
|
|
|
matched_words: list[str] = []
|
|
matched_categories: list[str] = []
|
|
matched_word_entities: list[ForbiddenWord] = []
|
|
|
|
for word in words:
|
|
if word.word in text:
|
|
matched_words.append(word.word)
|
|
if word.category not in matched_categories:
|
|
matched_categories.append(word.category)
|
|
matched_word_entities.append(word)
|
|
|
|
if matched_words:
|
|
logger.info(
|
|
f"[AC-AISVC-83] Input flagged: tenant={tenant_id}, "
|
|
f"matched_words={matched_words}, categories={matched_categories}"
|
|
)
|
|
|
|
for word_entity in matched_word_entities:
|
|
try:
|
|
await self._word_service.increment_hit_count(tenant_id, word_entity.id)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to increment hit count for word {word_entity.id}: {e}"
|
|
)
|
|
|
|
return InputScanResult(
|
|
flagged=len(matched_words) > 0,
|
|
matched_words=matched_words,
|
|
matched_categories=matched_categories,
|
|
)
|
|
|
|
async def scan_and_enrich_metadata(
|
|
self,
|
|
text: str,
|
|
tenant_id: str,
|
|
metadata: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
[AC-AISVC-83] Scan input and enrich metadata with scan result.
|
|
|
|
Args:
|
|
text: User input text to scan
|
|
tenant_id: Tenant ID for isolation
|
|
metadata: Existing metadata dict to enrich
|
|
|
|
Returns:
|
|
Enriched metadata with input_flagged and matched info
|
|
"""
|
|
result = await self.scan(text, tenant_id)
|
|
|
|
if metadata is None:
|
|
metadata = {}
|
|
|
|
metadata.update(result.to_dict())
|
|
|
|
return metadata
|