ai-robot-core/ai-service/app/api/chat.py

192 lines
6.7 KiB
Python
Raw Normal View History

"""
Chat endpoint for AI Service.
[AC-AISVC-01, AC-AISVC-02, AC-AISVC-06, AC-AISVC-08, AC-AISVC-09] Main chat endpoint with streaming/non-streaming modes.
"""
import logging
from typing import Annotated, Any
from fastapi import APIRouter, Depends, Header, Request
from fastapi.responses import JSONResponse
from sse_starlette.sse import EventSourceResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_session
from app.core.middleware import get_response_mode, is_sse_request
from app.core.sse import SSEStateMachine, create_error_event
from app.core.tenant import get_tenant_id
from app.models import ChatRequest, ChatResponse, ErrorResponse
from app.services.memory import MemoryService
from app.services.orchestrator import OrchestratorService
logger = logging.getLogger(__name__)
router = APIRouter(tags=["AI Chat"])
async def get_orchestrator_service_with_memory(
session: Annotated[AsyncSession, Depends(get_session)]
) -> OrchestratorService:
"""
[AC-AISVC-13] Create orchestrator service with memory service and LLM client.
Ensures each request has a fresh MemoryService with database session.
"""
from app.services.llm.factory import get_llm_config_manager
from app.services.retrieval.vector_retriever import get_vector_retriever
memory_service = MemoryService(session)
llm_config_manager = get_llm_config_manager()
llm_client = llm_config_manager.get_client()
retriever = await get_vector_retriever()
return OrchestratorService(
llm_client=llm_client,
memory_service=memory_service,
retriever=retriever,
)
@router.post(
"/ai/chat",
operation_id="generateReply",
summary="Generate AI reply",
description="""
[AC-AISVC-01, AC-AISVC-02, AC-AISVC-06] Generate AI reply based on user message.
Response mode is determined by Accept header:
- Accept: text/event-stream -> SSE streaming response
- Other -> JSON response
""",
responses={
200: {
"description": "Success - JSON or SSE stream",
"content": {
"application/json": {"schema": {"$ref": "#/components/schemas/ChatResponse"}},
"text/event-stream": {"schema": {"type": "string"}},
},
},
400: {"description": "Invalid request", "model": ErrorResponse},
500: {"description": "Internal error", "model": ErrorResponse},
503: {"description": "Service unavailable", "model": ErrorResponse},
},
)
async def generate_reply(
request: Request,
chat_request: ChatRequest,
accept: Annotated[str | None, Header()] = None,
orchestrator: OrchestratorService = Depends(get_orchestrator_service_with_memory),
) -> Any:
"""
[AC-AISVC-06] Generate AI reply with automatic response mode switching.
Based on Accept header:
- text/event-stream: Returns SSE stream with message/final/error events
- Other: Returns JSON ChatResponse
"""
tenant_id = get_tenant_id()
if not tenant_id:
from app.core.exceptions import MissingTenantIdException
raise MissingTenantIdException()
logger.info(
f"[AC-AISVC-06] Processing chat request: tenant={tenant_id}, "
f"session={chat_request.session_id}, mode={get_response_mode(request)}"
)
if is_sse_request(request):
return await _handle_streaming_request(tenant_id, chat_request, orchestrator)
else:
return await _handle_json_request(tenant_id, chat_request, orchestrator)
async def _handle_json_request(
tenant_id: str,
chat_request: ChatRequest,
orchestrator: OrchestratorService,
) -> JSONResponse:
"""
[AC-AISVC-02] Handle non-streaming JSON request.
Returns ChatResponse with reply, confidence, shouldTransfer.
"""
logger.info(f"[AC-AISVC-02] Processing JSON request for tenant={tenant_id}")
try:
response = await orchestrator.generate(tenant_id, chat_request)
return JSONResponse(
content=response.model_dump(exclude_none=True, by_alias=True),
)
except Exception as e:
logger.error(f"[AC-AISVC-04] Error generating response: {e}")
from app.core.exceptions import AIServiceException, ErrorCode
if isinstance(e, AIServiceException):
raise e
from app.core.exceptions import AIServiceException
raise AIServiceException(
code=ErrorCode.INTERNAL_ERROR,
message=str(e),
)
async def _handle_streaming_request(
tenant_id: str,
chat_request: ChatRequest,
orchestrator: OrchestratorService,
) -> EventSourceResponse:
"""
[AC-AISVC-06, AC-AISVC-07, AC-AISVC-08, AC-AISVC-09] Handle SSE streaming request.
SSE Event Sequence (per design.md Section 6.2):
- message* (0 or more) -> final (exactly 1) -> close
- OR message* (0 or more) -> error (exactly 1) -> close
State machine ensures:
- No events after final/error
- Only one final OR one error event
- Proper connection close
"""
logger.info(f"[AC-AISVC-06] Processing SSE request for tenant={tenant_id}")
state_machine = SSEStateMachine()
async def event_generator():
"""
[AC-AISVC-08, AC-AISVC-09] Event generator with state machine enforcement.
Ensures proper event sequence and error handling.
"""
await state_machine.transition_to_streaming()
try:
async for event in orchestrator.generate_stream(tenant_id, chat_request):
if not state_machine.can_send_message():
logger.warning("[AC-AISVC-08] Received event after state machine closed, ignoring")
break
if event.event == "final":
if await state_machine.transition_to_final():
logger.info("[AC-AISVC-08] Sending final event and closing stream")
yield event
break
elif event.event == "error":
if await state_machine.transition_to_error():
logger.info("[AC-AISVC-09] Sending error event and closing stream")
yield event
break
elif event.event == "message":
yield event
except Exception as e:
logger.error(f"[AC-AISVC-09] Streaming error: {e}")
if await state_machine.transition_to_error():
yield create_error_event(
code="STREAMING_ERROR",
message=str(e),
)
finally:
await state_machine.close()
logger.debug("[AC-AISVC-08] SSE connection closed")
return EventSourceResponse(event_generator(), ping=15)