""" API Key management service. [AC-AISVC-50] Lightweight authentication with in-memory cache. """ import logging import secrets from datetime import datetime from typing import Optional from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.entities import ApiKey, ApiKeyCreate logger = logging.getLogger(__name__) class ApiKeyService: """ [AC-AISVC-50] API Key management service. Features: - In-memory cache for fast validation - Database persistence - Hot-reload support """ def __init__(self): self._keys_cache: set[str] = set() self._initialized: bool = False async def initialize(self, session: AsyncSession) -> None: """ Load all active API keys from database into memory. Should be called on application startup. """ result = await session.execute( select(ApiKey).where(ApiKey.is_active == True) ) keys = result.scalars().all() self._keys_cache = {key.key for key in keys} self._initialized = True logger.info(f"[AC-AISVC-50] Loaded {len(self._keys_cache)} API keys into memory") def validate_key(self, key: str) -> bool: """ Validate an API key against the in-memory cache. Args: key: The API key to validate Returns: True if the key is valid, False otherwise """ if not self._initialized: logger.warning("[AC-AISVC-50] API key service not initialized") return False return key in self._keys_cache def generate_key(self) -> str: """ Generate a new secure API key. Returns: A URL-safe random string """ return secrets.token_urlsafe(32) async def create_key( self, session: AsyncSession, key_create: ApiKeyCreate ) -> ApiKey: """ Create a new API key. Args: session: Database session key_create: Key creation data Returns: The created ApiKey entity """ api_key = ApiKey( key=key_create.key, name=key_create.name, is_active=key_create.is_active, ) session.add(api_key) await session.commit() await session.refresh(api_key) if api_key.is_active: self._keys_cache.add(api_key.key) logger.info(f"[AC-AISVC-50] Created API key: {api_key.name}") return api_key async def create_default_key(self, session: AsyncSession) -> Optional[ApiKey]: """ Create a default API key if none exists. Returns: The created ApiKey or None if keys already exist """ result = await session.execute(select(ApiKey).limit(1)) existing = result.scalar_one_or_none() if existing: return None default_key = secrets.token_urlsafe(32) api_key = ApiKey( key=default_key, name="Default API Key", is_active=True, ) session.add(api_key) await session.commit() await session.refresh(api_key) self._keys_cache.add(api_key.key) logger.info(f"[AC-AISVC-50] Created default API key: {api_key.key}") return api_key async def delete_key( self, session: AsyncSession, key_id: str ) -> bool: """ Delete an API key. Args: session: Database session key_id: The key ID to delete Returns: True if deleted, False if not found """ import uuid try: key_uuid = uuid.UUID(key_id) except ValueError: return False result = await session.execute( select(ApiKey).where(ApiKey.id == key_uuid) ) api_key = result.scalar_one_or_none() if not api_key: return False key_value = api_key.key await session.delete(api_key) await session.commit() self._keys_cache.discard(key_value) logger.info(f"[AC-AISVC-50] Deleted API key: {api_key.name}") return True async def toggle_key( self, session: AsyncSession, key_id: str, is_active: bool ) -> Optional[ApiKey]: """ Toggle API key active status. Args: session: Database session key_id: The key ID to toggle is_active: New active status Returns: The updated ApiKey or None if not found """ import uuid try: key_uuid = uuid.UUID(key_id) except ValueError: return None result = await session.execute( select(ApiKey).where(ApiKey.id == key_uuid) ) api_key = result.scalar_one_or_none() if not api_key: return None api_key.is_active = is_active api_key.updated_at = datetime.utcnow() session.add(api_key) await session.commit() await session.refresh(api_key) if is_active: self._keys_cache.add(api_key.key) else: self._keys_cache.discard(api_key.key) logger.info(f"[AC-AISVC-50] Toggled API key {api_key.name}: active={is_active}") return api_key async def list_keys(self, session: AsyncSession) -> list[ApiKey]: """ List all API keys. Args: session: Database session Returns: List of all ApiKey entities """ result = await session.execute(select(ApiKey)) return list(result.scalars().all()) async def reload_cache(self, session: AsyncSession) -> None: """ Reload all API keys from database into memory. """ self._keys_cache.clear() await self.initialize(session) logger.info("[AC-AISVC-50] API key cache reloaded") _api_key_service: ApiKeyService | None = None def get_api_key_service() -> ApiKeyService: """Get the global API key service instance.""" global _api_key_service if _api_key_service is None: _api_key_service = ApiKeyService() return _api_key_service