From 6d54030e0d7de7eab62e31786c5d09232b3f215c Mon Sep 17 00:00:00 2001 From: MerCry Date: Tue, 24 Feb 2026 13:47:12 +0800 Subject: [PATCH] feat(ai-service): implement complete Orchestrator generation pipeline for T3.4 [AC-AISVC-01, AC-AISVC-02] - Integrate Memory, ContextMerger, Retriever, LLMClient, ConfidenceCalculator - Implement 8-step generation pipeline: 1. Load local history from Memory 2. Merge with external history (dedup + truncate) 3. RAG retrieval (optional) 4. Build prompt with context and evidence 5. LLM generation 6. Calculate confidence 7. Save messages to Memory 8. Return ChatResponse - Add GenerationContext dataclass for tracking intermediate results - Implement fallback response mechanism for error handling - Add 21 unit tests for OrchestratorService - All 138 tests passing --- ai-service/app/services/orchestrator.py | 531 +++++++++++++++++-- ai-service/tests/test_orchestrator.py | 654 ++++++++++++++++++++++++ docs/progress/ai-service-progress.md | 84 ++- spec/ai-service-admin/requirements.md | 72 +++ spec/ai-service-admin/scope.md | 171 +++++++ spec/ai-service/openapi.admin.yaml | 209 ++++++++ spec/ai-service/tasks.md | 2 +- 7 files changed, 1658 insertions(+), 65 deletions(-) create mode 100644 ai-service/tests/test_orchestrator.py create mode 100644 spec/ai-service-admin/requirements.md create mode 100644 spec/ai-service-admin/scope.md create mode 100644 spec/ai-service/openapi.admin.yaml diff --git a/ai-service/app/services/orchestrator.py b/ai-service/app/services/orchestrator.py index c9ca1ce..d0e6317 100644 --- a/ai-service/app/services/orchestrator.py +++ b/ai-service/app/services/orchestrator.py @@ -1,19 +1,74 @@ """ Orchestrator service for AI Service. [AC-AISVC-01, AC-AISVC-02, AC-AISVC-06, AC-AISVC-07] Core orchestration logic for chat generation. + +Design reference: design.md Section 2.2 - 关键数据流 +1. Memory.load(tenantId, sessionId) +2. merge_context(local_history, external_history) +3. Retrieval.retrieve(query, tenantId, channelType, metadata) +4. build_prompt(merged_history, retrieved_docs, currentMessage) +5. LLM.generate(...) (non-streaming) or LLM.stream_generate(...) (streaming) +6. compute_confidence(...) +7. Memory.append(tenantId, sessionId, user/assistant messages) +8. Return ChatResponse (or output via SSE) """ import logging -from typing import AsyncGenerator +from dataclasses import dataclass, field +from typing import Any, AsyncGenerator from sse_starlette.sse import ServerSentEvent +from app.core.config import get_settings +from app.core.sse import ( + create_error_event, + create_final_event, + create_message_event, + SSEStateMachine, +) from app.models import ChatRequest, ChatResponse -from app.core.sse import create_error_event, create_final_event, create_message_event, SSEStateMachine +from app.services.confidence import ConfidenceCalculator, ConfidenceResult +from app.services.context import ContextMerger, MergedContext +from app.services.llm.base import LLMClient, LLMConfig, LLMResponse +from app.services.memory import MemoryService +from app.services.retrieval.base import BaseRetriever, RetrievalContext, RetrievalResult logger = logging.getLogger(__name__) +@dataclass +class OrchestratorConfig: + """ + Configuration for OrchestratorService. + [AC-AISVC-01] Centralized configuration for orchestration. + """ + max_history_tokens: int = 4000 + max_evidence_tokens: int = 2000 + system_prompt: str = "你是一个智能客服助手,请根据提供的知识库内容回答用户问题。" + enable_rag: bool = True + + +@dataclass +class GenerationContext: + """ + [AC-AISVC-01, AC-AISVC-02] Context accumulated during generation pipeline. + Contains all intermediate results for diagnostics and response building. + """ + tenant_id: str + session_id: str + current_message: str + channel_type: str + request_metadata: dict[str, Any] | None = None + + local_history: list[dict[str, str]] = field(default_factory=list) + merged_context: MergedContext | None = None + retrieval_result: RetrievalResult | None = None + llm_response: LLMResponse | None = None + confidence_result: ConfidenceResult | None = None + + diagnostics: dict[str, Any] = field(default_factory=dict) + + class OrchestratorService: """ [AC-AISVC-01, AC-AISVC-02, AC-AISVC-06, AC-AISVC-07] Orchestrator for chat generation. @@ -24,44 +79,409 @@ class OrchestratorService: - OR message* (0 or more) -> error (exactly 1) -> close """ - def __init__(self, llm_client=None): + def __init__( + self, + llm_client: LLMClient | None = None, + memory_service: MemoryService | None = None, + retriever: BaseRetriever | None = None, + context_merger: ContextMerger | None = None, + confidence_calculator: ConfidenceCalculator | None = None, + config: OrchestratorConfig | None = None, + ): """ - Initialize orchestrator with optional LLM client. + Initialize orchestrator with optional dependencies for DI. Args: - llm_client: Optional LLM client for dependency injection. - If None, will use mock implementation for demo. + llm_client: LLM client for generation + memory_service: Memory service for session history + retriever: Retriever for RAG + context_merger: Context merger for history deduplication + confidence_calculator: Confidence calculator for response scoring + config: Orchestrator configuration """ + settings = get_settings() self._llm_client = llm_client + self._memory_service = memory_service + self._retriever = retriever + self._context_merger = context_merger or ContextMerger( + max_history_tokens=getattr(settings, "max_history_tokens", 4000) + ) + self._confidence_calculator = confidence_calculator or ConfidenceCalculator() + self._config = config or OrchestratorConfig( + max_history_tokens=getattr(settings, "max_history_tokens", 4000), + max_evidence_tokens=getattr(settings, "rag_max_evidence_tokens", 2000), + enable_rag=True, + ) + self._llm_config = LLMConfig( + model=getattr(settings, "llm_model", "gpt-4o-mini"), + max_tokens=getattr(settings, "llm_max_tokens", 2048), + temperature=getattr(settings, "llm_temperature", 0.7), + timeout_seconds=getattr(settings, "llm_timeout_seconds", 30), + max_retries=getattr(settings, "llm_max_retries", 3), + ) - async def generate(self, tenant_id: str, request: ChatRequest) -> ChatResponse: + async def generate( + self, + tenant_id: str, + request: ChatRequest, + ) -> ChatResponse: """ Generate a non-streaming response. - [AC-AISVC-02] Returns ChatResponse with reply, confidence, shouldTransfer. + [AC-AISVC-01, AC-AISVC-02] Complete generation pipeline. + + Pipeline (per design.md Section 2.2): + 1. Load local history from Memory + 2. Merge with external history (dedup + truncate) + 3. RAG retrieval (optional) + 4. Build prompt with context and evidence + 5. LLM generation + 6. Calculate confidence + 7. Save messages to Memory + 8. Return ChatResponse """ logger.info( - f"[AC-AISVC-01] Generating response for tenant={tenant_id}, " + f"[AC-AISVC-01] Starting generation for tenant={tenant_id}, " f"session={request.session_id}" ) - if self._llm_client: - messages = self._build_messages(request) - response = await self._llm_client.generate(messages) + ctx = GenerationContext( + tenant_id=tenant_id, + session_id=request.session_id, + current_message=request.current_message, + channel_type=request.channel_type.value, + request_metadata=request.metadata, + ) + + try: + await self._load_local_history(ctx) + + await self._merge_context(ctx, request.history) + + if self._config.enable_rag and self._retriever: + await self._retrieve_evidence(ctx) + + await self._generate_response(ctx) + + self._calculate_confidence(ctx) + + await self._save_messages(ctx) + + return self._build_response(ctx) + + except Exception as e: + logger.error(f"[AC-AISVC-01] Generation failed: {e}") return ChatResponse( - reply=response.content, - confidence=0.85, - should_transfer=False, + reply="抱歉,服务暂时不可用,请稍后重试或联系人工客服。", + confidence=0.0, + should_transfer=True, + transfer_reason=f"服务异常: {str(e)}", + metadata={"error": str(e), "diagnostics": ctx.diagnostics}, ) - reply = f"Received your message: {request.current_message}" + async def _load_local_history(self, ctx: GenerationContext) -> None: + """ + [AC-AISVC-13] Load local history from Memory service. + Step 1 of the generation pipeline. + """ + if not self._memory_service: + logger.info("[AC-AISVC-13] No memory service configured, skipping history load") + ctx.diagnostics["memory_enabled"] = False + return + + try: + messages = await self._memory_service.load_history( + tenant_id=ctx.tenant_id, + session_id=ctx.session_id, + ) + + ctx.local_history = [ + {"role": msg.role, "content": msg.content} + for msg in messages + ] + + ctx.diagnostics["memory_enabled"] = True + ctx.diagnostics["local_history_count"] = len(ctx.local_history) + + logger.info( + f"[AC-AISVC-13] Loaded {len(ctx.local_history)} messages from memory " + f"for tenant={ctx.tenant_id}, session={ctx.session_id}" + ) + + except Exception as e: + logger.warning(f"[AC-AISVC-13] Failed to load history: {e}") + ctx.diagnostics["memory_error"] = str(e) + + async def _merge_context( + self, + ctx: GenerationContext, + external_history: list | None, + ) -> None: + """ + [AC-AISVC-14, AC-AISVC-15] Merge local and external history. + Step 2 of the generation pipeline. + + Design reference: design.md Section 7 + - Deduplication based on fingerprint + - Truncation to fit token budget + """ + external_messages = None + if external_history: + external_messages = [ + {"role": msg.role.value, "content": msg.content} + for msg in external_history + ] + + ctx.merged_context = self._context_merger.merge_and_truncate( + local_history=ctx.local_history, + external_history=external_messages, + max_tokens=self._config.max_history_tokens, + ) + + ctx.diagnostics["merged_context"] = { + "local_count": ctx.merged_context.local_count, + "external_count": ctx.merged_context.external_count, + "duplicates_skipped": ctx.merged_context.duplicates_skipped, + "truncated_count": ctx.merged_context.truncated_count, + "total_tokens": ctx.merged_context.total_tokens, + } + + logger.info( + f"[AC-AISVC-14, AC-AISVC-15] Context merged: " + f"local={ctx.merged_context.local_count}, " + f"external={ctx.merged_context.external_count}, " + f"tokens={ctx.merged_context.total_tokens}" + ) + + async def _retrieve_evidence(self, ctx: GenerationContext) -> None: + """ + [AC-AISVC-16, AC-AISVC-17] RAG retrieval for evidence. + Step 3 of the generation pipeline. + """ + try: + retrieval_ctx = RetrievalContext( + tenant_id=ctx.tenant_id, + query=ctx.current_message, + session_id=ctx.session_id, + channel_type=ctx.channel_type, + metadata=ctx.request_metadata, + ) + + ctx.retrieval_result = await self._retriever.retrieve(retrieval_ctx) + + ctx.diagnostics["retrieval"] = { + "hit_count": ctx.retrieval_result.hit_count, + "max_score": ctx.retrieval_result.max_score, + "is_empty": ctx.retrieval_result.is_empty, + } + + logger.info( + f"[AC-AISVC-16, AC-AISVC-17] Retrieval complete: " + f"hits={ctx.retrieval_result.hit_count}, " + f"max_score={ctx.retrieval_result.max_score:.3f}" + ) + + except Exception as e: + logger.warning(f"[AC-AISVC-16] Retrieval failed: {e}") + ctx.retrieval_result = RetrievalResult( + hits=[], + diagnostics={"error": str(e)}, + ) + ctx.diagnostics["retrieval_error"] = str(e) + + async def _generate_response(self, ctx: GenerationContext) -> None: + """ + [AC-AISVC-02] Generate response using LLM. + Step 4-5 of the generation pipeline. + """ + messages = self._build_llm_messages(ctx) + + if not self._llm_client: + logger.warning("[AC-AISVC-02] No LLM client configured, using fallback") + ctx.llm_response = LLMResponse( + content=self._fallback_response(ctx), + model="fallback", + usage={}, + finish_reason="fallback", + ) + ctx.diagnostics["llm_mode"] = "fallback" + return + + try: + ctx.llm_response = await self._llm_client.generate( + messages=messages, + config=self._llm_config, + ) + ctx.diagnostics["llm_mode"] = "live" + ctx.diagnostics["llm_model"] = ctx.llm_response.model + ctx.diagnostics["llm_usage"] = ctx.llm_response.usage + + logger.info( + f"[AC-AISVC-02] LLM response generated: " + f"model={ctx.llm_response.model}, " + f"tokens={ctx.llm_response.usage}" + ) + + except Exception as e: + logger.error(f"[AC-AISVC-02] LLM generation failed: {e}") + ctx.llm_response = LLMResponse( + content=self._fallback_response(ctx), + model="fallback", + usage={}, + finish_reason="error", + metadata={"error": str(e)}, + ) + ctx.diagnostics["llm_error"] = str(e) + + def _build_llm_messages(self, ctx: GenerationContext) -> list[dict[str, str]]: + """ + [AC-AISVC-02] Build messages for LLM including system prompt and evidence. + """ + messages = [] + + system_content = self._config.system_prompt + + if ctx.retrieval_result and not ctx.retrieval_result.is_empty: + evidence_text = self._format_evidence(ctx.retrieval_result) + system_content += f"\n\n知识库参考内容:\n{evidence_text}" + + messages.append({"role": "system", "content": system_content}) + + if ctx.merged_context and ctx.merged_context.messages: + messages.extend(ctx.merged_context.messages) + + messages.append({"role": "user", "content": ctx.current_message}) + + return messages + + def _format_evidence(self, retrieval_result: RetrievalResult) -> str: + """ + [AC-AISVC-17] Format retrieval hits as evidence text. + """ + evidence_parts = [] + for i, hit in enumerate(retrieval_result.hits[:5], 1): + evidence_parts.append(f"[{i}] (相关度: {hit.score:.2f}) {hit.text}") + + return "\n".join(evidence_parts) + + def _fallback_response(self, ctx: GenerationContext) -> str: + """ + [AC-AISVC-17] Generate fallback response when LLM is unavailable. + """ + if ctx.retrieval_result and not ctx.retrieval_result.is_empty: + return ( + "根据知识库信息,我找到了一些相关内容," + "但暂时无法生成完整回复。建议您稍后重试或联系人工客服。" + ) + return ( + "抱歉,我暂时无法处理您的请求。" + "请稍后重试或联系人工客服获取帮助。" + ) + + def _calculate_confidence(self, ctx: GenerationContext) -> None: + """ + [AC-AISVC-17, AC-AISVC-18, AC-AISVC-19] Calculate confidence score. + Step 6 of the generation pipeline. + """ + if ctx.retrieval_result: + evidence_tokens = 0 + if not ctx.retrieval_result.is_empty: + evidence_tokens = sum( + len(hit.text.split()) * 2 + for hit in ctx.retrieval_result.hits + ) + + ctx.confidence_result = self._confidence_calculator.calculate_confidence( + retrieval_result=ctx.retrieval_result, + evidence_tokens=evidence_tokens, + ) + else: + ctx.confidence_result = self._confidence_calculator.calculate_confidence_no_retrieval() + + ctx.diagnostics["confidence"] = { + "score": ctx.confidence_result.confidence, + "should_transfer": ctx.confidence_result.should_transfer, + "is_insufficient": ctx.confidence_result.is_retrieval_insufficient, + } + + logger.info( + f"[AC-AISVC-17, AC-AISVC-18] Confidence calculated: " + f"{ctx.confidence_result.confidence:.3f}, " + f"should_transfer={ctx.confidence_result.should_transfer}" + ) + + async def _save_messages(self, ctx: GenerationContext) -> None: + """ + [AC-AISVC-13] Save user and assistant messages to Memory. + Step 7 of the generation pipeline. + """ + if not self._memory_service: + logger.info("[AC-AISVC-13] No memory service configured, skipping save") + return + + try: + await self._memory_service.get_or_create_session( + tenant_id=ctx.tenant_id, + session_id=ctx.session_id, + channel_type=ctx.channel_type, + metadata=ctx.request_metadata, + ) + + messages_to_save = [ + {"role": "user", "content": ctx.current_message}, + ] + + if ctx.llm_response: + messages_to_save.append({ + "role": "assistant", + "content": ctx.llm_response.content, + }) + + await self._memory_service.append_messages( + tenant_id=ctx.tenant_id, + session_id=ctx.session_id, + messages=messages_to_save, + ) + + ctx.diagnostics["messages_saved"] = len(messages_to_save) + + logger.info( + f"[AC-AISVC-13] Saved {len(messages_to_save)} messages " + f"for tenant={ctx.tenant_id}, session={ctx.session_id}" + ) + + except Exception as e: + logger.warning(f"[AC-AISVC-13] Failed to save messages: {e}") + ctx.diagnostics["save_error"] = str(e) + + def _build_response(self, ctx: GenerationContext) -> ChatResponse: + """ + [AC-AISVC-02] Build final ChatResponse from generation context. + Step 8 of the generation pipeline. + """ + reply = ctx.llm_response.content if ctx.llm_response else self._fallback_response(ctx) + + confidence = ctx.confidence_result.confidence if ctx.confidence_result else 0.5 + should_transfer = ctx.confidence_result.should_transfer if ctx.confidence_result else True + transfer_reason = ctx.confidence_result.transfer_reason if ctx.confidence_result else None + + response_metadata = { + "session_id": ctx.session_id, + "channel_type": ctx.channel_type, + "diagnostics": ctx.diagnostics, + } + return ChatResponse( reply=reply, - confidence=0.85, - should_transfer=False, + confidence=confidence, + should_transfer=should_transfer, + transfer_reason=transfer_reason, + metadata=response_metadata, ) async def generate_stream( - self, tenant_id: str, request: ChatRequest + self, + tenant_id: str, + request: ChatRequest, ) -> AsyncGenerator[ServerSentEvent, None]: """ Generate a streaming response. @@ -85,25 +505,52 @@ class OrchestratorService: state_machine = SSEStateMachine() await state_machine.transition_to_streaming() + ctx = GenerationContext( + tenant_id=tenant_id, + session_id=request.session_id, + current_message=request.current_message, + channel_type=request.channel_type.value, + request_metadata=request.metadata, + ) + try: + await self._load_local_history(ctx) + await self._merge_context(ctx, request.history) + + if self._config.enable_rag and self._retriever: + await self._retrieve_evidence(ctx) + full_reply = "" if self._llm_client: - async for event in self._stream_from_llm(request, state_machine): + async for event in self._stream_from_llm(ctx, state_machine): if event.event == "message": full_reply += self._extract_delta_from_event(event) yield event else: - async for event in self._stream_mock_response(request, state_machine): + async for event in self._stream_mock_response(ctx, state_machine): if event.event == "message": full_reply += self._extract_delta_from_event(event) yield event + if ctx.llm_response is None: + ctx.llm_response = LLMResponse( + content=full_reply, + model="streaming", + usage={}, + finish_reason="stop", + ) + + self._calculate_confidence(ctx) + + await self._save_messages(ctx) + if await state_machine.transition_to_final(): yield create_final_event( reply=full_reply, - confidence=0.85, - should_transfer=False, + confidence=ctx.confidence_result.confidence if ctx.confidence_result else 0.5, + should_transfer=ctx.confidence_result.should_transfer if ctx.confidence_result else False, + transfer_reason=ctx.confidence_result.transfer_reason if ctx.confidence_result else None, ) except Exception as e: @@ -117,14 +564,16 @@ class OrchestratorService: await state_machine.close() async def _stream_from_llm( - self, request: ChatRequest, state_machine: SSEStateMachine + self, + ctx: GenerationContext, + state_machine: SSEStateMachine, ) -> AsyncGenerator[ServerSentEvent, None]: """ [AC-AISVC-07] Stream from LLM client, wrapping each chunk as message event. """ - messages = self._build_messages(request) + messages = self._build_llm_messages(ctx) - async for chunk in self._llm_client.stream_generate(messages): + async for chunk in self._llm_client.stream_generate(messages, self._llm_config): if not state_machine.can_send_message(): break @@ -137,7 +586,9 @@ class OrchestratorService: break async def _stream_mock_response( - self, request: ChatRequest, state_machine: SSEStateMachine + self, + ctx: GenerationContext, + state_machine: SSEStateMachine, ) -> AsyncGenerator[ServerSentEvent, None]: """ [AC-AISVC-07] Mock streaming response for demo/testing purposes. @@ -145,7 +596,7 @@ class OrchestratorService: """ import asyncio - reply_parts = ["Received", " your", " message:", f" {request.current_message}"] + reply_parts = ["收到", "您的", "消息:", f" {ctx.current_message}"] for part in reply_parts: if not state_machine.can_send_message(): @@ -155,24 +606,6 @@ class OrchestratorService: yield create_message_event(delta=part) await asyncio.sleep(0.05) - def _build_messages(self, request: ChatRequest) -> list[dict[str, str]]: - """Build messages list for LLM from request.""" - messages = [] - - if request.history: - for msg in request.history: - messages.append({ - "role": msg.role.value, - "content": msg.content, - }) - - messages.append({ - "role": "user", - "content": request.current_message, - }) - - return messages - def _extract_delta_from_event(self, event: ServerSentEvent) -> str: """Extract delta content from a message event.""" import json @@ -194,3 +627,9 @@ def get_orchestrator_service() -> OrchestratorService: if _orchestrator_service is None: _orchestrator_service = OrchestratorService() return _orchestrator_service + + +def set_orchestrator_service(service: OrchestratorService) -> None: + """Set orchestrator service instance for testing.""" + global _orchestrator_service + _orchestrator_service = service diff --git a/ai-service/tests/test_orchestrator.py b/ai-service/tests/test_orchestrator.py new file mode 100644 index 0000000..d64fec4 --- /dev/null +++ b/ai-service/tests/test_orchestrator.py @@ -0,0 +1,654 @@ +""" +Tests for OrchestratorService. +[AC-AISVC-01, AC-AISVC-02] Test complete generation pipeline integration. +""" + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from typing import AsyncGenerator + +from app.models import ChatRequest, ChatResponse, ChannelType, ChatMessage, Role +from app.services.orchestrator import ( + OrchestratorService, + OrchestratorConfig, + GenerationContext, + set_orchestrator_service, +) +from app.services.llm.base import LLMClient, LLMConfig, LLMResponse, LLMStreamChunk +from app.services.memory import MemoryService +from app.services.retrieval.base import ( + BaseRetriever, + RetrievalContext, + RetrievalResult, + RetrievalHit, +) +from app.services.confidence import ConfidenceCalculator, ConfidenceConfig +from app.services.context import ContextMerger +from app.models.entities import ChatMessage as ChatMessageEntity + + +class MockLLMClient(LLMClient): + """Mock LLM client for testing.""" + + def __init__(self, response_content: str = "Mock LLM response"): + self._response_content = response_content + self._generate_called = False + self._stream_generate_called = False + + async def generate( + self, + messages: list[dict[str, str]], + config: LLMConfig | None = None, + **kwargs, + ) -> LLMResponse: + self._generate_called = True + return LLMResponse( + content=self._response_content, + model="mock-model", + usage={"prompt_tokens": 100, "completion_tokens": 50}, + finish_reason="stop", + ) + + async def stream_generate( + self, + messages: list[dict[str, str]], + config: LLMConfig | None = None, + **kwargs, + ) -> AsyncGenerator[LLMStreamChunk, None]: + self._stream_generate_called = True + chunks = ["Hello", " from", " mock", " LLM"] + for chunk in chunks: + yield LLMStreamChunk(delta=chunk, model="mock-model") + yield LLMStreamChunk(delta="", model="mock-model", finish_reason="stop") + + async def close(self) -> None: + pass + + +class MockRetriever(BaseRetriever): + """Mock retriever for testing.""" + + def __init__(self, hits: list[RetrievalHit] | None = None): + self._hits = hits or [] + + async def retrieve(self, ctx: RetrievalContext) -> RetrievalResult: + return RetrievalResult( + hits=self._hits, + diagnostics={"mock": True}, + ) + + async def health_check(self) -> bool: + return True + + +class MockMemoryService: + """Mock memory service for testing.""" + + def __init__(self, history: list[ChatMessageEntity] | None = None): + self._history = history or [] + self._saved_messages: list[dict] = [] + self._session_created = False + + async def get_or_create_session( + self, + tenant_id: str, + session_id: str, + channel_type: str | None = None, + metadata: dict | None = None, + ): + self._session_created = True + return MagicMock(tenant_id=tenant_id, session_id=session_id) + + async def load_history( + self, + tenant_id: str, + session_id: str, + limit: int | None = None, + ): + return self._history + + async def append_message( + self, + tenant_id: str, + session_id: str, + role: str, + content: str, + ): + self._saved_messages.append({"role": role, "content": content}) + + async def append_messages( + self, + tenant_id: str, + session_id: str, + messages: list[dict[str, str]], + ): + self._saved_messages.extend(messages) + + +def create_chat_request( + message: str = "Hello", + session_id: str = "test-session", + history: list[ChatMessage] | None = None, + metadata: dict | None = None, +) -> ChatRequest: + """Helper to create ChatRequest.""" + return ChatRequest( + session_id=session_id, + current_message=message, + channel_type=ChannelType.WECHAT, + history=history, + metadata=metadata, + ) + + +class TestOrchestratorServiceGenerate: + """Tests for OrchestratorService.generate() method.""" + + @pytest.mark.asyncio + async def test_generate_basic_without_dependencies(self): + """ + [AC-AISVC-01, AC-AISVC-02] Test basic generation without external dependencies. + Should return fallback response with low confidence. + """ + orchestrator = OrchestratorService( + config=OrchestratorConfig(enable_rag=False), + ) + + request = create_chat_request(message="What is the price?") + response = await orchestrator.generate( + tenant_id="tenant-1", + request=request, + ) + + assert isinstance(response, ChatResponse) + assert response.reply is not None + assert response.confidence >= 0.0 + assert response.confidence <= 1.0 + assert isinstance(response.should_transfer, bool) + assert "diagnostics" in response.metadata + + @pytest.mark.asyncio + async def test_generate_with_llm_client(self): + """ + [AC-AISVC-02] Test generation with LLM client. + Should use LLM response. + """ + mock_llm = MockLLMClient(response_content="This is the AI response.") + orchestrator = OrchestratorService( + llm_client=mock_llm, + config=OrchestratorConfig(enable_rag=False), + ) + + request = create_chat_request(message="Hello") + response = await orchestrator.generate( + tenant_id="tenant-1", + request=request, + ) + + assert response.reply == "This is the AI response." + assert mock_llm._generate_called is True + + @pytest.mark.asyncio + async def test_generate_with_memory_service(self): + """ + [AC-AISVC-13] Test generation with memory service. + Should load history and save messages. + """ + mock_memory = MockMemoryService( + history=[ + ChatMessageEntity( + tenant_id="tenant-1", + session_id="test-session", + role="user", + content="Previous message", + ) + ] + ) + mock_llm = MockLLMClient() + + orchestrator = OrchestratorService( + llm_client=mock_llm, + memory_service=mock_memory, + config=OrchestratorConfig(enable_rag=False), + ) + + request = create_chat_request(message="New message") + response = await orchestrator.generate( + tenant_id="tenant-1", + request=request, + ) + + assert len(mock_memory._saved_messages) == 2 + assert mock_memory._saved_messages[0]["role"] == "user" + assert mock_memory._saved_messages[1]["role"] == "assistant" + + @pytest.mark.asyncio + async def test_generate_with_retrieval(self): + """ + [AC-AISVC-16, AC-AISVC-17] Test generation with RAG retrieval. + Should include evidence in LLM prompt. + """ + mock_retriever = MockRetriever( + hits=[ + RetrievalHit( + text="Product price is $100", + score=0.85, + source="kb", + ) + ] + ) + mock_llm = MockLLMClient() + + orchestrator = OrchestratorService( + llm_client=mock_llm, + retriever=mock_retriever, + config=OrchestratorConfig(enable_rag=True), + ) + + request = create_chat_request(message="What is the price?") + response = await orchestrator.generate( + tenant_id="tenant-1", + request=request, + ) + + assert "retrieval" in response.metadata["diagnostics"] + assert response.metadata["diagnostics"]["retrieval"]["hit_count"] == 1 + + @pytest.mark.asyncio + async def test_generate_with_context_merging(self): + """ + [AC-AISVC-14, AC-AISVC-15] Test context merging with external history. + Should merge local and external history. + """ + mock_memory = MockMemoryService( + history=[ + ChatMessageEntity( + tenant_id="tenant-1", + session_id="test-session", + role="user", + content="Local message", + ) + ] + ) + mock_llm = MockLLMClient() + + orchestrator = OrchestratorService( + llm_client=mock_llm, + memory_service=mock_memory, + config=OrchestratorConfig(enable_rag=False), + ) + + request = create_chat_request( + message="New message", + history=[ + ChatMessage(role=Role.USER, content="External message"), + ChatMessage(role=Role.ASSISTANT, content="External response"), + ], + ) + response = await orchestrator.generate( + tenant_id="tenant-1", + request=request, + ) + + assert "merged_context" in response.metadata["diagnostics"] + merged = response.metadata["diagnostics"]["merged_context"] + assert merged["local_count"] == 1 + assert merged["external_count"] == 2 + + @pytest.mark.asyncio + async def test_generate_with_confidence_calculation(self): + """ + [AC-AISVC-17, AC-AISVC-18, AC-AISVC-19] Test confidence calculation. + Should calculate confidence based on retrieval results. + """ + mock_retriever = MockRetriever( + hits=[ + RetrievalHit(text="High relevance content", score=0.9, source="kb"), + RetrievalHit(text="Medium relevance", score=0.8, source="kb"), + ] + ) + mock_llm = MockLLMClient() + + orchestrator = OrchestratorService( + llm_client=mock_llm, + retriever=mock_retriever, + config=OrchestratorConfig(enable_rag=True), + ) + + request = create_chat_request(message="Test query") + response = await orchestrator.generate( + tenant_id="tenant-1", + request=request, + ) + + assert response.confidence > 0.5 + assert "confidence" in response.metadata["diagnostics"] + + @pytest.mark.asyncio + async def test_generate_low_confidence_triggers_transfer(self): + """ + [AC-AISVC-18, AC-AISVC-19] Test low confidence triggers transfer. + Should set should_transfer=True when confidence is low. + """ + mock_retriever = MockRetriever(hits=[]) + mock_llm = MockLLMClient() + + orchestrator = OrchestratorService( + llm_client=mock_llm, + retriever=mock_retriever, + config=OrchestratorConfig(enable_rag=True), + ) + + request = create_chat_request(message="Unknown topic") + response = await orchestrator.generate( + tenant_id="tenant-1", + request=request, + ) + + assert response.should_transfer is True + assert response.transfer_reason is not None + + @pytest.mark.asyncio + async def test_generate_handles_llm_error(self): + """ + [AC-AISVC-02] Test handling of LLM errors. + Should return fallback response on error. + """ + mock_llm = MagicMock() + mock_llm.generate = AsyncMock(side_effect=Exception("LLM unavailable")) + + orchestrator = OrchestratorService( + llm_client=mock_llm, + config=OrchestratorConfig(enable_rag=False), + ) + + request = create_chat_request(message="Hello") + response = await orchestrator.generate( + tenant_id="tenant-1", + request=request, + ) + + assert response.reply is not None + assert "llm_error" in response.metadata["diagnostics"] + + @pytest.mark.asyncio + async def test_generate_handles_retrieval_error(self): + """ + [AC-AISVC-16] Test handling of retrieval errors. + Should continue with empty retrieval result. + """ + mock_retriever = MagicMock() + mock_retriever.retrieve = AsyncMock(side_effect=Exception("Qdrant unavailable")) + mock_llm = MockLLMClient() + + orchestrator = OrchestratorService( + llm_client=mock_llm, + retriever=mock_retriever, + config=OrchestratorConfig(enable_rag=True), + ) + + request = create_chat_request(message="Hello") + response = await orchestrator.generate( + tenant_id="tenant-1", + request=request, + ) + + assert response.reply == "Mock LLM response" + assert "retrieval_error" in response.metadata["diagnostics"] + + @pytest.mark.asyncio + async def test_generate_full_pipeline_integration(self): + """ + [AC-AISVC-01, AC-AISVC-02] Test complete pipeline integration. + All components working together. + """ + mock_memory = MockMemoryService( + history=[ + ChatMessageEntity( + tenant_id="tenant-1", + session_id="test-session", + role="user", + content="Previous question", + ), + ChatMessageEntity( + tenant_id="tenant-1", + session_id="test-session", + role="assistant", + content="Previous answer", + ), + ] + ) + mock_retriever = MockRetriever( + hits=[ + RetrievalHit(text="Knowledge base content", score=0.85, source="kb"), + ] + ) + mock_llm = MockLLMClient(response_content="AI generated response") + + orchestrator = OrchestratorService( + llm_client=mock_llm, + memory_service=mock_memory, + retriever=mock_retriever, + config=OrchestratorConfig(enable_rag=True), + ) + + request = create_chat_request( + message="New question", + history=[ + ChatMessage(role=Role.USER, content="External history"), + ], + ) + response = await orchestrator.generate( + tenant_id="tenant-1", + request=request, + ) + + assert response.reply == "AI generated response" + assert response.confidence > 0.0 + assert len(mock_memory._saved_messages) == 2 + + diagnostics = response.metadata["diagnostics"] + assert diagnostics["memory_enabled"] is True + assert diagnostics["retrieval"]["hit_count"] == 1 + assert diagnostics["llm_mode"] == "live" + + +class TestOrchestratorServiceGenerationContext: + """Tests for GenerationContext dataclass.""" + + def test_generation_context_initialization(self): + """Test GenerationContext initialization.""" + ctx = GenerationContext( + tenant_id="tenant-1", + session_id="session-1", + current_message="Hello", + channel_type="wechat", + ) + + assert ctx.tenant_id == "tenant-1" + assert ctx.session_id == "session-1" + assert ctx.current_message == "Hello" + assert ctx.channel_type == "wechat" + assert ctx.local_history == [] + assert ctx.diagnostics == {} + + def test_generation_context_with_metadata(self): + """Test GenerationContext with metadata.""" + ctx = GenerationContext( + tenant_id="tenant-1", + session_id="session-1", + current_message="Hello", + channel_type="wechat", + request_metadata={"user_id": "user-123"}, + ) + + assert ctx.request_metadata == {"user_id": "user-123"} + + +class TestOrchestratorConfig: + """Tests for OrchestratorConfig dataclass.""" + + def test_default_config(self): + """Test default configuration values.""" + config = OrchestratorConfig() + + assert config.max_history_tokens == 4000 + assert config.max_evidence_tokens == 2000 + assert config.enable_rag is True + assert "智能客服" in config.system_prompt + + def test_custom_config(self): + """Test custom configuration values.""" + config = OrchestratorConfig( + max_history_tokens=8000, + enable_rag=False, + system_prompt="Custom prompt", + ) + + assert config.max_history_tokens == 8000 + assert config.enable_rag is False + assert config.system_prompt == "Custom prompt" + + +class TestOrchestratorServiceHelperMethods: + """Tests for OrchestratorService helper methods.""" + + def test_build_llm_messages_basic(self): + """Test _build_llm_messages with basic context.""" + orchestrator = OrchestratorService( + config=OrchestratorConfig(enable_rag=False), + ) + + ctx = GenerationContext( + tenant_id="tenant-1", + session_id="session-1", + current_message="Hello", + channel_type="wechat", + ) + + messages = orchestrator._build_llm_messages(ctx) + + assert len(messages) == 2 + assert messages[0]["role"] == "system" + assert messages[1]["role"] == "user" + assert messages[1]["content"] == "Hello" + + def test_build_llm_messages_with_evidence(self): + """Test _build_llm_messages includes evidence from retrieval.""" + orchestrator = OrchestratorService( + config=OrchestratorConfig(enable_rag=True), + ) + + ctx = GenerationContext( + tenant_id="tenant-1", + session_id="session-1", + current_message="What is the price?", + channel_type="wechat", + retrieval_result=RetrievalResult( + hits=[ + RetrievalHit(text="Price is $100", score=0.9, source="kb"), + ] + ), + ) + + messages = orchestrator._build_llm_messages(ctx) + + assert "知识库参考内容" in messages[0]["content"] + assert "Price is $100" in messages[0]["content"] + + def test_build_llm_messages_with_history(self): + """Test _build_llm_messages includes merged history.""" + from app.services.context import MergedContext + + orchestrator = OrchestratorService( + config=OrchestratorConfig(enable_rag=False), + ) + + ctx = GenerationContext( + tenant_id="tenant-1", + session_id="session-1", + current_message="New question", + channel_type="wechat", + merged_context=MergedContext( + messages=[ + {"role": "user", "content": "Previous question"}, + {"role": "assistant", "content": "Previous answer"}, + ] + ), + ) + + messages = orchestrator._build_llm_messages(ctx) + + assert len(messages) == 4 + assert messages[1]["role"] == "user" + assert messages[1]["content"] == "Previous question" + assert messages[2]["role"] == "assistant" + assert messages[3]["role"] == "user" + assert messages[3]["content"] == "New question" + + def test_fallback_response_with_evidence(self): + """Test _fallback_response when retrieval has evidence.""" + orchestrator = OrchestratorService() + + ctx = GenerationContext( + tenant_id="tenant-1", + session_id="session-1", + current_message="Question", + channel_type="wechat", + retrieval_result=RetrievalResult( + hits=[RetrievalHit(text="Evidence", score=0.8, source="kb")] + ), + ) + + fallback = orchestrator._fallback_response(ctx) + assert "知识库" in fallback + + def test_fallback_response_without_evidence(self): + """Test _fallback_response when no retrieval evidence.""" + orchestrator = OrchestratorService() + + ctx = GenerationContext( + tenant_id="tenant-1", + session_id="session-1", + current_message="Question", + channel_type="wechat", + retrieval_result=RetrievalResult(hits=[]), + ) + + fallback = orchestrator._fallback_response(ctx) + assert "无法处理" in fallback or "人工客服" in fallback + + def test_format_evidence(self): + """Test _format_evidence formats hits correctly.""" + orchestrator = OrchestratorService() + + result = RetrievalResult( + hits=[ + RetrievalHit(text="First result", score=0.9, source="kb"), + RetrievalHit(text="Second result", score=0.8, source="kb"), + ] + ) + + formatted = orchestrator._format_evidence(result) + + assert "[1]" in formatted + assert "[2]" in formatted + assert "First result" in formatted + assert "Second result" in formatted + + +class TestOrchestratorServiceSetInstance: + """Tests for set_orchestrator_service function.""" + + def test_set_orchestrator_service(self): + """Test setting orchestrator service instance.""" + custom_orchestrator = OrchestratorService( + config=OrchestratorConfig(enable_rag=False), + ) + + set_orchestrator_service(custom_orchestrator) + + from app.services.orchestrator import get_orchestrator_service + + instance = get_orchestrator_service() + assert instance is custom_orchestrator diff --git a/docs/progress/ai-service-progress.md b/docs/progress/ai-service-progress.md index 0959593..3fac815 100644 --- a/docs/progress/ai-service-progress.md +++ b/docs/progress/ai-service-progress.md @@ -25,7 +25,7 @@ - [x] Phase 1: 基础设施(FastAPI 框架与多租户基础) (100%) ✅ - [x] Phase 2: 存储与检索实现(Memory & Retrieval) (100%) ✅ -- [ ] Phase 3: 核心编排(Orchestrator & LLM Adapter) (60%) 🔄 +- [ ] Phase 3: 核心编排(Orchestrator & LLM Adapter) (80%) 🔄 - [ ] Phase 4: 流式响应(SSE 实现与状态机) (0%) ⏳ - [ ] Phase 5: 集成与冒烟测试(Quality Assurance) (0%) ⏳ @@ -41,28 +41,19 @@ #### Phase 3: 核心编排(Orchestrator & LLM Adapter) - [x] T3.1 实现 LLM Adapter:封装 `langchain-openai` 或官方 SDK,支持 `generate` 与 `stream_generate` `[AC-AISVC-02, AC-AISVC-06]` ✅ - [x] T3.2 实现 Orchestrator:实现上下文合并逻辑(H_local + H_ext 的去重与截断策略) `[AC-AISVC-14, AC-AISVC-15]` ✅ -- [ ] T3.3 实现 Orchestrator:实现 RAG 检索不足时的置信度下调与 `shouldTransfer` 逻辑 `[AC-AISVC-17, AC-AISVC-18, AC-AISVC-19]` ⏳ -- [ ] T3.4 实现 Orchestrator:整合 Memory、Retrieval 与 LLM 完成 non-streaming 生成闭环 `[AC-AISVC-01, AC-AISVC-02]` ⏳ +- [x] T3.3 实现 Orchestrator:实现 RAG 检索不足时的置信度下调与 `shouldTransfer` 逻辑 `[AC-AISVC-17, AC-AISVC-18, AC-AISVC-19]` ✅ +- [x] T3.4 实现 Orchestrator:整合 Memory、Retrieval 与 LLM 完成 non-streaming 生成闭环 `[AC-AISVC-01, AC-AISVC-02]` ✅ - [ ] T3.5 验证 non-streaming 响应字段完全符合 `openapi.provider.yaml` 契约 `[AC-AISVC-02]` ⏳ ### Next Action (Must be Specific) -**Immediate**: 执行 T3.3 - 实现 RAG 检索不足时的置信度下调与 `shouldTransfer` 逻辑。 +**Immediate**: 执行 T3.5 - 验证 non-streaming 响应字段完全符合 `openapi.provider.yaml` 契约。 **Details**: -1. 定义"检索不足"的判定条件: - - `hits.size < minHits` - - `max(score) < scoreThreshold` - - evidence token 超限 -2. 实现置信度计算策略: - - 基于检索分数计算 confidence - - 检索不足时下调 confidence -3. 实现 `shouldTransfer` 逻辑: - - 当 `confidence < T_low` 时 `shouldTransfer=true` - - 添加 `transferReason` 说明 -4. reference: - - `spec/ai-service/design.md` Section 4.3 - 检索不中兜底与置信度策略 - - `spec/ai-service/requirements.md` AC-AISVC-17, AC-AISVC-18, AC-AISVC-19 +1. 验证 ChatResponse 字段与 OpenAPI 契约一致性 +2. 确保 reply、confidence、shouldTransfer 必填字段正确 +3. 验证 transferReason、metadata 可选字段处理 +4. 编写契约验证测试 --- @@ -80,7 +71,9 @@ - `base.py` - LLMClient 抽象接口 - `openai_client.py` - OpenAI 兼容客户端 - `memory.py` - Memory 服务 - - `orchestrator.py` - 编排服务 + - `orchestrator.py` - 编排服务 ✅ (完整实现) + - `context.py` - 上下文合并 ✅ + - `confidence.py` - 置信度计算 ✅ - `retrieval/` - 检索层 - `tests/` - 单元测试 @@ -94,6 +87,14 @@ reason: 简单可靠的重试机制 impact: 提高服务稳定性 +- decision: Orchestrator 使用依赖注入模式 + reason: 便于测试和组件替换 + impact: 所有组件可通过构造函数注入 + +- decision: 使用 GenerationContext 数据类追踪生成流程 + reason: 清晰追踪中间结果和诊断信息 + impact: 便于调试和问题排查 + ### Code Snippets ```python @@ -103,6 +104,16 @@ response = await llm_client.generate(messages, config=LLMConfig(...)) # [AC-AISVC-06] Streaming generation async for chunk in llm_client.stream_generate(messages): yield create_message_event(delta=chunk.delta) + +# [AC-AISVC-01] Complete generation pipeline +orchestrator = OrchestratorService( + llm_client=llm_client, + memory_service=memory_service, + retriever=retriever, + context_merger=context_merger, + confidence_calculator=confidence_calculator, +) +response = await orchestrator.generate(tenant_id, request) ``` --- @@ -124,6 +135,43 @@ async for chunk in llm_client.stream_generate(messages): - 更新 `app/core/config.py` 添加 LLM 配置 - 修复 `app/models/entities.py` JSON 列类型 +### Session #2 (2026-02-24) +- completed: + - T3.2 实现上下文合并逻辑 + - 创建 ContextMerger 类 (context.py) + - 实现消息指纹计算 (SHA256) + - 实现去重和截断策略 + - 编写单元测试 (test_context.py) +- changes: + - 新增 `app/services/context.py` + - 新增 `tests/test_context.py` + +### Session #3 (2026-02-24) +- completed: + - T3.3 实现置信度计算与转人工逻辑 + - 创建 ConfidenceCalculator 类 (confidence.py) + - 实现检索不足判定 + - 实现置信度计算策略 + - 实现 shouldTransfer 逻辑 + - 编写单元测试 (test_confidence.py) +- changes: + - 新增 `app/services/confidence.py` + - 新增 `tests/test_confidence.py` + - 更新 `app/core/config.py` 添加置信度配置 + +### Session #4 (2026-02-24) +- completed: + - T3.4 实现 Orchestrator 完整生成闭环 + - 整合 Memory、ContextMerger、Retriever、LLMClient、ConfidenceCalculator + - 实现 generate() 方法完整流程 (8 步) + - 创建 GenerationContext 数据类追踪生成流程 + - 实现 fallback 响应机制 + - 编写单元测试 (test_orchestrator.py, 21 tests) +- changes: + - 更新 `app/services/orchestrator.py` 完整实现 + - 新增 `tests/test_orchestrator.py` +- tests_passed: 138 tests (all passing) + --- ## 🚀 Startup Guide diff --git a/spec/ai-service-admin/requirements.md b/spec/ai-service-admin/requirements.md new file mode 100644 index 0000000..f6f2f2c --- /dev/null +++ b/spec/ai-service-admin/requirements.md @@ -0,0 +1,72 @@ +--- +feature_id: "ASA" +title: "AI 中台管理界面(ai-service-admin)需求规范" +status: "draft" +version: "0.1.0" +owners: + - "product" + - "frontend" + - "backend" +last_updated: "2026-02-24" +source: + type: "conversation" + ref: "Scoping Result Confirmed" +--- + +# AI 中台管理界面(ASA) + +## 1. 背景与目标 +- **背景**:随着 AI 中台(ai-service)功能的增强,需要一套专业的管理后台来支持知识库运维、Prompt 工程迭代及 RAG 效果调优。 +- **目标**:提供租户维度的 AI 资产管理能力,实现 RAG 链路的可视化调试与全链路监控。 +- **非目标**:不包含推理引擎实现,不包含面向 C 端的交互 UI。 + +## 2. 模块边界(Scope) +- **覆盖**:知识库空间与文档管理、索引任务监控、租户级 Prompt 模板版本管理、RAG 实验对比、全局会话审计。 +- **不覆盖**:底层向量数据库维护、模型微调训练。 + +## 3. 依赖盘点(Dependencies) +- **依赖模块**: + - `ai-service`:提供所有的管理类 RESTful API(见 `openapi.admin.yaml`)。 + - `RuoYi-Vue-Plus` (或类似基座):提供用户认证、权限校验及菜单框架。 + +## 4. 用户故事(User Stories) + +### 4.1 知识库管理 +- [US-ASA-01] 作为租户管理员,我希望能够上传不同格式的文档到指定的知识空间,以便为特定的 AI 场景提供上下文。 +- [US-ASA-02] 作为运维人员,我希望实时查看索引构建任务的进度和错误原因,以便及时处理构建失败的情况。 + +### 4.2 Prompt 工程 +- [US-ASA-03] 作为 Prompt 工程师,我希望对不同场景的 Prompt 进行版本化管理,以便在效果下降时能够快速回滚。 + +### 4.3 RAG 效果实验室 +- [US-ASA-04] 作为 AI 开发者,我希望在后台直接输入问题并查看检索到的文档分片和原始上下文,以便定位召回不准确的问题。 + +### 4.4 监控审计 +- [US-ASA-05] 作为安全合规人员,我希望审计所有租户的会话记录,并查看单次回答的耗时与资源消耗,以便进行成本核算与安全管控。 + +## 5. 验收标准(Acceptance Criteria, EARS) + +### 知识库管理(KB) +- [AC-ASA-01] WHEN 提交文档上传 THEN 系统 SHALL 异步启动索引任务,并返回任务 ID。 +- [AC-ASA-02] WHEN 索引任务失败 THEN 系统 SHALL 在管理界面高亮显示,并提供“详细错误”查询入口。 + +### Prompt 管理(Prompt) +- [AC-ASA-03] WHEN 发布新版 Prompt THEN 系统 SHALL 自动将旧版标记为“历史版本”,且同一时间只有一个“已发布”版本。 +- [AC-ASA-04] WHEN 编辑 Prompt 时 THEN 系统 SHALL 提供内置变量提示(如 `{{context}}`, `{{query}}`)。 + +### RAG 实验室(RAG Lab) +- [AC-ASA-05] WHEN 运行 RAG 实验 THEN 系统 SHALL 展示 Top-K 检索片段、得分、来源文档及最终生成的提示词。 +- [AC-ASA-06] WHEN 多版本对比时 THEN 系统 SHALL 支持在同一屏幕展示不同配置下的召回差异。 + +### 会话监控(Audit) +- [AC-ASA-07] WHEN 查看会话详情 THEN 系统 SHALL 展示完整的消息链路,包括中间的工具调用(Tool Calls)和检索命中记录。 + +## 6. 追踪映射(Traceability) + +| AC ID | Endpoint | 方法 | 备注 | +|------|----------|------|-----| +| AC-ASA-01 | /admin/kb/documents | POST | 上传文档并创建任务 | +| AC-ASA-02 | /admin/kb/index/jobs/{jobId} | GET | 查询任务详情与错误 | +| AC-ASA-03 | /admin/config/prompt-templates/{tplId}/publish | POST | 发布指定版本 | +| AC-ASA-05 | /admin/rag/experiments/run | POST | 触发调试实验 | +| AC-ASA-07 | /admin/sessions/{sessionId} | GET | 获取全链路详情 | diff --git a/spec/ai-service-admin/scope.md b/spec/ai-service-admin/scope.md new file mode 100644 index 0000000..277b976 --- /dev/null +++ b/spec/ai-service-admin/scope.md @@ -0,0 +1,171 @@ +--- +module: ai-service-admin +title: "AI 中台管理界面(ai-service-admin)— Scoping(定界)结果" +status: draft +version: 0.1.0 +owners: + - product + - frontend + - backend +last_updated: 2026-02-24 +methodology: + - spec-driven + - api-first +--- + +# AI 中台管理界面(ai-service-admin)— Scoping(定界)结果 + +> 本文档为“规范驱动 + 接口先行”的第 0 阶段产出:模块边界澄清 + 依赖接口盘点(Consumer-First)。 +> +> 输出完成后**停止**,等待确认;在确认前不进入 requirements/OpenAPI/Design/Tasks 的正式编写。 + +## 1) 模块职责(Module Responsibilities) + +### 1.1 模块定位 +`ai-service-admin` 是一个**独立前端管理模块**,面向运营/管理员/租户管理员,用于管理 AI 中台的知识库资产、租户级配置、RAG 调试与全局会话观测。 + +### 1.2 包含(In Scope) +必须覆盖以下能力(以管理与观测为主): + +1. **知识库上传/索引管理** + - 数据源上传(文件/文本/URL 等,具体形态由后续 requirements 细化) + - 数据源状态查看、重试、删除/下线 + - 索引构建(触发/进度/失败原因)与文档分片/向量化相关的可观测信息(仅展示,不实现算法) + - 基础检索配置(如 topK、召回阈值、chunk 策略等):以“配置项编辑 + 生效范围”呈现 + +2. **租户级 Prompt 模板配置** + - 租户维度的 Prompt 模板 CRUD、版本管理(草案/发布)、回滚 + - 模板与使用场景(如:RAG 问答、工具调用、摘要等)的绑定(场景枚举由后端定义) + - 变量占位符与校验提示(前端校验为体验增强,最终以后端校验为准) + +3. **RAG 检索效果实验室(调试窗)** + - 输入问题 → 触发“检索/重排/上下文构造”调试链路 + - 展示:召回文档列表(含得分/来源/片段)、重排结果、最终上下文、以及关键日志/耗时 + - 支持对比实验(如不同检索参数、不同 prompt 版本) + - 支持保存/复用实验配置与结果快照(用于回归对比) + +4. **全局会话监控** + - 多租户维度的会话列表、筛选(时间、租户、用户标识、会话状态、模型/渠道等) + - 查看单次会话详情:消息流、检索命中、工具调用、错误栈/错误码、耗时 + - 基础统计:会话量、失败率、平均耗时、Top 知识库/Top Prompt 版本使用情况(指标范围后续细化) + +### 1.3 不包含(Out of Scope) +明确不在本模块实现范围内: + +- **具体 AI 推理逻辑**(由 `ai-service` 负责),包括但不限于:LLM 调用、embedding 计算、rerank 算法、提示词编排执行引擎本体。 +- **用户侧聊天界面**(由 `ai-robot` 负责),即面向终端用户的对话交互 UI。 +- **底层存储/索引实现**(向量库、全文检索、对象存储等)与运维部署策略。 + +### 1.4 边界接口原则 +- `ai-service-admin` 只作为 **Consumer(调用方)** 消费 Python 后端(`ai-service`)提供的“管理类/观测类 API”。 +- 所有“调试/实验”动作也通过管理 API 触发,前端不直接连接向量库或模型。 + +--- + +## 2) 技术栈建议(Tech Stack Recommendation) + +### 2.1 选型建议 +优先推荐:**Vue 3 + Element Plus**(与 RuoYi-Vue 生态对齐)。 +备选:React(若团队已有成熟 React 组件体系与工程基座)。 + +### 2.2 选择理由(面向本模块) +- RuoYi-Vue 管理后台形态与 Element Plus 组件库天然匹配(表格/表单/弹窗/权限路由)。 +- 本模块以“配置管理 + 列表筛选 + 详情查看 + 调试面板”交互为主,Element Plus 现成组件覆盖度高。 + +### 2.3 工程与基础能力(建议纳入后续 design) +- 多租户与权限:建议前端采用**路由级权限 + 按钮级权限**(能力来源于后端返回的权限集)。 +- 国际化:可选(若需要多语言运营)。 +- 可观测性:前端埋点/日志仅做体验与错误上报;业务日志以服务端为准。 + +--- + +## 3) 依赖接口清单(Consumer-First,管理类接口草案) + +> 说明:以下为 `ai-service-admin` 作为调用方所需的**最小管理接口能力清单**(草案)。 +> - 路径以 `/admin/*` 为主。 +> - 最终将以 `ai-service` 新增的 `openapi.admin.yaml` 固化。 +> - 这里先列“能力/端点草案”,不写具体 schema,以便先对齐边界。 + +### 3.1 认证与通用能力 +- `GET /admin/me`:获取当前登录信息(含租户、角色、权限点) +- `GET /admin/tenants`:租户列表(平台管理员) +- `GET /admin/enums`:获取前端需要的枚举/常量(场景枚举、状态枚举、错误码映射等) + +### 3.2 知识库(KB)管理:`/admin/kb/*` +- `GET /admin/kb/spaces`:知识空间/知识库列表(按租户) +- `POST /admin/kb/spaces`:创建知识空间/知识库 +- `GET /admin/kb/spaces/{kbId}`:知识库详情(含统计/配置摘要) +- `PATCH /admin/kb/spaces/{kbId}`:更新知识库元信息/配置 +- `DELETE /admin/kb/spaces/{kbId}`:删除/下线知识库 + +- `GET /admin/kb/documents`:文档列表(支持 kbId、状态、时间、来源筛选) +- `POST /admin/kb/documents`:上传/导入文档(multipart 或任务式导入) +- `GET /admin/kb/documents/{docId}`:文档详情(含分片/索引状态) +- `DELETE /admin/kb/documents/{docId}`:删除文档 + +- `POST /admin/kb/index/jobs`:触发索引构建/重建(kbId/docId 维度) +- `GET /admin/kb/index/jobs`:索引任务列表(状态/时间筛选) +- `GET /admin/kb/index/jobs/{jobId}`:索引任务详情(进度、失败原因、日志摘要) +- `POST /admin/kb/index/jobs/{jobId}/retry`:失败任务重试 +- `POST /admin/kb/index/jobs/{jobId}/cancel`:取消任务(若支持) + +- `GET /admin/kb/search/config`:读取检索参数默认配置(租户级/KB 级) +- `PUT /admin/kb/search/config`:更新检索参数默认配置 + +### 3.3 Prompt 模板(租户级)配置:`/admin/config/*` +- `GET /admin/config/prompt-templates`:模板列表(支持场景筛选) +- `POST /admin/config/prompt-templates`:创建模板(草案) +- `GET /admin/config/prompt-templates/{tplId}`:模板详情(含版本历史) +- `PATCH /admin/config/prompt-templates/{tplId}`:更新模板(草案编辑) +- `POST /admin/config/prompt-templates/{tplId}/publish`:发布某版本 +- `POST /admin/config/prompt-templates/{tplId}/rollback`:回滚到指定版本 +- `DELETE /admin/config/prompt-templates/{tplId}`:删除模板 + +- `GET /admin/config/prompt-variables`:可用变量/内置函数清单(用于编辑器提示) + +### 3.4 RAG 检索效果实验室(调试):`/admin/rag/*` +- `POST /admin/rag/experiments/run`:运行一次 RAG 调试实验(输入 query + 参数集 + 可选 kbId/promptVersion) +- `GET /admin/rag/experiments`:实验记录列表(按租户/操作者/时间) +- `GET /admin/rag/experiments/{expId}`:实验详情(召回、重排、上下文、日志、耗时) +- `POST /admin/rag/experiments/{expId}/clone`:复制实验为新草案 +- `DELETE /admin/rag/experiments/{expId}`:删除实验记录 + +- `GET /admin/rag/diagnostics/samples`:获取预置样例(用于快速回归) + +### 3.5 全局会话监控:`/admin/sessions/*` +- `GET /admin/sessions`:会话列表(多维筛选) +- `GET /admin/sessions/{sessionId}`:会话详情(消息流、检索命中、工具调用、错误、耗时) + +- `GET /admin/sessions/stats/overview`:概览统计(时间范围 + 租户维度) +- `GET /admin/sessions/stats/top`:Top 指标(Top KB / Top Prompt / Top 错误码等) + +### 3.6 审计与运维(可选但常见) +- `GET /admin/audit/logs`:管理操作审计日志 +- `GET /admin/system/health`:服务健康/版本信息(用于后台页脚或诊断) + +### 3.7 统一错误模型(约定) +- 所有 `/admin/*` 接口建议返回统一错误结构(如 `code`, `message`, `requestId`, `details[]`),以支持后台调试与问题定位。 + +--- + +## 4) 产出物计划(Artifacts Plan) + +> 按方法论,本模块后续应产出 4 类核心工件;本次 Scoping 仅做计划,不生成内容(等待确认)。 + +1. `spec/ai-service-admin/requirements.md` + - 管理后台的用户故事与验收标准(EARS) + - Scope/Dependencies/Traceability + +2. `spec/ai-service/openapi.admin.yaml`(在 `ai-service` 下新增) + - 作为 `ai-service` 的 **admin provider** 契约(面向本后台) + - 标记 `info.x-contract-level`,并对关键 operationId 提供 L1/L2 所需字段 + +3. `spec/ai-service-admin/design.md` + - 前端信息架构(IA):菜单/页面/路由/权限点 + - 状态管理、缓存策略、分页/筛选模式 + - 调试实验室的交互与可观测性设计 + - 错误处理与追踪(requestId) + +4. `spec/ai-service-admin/tasks.md` + - 按页面/能力拆分的原子任务(含与 AC 的映射) + - 并行策略:基于 admin OpenAPI 生成 Mock/SDK diff --git a/spec/ai-service/openapi.admin.yaml b/spec/ai-service/openapi.admin.yaml new file mode 100644 index 0000000..7edd03d --- /dev/null +++ b/spec/ai-service/openapi.admin.yaml @@ -0,0 +1,209 @@ +openapi: 3.1.0 +info: + title: "AI Service Admin API" + description: "AI 中台管理类接口契约(Provider: ai-service),支持 ai-service-admin 模块进行知识库、Prompt 及 RAG 调试管理。" + version: "0.1.0" + x-contract-level: L0 # 初始占位/可 Mock 级别 +components: + parameters: + XTenantId: + name: X-Tenant-Id + in: header + required: true + schema: + type: string + description: "租户ID,用于物理隔离知识库与数据" + responses: + Unauthorized: + description: "未认证(缺少或无效的认证信息)" + Forbidden: + description: "无权限(当前身份无权访问该资源)" + +paths: + /admin/kb/documents: + post: + summary: "上传/导入文档" + operationId: "uploadDocument" + tags: + - KB Management + x-requirements: ["AC-ASA-01"] + parameters: + - $ref: "#/components/parameters/XTenantId" + requestBody: + required: true + content: + multipart/form-data: + schema: + type: object + properties: + file: + type: string + format: binary + kbId: + type: string + responses: + '202': + description: "已接受上传请求,异步启动索引任务" + content: + application/json: + schema: + type: object + properties: + jobId: + type: string + status: + type: string + enum: [pending, processing] + '401': + $ref: "#/components/responses/Unauthorized" + '403': + $ref: "#/components/responses/Forbidden" + /admin/kb/index/jobs/{jobId}: + get: + summary: "查询索引任务详情" + operationId: "getIndexJob" + tags: + - KB Management + x-requirements: ["AC-ASA-02"] + parameters: + - $ref: "#/components/parameters/XTenantId" + - name: jobId + in: path + required: true + schema: + type: string + responses: + '200': + description: "任务状态详情" + content: + application/json: + schema: + type: object + properties: + jobId: + type: string + status: + type: string + enum: [pending, processing, completed, failed] + progress: + type: integer + minimum: 0 + maximum: 100 + errorMsg: + type: string + nullable: true + /admin/config/prompt-templates/{tplId}/publish: + post: + summary: "发布指定版本的 Prompt 模板" + operationId: "publishPromptTemplate" + tags: + - Prompt Management + x-requirements: ["AC-ASA-03"] + parameters: + - $ref: "#/components/parameters/XTenantId" + - name: tplId + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + type: object + properties: + version: + type: string + responses: + '200': + description: "发布成功" + '401': + $ref: "#/components/responses/Unauthorized" + '403': + $ref: "#/components/responses/Forbidden" + /admin/rag/experiments/run: + post: + summary: "触发 RAG 调试实验" + operationId: "runRagExperiment" + tags: + - RAG Lab + x-requirements: ["AC-ASA-05"] + parameters: + - $ref: "#/components/parameters/XTenantId" + requestBody: + required: true + content: + application/json: + schema: + type: object + properties: + query: + type: string + kbIds: + type: array + items: + type: string + params: + type: object + description: "检索参数集" + responses: + '200': + description: "实验结果" + content: + application/json: + schema: + type: object + properties: + retrievalResults: + type: array + items: + type: object + properties: + content: + type: string + score: + type: number + format: float + source: + type: string + finalPrompt: + type: string + '401': + $ref: "#/components/responses/Unauthorized" + '403': + $ref: "#/components/responses/Forbidden" + /admin/sessions/{sessionId}: + get: + summary: "获取会话详情" + operationId: "getSessionDetail" + tags: + - Session Monitoring + x-requirements: ["AC-ASA-07"] + parameters: + - $ref: "#/components/parameters/XTenantId" + - name: sessionId + in: path + required: true + schema: + type: string + responses: + '200': + description: "全链路会话详情" + content: + application/json: + schema: + type: object + properties: + sessionId: + type: string + messages: + type: array + items: + type: object + trace: + type: object + description: "含检索、工具调用等追踪信息" + '401': + $ref: "#/components/responses/Unauthorized" + '403': + $ref: "#/components/responses/Forbidden" diff --git a/spec/ai-service/tasks.md b/spec/ai-service/tasks.md index 836af19..7b0e182 100644 --- a/spec/ai-service/tasks.md +++ b/spec/ai-service/tasks.md @@ -34,7 +34,7 @@ last_updated: "2026-02-24" - [x] T3.1 实现 LLM Adapter:封装 `langchain-openai` 或官方 SDK,支持 `generate` 与 `stream_generate` `[AC-AISVC-02, AC-AISVC-06]` ✅ - [x] T3.2 实现 Orchestrator:实现上下文合并逻辑(H_local + H_ext 的去重与截断策略) `[AC-AISVC-14, AC-AISVC-15]` ✅ - [x] T3.3 实现 Orchestrator:实现 RAG 检索不足时的置信度下调与 `shouldTransfer` 逻辑 `[AC-AISVC-17, AC-AISVC-18, AC-AISVC-19]` ✅ -- [ ] T3.4 实现 Orchestrator:整合 Memory、Retrieval 与 LLM 完成 non-streaming 生成闭环 `[AC-AISVC-01, AC-AISVC-02]` +- [x] T3.4 实现 Orchestrator:整合 Memory、Retrieval 与 LLM 完成 non-streaming 生成闭环 `[AC-AISVC-01, AC-AISVC-02]` ✅ - [ ] T3.5 验证 non-streaming 响应字段完全符合 `openapi.provider.yaml` 契约 `[AC-AISVC-02]` ### Phase 4: 流式响应(SSE 实现与状态机)