ai-robot-core/spec/ai-service/design.md

13 KiB
Raw Permalink Blame History

feature_id title status version last_updated inputs
AISVC Python AI 中台ai-service技术设计 draft 0.1.0 2026-02-24
spec/ai-service/requirements.md
spec/ai-service/openapi.provider.yaml
java/openapi.deps.yaml

Python AI 中台ai-service技术设计AISVC

1. 设计目标与约束

1.1 设计目标

  • 落地 POST /ai/chatnon-streaming JSONSSE streaming 两种返回模式,并确保与契约一致:
    • non-streaming 响应字段必须包含 reply/confidence/shouldTransfer
    • streaming 通过 Accept: text/event-stream 输出 message/final/error 事件序列。
  • 实现 AI 侧会话记忆:基于 (tenantId, sessionId) 持久化与加载。
  • 实现 RAGMVP向量检索并预留图谱检索Neo4j插件点。
  • 多租户隔离:
    • Qdrant一租户一 collection或一租户一 collection 前缀)。
    • PostgreSQLtenant_id 分区/索引,保证跨租户不可见。

1.2 硬约束(来自契约与需求)

  • API 对齐:/ai/chat/ai/health 的路径/方法/状态码与 Java 侧 deps 对齐。
  • 多租户:请求必须携带 X-Tenant-Id(网关/拦截器易处理),所有数据访问必须以 tenant_id 过滤。
  • SSE事件类型固定为 message/final/error,并保证顺序与异常语义清晰。

2. 总体架构与模块分层

2.1 分层概览

本服务按“职责单一 + 可插拔”的原则分为五层:

  1. API 层Transport / Controller
  • 职责:
    • HTTP 请求解析、参数校验(含 X-Tenant-Id)、鉴权/限流(如后续需要)。
    • 根据 Accept 头选择 non-streaming 或 SSE streaming。
    • 统一错误映射为 ErrorResponse
  • 输入:X-Tenant-Id header + ChatRequest body。
  • 输出:
    • JSONChatResponse
    • SSEmessage/final/error 事件流。
  1. 编排层Orchestrator / Use Case
  • 职责:
    • 整体流程编排:加载会话记忆 → 合并上下文 →可选RAG 检索 → 组装 prompt → 调用 LLM → 计算置信度与转人工建议 → 写回记忆。
    • 在 streaming 模式下,将 LLM 的增量输出转为 SSE message 事件,同时维护最终 reply
  • 输入:tenantId, sessionId, currentMessage, channelType, history?, metadata?
  • 输出:
    • non-streaming一次性 ChatResponse
    • streaming增量 token或片段流 + 最终 ChatResponse
  1. 记忆层Memory
  • 职责:
    • 持久化会话消息与摘要/记忆(最小:消息列表)。
    • 提供按 (tenantId, sessionId) 查询的会话上下文读取 API。
  • 存储PostgreSQL。
  1. 检索层Retrieval
  • 职责:
    • 提供统一 Retriever 抽象接口。
    • MVP 实现向量检索Qdrant
    • 插件点图谱检索Neo4j实现可新增而不改动 Orchestrator。
  1. LLM 适配层LLM Adapter
  • 职责:
    • 屏蔽不同 LLM 提供方差异(请求格式、流式回调、重试策略)。
    • 提供:一次性生成接口 + 流式生成接口yield token/delta

2.2 关键数据流(文字版)

  • API 层接收请求 → 提取 tenantIdHeader与 body → 调用 Orchestrator。
  • Orchestrator
    1. Memory.load(tenantId, sessionId)
    2. merge_context(local_history, external_history)
    3. Retrieval.retrieve(query, tenantId, channelType, metadata)MVP 向量检索)
    4. build_prompt(merged_history, retrieved_docs, currentMessage)
    5. LLM.generate(...)non-streaming或 LLM.stream_generate(...)streaming
    6. compute_confidence(…)
    7. Memory.append(tenantId, sessionId, user/assistant messages)
    8. 返回 ChatResponse(或通过 SSE 输出)。

3. API 与协议设计要点

3.1 tenantId 放置与处理

  • 主入口X-Tenant-Id header契约已声明 required
  • Orchestrator 与所有下游组件调用均显式传入 tenantId
  • 禁止使用仅 sessionId 定位会话,必须 (tenantId, sessionId)

3.2 streaming / non-streaming 模式判定

  • Accept 头作为唯一判定依据:
    • Accept: text/event-stream → SSE streaming。
    • 其他 → non-streaming JSON。

4. RAG 管道设计

4.1 MVP向量检索Qdrant流程

4.1.1 步骤

  1. Query 规范化
  • 输入:currentMessage(可结合 channelType 与 metadata
  • 规则:去噪、截断(防止超长)、可选的 query rewriteMVP 可不做)。
  1. Embedding
  • EmbeddingProvider 生成向量(可复用 LLM 适配层或独立适配层)。
  • 已确认Token 计数统一使用 tiktoken 进行精确计算(用于 history 截断与证据预算)。
  1. 向量检索Qdrant
  • 按租户隔离选择 collection见 5.1)。
  • 使用 topK + score threshold 过滤。
  1. 上下文构建
  • 将检索结果转为 “证据片段列表”,限制总 token 与片段数。
  • 生成 prompt 时区分:系统指令 / 对话历史 / 证据 / 当前问题。
  1. 生成与引用策略
  • 生成回答必须优先依据证据。
  • 若证据不足:触发兜底策略(见 4.3)。

4.1.2 关键参数MVP 默认,可配置)

  • topK例如 5~10
  • scoreThreshold相似度阈值
  • minHits最小命中文档数
  • maxEvidenceTokens证据总 token 上限)

4.2 图谱检索插件点Neo4j

4.2.1 Retriever 抽象接口(概念设计)

设计统一接口,使 Orchestrator 不关心向量/图谱差异:

  • Retriever.retrieve(ctx) -> RetrievalResult
    • 输入 ctx:包含 tenantId, query, sessionId, channelType, metadata 等。
    • 输出 RetrievalResult
      • hits[]:证据条目(统一为 text + score + source + metadata
      • diagnostics:检索调试信息(可选)

MVP 提供 VectorRetriever(Qdrant)

4.2.2 Neo4j 接入方式(未来扩展)

新增实现类 GraphRetriever(Neo4j),实现同一接口:

  • tenant 隔离Neo4j 可采用 database per tenant / label+tenantId 过滤 / subgraph per tenant视规模与授权能力选择
  • 输出同构 RetrievalResult,由 ContextBuilder 使用。

约束:新增 GraphRetriever 不应要求修改 API 层与 Orchestrator 的业务流程,只需配置切换(策略模式/依赖注入)。

4.3 检索不中兜底与置信度策略(对应 AC-AISVC-17/18/19

定义“检索不足”的判定:

  • hits.size < minHitsmax(score) < scoreThreshold 或 evidence token 超限导致可用证据过少。

兜底动作:

  1. 回复策略:
  • 明确表达“未从知识库确认/建议咨询人工/提供可执行下一步”。
  • 避免编造具体事实性结论。
  1. 置信度:
  • T_low 为阈值(可配置),检索不足场景通常产生较低 confidence
  1. 转人工建议:
  • confidence < T_lowshouldTransfer=true,可附 transferReason
  • 已确认MVP 阶段 confidence 优先基于 RAG 检索分数Score计算并结合检索不中兜底下调

5. 多租户隔离方案

5.1 Qdrant向量库隔离一租户一 Collection

5.1.1 命名规则

  • collection 命名:kb_{tenantId}(或 kb_{tenantId}_{kbName} 为未来多知识库预留)。

5.1.2 读写路径

  • 所有 upsert/search 操作必须先基于 tenantId 解析目标 collection。
  • 禁止在同一 collection 内通过 payload filter 做租户隔离作为默认方案(可作为兜底/迁移手段),原因:
    • 更容易出现误用导致跨租户泄露。
    • 运维与配额更难隔离(单租户删除、重建、统计)。

5.1.3 租户生命周期

  • tenant 创建:初始化 collection含向量维度与 index 参数)。
    • 已确认:采用提前预置模式,不通过业务请求动态创建 collection。
  • tenant 删除:删除 collection。
  • tenant 扩容:独立配置 HNSW 参数或分片(依赖 Qdrant 部署模式)。

5.2 PostgreSQL会话库分区与约束

5.2.1 表设计(概念)

  • chat_sessions

    • tenant_id (NOT NULL)
    • session_id (NOT NULL)
    • created_at, updated_at
    • 主键/唯一约束:(tenant_id, session_id)
  • chat_messages

    • tenant_id (NOT NULL)
    • session_id (NOT NULL)
    • message_id (UUID 或 bigserial)
    • role (user/assistant)
    • content (text)
    • created_at

5.2.2 分区策略

根据租户规模选择:

方案 AMVP 推荐):逻辑分区 + 复合索引

  • 不做 PG 分区表。
  • 建立索引:
    • chat_messages(tenant_id, session_id, created_at)
    • chat_sessions(tenant_id, session_id)
  • 好处:实现与运维简单。

方案 B规模化按 tenant_id 做 LIST/HASH 分区

  • chat_messagestenant_id 分区LIST 或 HASH
  • 适合租户数量有限且单租户数据量大,或需要更强隔离与清理效率。

5.2.3 防串租约束

  • 所有查询必须带 tenant_id 条件;在代码层面提供 TenantScopedRepository 强制注入。
  • 可选:启用 Row Level SecurityRLS并通过 SET app.tenant_id 做隔离(实现复杂度较高,后续可选)。

6. SSE 状态机设计(顺序与异常保证)

6.1 状态机

定义连接级状态:

  • INIT:已建立连接,尚未输出。
  • STREAMING:持续输出 message 事件。
  • FINAL_SENT:已输出 final,准备关闭。
  • ERROR_SENT:已输出 error,准备关闭。
  • CLOSED:连接关闭。

6.2 事件顺序保证

  • 在一次请求生命周期内,事件序列必须满足:
    • message*0 次或多次) → 且仅一次 final → close
    • message*0 次或多次) → 且仅一次 error → close
  • 禁止 final 之后再发送 message
  • 禁止同时发送 finalerror

实现策略(概念):

  • Orchestrator 维护一个原子状态变量(或单线程事件循环保证),在发送 final/error 时 CAS 切换状态。
  • 对 LLM 流式回调进行包装:
    • 每个 delta 输出前检查状态必须为 STREAMING
    • 发生异常立即进入 ERROR_SENT 并输出 error

6.3 异常处理

  • 参数错误:在进入流式生成前即可判定,直接发送 error(或返回 400取决于是否已经选择 SSE建议 SSE 模式同样用 event:error 输出 ErrorResponse

  • 下游依赖错误LLM/Qdrant/PG

    • 若尚未开始输出:可直接返回 503/500 JSONnon-streaming或发送 event:errorstreaming
    • 若已输出部分 message:必须以 event:error 收尾。
  • 客户端断开:

    • 立即停止 LLM 流(如果适配层支持 cancel并避免继续写入 response。
  • 已确认:必须实现 SSE 心跳Keep-alive以注释行形式定期发送 : ping(不改变事件模型),防止网关/中间件断开连接。

  • 已确认Python 内部设置 20s 硬超时(包含 LLM 调用与检索/存储等关键步骤的总体超时控制),防止资源泄露与请求堆积。


7. 上下文合并规则Java history + 本地持久化 history

7.1 合并输入

  • H_localMemory 层基于 (tenantId, sessionId) 读取到的历史(按时间排序)。
  • H_extJava 请求中可选的 history(按传入顺序)。

7.2 去重规则(确定性)

为避免重复注入导致 prompt 膨胀,定义 message 指纹:

  • fingerprint = hash(role + "|" + normalized(content))
  • normalizedtrim + 统一空白MVP 简化trim

去重策略:

  1. 先以 H_local 构建 seen 集合。
  2. 遍历 H_ext:若 fingerprint 未出现,则追加到 merged否则跳过。

解释:优先信任本地持久化历史,外部 history 作为补充。

7.3 优先级与冲突处理

  • H_extH_local 在末尾存在重复但内容略有差异:
    • MVP 采取“以 local 为准”策略(保持服务端一致性)。
    • 将差异记录到 diagnostics可选供后续排查。

7.4 截断策略(控制 token

合并后历史 H_merged 需受 token 预算约束:

  • 预算 = maxHistoryTokens(可配置)。
  • 截断策略:保留最近的 N 条(从尾部向前累加 token 直到阈值)。
  • 可选增强(后续):对更早历史做摘要并作为系统记忆注入。

8. 关键接口(内部)与可插拔点

8.1 Orchestrator 依赖接口(概念)

  • MemoryStore
    • load_history(tenantId, sessionId) -> messages[]
    • append_messages(tenantId, sessionId, messages[])
  • Retriever
    • retrieve(tenantId, query, metadata) -> RetrievalResult
  • LLMClient
    • generate(prompt, params) -> text
    • stream_generate(prompt, params) -> iterator[delta]

8.2 插件点

  • RetrievalVectorRetriever / GraphRetriever / HybridRetriever
  • LLMOpenAICompatibleClient / LocalModelClient
  • ConfidencePolicy可替换策略基于检索质量 + 模型信号)

9. 风险与后续工作

  • SSE 的网关兼容性:需确认网关是否支持 text/event-stream 透传与超时策略。
  • 租户级 collection 数量增长若租户数量巨大Qdrant collection 管理成本上升;可在规模化阶段切换为“单 collection + payload tenant filter”并加强隔离校验。
  • 上下文膨胀:仅截断可能影响长会话体验;后续可引入摘要记忆与检索式记忆。
  • 置信度定义MVP 先以规则/阈值实现,后续引入离线评测与校准。