13 KiB
13 KiB
| feature_id | title | status | version | last_updated | inputs | |||
|---|---|---|---|---|---|---|---|---|
| AISVC | Python AI 中台(ai-service)技术设计 | draft | 0.1.0 | 2026-02-24 |
|
Python AI 中台(ai-service)技术设计(AISVC)
1. 设计目标与约束
1.1 设计目标
- 落地
POST /ai/chat的 non-streaming JSON 与 SSE streaming 两种返回模式,并确保与契约一致:- non-streaming 响应字段必须包含
reply/confidence/shouldTransfer。 - streaming 通过
Accept: text/event-stream输出message/final/error事件序列。
- non-streaming 响应字段必须包含
- 实现 AI 侧会话记忆:基于
(tenantId, sessionId)持久化与加载。 - 实现 RAG(MVP:向量检索)并预留图谱检索(Neo4j)插件点。
- 多租户隔离:
- Qdrant:一租户一 collection(或一租户一 collection 前缀)。
- PostgreSQL:按
tenant_id分区/索引,保证跨租户不可见。
1.2 硬约束(来自契约与需求)
- API 对齐:
/ai/chat、/ai/health的路径/方法/状态码与 Java 侧 deps 对齐。 - 多租户:请求必须携带
X-Tenant-Id(网关/拦截器易处理),所有数据访问必须以tenant_id过滤。 - SSE:事件类型固定为
message/final/error,并保证顺序与异常语义清晰。
2. 总体架构与模块分层
2.1 分层概览
本服务按“职责单一 + 可插拔”的原则分为五层:
- API 层(Transport / Controller)
- 职责:
- HTTP 请求解析、参数校验(含
X-Tenant-Id)、鉴权/限流(如后续需要)。 - 根据
Accept头选择 non-streaming 或 SSE streaming。 - 统一错误映射为
ErrorResponse。
- HTTP 请求解析、参数校验(含
- 输入:
X-Tenant-Idheader +ChatRequestbody。 - 输出:
- JSON:
ChatResponse - SSE:
message/final/error事件流。
- JSON:
- 编排层(Orchestrator / Use Case)
- 职责:
- 整体流程编排:加载会话记忆 → 合并上下文 →(可选)RAG 检索 → 组装 prompt → 调用 LLM → 计算置信度与转人工建议 → 写回记忆。
- 在 streaming 模式下,将 LLM 的增量输出转为 SSE
message事件,同时维护最终reply。
- 输入:
tenantId, sessionId, currentMessage, channelType, history?, metadata? - 输出:
- non-streaming:一次性
ChatResponse - streaming:增量 token(或片段)流 + 最终
ChatResponse。
- non-streaming:一次性
- 记忆层(Memory)
- 职责:
- 持久化会话消息与摘要/记忆(最小:消息列表)。
- 提供按
(tenantId, sessionId)查询的会话上下文读取 API。
- 存储:PostgreSQL。
- 检索层(Retrieval)
- 职责:
- 提供统一
Retriever抽象接口。 - MVP 实现:向量检索(Qdrant)。
- 插件点:图谱检索(Neo4j)实现可新增而不改动 Orchestrator。
- 提供统一
- LLM 适配层(LLM Adapter)
- 职责:
- 屏蔽不同 LLM 提供方差异(请求格式、流式回调、重试策略)。
- 提供:一次性生成接口 + 流式生成接口(yield token/delta)。
2.2 关键数据流(文字版)
- API 层接收请求 → 提取
tenantId(Header)与 body → 调用 Orchestrator。 - Orchestrator:
- Memory.load(tenantId, sessionId)
- merge_context(local_history, external_history)
- Retrieval.retrieve(query, tenantId, channelType, metadata)(MVP 向量检索)
- build_prompt(merged_history, retrieved_docs, currentMessage)
- LLM.generate(...)(non-streaming)或 LLM.stream_generate(...)(streaming)
- compute_confidence(…)
- Memory.append(tenantId, sessionId, user/assistant messages)
- 返回
ChatResponse(或通过 SSE 输出)。
3. API 与协议设计要点
3.1 tenantId 放置与处理
- 主入口:
X-Tenant-Idheader(契约已声明 required)。 - Orchestrator 与所有下游组件调用均显式传入
tenantId。 - 禁止使用仅
sessionId定位会话,必须(tenantId, sessionId)。
3.2 streaming / non-streaming 模式判定
- 以
Accept头作为唯一判定依据:Accept: text/event-stream→ SSE streaming。- 其他 → non-streaming JSON。
4. RAG 管道设计
4.1 MVP:向量检索(Qdrant)流程
4.1.1 步骤
- Query 规范化
- 输入:
currentMessage(可结合channelType与 metadata)。 - 规则:去噪、截断(防止超长)、可选的 query rewrite(MVP 可不做)。
- Embedding
- 由
EmbeddingProvider生成向量(可复用 LLM 适配层或独立适配层)。 - ✅ 已确认:Token 计数统一使用
tiktoken进行精确计算(用于 history 截断与证据预算)。
- 向量检索(Qdrant)
- 按租户隔离选择 collection(见 5.1)。
- 使用 topK + score threshold 过滤。
- 上下文构建
- 将检索结果转为 “证据片段列表”,限制总 token 与片段数。
- 生成 prompt 时区分:系统指令 / 对话历史 / 证据 / 当前问题。
- 生成与引用策略
- 生成回答必须优先依据证据。
- 若证据不足:触发兜底策略(见 4.3)。
4.1.2 关键参数(MVP 默认,可配置)
- topK(例如 5~10)
- scoreThreshold(相似度阈值)
- minHits(最小命中文档数)
- maxEvidenceTokens(证据总 token 上限)
4.2 图谱检索插件点(Neo4j)
4.2.1 Retriever 抽象接口(概念设计)
设计统一接口,使 Orchestrator 不关心向量/图谱差异:
Retriever.retrieve(ctx) -> RetrievalResult- 输入
ctx:包含tenantId,query,sessionId,channelType,metadata等。 - 输出
RetrievalResult:hits[]:证据条目(统一为 text + score + source + metadata)diagnostics:检索调试信息(可选)
- 输入
MVP 提供 VectorRetriever(Qdrant)。
4.2.2 Neo4j 接入方式(未来扩展)
新增实现类 GraphRetriever(Neo4j),实现同一接口:
- tenant 隔离:Neo4j 可采用 database per tenant / label+tenantId 过滤 / subgraph per tenant(视规模与授权能力选择)。
- 输出同构
RetrievalResult,由 ContextBuilder 使用。
约束:新增 GraphRetriever 不应要求修改 API 层与 Orchestrator 的业务流程,只需配置切换(策略模式/依赖注入)。
4.3 检索不中兜底与置信度策略(对应 AC-AISVC-17/18/19)
定义“检索不足”的判定:
hits.size < minHits或max(score) < scoreThreshold或 evidence token 超限导致可用证据过少。
兜底动作:
- 回复策略:
- 明确表达“未从知识库确认/建议咨询人工/提供可执行下一步”。
- 避免编造具体事实性结论。
- 置信度:
- 以
T_low为阈值(可配置),检索不足场景通常产生较低confidence。
- 转人工建议:
confidence < T_low时shouldTransfer=true,可附transferReason。- ✅ 已确认:MVP 阶段
confidence优先基于 RAG 检索分数(Score)计算(并结合检索不中兜底下调)。
5. 多租户隔离方案
5.1 Qdrant(向量库)隔离:一租户一 Collection
5.1.1 命名规则
- collection 命名:
kb_{tenantId}(或kb_{tenantId}_{kbName}为未来多知识库预留)。
5.1.2 读写路径
- 所有 upsert/search 操作必须先基于
tenantId解析目标 collection。 - 禁止在同一 collection 内通过 payload filter 做租户隔离作为默认方案(可作为兜底/迁移手段),原因:
- 更容易出现误用导致跨租户泄露。
- 运维与配额更难隔离(单租户删除、重建、统计)。
5.1.3 租户生命周期
- tenant 创建:初始化 collection(含向量维度与 index 参数)。
- ✅ 已确认:采用提前预置模式,不通过业务请求动态创建 collection。
- tenant 删除:删除 collection。
- tenant 扩容:独立配置 HNSW 参数或分片(依赖 Qdrant 部署模式)。
5.2 PostgreSQL(会话库)分区与约束
5.2.1 表设计(概念)
-
chat_sessionstenant_id(NOT NULL)session_id(NOT NULL)created_at,updated_at- 主键/唯一约束:
(tenant_id, session_id)
-
chat_messagestenant_id(NOT NULL)session_id(NOT NULL)message_id(UUID 或 bigserial)role(user/assistant)content(text)created_at
5.2.2 分区策略
根据租户规模选择:
方案 A(MVP 推荐):逻辑分区 + 复合索引
- 不做 PG 分区表。
- 建立索引:
chat_messages(tenant_id, session_id, created_at)chat_sessions(tenant_id, session_id)
- 好处:实现与运维简单。
方案 B(规模化):按 tenant_id 做 LIST/HASH 分区
chat_messages按tenant_id分区(LIST 或 HASH)。- 适合租户数量有限且单租户数据量大,或需要更强隔离与清理效率。
5.2.3 防串租约束
- 所有查询必须带
tenant_id条件;在代码层面提供TenantScopedRepository强制注入。 - 可选:启用 Row Level Security(RLS)并通过
SET app.tenant_id做隔离(实现复杂度较高,后续可选)。
6. SSE 状态机设计(顺序与异常保证)
6.1 状态机
定义连接级状态:
INIT:已建立连接,尚未输出。STREAMING:持续输出message事件。FINAL_SENT:已输出final,准备关闭。ERROR_SENT:已输出error,准备关闭。CLOSED:连接关闭。
6.2 事件顺序保证
- 在一次请求生命周期内,事件序列必须满足:
message*(0 次或多次) → 且仅一次final→ close- 或
message*(0 次或多次) → 且仅一次error→ close
- 禁止
final之后再发送message。 - 禁止同时发送
final与error。
实现策略(概念):
- Orchestrator 维护一个原子状态变量(或单线程事件循环保证),在发送
final/error时 CAS 切换状态。 - 对 LLM 流式回调进行包装:
- 每个 delta 输出前检查状态必须为
STREAMING。 - 发生异常立即进入
ERROR_SENT并输出error。
- 每个 delta 输出前检查状态必须为
6.3 异常处理
-
参数错误:在进入流式生成前即可判定,直接发送
error(或返回 400,取决于是否已经选择 SSE;建议 SSE 模式同样用event:error输出 ErrorResponse)。 -
下游依赖错误(LLM/Qdrant/PG):
- 若尚未开始输出:可直接返回 503/500 JSON(non-streaming)或发送
event:error(streaming)。 - 若已输出部分
message:必须以event:error收尾。
- 若尚未开始输出:可直接返回 503/500 JSON(non-streaming)或发送
-
客户端断开:
- 立即停止 LLM 流(如果适配层支持 cancel),并避免继续写入 response。
-
✅ 已确认:必须实现 SSE 心跳(Keep-alive),以注释行形式定期发送
: ping(不改变事件模型),防止网关/中间件断开连接。 -
✅ 已确认:Python 内部设置 20s 硬超时(包含 LLM 调用与检索/存储等关键步骤的总体超时控制),防止资源泄露与请求堆积。
7. 上下文合并规则(Java history + 本地持久化 history)
7.1 合并输入
H_local:Memory 层基于(tenantId, sessionId)读取到的历史(按时间排序)。H_ext:Java 请求中可选的history(按传入顺序)。
7.2 去重规则(确定性)
为避免重复注入导致 prompt 膨胀,定义 message 指纹:
fingerprint = hash(role + "|" + normalized(content))- normalized:trim + 统一空白(MVP 简化:trim)。
去重策略:
- 先以
H_local构建seen集合。 - 遍历
H_ext:若 fingerprint 未出现,则追加到 merged;否则跳过。
解释:优先信任本地持久化历史,外部 history 作为补充。
7.3 优先级与冲突处理
- 若
H_ext与H_local在末尾存在重复但内容略有差异:- MVP 采取“以 local 为准”策略(保持服务端一致性)。
- 将差异记录到 diagnostics(可选)供后续排查。
7.4 截断策略(控制 token)
合并后历史 H_merged 需受 token 预算约束:
- 预算 =
maxHistoryTokens(可配置)。 - 截断策略:保留最近的 N 条(从尾部向前累加 token 直到阈值)。
- 可选增强(后续):对更早历史做摘要并作为系统记忆注入。
8. 关键接口(内部)与可插拔点
8.1 Orchestrator 依赖接口(概念)
MemoryStoreload_history(tenantId, sessionId) -> messages[]append_messages(tenantId, sessionId, messages[])
Retrieverretrieve(tenantId, query, metadata) -> RetrievalResult
LLMClientgenerate(prompt, params) -> textstream_generate(prompt, params) -> iterator[delta]
8.2 插件点
- Retrieval:VectorRetriever / GraphRetriever / HybridRetriever
- LLM:OpenAICompatibleClient / LocalModelClient
- ConfidencePolicy:可替换策略(基于检索质量 + 模型信号)
9. 风险与后续工作
- SSE 的网关兼容性:需确认网关是否支持
text/event-stream透传与超时策略。 - 租户级 collection 数量增长:若租户数量巨大,Qdrant collection 管理成本上升;可在规模化阶段切换为“单 collection + payload tenant filter”并加强隔离校验。
- 上下文膨胀:仅截断可能影响长会话体验;后续可引入摘要记忆与检索式记忆。
- 置信度定义:MVP 先以规则/阈值实现,后续引入离线评测与校准。