""" Chat endpoint for AI Service. [AC-AISVC-01, AC-AISVC-02, AC-AISVC-06, AC-AISVC-08, AC-AISVC-09] Main chat endpoint with streaming/non-streaming modes. """ import logging from typing import Annotated, Any from fastapi import APIRouter, Depends, Header, Request from fastapi.responses import JSONResponse from sse_starlette.sse import EventSourceResponse from app.core.middleware import get_response_mode, is_sse_request from app.core.sse import SSEStateMachine, create_error_event from app.core.tenant import get_tenant_id from app.models import ChatRequest, ChatResponse, ErrorResponse from app.services.orchestrator import OrchestratorService, get_orchestrator_service logger = logging.getLogger(__name__) router = APIRouter(tags=["AI Chat"]) @router.post( "/ai/chat", operation_id="generateReply", summary="Generate AI reply", description=""" [AC-AISVC-01, AC-AISVC-02, AC-AISVC-06] Generate AI reply based on user message. Response mode is determined by Accept header: - Accept: text/event-stream -> SSE streaming response - Other -> JSON response """, responses={ 200: { "description": "Success - JSON or SSE stream", "content": { "application/json": {"schema": {"$ref": "#/components/schemas/ChatResponse"}}, "text/event-stream": {"schema": {"type": "string"}}, }, }, 400: {"description": "Invalid request", "model": ErrorResponse}, 500: {"description": "Internal error", "model": ErrorResponse}, 503: {"description": "Service unavailable", "model": ErrorResponse}, }, ) async def generate_reply( request: Request, chat_request: ChatRequest, accept: Annotated[str | None, Header()] = None, orchestrator: OrchestratorService = Depends(get_orchestrator_service), ) -> Any: """ [AC-AISVC-06] Generate AI reply with automatic response mode switching. Based on Accept header: - text/event-stream: Returns SSE stream with message/final/error events - Other: Returns JSON ChatResponse """ tenant_id = get_tenant_id() if not tenant_id: from app.core.exceptions import MissingTenantIdException raise MissingTenantIdException() logger.info( f"[AC-AISVC-06] Processing chat request: tenant={tenant_id}, " f"session={chat_request.session_id}, mode={get_response_mode(request)}" ) if is_sse_request(request): return await _handle_streaming_request(tenant_id, chat_request, orchestrator) else: return await _handle_json_request(tenant_id, chat_request, orchestrator) async def _handle_json_request( tenant_id: str, chat_request: ChatRequest, orchestrator: OrchestratorService, ) -> JSONResponse: """ [AC-AISVC-02] Handle non-streaming JSON request. Returns ChatResponse with reply, confidence, shouldTransfer. """ logger.info(f"[AC-AISVC-02] Processing JSON request for tenant={tenant_id}") try: response = await orchestrator.generate(tenant_id, chat_request) return JSONResponse( content=response.model_dump(exclude_none=True, by_alias=True), ) except Exception as e: logger.error(f"[AC-AISVC-04] Error generating response: {e}") from app.core.exceptions import AIServiceException, ErrorCode if isinstance(e, AIServiceException): raise e from app.core.exceptions import AIServiceException raise AIServiceException( code=ErrorCode.INTERNAL_ERROR, message=str(e), ) async def _handle_streaming_request( tenant_id: str, chat_request: ChatRequest, orchestrator: OrchestratorService, ) -> EventSourceResponse: """ [AC-AISVC-06, AC-AISVC-07, AC-AISVC-08, AC-AISVC-09] Handle SSE streaming request. SSE Event Sequence (per design.md Section 6.2): - message* (0 or more) -> final (exactly 1) -> close - OR message* (0 or more) -> error (exactly 1) -> close State machine ensures: - No events after final/error - Only one final OR one error event - Proper connection close """ logger.info(f"[AC-AISVC-06] Processing SSE request for tenant={tenant_id}") state_machine = SSEStateMachine() async def event_generator(): """ [AC-AISVC-08, AC-AISVC-09] Event generator with state machine enforcement. Ensures proper event sequence and error handling. """ await state_machine.transition_to_streaming() try: async for event in orchestrator.generate_stream(tenant_id, chat_request): if not state_machine.can_send_message(): logger.warning("[AC-AISVC-08] Received event after state machine closed, ignoring") break if event.event == "final": if await state_machine.transition_to_final(): logger.info("[AC-AISVC-08] Sending final event and closing stream") yield event break elif event.event == "error": if await state_machine.transition_to_error(): logger.info("[AC-AISVC-09] Sending error event and closing stream") yield event break elif event.event == "message": yield event except Exception as e: logger.error(f"[AC-AISVC-09] Streaming error: {e}") if await state_machine.transition_to_error(): yield create_error_event( code="STREAMING_ERROR", message=str(e), ) finally: await state_machine.close() logger.debug("[AC-AISVC-08] SSE connection closed") return EventSourceResponse(event_generator(), ping=15)