""" API Key management endpoints. [AC-AISVC-50] CRUD operations for API keys. """ import logging from datetime import datetime from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel, Field from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import get_session from app.models.entities import ApiKey, ApiKeyCreate from app.services.api_key import get_api_key_service logger = logging.getLogger(__name__) router = APIRouter(prefix="/admin/api-keys", tags=["API Keys"]) class ApiKeyResponse(BaseModel): """Response model for API key.""" id: str = Field(..., description="API key ID") key: str = Field(..., description="API key value") name: str = Field(..., description="API key name") is_active: bool = Field(..., description="Whether the key is active") expires_at: str | None = Field(default=None, description="Expiration time") allowed_ips: list[str] | None = Field(default=None, description="Optional client IP allowlist") rate_limit_qpm: int | None = Field(default=60, description="Per-minute quota") created_at: str = Field(..., description="Creation time") updated_at: str = Field(..., description="Last update time") class ApiKeyListResponse(BaseModel): """Response model for API key list.""" keys: list[ApiKeyResponse] = Field(..., description="List of API keys") total: int = Field(..., description="Total count") class CreateApiKeyRequest(BaseModel): """Request model for creating API key.""" name: str = Field(..., description="API key name/description") key: str | None = Field(default=None, description="Custom API key (auto-generated if not provided)") expires_at: datetime | None = Field(default=None, description="Expiration time; null means never expires") allowed_ips: list[str] | None = Field(default=None, description="Optional client IP allowlist") rate_limit_qpm: int | None = Field(default=60, ge=1, le=60000, description="Per-minute quota") class ToggleApiKeyRequest(BaseModel): """Request model for toggling API key status.""" is_active: bool = Field(..., description="New active status") def api_key_to_response(api_key: ApiKey) -> ApiKeyResponse: """Convert ApiKey entity to response model.""" return ApiKeyResponse( id=str(api_key.id), key=api_key.key, name=api_key.name, is_active=api_key.is_active, expires_at=api_key.expires_at.isoformat() if api_key.expires_at else None, allowed_ips=api_key.allowed_ips, rate_limit_qpm=api_key.rate_limit_qpm, created_at=api_key.created_at.isoformat(), updated_at=api_key.updated_at.isoformat(), ) @router.get("", response_model=ApiKeyListResponse) async def list_api_keys( session: Annotated[AsyncSession, Depends(get_session)], ): """ [AC-AISVC-50] List all API keys. """ service = get_api_key_service() keys = await service.list_keys(session) return ApiKeyListResponse( keys=[api_key_to_response(k) for k in keys], total=len(keys), ) @router.post("", response_model=ApiKeyResponse, status_code=status.HTTP_201_CREATED) async def create_api_key( request: CreateApiKeyRequest, session: Annotated[AsyncSession, Depends(get_session)], ): """ [AC-AISVC-50] Create a new API key. """ service = get_api_key_service() key_value = request.key or service.generate_key() key_create = ApiKeyCreate( key=key_value, name=request.name, is_active=True, expires_at=request.expires_at, allowed_ips=request.allowed_ips, rate_limit_qpm=request.rate_limit_qpm, ) api_key = await service.create_key(session, key_create) logger.info(f"[AC-AISVC-50] Created API key: {api_key.name}") return api_key_to_response(api_key) @router.delete("/{key_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_api_key( key_id: str, session: Annotated[AsyncSession, Depends(get_session)], ): """ [AC-AISVC-50] Delete an API key. """ service = get_api_key_service() deleted = await service.delete_key(session, key_id) if not deleted: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="API key not found", ) @router.patch("/{key_id}/toggle", response_model=ApiKeyResponse) async def toggle_api_key( key_id: str, request: ToggleApiKeyRequest, session: Annotated[AsyncSession, Depends(get_session)], ): """ [AC-AISVC-50] Toggle API key active status. """ service = get_api_key_service() api_key = await service.toggle_key(session, key_id, request.is_active) if not api_key: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="API key not found", ) return api_key_to_response(api_key) @router.post("/reload-cache", status_code=status.HTTP_204_NO_CONTENT) async def reload_api_key_cache( session: Annotated[AsyncSession, Depends(get_session)], ): """ [AC-AISVC-50] Reload API key cache from database. """ service = get_api_key_service() await service.reload_cache(session)