""" Input scanner for AI Service. [AC-AISVC-83] User input pre-detection (logging only, no blocking). """ import logging from typing import Any from app.models.entities import ( ForbiddenWord, InputScanResult, ) from app.services.guardrail.word_service import ForbiddenWordService logger = logging.getLogger(__name__) class InputScanner: """ [AC-AISVC-83] Input scanner for pre-detection of forbidden words. Features: - Scans user input for forbidden words - Records matched words and categories in metadata - Does NOT block the request (only logging) - Used for monitoring and analytics """ def __init__(self, word_service: ForbiddenWordService): self._word_service = word_service async def scan( self, text: str, tenant_id: str, ) -> InputScanResult: """ [AC-AISVC-83] Scan user input for forbidden words. Args: text: User input text to scan tenant_id: Tenant ID for isolation Returns: InputScanResult with flagged status and matched words """ if not text or not text.strip(): return InputScanResult(flagged=False) words = await self._word_service.get_enabled_words_for_filtering(tenant_id) if not words: return InputScanResult(flagged=False) matched_words: list[str] = [] matched_categories: list[str] = [] matched_word_entities: list[ForbiddenWord] = [] for word in words: if word.word in text: matched_words.append(word.word) if word.category not in matched_categories: matched_categories.append(word.category) matched_word_entities.append(word) if matched_words: logger.info( f"[AC-AISVC-83] Input flagged: tenant={tenant_id}, " f"matched_words={matched_words}, categories={matched_categories}" ) for word_entity in matched_word_entities: try: await self._word_service.increment_hit_count(tenant_id, word_entity.id) except Exception as e: logger.warning( f"Failed to increment hit count for word {word_entity.id}: {e}" ) return InputScanResult( flagged=len(matched_words) > 0, matched_words=matched_words, matched_categories=matched_categories, ) async def scan_and_enrich_metadata( self, text: str, tenant_id: str, metadata: dict[str, Any] | None = None, ) -> dict[str, Any]: """ [AC-AISVC-83] Scan input and enrich metadata with scan result. Args: text: User input text to scan tenant_id: Tenant ID for isolation metadata: Existing metadata dict to enrich Returns: Enriched metadata with input_flagged and matched info """ result = await self.scan(text, tenant_id) if metadata is None: metadata = {} metadata.update(result.to_dict()) return metadata