ai-robot-core/ai-service/app/services/api_key.py

250 lines
6.2 KiB
Python

"""
API Key management service.
[AC-AISVC-50] Lightweight authentication with in-memory cache.
"""
import logging
import secrets
from datetime import datetime
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.entities import ApiKey, ApiKeyCreate
logger = logging.getLogger(__name__)
class ApiKeyService:
"""
[AC-AISVC-50] API Key management service.
Features:
- In-memory cache for fast validation
- Database persistence
- Hot-reload support
"""
def __init__(self):
self._keys_cache: set[str] = set()
self._initialized: bool = False
async def initialize(self, session: AsyncSession) -> None:
"""
Load all active API keys from database into memory.
Should be called on application startup.
"""
result = await session.execute(
select(ApiKey).where(ApiKey.is_active == True)
)
keys = result.scalars().all()
self._keys_cache = {key.key for key in keys}
self._initialized = True
logger.info(f"[AC-AISVC-50] Loaded {len(self._keys_cache)} API keys into memory")
def validate_key(self, key: str) -> bool:
"""
Validate an API key against the in-memory cache.
Args:
key: The API key to validate
Returns:
True if the key is valid, False otherwise
"""
if not self._initialized:
logger.warning("[AC-AISVC-50] API key service not initialized")
return False
return key in self._keys_cache
def generate_key(self) -> str:
"""
Generate a new secure API key.
Returns:
A URL-safe random string
"""
return secrets.token_urlsafe(32)
async def create_key(
self,
session: AsyncSession,
key_create: ApiKeyCreate
) -> ApiKey:
"""
Create a new API key.
Args:
session: Database session
key_create: Key creation data
Returns:
The created ApiKey entity
"""
api_key = ApiKey(
key=key_create.key,
name=key_create.name,
is_active=key_create.is_active,
)
session.add(api_key)
await session.commit()
await session.refresh(api_key)
if api_key.is_active:
self._keys_cache.add(api_key.key)
logger.info(f"[AC-AISVC-50] Created API key: {api_key.name}")
return api_key
async def create_default_key(self, session: AsyncSession) -> Optional[ApiKey]:
"""
Create a default API key if none exists.
Returns:
The created ApiKey or None if keys already exist
"""
result = await session.execute(select(ApiKey).limit(1))
existing = result.scalar_one_or_none()
if existing:
return None
default_key = secrets.token_urlsafe(32)
api_key = ApiKey(
key=default_key,
name="Default API Key",
is_active=True,
)
session.add(api_key)
await session.commit()
await session.refresh(api_key)
self._keys_cache.add(api_key.key)
logger.info(f"[AC-AISVC-50] Created default API key: {api_key.key}")
return api_key
async def delete_key(
self,
session: AsyncSession,
key_id: str
) -> bool:
"""
Delete an API key.
Args:
session: Database session
key_id: The key ID to delete
Returns:
True if deleted, False if not found
"""
import uuid
try:
key_uuid = uuid.UUID(key_id)
except ValueError:
return False
result = await session.execute(
select(ApiKey).where(ApiKey.id == key_uuid)
)
api_key = result.scalar_one_or_none()
if not api_key:
return False
key_value = api_key.key
await session.delete(api_key)
await session.commit()
self._keys_cache.discard(key_value)
logger.info(f"[AC-AISVC-50] Deleted API key: {api_key.name}")
return True
async def toggle_key(
self,
session: AsyncSession,
key_id: str,
is_active: bool
) -> Optional[ApiKey]:
"""
Toggle API key active status.
Args:
session: Database session
key_id: The key ID to toggle
is_active: New active status
Returns:
The updated ApiKey or None if not found
"""
import uuid
try:
key_uuid = uuid.UUID(key_id)
except ValueError:
return None
result = await session.execute(
select(ApiKey).where(ApiKey.id == key_uuid)
)
api_key = result.scalar_one_or_none()
if not api_key:
return None
api_key.is_active = is_active
api_key.updated_at = datetime.utcnow()
session.add(api_key)
await session.commit()
await session.refresh(api_key)
if is_active:
self._keys_cache.add(api_key.key)
else:
self._keys_cache.discard(api_key.key)
logger.info(f"[AC-AISVC-50] Toggled API key {api_key.name}: active={is_active}")
return api_key
async def list_keys(self, session: AsyncSession) -> list[ApiKey]:
"""
List all API keys.
Args:
session: Database session
Returns:
List of all ApiKey entities
"""
result = await session.execute(select(ApiKey))
return list(result.scalars().all())
async def reload_cache(self, session: AsyncSession) -> None:
"""
Reload all API keys from database into memory.
"""
self._keys_cache.clear()
await self.initialize(session)
logger.info("[AC-AISVC-50] API key cache reloaded")
_api_key_service: ApiKeyService | None = None
def get_api_key_service() -> ApiKeyService:
"""Get the global API key service instance."""
global _api_key_service
if _api_key_service is None:
_api_key_service = ApiKeyService()
return _api_key_service