--- feature_id: "AISVC" title: "Python AI 中台(ai-service)技术设计" status: "draft" version: "0.7.0" last_updated: "2026-02-27" inputs: - "spec/ai-service/requirements.md" - "spec/ai-service/openapi.provider.yaml" - "spec/ai-service/openapi.admin.yaml" - "java/openapi.deps.yaml" --- # Python AI 中台(ai-service)技术设计(AISVC) ## 1. 设计目标与约束 ### 1.1 设计目标 - 落地 `POST /ai/chat` 的 **non-streaming JSON** 与 **SSE streaming** 两种返回模式,并确保与契约一致: - non-streaming 响应字段必须包含 `reply/confidence/shouldTransfer`。 - streaming 通过 `Accept: text/event-stream` 输出 `message/final/error` 事件序列。 - 实现 AI 侧会话记忆:基于 `(tenantId, sessionId)` 持久化与加载。 - 实现 RAG(MVP:向量检索)并预留图谱检索(Neo4j)插件点。 - 多租户隔离: - Qdrant:一租户一 collection(或一租户一 collection 前缀)。 - PostgreSQL:按 `tenant_id` 分区/索引,保证跨租户不可见。 ### 1.2 硬约束(来自契约与需求) - API 对齐:`/ai/chat`、`/ai/health` 的路径/方法/状态码与 Java 侧 deps 对齐。 - 多租户:请求必须携带 `X-Tenant-Id`(网关/拦截器易处理),所有数据访问必须以 `tenant_id` 过滤。 - SSE:事件类型固定为 `message/final/error`,并保证顺序与异常语义清晰。 --- ## 2. 总体架构与模块分层 ### 2.1 分层概览 本服务按“职责单一 + 可插拔”的原则分为五层: 1) **API 层(Transport / Controller)** - 职责: - HTTP 请求解析、参数校验(含 `X-Tenant-Id`)、鉴权/限流(如后续需要)。 - 根据 `Accept` 头选择 non-streaming 或 SSE streaming。 - 统一错误映射为 `ErrorResponse`。 - 输入:`X-Tenant-Id` header + `ChatRequest` body。 - 输出: - JSON:`ChatResponse` - SSE:`message/final/error` 事件流。 2) **编排层(Orchestrator / Use Case)** - 职责: - 整体流程编排:加载会话记忆 → 合并上下文 →(可选)RAG 检索 → 组装 prompt → 调用 LLM → 计算置信度与转人工建议 → 写回记忆。 - 在 streaming 模式下,将 LLM 的增量输出转为 SSE `message` 事件,同时维护最终 `reply`。 - 输入:`tenantId, sessionId, currentMessage, channelType, history?, metadata?` - 输出: - non-streaming:一次性 `ChatResponse` - streaming:增量 token(或片段)流 + 最终 `ChatResponse`。 3) **记忆层(Memory)** - 职责: - 持久化会话消息与摘要/记忆(最小:消息列表)。 - 提供按 `(tenantId, sessionId)` 查询的会话上下文读取 API。 - 存储:PostgreSQL。 4) **检索层(Retrieval)** - 职责: - 提供统一 `Retriever` 抽象接口。 - MVP 实现:向量检索(Qdrant)。 - 插件点:图谱检索(Neo4j)实现可新增而不改动 Orchestrator。 5) **LLM 适配层(LLM Adapter)** - 职责: - 屏蔽不同 LLM 提供方差异(请求格式、流式回调、重试策略)。 - 提供:一次性生成接口 + 流式生成接口(yield token/delta)。 ### 2.2 关键数据流(文字版) - API 层接收请求 → 提取 `tenantId`(Header)与 body → 调用 Orchestrator。 - Orchestrator: 1) Memory.load(tenantId, sessionId) 2) merge_context(local_history, external_history) 3) Retrieval.retrieve(query, tenantId, channelType, metadata)(MVP 向量检索) 4) build_prompt(merged_history, retrieved_docs, currentMessage) 5) LLM.generate(...)(non-streaming)或 LLM.stream_generate(...)(streaming) 6) compute_confidence(…) 7) Memory.append(tenantId, sessionId, user/assistant messages) 8) 返回 `ChatResponse`(或通过 SSE 输出)。 --- ## 3. API 与协议设计要点 ### 3.1 tenantId 放置与处理 - **主入口**:`X-Tenant-Id` header(契约已声明 required)。 - Orchestrator 与所有下游组件调用均显式传入 `tenantId`。 - 禁止使用仅 `sessionId` 定位会话,必须 `(tenantId, sessionId)`。 ### 3.2 streaming / non-streaming 模式判定 - 以 `Accept` 头作为唯一判定依据: - `Accept: text/event-stream` → SSE streaming。 - 其他 → non-streaming JSON。 --- ## 4. RAG 管道设计 ### 4.1 MVP:向量检索(Qdrant)流程 #### 4.1.1 步骤 1) **Query 规范化** - 输入:`currentMessage`(可结合 `channelType` 与 metadata)。 - 规则:去噪、截断(防止超长)、可选的 query rewrite(MVP 可不做)。 2) **Embedding** - 由 `EmbeddingProvider` 生成向量(可复用 LLM 适配层或独立适配层)。 - ✅ 已确认:Token 计数统一使用 `tiktoken` 进行精确计算(用于 history 截断与证据预算)。 3) **向量检索**(Qdrant) - 按租户隔离选择 collection(见 5.1)。 - 使用 topK + score threshold 过滤。 4) **上下文构建** - 将检索结果转为 “证据片段列表”,限制总 token 与片段数。 - 生成 prompt 时区分:系统指令 / 对话历史 / 证据 / 当前问题。 5) **生成与引用策略** - 生成回答必须优先依据证据。 - 若证据不足:触发兜底策略(见 4.3)。 #### 4.1.2 关键参数(MVP 默认,可配置) - topK(例如 5~10) - scoreThreshold(相似度阈值) - minHits(最小命中文档数) - maxEvidenceTokens(证据总 token 上限) ### 4.2 图谱检索插件点(Neo4j) #### 4.2.1 Retriever 抽象接口(概念设计) 设计统一接口,使 Orchestrator 不关心向量/图谱差异: - `Retriever.retrieve(ctx) -> RetrievalResult` - 输入 `ctx`:包含 `tenantId`, `query`, `sessionId`, `channelType`, `metadata` 等。 - 输出 `RetrievalResult`: - `hits[]`:证据条目(统一为 text + score + source + metadata) - `diagnostics`:检索调试信息(可选) MVP 提供 `VectorRetriever(Qdrant)`。 #### 4.2.2 Neo4j 接入方式(未来扩展) 新增实现类 `GraphRetriever(Neo4j)`,实现同一接口: - tenant 隔离:Neo4j 可采用 database per tenant / label+tenantId 过滤 / subgraph per tenant(视规模与授权能力选择)。 - 输出同构 `RetrievalResult`,由 ContextBuilder 使用。 > 约束:新增 GraphRetriever 不应要求修改 API 层与 Orchestrator 的业务流程,只需配置切换(策略模式/依赖注入)。 ### 4.3 检索不中兜底与置信度策略(对应 AC-AISVC-17/18/19) 定义“检索不足”的判定: - `hits.size < minHits` 或 `max(score) < scoreThreshold` 或 evidence token 超限导致可用证据过少。 兜底动作: 1) 回复策略: - 明确表达“未从知识库确认/建议咨询人工/提供可执行下一步”。 - 避免编造具体事实性结论。 2) 置信度: - 以 `T_low` 为阈值(可配置),检索不足场景通常产生较低 `confidence`。 3) 转人工建议: - `confidence < T_low` 时 `shouldTransfer=true`,可附 `transferReason`。 - ✅ 已确认:MVP 阶段 `confidence` 优先基于 RAG 检索分数(Score)计算(并结合检索不中兜底下调)。 --- ## 5. 多租户隔离方案 ### 5.1 Qdrant(向量库)隔离:一租户一 Collection #### 5.1.1 命名规则 - collection 命名:`kb_{tenantId}`(或 `kb_{tenantId}_{kbName}` 为未来多知识库预留)。 #### 5.1.2 读写路径 - 所有 upsert/search 操作必须先基于 `tenantId` 解析目标 collection。 - 禁止在同一 collection 内通过 payload filter 做租户隔离作为默认方案(可作为兜底/迁移手段),原因: - 更容易出现误用导致跨租户泄露。 - 运维与配额更难隔离(单租户删除、重建、统计)。 #### 5.1.3 租户生命周期 - tenant 创建:初始化 collection(含向量维度与 index 参数)。 - ✅ 已确认:采用**提前预置**模式,不通过业务请求动态创建 collection。 - tenant 删除:删除 collection。 - tenant 扩容:独立配置 HNSW 参数或分片(依赖 Qdrant 部署模式)。 ### 5.2 PostgreSQL(会话库)分区与约束 #### 5.2.1 表设计(概念) - `chat_sessions` - `tenant_id` (NOT NULL) - `session_id` (NOT NULL) - `created_at`, `updated_at` - 主键/唯一约束:`(tenant_id, session_id)` - `chat_messages` - `tenant_id` (NOT NULL) - `session_id` (NOT NULL) - `message_id` (UUID 或 bigserial) - `role` (user/assistant) - `content` (text) - `created_at` #### 5.2.2 分区策略 根据租户规模选择: **方案 A(MVP 推荐):逻辑分区 + 复合索引** - 不做 PG 分区表。 - 建立索引: - `chat_messages(tenant_id, session_id, created_at)` - `chat_sessions(tenant_id, session_id)` - 好处:实现与运维简单。 **方案 B(规模化):按 tenant_id 做 LIST/HASH 分区** - `chat_messages` 按 `tenant_id` 分区(LIST 或 HASH)。 - 适合租户数量有限且单租户数据量大,或需要更强隔离与清理效率。 #### 5.2.3 防串租约束 - 所有查询必须带 `tenant_id` 条件;在代码层面提供 `TenantScopedRepository` 强制注入。 - 可选:启用 Row Level Security(RLS)并通过 `SET app.tenant_id` 做隔离(实现复杂度较高,后续可选)。 --- ## 6. SSE 状态机设计(顺序与异常保证) ### 6.1 状态机 定义连接级状态: - `INIT`:已建立连接,尚未输出。 - `STREAMING`:持续输出 `message` 事件。 - `FINAL_SENT`:已输出 `final`,准备关闭。 - `ERROR_SENT`:已输出 `error`,准备关闭。 - `CLOSED`:连接关闭。 ### 6.2 事件顺序保证 - 在一次请求生命周期内,事件序列必须满足: - `message*`(0 次或多次) → **且仅一次** `final` → close - 或 `message*`(0 次或多次) → **且仅一次** `error` → close - 禁止 `final` 之后再发送 `message`。 - 禁止同时发送 `final` 与 `error`。 实现策略(概念): - Orchestrator 维护一个原子状态变量(或单线程事件循环保证),在发送 `final/error` 时 CAS 切换状态。 - 对 LLM 流式回调进行包装: - 每个 delta 输出前检查状态必须为 `STREAMING`。 - 发生异常立即进入 `ERROR_SENT` 并输出 `error`。 ### 6.3 异常处理 - 参数错误:在进入流式生成前即可判定,直接发送 `error`(或返回 400,取决于是否已经选择 SSE;建议 SSE 模式同样用 `event:error` 输出 ErrorResponse)。 - 下游依赖错误(LLM/Qdrant/PG): - 若尚未开始输出:可直接返回 503/500 JSON(non-streaming)或发送 `event:error`(streaming)。 - 若已输出部分 `message`:必须以 `event:error` 收尾。 - 客户端断开: - 立即停止 LLM 流(如果适配层支持 cancel),并避免继续写入 response。 - ✅ 已确认:必须实现 SSE 心跳(Keep-alive),以注释行形式定期发送 `: ping`(不改变事件模型),防止网关/中间件断开连接。 - ✅ 已确认:Python 内部设置 **20s 硬超时**(包含 LLM 调用与检索/存储等关键步骤的总体超时控制),防止资源泄露与请求堆积。 --- ## 7. 上下文合并规则(Java history + 本地持久化 history) ### 7.1 合并输入 - `H_local`:Memory 层基于 `(tenantId, sessionId)` 读取到的历史(按时间排序)。 - `H_ext`:Java 请求中可选的 `history`(按传入顺序)。 ### 7.2 去重规则(确定性) 为避免重复注入导致 prompt 膨胀,定义 message 指纹: - `fingerprint = hash(role + "|" + normalized(content))` - normalized:trim + 统一空白(MVP 简化:trim)。 去重策略: 1) 先以 `H_local` 构建 `seen` 集合。 2) 遍历 `H_ext`:若 fingerprint 未出现,则追加到 merged;否则跳过。 > 解释:优先信任本地持久化历史,外部 history 作为补充。 ### 7.3 优先级与冲突处理 - 若 `H_ext` 与 `H_local` 在末尾存在重复但内容略有差异: - MVP 采取“以 local 为准”策略(保持服务端一致性)。 - 将差异记录到 diagnostics(可选)供后续排查。 ### 7.4 截断策略(控制 token) 合并后历史 `H_merged` 需受 token 预算约束: - 预算 = `maxHistoryTokens`(可配置)。 - 截断策略:保留最近的 N 条(从尾部向前累加 token 直到阈值)。 - 可选增强(后续):对更早历史做摘要并作为系统记忆注入。 --- ## 8. 关键接口(内部)与可插拔点 ### 8.1 Orchestrator 依赖接口(概念) - `MemoryStore` - `load_history(tenantId, sessionId) -> messages[]` - `append_messages(tenantId, sessionId, messages[])` - `Retriever` - `retrieve(tenantId, query, metadata) -> RetrievalResult` - `LLMClient` - `generate(prompt, params) -> text` - `stream_generate(prompt, params) -> iterator[delta]` ### 8.2 插件点 - Retrieval:VectorRetriever / GraphRetriever / HybridRetriever - LLM:OpenAICompatibleClient / LocalModelClient - ConfidencePolicy:可替换策略(基于检索质量 + 模型信号) --- ## 9. 风险与后续工作 - SSE 的网关兼容性:需确认网关是否支持 `text/event-stream` 透传与超时策略。 - 租户级 collection 数量增长:若租户数量巨大,Qdrant collection 管理成本上升;可在规模化阶段切换为“单 collection + payload tenant filter”并加强隔离校验。 - 上下文膨胀:仅截断可能影响长会话体验;后续可引入摘要记忆与检索式记忆。 - 置信度定义:MVP 先以规则/阈值实现,后续引入离线评测与校准。 --- ## 10. v0.6.0 智能客服增强 — 总体架构升级 ### 10.1 升级后的 Orchestrator 数据流 原有 8 步 pipeline 升级为 12 步,新增步骤用 `[NEW]` 标记: ``` API 层接收请求 → 提取 tenantId + body → 调用 Orchestrator Orchestrator: 1) Memory.load(tenantId, sessionId) 2) merge_context(local_history, external_history) 3) [NEW] InputGuardrail.scan(currentMessage) → 前置禁词检测(仅记录,不阻断) 4) [NEW] FlowEngine.check_active_flow(sessionId) → 检查是否有进行中的话术流程 ├─ 有活跃流程 → FlowEngine.advance(user_input) → 返回话术内容 → 跳到步骤 11 └─ 无活跃流程 → 继续步骤 5 5) [NEW] IntentRouter.match(currentMessage, tenantId) → 意图识别(关键词+正则) ├─ fixed → 返回固定回复 → 跳到步骤 11 ├─ flow → FlowEngine.start(flowId, sessionId) → 返回首步话术 → 跳到步骤 11 ├─ transfer → shouldTransfer=true + 转人工话术 → 跳到步骤 11 ├─ rag → 设置 target_kb_ids → 继续步骤 6 └─ 未命中 → target_kb_ids=按优先级全部 → 继续步骤 6 6) [NEW] QueryRewriter.rewrite(currentMessage, history) → Query 改写(LLM 调用,解析指代词) 7) Retrieval.retrieve(rewritten_query, tenantId, target_kb_ids) → 多知识库定向检索 8) [NEW] ResultRanker.rank(hits, kb_priorities) → 分层排序(按知识库类型优先级) 9) [NEW] PromptBuilder.build(template, evidence, history, message) → 从数据库模板构建 Prompt 10) LLM.generate(messages) 或 LLM.stream_generate(messages) 11) [NEW] OutputGuardrail.filter(reply) → 后置禁词过滤(mask/replace/block) 12) compute_confidence(retrieval_result) 13) Memory.append(tenantId, sessionId, user + assistant messages) 14) 返回 ChatResponse ``` ### 10.2 新增模块与现有模块的关系 ``` app/ ├── services/ │ ├── orchestrator.py # [修改] 升级为 12 步 pipeline │ ├── prompt/ # [新增] Prompt 模板服务 │ │ ├── template_service.py # 模板 CRUD + 版本管理 + 缓存 │ │ └── variable_resolver.py # 变量替换引擎 │ ├── intent/ # [新增] 意图识别与路由 │ │ ├── router.py # IntentRouter:规则匹配引擎 │ │ └── rule_service.py # 规则 CRUD │ ├── flow/ # [新增] 话术流程引擎 │ │ ├── engine.py # FlowEngine:状态机执行 │ │ └── flow_service.py # 流程 CRUD │ ├── guardrail/ # [新增] 输出护栏 │ │ ├── input_scanner.py # 输入前置检测 │ │ ├── output_filter.py # 输出后置过滤 │ │ └── word_service.py # 禁词/行为规则 CRUD │ ├── retrieval/ │ │ ├── optimized_retriever.py # [修改] 支持 target_kb_ids 参数 │ │ ├── query_rewriter.py # [新增] Query 改写 │ │ └── result_ranker.py # [新增] 分层排序 │ ├── kb.py # [修改] 支持多知识库 CRUD │ └── ...(现有模块不变) ├── api/ │ └── admin/ │ ├── prompt_templates.py # [新增] Prompt 模板管理 API │ ├── intent_rules.py # [新增] 意图规则管理 API │ ├── script_flows.py # [新增] 话术流程管理 API │ ├── guardrails.py # [新增] 护栏管理 API(禁词+行为规则) │ ├── kb.py # [修改] 新增知识库 CRUD 端点 │ └── ...(现有 API 不变) ├── models/ │ └── entities.py # [修改] 新增实体定义 └── core/ └── prompts.py # [修改] 改为从数据库加载,保留硬编码作为 fallback ``` --- ## 11. Prompt 模板系统设计 ### 11.1 数据模型 ``` prompt_templates 表 ├── id: UUID (PK) ├── tenant_id: VARCHAR (NOT NULL, FK) ├── name: VARCHAR (模板名称,如"默认客服人设") ├── scene: VARCHAR (场景标签:chat/rag_qa/greeting/farewell) ├── description: TEXT (模板描述) ├── is_default: BOOLEAN (是否为该场景的默认模板) ├── created_at: TIMESTAMP ├── updated_at: TIMESTAMP └── INDEX: (tenant_id, scene) prompt_template_versions 表 ├── id: UUID (PK) ├── template_id: UUID (FK → prompt_templates.id) ├── version: INTEGER (自增版本号) ├── status: VARCHAR (draft/published/archived) ├── system_instruction: TEXT (系统指令内容,支持 {{variable}} 占位符) ├── variables: JSONB (变量定义列表,如 [{"name":"persona_name","default":"小N","description":"人设名称"}]) ├── created_at: TIMESTAMP └── INDEX: (template_id, status) └── UNIQUE: 同一 template_id 下仅一个 status=published ``` ### 11.2 变量替换引擎 内置变量(系统自动注入,无需用户定义): | 变量 | 说明 | 示例值 | |------|------|--------| | `{{persona_name}}` | 人设名称 | 小N | | `{{current_time}}` | 当前时间 | 2026-02-27 14:30 | | `{{channel_type}}` | 渠道类型 | wechat | | `{{tenant_name}}` | 租户名称 | 某某公司 | | `{{session_id}}` | 会话ID | kf_001_wx123 | 自定义变量:由模板定义,管理员在模板中声明变量名和默认值。 替换流程: 1. 加载已发布版本的 `system_instruction` 2. 合并内置变量 + 自定义变量默认值 3. 执行 `{{variable}}` 模式替换 4. 注入行为规则(从 guardrails 加载,追加到系统指令末尾) 5. 输出最终 system message ### 11.3 缓存策略 - 使用内存缓存(dict),key = `(tenant_id, scene)`,value = 已发布版本的完整模板 - 发布/回滚操作时主动失效缓存 - 缓存 TTL = 300s(兜底过期,防止分布式场景下缓存不一致) - fallback:缓存未命中且数据库无模板时,使用现有硬编码的 `SYSTEM_PROMPT` 作为兜底 --- ## 12. 多知识库设计 ### 12.1 数据模型 ``` knowledge_bases 表(扩展现有 KnowledgeBase 实体) ├── id: VARCHAR (PK, 如 "kb_product_001") ├── tenant_id: VARCHAR (NOT NULL) ├── name: VARCHAR (知识库名称) ├── kb_type: VARCHAR (product/faq/script/policy/general) ├── description: TEXT ├── priority: INTEGER (优先级权重,数值越大越优先,默认 0) ├── is_enabled: BOOLEAN (默认 true) ├── doc_count: INTEGER (文档数量,冗余统计) ├── created_at: TIMESTAMP ├── updated_at: TIMESTAMP └── INDEX: (tenant_id, kb_type) ``` ### 12.2 Qdrant Collection 命名升级 现有:`kb_{tenant_id}`(单 collection) 升级为:`kb_{tenant_id}_{kb_id}`(每个知识库独立 collection) 兼容策略: - 新创建的知识库使用新命名 - 现有 `kb_{tenant_id}` collection 映射为 `kb_default` 知识库(自动迁移) - 检索时如果 target_kb_ids 包含 `kb_default`,同时搜索新旧两种命名的 collection ### 12.3 多知识库检索流程 ``` target_kb_ids(来自意图路由或默认全部) → 按 kb_type 优先级排序:script > faq > product > policy > general → 并行检索各 collection(使用现有 OptimizedRetriever) → 合并结果,按 (kb_type_priority, score) 双维度排序 → 截断到 maxEvidenceTokens → 输出 ranked_hits ``` --- ## 13. 意图识别与规则引擎设计 ### 13.1 数据模型 ``` intent_rules 表 ├── id: UUID (PK) ├── tenant_id: VARCHAR (NOT NULL) ├── name: VARCHAR (意图名称,如"退货意图") ├── keywords: JSONB (关键词列表,如 ["退货","退款","不想要了"]) ├── patterns: JSONB (正则模式列表,如 ["退.*货","怎么退"]) ├── priority: INTEGER (优先级,数值越大越先匹配) ├── response_type: VARCHAR (flow/rag/fixed/transfer) ├── target_kb_ids: JSONB (rag 类型时关联的知识库 ID 列表) ├── flow_id: UUID (flow 类型时关联的流程 ID) ├── fixed_reply: TEXT (fixed 类型时的固定回复内容) ├── transfer_message: TEXT (transfer 类型时的转人工话术) ├── is_enabled: BOOLEAN (默认 true) ├── hit_count: BIGINT (命中统计,默认 0) ├── created_at: TIMESTAMP ├── updated_at: TIMESTAMP └── INDEX: (tenant_id, is_enabled, priority DESC) ``` ### 13.2 匹配算法 ```python class IntentRouter: def match(self, message: str, tenant_id: str) -> Optional[IntentMatchResult]: rules = self._load_rules(tenant_id) # 按 priority DESC 排序,已缓存 for rule in rules: if not rule.is_enabled: continue # 1. 关键词匹配(任一命中即匹配) for keyword in rule.keywords: if keyword in message: return IntentMatchResult(rule=rule, match_type="keyword", matched=keyword) # 2. 正则匹配(任一命中即匹配) for pattern in rule.patterns: if re.search(pattern, message): return IntentMatchResult(rule=rule, match_type="regex", matched=pattern) return None # 未命中,走默认 RAG ``` ### 13.3 缓存策略 - 规则列表按 `tenant_id` 缓存在内存中 - 规则 CRUD 操作时主动失效缓存 - 缓存 TTL = 60s --- ## 14. 话术流程引擎设计 ### 14.1 数据模型 ``` script_flows 表 ├── id: UUID (PK) ├── tenant_id: VARCHAR (NOT NULL) ├── name: VARCHAR (流程名称) ├── description: TEXT ├── steps: JSONB (步骤列表,见下方结构) ├── is_enabled: BOOLEAN (默认 true) ├── created_at: TIMESTAMP ├── updated_at: TIMESTAMP └── INDEX: (tenant_id) steps JSONB 结构: [ { "step_no": 1, "content": "您好,请问您的订单号是多少?", "wait_input": true, "timeout_seconds": 120, "timeout_action": "repeat", // repeat/skip/transfer "next_conditions": [ {"keywords": ["不知道","忘了"], "goto_step": 3}, {"pattern": "\\d{10,}", "goto_step": 2} ], "default_next": 2 // 无条件匹配时的下一步 }, ... ] flow_instances 表(运行时状态) ├── id: UUID (PK) ├── tenant_id: VARCHAR (NOT NULL) ├── session_id: VARCHAR (NOT NULL) ├── flow_id: UUID (FK → script_flows.id) ├── current_step: INTEGER (当前步骤序号) ├── status: VARCHAR (active/completed/timeout/cancelled) ├── context: JSONB (流程执行上下文,存储用户输入等) ├── started_at: TIMESTAMP ├── updated_at: TIMESTAMP ├── completed_at: TIMESTAMP (nullable) └── UNIQUE: (tenant_id, session_id, status='active') -- 同一会话同时只有一个活跃流程 ``` ### 14.2 状态机 ``` ┌─────────────┐ │ IDLE │ (无活跃流程) └──────┬──────┘ │ 意图命中 flow 规则 ▼ ┌─────────────┐ ┌────►│ ACTIVE │◄────┐ │ └──────┬──────┘ │ │ │ │ │ 用户输入匹配条件 │ 用户输入不匹配 │ │ │ → 重复当前步骤 │ ▼ │ │ 推进到下一步 ────────┘ │ │ │ 到达最后一步 │ │ │ ▼ │ ┌─────────────┐ │ │ COMPLETED │ │ └─────────────┘ │ │ 超时 / 用户触发退出 │ │ │ ▼ │ ┌─────────────┐ └─────│ TIMEOUT / │ │ CANCELLED │ └─────────────┘ ``` ### 14.3 FlowEngine 核心逻辑 ```python class FlowEngine: async def check_active_flow(self, tenant_id: str, session_id: str) -> Optional[FlowInstance]: """检查会话是否有活跃流程""" return await self.repo.get_active_instance(tenant_id, session_id) async def start(self, flow_id: str, tenant_id: str, session_id: str) -> str: """启动流程,返回第一步话术""" flow = await self.repo.get_flow(flow_id) instance = FlowInstance(flow_id=flow_id, session_id=session_id, current_step=1, status="active") await self.repo.save_instance(instance) return flow.steps[0]["content"] async def advance(self, instance: FlowInstance, user_input: str) -> FlowAdvanceResult: """根据用户输入推进流程""" flow = await self.repo.get_flow(instance.flow_id) current = flow.steps[instance.current_step - 1] # 匹配下一步条件 next_step = self._match_next(current, user_input) if next_step > len(flow.steps): # 流程结束 instance.status = "completed" await self.repo.save_instance(instance) return FlowAdvanceResult(completed=True, message=None) instance.current_step = next_step await self.repo.save_instance(instance) return FlowAdvanceResult(completed=False, message=flow.steps[next_step - 1]["content"]) ``` --- ## 15. 输出护栏设计 ### 15.1 数据模型 ``` forbidden_words 表 ├── id: UUID (PK) ├── tenant_id: VARCHAR (NOT NULL) ├── word: VARCHAR (禁词) ├── category: VARCHAR (competitor/sensitive/political/custom) ├── strategy: VARCHAR (mask/replace/block) ├── replacement: TEXT (replace 策略时的替换文本) ├── fallback_reply: TEXT (block 策略时的兜底话术) ├── is_enabled: BOOLEAN (默认 true) ├── hit_count: BIGINT (命中统计,默认 0) ├── created_at: TIMESTAMP ├── updated_at: TIMESTAMP └── INDEX: (tenant_id, is_enabled) behavior_rules 表 ├── id: UUID (PK) ├── tenant_id: VARCHAR (NOT NULL) ├── rule_text: TEXT (行为约束描述,如"不允许承诺具体赔偿金额") ├── category: VARCHAR (compliance/tone/boundary/custom) ├── is_enabled: BOOLEAN (默认 true) ├── created_at: TIMESTAMP ├── updated_at: TIMESTAMP └── INDEX: (tenant_id, is_enabled) ``` ### 15.2 输出过滤流程 ```python class OutputGuardrail: async def filter(self, reply: str, tenant_id: str) -> GuardrailResult: words = self._load_words(tenant_id) # 已缓存 triggered = [] filtered_reply = reply for word in words: if not word.is_enabled: continue if word.word in filtered_reply: triggered.append(word) if word.strategy == "block": # 整条拦截,返回兜底话术 return GuardrailResult( reply=word.fallback_reply or "抱歉,让我换个方式回答您", blocked=True, triggered_words=[w.word for w in triggered] ) elif word.strategy == "mask": filtered_reply = filtered_reply.replace(word.word, "*" * len(word.word)) elif word.strategy == "replace": filtered_reply = filtered_reply.replace(word.word, word.replacement) return GuardrailResult( reply=filtered_reply, blocked=False, triggered_words=[w.word for w in triggered] ) ``` ### 15.3 Streaming 模式下的护栏处理 SSE 流式输出时,禁词过滤需要特殊处理: - 维护一个滑动窗口缓冲区(buffer),大小 = 最长禁词长度 - 每次收到 LLM delta 时追加到 buffer - 当 buffer 长度超过窗口大小时,对已确认安全的前缀执行输出 - 在 `final` 事件前对剩余 buffer 做最终检查 - `block` 策略在流式模式下:检测到禁词后立即停止输出,发送 `error` 事件并附带兜底话术 ### 15.4 行为规则注入 行为规则不做运行时检测,而是注入到 Prompt 中作为 LLM 的行为约束: ``` [系统指令] {模板内容} [行为约束 - 以下规则必须严格遵守] 1. 不允许承诺具体赔偿金额 2. 不允许透露内部流程 3. 不允许评价竞品 ... ``` --- ## 16. 智能 RAG 增强设计 ### 16.1 Query 改写 ```python class QueryRewriter: REWRITE_PROMPT = """根据对话历史,改写用户的最新问题,使其语义完整、适合知识库检索。 规则: - 解析指代词("它"、"这个"等),替换为具体实体 - 补全省略的主语或宾语 - 保持原意,不添加额外信息 - 如果问题已经足够清晰,直接返回原文 对话历史: {history} 用户最新问题:{query} 改写后的检索查询:""" async def rewrite(self, query: str, history: list, llm_client: LLMClient) -> str: if not history or len(history) < 2: return query # 无历史或历史太短,不改写 messages = [{"role": "user", "content": self.REWRITE_PROMPT.format( history=self._format_history(history[-6:]), # 最近 3 轮 query=query )}] result = await llm_client.generate(messages, max_tokens=100, temperature=0) return result.content.strip() or query ``` ### 16.2 分层排序 ```python KB_TYPE_PRIORITY = { "script": 50, # 话术模板最高 "faq": 40, # FAQ 次之 "product": 30, # 产品知识 "policy": 20, # 政策规范 "general": 10, # 通用最低 } class ResultRanker: def rank(self, hits: list[RetrievalHit], kb_map: dict[str, KnowledgeBase]) -> list[RetrievalHit]: """按 (kb_type_priority DESC, score DESC) 双维度排序""" def sort_key(hit): kb = kb_map.get(hit.kb_id) type_priority = KB_TYPE_PRIORITY.get(kb.kb_type, 0) if kb else 0 custom_priority = kb.priority if kb else 0 return (-(type_priority + custom_priority), -hit.score) return sorted(hits, key=sort_key) ``` --- ## 17. 新增数据库实体汇总 v0.6.0 新增以下 SQLModel 实体(均包含 `tenant_id` 字段,遵循现有多租户隔离模式): | 实体 | 表名 | 用途 | |------|------|------| | PromptTemplate | prompt_templates | Prompt 模板主表 | | PromptTemplateVersion | prompt_template_versions | 模板版本表 | | KnowledgeBase(扩展) | knowledge_bases | 知识库主表(新增 kb_type/priority/is_enabled) | | IntentRule | intent_rules | 意图规则表 | | ScriptFlow | script_flows | 话术流程定义表 | | FlowInstance | flow_instances | 流程运行实例表 | | ForbiddenWord | forbidden_words | 禁词表 | | BehaviorRule | behavior_rules | 行为规则表 | --- ## 18. v0.6.0 风险与待澄清 - Query 改写的 LLM 调用会增加约 0.5-1s 延迟和额外 token 消耗;可通过配置开关控制是否启用。 - 流式模式下的禁词滑动窗口可能导致输出延迟(等待 buffer 填满);需要在实时性和安全性之间权衡窗口大小。 - 多知识库并行检索会增加 Qdrant 负载;需要评估并发 collection 搜索的性能影响。 - 话术流程的超时检测依赖调用方(Java 侧)触发;需要与 Java 侧约定超时回调机制。 - 现有 `kb_default` 到多知识库的数据迁移需要平滑过渡,不能中断现有服务。 --- ## 19. v0.7.0 测试与监控系统设计 ### 19.1 设计目标与范围 #### 19.1.1 核心目标 - **可测试性**:为 v0.6.0 新增的四大功能(Prompt 模板、意图规则、话术流程、输出护栏)提供独立测试能力。 - **可观测性**:提供细粒度的运行时监控数据,支持规则命中率、流程执行状态、护栏拦截统计等。 - **可追溯性**:完整记录对话流程的 12 步执行细节,支持问题排查与效果评估。 - **可导出性**:支持对话数据导出,便于离线分析与模型优化。 #### 19.1.2 设计约束 - **性能优先**:监控数据采集不能显著影响对话生成性能(目标:<5% 延迟增加)。 - **存储可控**:完整流程测试的详细日志仅保留 7 天,避免存储膨胀。 - **租户隔离**:所有测试与监控数据必须按 `tenant_id` 隔离。 - **向后兼容**:新增监控不影响现有 `/ai/chat` 接口的行为与性能。 --- ### 19.2 总体架构 #### 19.2.1 监控数据流 ``` ┌─────────────────────────────────────────────────────────────┐ │ Admin API Layer │ │ /admin/test/* /admin/monitoring/* /admin/dashboard/* │ └────────────┬────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ Monitoring Service Layer │ │ ├─ FlowTestService (完整流程测试) │ │ ├─ IntentMonitor (意图规则监控) │ │ ├─ PromptMonitor (Prompt 模板监控) │ │ ├─ FlowMonitor (话术流程监控) │ │ ├─ GuardrailMonitor (护栏监控) │ │ └─ ConversationTracker (对话追踪) │ └────────────┬────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ Orchestrator (增强) │ │ 12-step pipeline + 监控埋点 (可选开关) │ └────────────┬────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ Data Storage Layer │ │ ├─ PostgreSQL (统计数据、对话记录) │ │ └─ Redis (缓存、实时统计) │ └─────────────────────────────────────────────────────────────┘ ``` #### 19.2.2 监控模式 **生产模式(默认)**: - 仅记录关键指标(命中次数、错误率、平均延迟) - 不记录详细的步骤执行日志 - 性能影响 <2% **测试模式(显式开启)**: - 记录完整的 12 步执行细节 - 包含每步的输入/输出/耗时/错误 - 仅用于 `/admin/test/*` 端点 - 数据保留 7 天 --- ### 19.3 Dashboard 统计增强设计 #### 19.3.1 新增统计指标(对应 AC-AISVC-91, AC-AISVC-92) 在现有 `GET /admin/dashboard/stats` 响应中新增以下字段: ```python # 意图规则统计 intentRuleHitRate: float # 命中率 = 命中次数 / 总对话次数 intentRuleHitCount: int # 总命中次数 intentRuleTopHits: list[dict] # Top 5 命中规则 [{"ruleId", "name", "hitCount"}] # Prompt 模板统计 promptTemplateUsageCount: int # 模板使用总次数 promptTemplateActiveCount: int # 已发布模板数量 promptTemplateTopUsed: list[dict] # Top 5 使用模板 [{"templateId", "name", "usageCount"}] # 话术流程统计 scriptFlowActivationCount: int # 流程激活总次数 scriptFlowCompletionRate: float # 完成率 = 完成次数 / 激活次数 scriptFlowTopActive: list[dict] # Top 5 活跃流程 [{"flowId", "name", "activationCount"}] # 护栏统计 guardrailBlockCount: int # 拦截总次数 guardrailBlockRate: float # 拦截率 = 拦截次数 / 总对话次数 guardrailTopWords: list[dict] # Top 5 触发禁词 [{"word", "category", "hitCount"}] ``` #### 19.3.2 数据来源与计算 **实时统计(Redis)**: - 使用 Redis Hash 存储租户级计数器 - Key 格式:`stats:{tenant_id}:{metric}:{date}` - 每次对话结束时异步更新(不阻塞响应) - TTL = 90 天 **聚合统计(PostgreSQL)**: - 从现有表的 `hit_count` 字段聚合(intent_rules, forbidden_words) - 从 `flow_instances` 表统计流程激活与完成 - 从 `chat_messages` 表关联 `prompt_template_id`(需新增字段) #### 19.3.3 性能优化 - Dashboard 统计结果缓存 60 秒(Redis) - Top N 排行榜每 5 分钟预计算一次(后台任务) - 避免实时聚合大表,使用增量计数器 --- ### 19.4 完整流程测试台设计 #### 19.4.1 测试接口(对应 AC-AISVC-93 ~ AC-AISVC-96) **端点**:`POST /admin/test/flow-execution` **请求体**: ```json { "message": "我想退货", "sessionId": "test_session_001", "channelType": "wechat", "history": [...], // 可选 "metadata": {...}, // 可选 "enableDetailedLog": true // 是否记录详细日志 } ``` **响应体**: ```json { "executionId": "exec_uuid", "steps": [ { "step": 1, "name": "Memory.load", "status": "success", "durationMs": 12, "input": {"sessionId": "test_session_001"}, "output": {"messageCount": 5}, "error": null, "metadata": {} }, { "step": 3, "name": "InputGuardrail.scan", "status": "success", "durationMs": 8, "input": {"message": "我想退货"}, "output": {"triggered": false, "words": []}, "error": null, "metadata": {} }, { "step": 5, "name": "IntentRouter.match", "status": "success", "durationMs": 15, "input": {"message": "我想退货"}, "output": { "matched": true, "ruleId": "rule_001", "ruleName": "退货意图", "responseType": "flow", "flowId": "flow_return_001" }, "error": null, "metadata": {"priority": 100, "matchType": "keyword"} }, // ... 其他步骤 ], "finalResponse": { "reply": "您好,请问您的订单号是多少?", "confidence": 0.95, "shouldTransfer": false }, "totalDurationMs": 1250, "summary": { "successSteps": 11, "failedSteps": 0, "skippedSteps": 1 } } ``` #### 19.4.2 实现策略 **Orchestrator 增强**: ```python class Orchestrator: async def generate_with_monitoring( self, request: ChatRequest, tenant_id: str, enable_detailed_log: bool = False ) -> tuple[ChatResponse, Optional[list[StepLog]]]: """ 增强版生成方法,支持可选的详细日志记录 """ step_logs = [] if enable_detailed_log else None # Step 1: Memory.load step_start = time.time() try: history = await self.memory.load_history(tenant_id, request.sessionId) if step_logs is not None: step_logs.append(StepLog( step=1, name="Memory.load", status="success", durationMs=int((time.time() - step_start) * 1000), input={"sessionId": request.sessionId}, output={"messageCount": len(history)} )) except Exception as e: if step_logs is not None: step_logs.append(StepLog( step=1, name="Memory.load", status="failed", durationMs=int((time.time() - step_start) * 1000), error=str(e) )) raise # ... 其他步骤类似 return response, step_logs ``` **测试端点实现**: ```python @router.post("/admin/test/flow-execution") async def test_flow_execution( request: FlowTestRequest, tenant_id: str = Depends(get_current_tenant_id), session: AsyncSession = Depends(get_session) ): # 调用增强版 Orchestrator response, step_logs = await orchestrator.generate_with_monitoring( ChatRequest( message=request.message, sessionId=request.sessionId, channelType=request.channelType, history=request.history, metadata=request.metadata ), tenant_id=tenant_id, enable_detailed_log=request.enableDetailedLog ) # 保存测试记录(可选,用于历史查询) if request.enableDetailedLog: test_record = FlowTestRecord( tenant_id=tenant_id, session_id=request.sessionId, steps=step_logs, final_response=response, created_at=datetime.utcnow() ) await save_test_record(session, test_record) return FlowExecutionResult( executionId=str(uuid.uuid4()), steps=step_logs, finalResponse=response, totalDurationMs=sum(s.durationMs for s in step_logs), summary={ "successSteps": sum(1 for s in step_logs if s.status == "success"), "failedSteps": sum(1 for s in step_logs if s.status == "failed"), "skippedSteps": sum(1 for s in step_logs if s.status == "skipped") } ) ``` --- ### 19.5 意图规则测试与监控设计 #### 19.5.1 独立测试接口(对应 AC-AISVC-97 ~ AC-AISVC-99) **端点**:`POST /admin/intent-rules/{ruleId}/test` **请求体**: ```json { "testMessages": [ "我想退货", "能退款吗", "这个产品怎么样" ] } ``` **响应体**: ```json { "ruleId": "rule_001", "ruleName": "退货意图", "results": [ { "message": "我想退货", "matched": true, "matchedKeywords": ["退货"], "matchedPatterns": [], "matchType": "keyword", "priority": 100, "conflictRules": [] }, { "message": "能退款吗", "matched": true, "matchedKeywords": ["退款"], "matchedPatterns": [], "matchType": "keyword", "priority": 100, "conflictRules": [] }, { "message": "这个产品怎么样", "matched": false, "matchedKeywords": [], "matchedPatterns": [], "matchType": null, "priority": 100, "conflictRules": [ { "ruleId": "rule_002", "ruleName": "产品咨询", "priority": 80, "reason": "可能匹配产品咨询规则" } ] } ], "summary": { "totalTests": 3, "matchedCount": 2, "matchRate": 0.67 } } ``` #### 19.5.2 冲突检测算法 ```python class IntentRuleTester: async def test_rule( self, rule: IntentRule, test_messages: list[str], tenant_id: str ) -> IntentRuleTestResult: """测试意图规则并检测冲突""" all_rules = await self.rule_service.get_rules(tenant_id) results = [] for message in test_messages: # 测试当前规则 matched = self._match_rule(rule, message) # 检测冲突:查找其他也能匹配的规则 conflict_rules = [] for other_rule in all_rules: if other_rule.id == rule.id: continue if self._match_rule(other_rule, message): conflict_rules.append({ "ruleId": other_rule.id, "ruleName": other_rule.name, "priority": other_rule.priority, "reason": f"同时匹配(优先级:{other_rule.priority})" }) results.append(IntentRuleTestCase( message=message, matched=matched, conflictRules=conflict_rules )) return IntentRuleTestResult( ruleId=rule.id, ruleName=rule.name, results=results, summary={ "totalTests": len(test_messages), "matchedCount": sum(1 for r in results if r.matched), "matchRate": sum(1 for r in results if r.matched) / len(test_messages) } ) ``` #### 19.5.3 监控统计接口(对应 AC-AISVC-100) **端点**:`GET /admin/monitoring/intent-rules` **查询参数**: - `startDate`: 开始日期(ISO 8601) - `endDate`: 结束日期(ISO 8601) - `limit`: 返回数量(默认 10) **响应体**: ```json { "totalHits": 1250, "totalConversations": 5000, "hitRate": 0.25, "rules": [ { "ruleId": "rule_001", "ruleName": "退货意图", "hitCount": 450, "hitRate": 0.09, "responseType": "flow", "avgResponseTime": 1200, "lastHitAt": "2026-02-27T14:30:00Z" } ], "timeSeriesData": [ { "date": "2026-02-27", "totalHits": 120, "ruleBreakdown": { "rule_001": 45, "rule_002": 30, "rule_003": 45 } } ] } ``` --- ### 19.6 Prompt 模板测试与监控设计 #### 19.6.1 模板预览接口(对应 AC-AISVC-101) **端点**:`POST /admin/prompt-templates/{templateId}/preview` **请求体**: ```json { "variables": { "persona_name": "小助手", "custom_var": "测试值" }, "sampleHistory": [ {"role": "user", "content": "你好"}, {"role": "assistant", "content": "您好,有什么可以帮您?"} ], "sampleMessage": "我想了解产品信息" } ``` **响应体**: ```json { "templateId": "tpl_001", "templateName": "默认客服人设", "version": 3, "renderedSystemPrompt": "你是小助手,一个专业的客服助手...\n\n[行为约束]\n1. 不允许承诺具体赔偿金额\n...", "finalMessages": [ { "role": "system", "content": "你是小助手,一个专业的客服助手..." }, { "role": "user", "content": "你好" }, { "role": "assistant", "content": "您好,有什么可以帮您?" }, { "role": "user", "content": "我想了解产品信息" } ], "tokenCount": { "systemPrompt": 450, "history": 120, "currentMessage": 30, "total": 600 }, "warnings": [] } ``` #### 19.6.2 模板使用统计(对应 AC-AISVC-102) **端点**:`GET /admin/monitoring/prompt-templates` **响应体**: ```json { "totalUsage": 5000, "templates": [ { "templateId": "tpl_001", "templateName": "默认客服人设", "scene": "chat", "usageCount": 3500, "usageRate": 0.70, "currentVersion": 3, "avgTokenCount": 450, "lastUsedAt": "2026-02-27T14:30:00Z" } ], "sceneBreakdown": { "chat": 3500, "rag_qa": 1200, "greeting": 300 } } ``` #### 19.6.3 实现策略 **Token 计数**: ```python import tiktoken class PromptTemplateMonitor: def __init__(self): self.tokenizer = tiktoken.get_encoding("cl100k_base") async def preview_template( self, template: PromptTemplate, variables: dict, sample_history: list[dict], sample_message: str ) -> PromptPreviewResult: """预览模板渲染结果并计算 token""" # 1. 渲染系统指令 version = await self.get_published_version(template.id) system_prompt = self.variable_resolver.resolve( version.system_instruction, variables ) # 2. 注入行为规则 behavior_rules = await self.get_behavior_rules(template.tenant_id) if behavior_rules: system_prompt += "\n\n[行为约束]\n" + "\n".join( f"{i+1}. {rule.rule_text}" for i, rule in enumerate(behavior_rules) ) # 3. 构建完整消息列表 messages = [{"role": "system", "content": system_prompt}] messages.extend(sample_history) messages.append({"role": "user", "content": sample_message}) # 4. 计算 token token_counts = { "systemPrompt": len(self.tokenizer.encode(system_prompt)), "history": sum( len(self.tokenizer.encode(msg["content"])) for msg in sample_history ), "currentMessage": len(self.tokenizer.encode(sample_message)), } token_counts["total"] = sum(token_counts.values()) # 5. 检查警告 warnings = [] if token_counts["total"] > 4000: warnings.append("总 token 数超过 4000,可能影响性能") if token_counts["systemPrompt"] > 2000: warnings.append("系统指令过长,建议精简") return PromptPreviewResult( templateId=template.id, renderedSystemPrompt=system_prompt, finalMessages=messages, tokenCount=token_counts, warnings=warnings ) ``` --- ### 19.7 话术流程测试与监控设计 #### 19.7.1 流程模拟测试接口(对应 AC-AISVC-103 ~ AC-AISVC-105) **端点**:`POST /admin/script-flows/{flowId}/simulate` **请求体**: ```json { "userInputs": [ "12345678901234", "质量问题", "是的" ] } ``` **响应体**: ```json { "flowId": "flow_001", "flowName": "退货流程", "simulation": [ { "stepNo": 1, "botMessage": "您好,请问您的订单号是多少?", "userInput": "12345678901234", "matchedCondition": { "type": "pattern", "pattern": "\\d{10,}", "gotoStep": 2 }, "nextStep": 2, "durationMs": 50 }, { "stepNo": 2, "botMessage": "请问退货原因是什么?", "userInput": "质量问题", "matchedCondition": { "type": "default", "gotoStep": 3 }, "nextStep": 3, "durationMs": 45 }, { "stepNo": 3, "botMessage": "已为您登记退货申请,是否需要上门取件?", "userInput": "是的", "matchedCondition": { "type": "keyword", "keywords": ["是", "需要"], "gotoStep": 4 }, "nextStep": 4, "durationMs": 40 } ], "result": { "completed": true, "totalSteps": 3, "totalDurationMs": 135, "finalMessage": "好的,我们会在 24 小时内安排快递上门取件。" }, "coverage": { "totalSteps": 5, "coveredSteps": 3, "coverageRate": 0.60, "uncoveredSteps": [4, 5] }, "issues": [ "流程覆盖率低于 80%,建议增加测试用例" ] } ``` #### 19.7.2 流程覆盖率分析 ```python class ScriptFlowTester: async def simulate_flow( self, flow: ScriptFlow, user_inputs: list[str] ) -> FlowSimulationResult: """模拟流程执行并分析覆盖率""" simulation = [] current_step = 1 visited_steps = set() for user_input in user_inputs: if current_step > len(flow.steps): break step_def = flow.steps[current_step - 1] visited_steps.add(current_step) # 匹配下一步条件 matched_condition, next_step = self._match_next_step( step_def, user_input ) simulation.append(FlowSimulationStep( stepNo=current_step, botMessage=step_def["content"], userInput=user_input, matchedCondition=matched_condition, nextStep=next_step )) current_step = next_step # 分析覆盖率 total_steps = len(flow.steps) covered_steps = len(visited_steps) coverage_rate = covered_steps / total_steps # 检测问题 issues = [] if coverage_rate < 0.8: issues.append("流程覆盖率低于 80%,建议增加测试用例") # 检测死循环 if len(simulation) > total_steps * 2: issues.append("检测到可能的死循环") # 检测未覆盖的分支 uncovered_steps = set(range(1, total_steps + 1)) - visited_steps if uncovered_steps: issues.append(f"未覆盖步骤:{sorted(uncovered_steps)}") return FlowSimulationResult( flowId=flow.id, simulation=simulation, coverage={ "totalSteps": total_steps, "coveredSteps": covered_steps, "coverageRate": coverage_rate, "uncoveredSteps": list(uncovered_steps) }, issues=issues ) ``` #### 19.7.3 流程监控统计(对应 AC-AISVC-106) **端点**:`GET /admin/monitoring/script-flows` **响应体**: ```json { "totalActivations": 850, "totalCompletions": 680, "completionRate": 0.80, "flows": [ { "flowId": "flow_001", "flowName": "退货流程", "activationCount": 450, "completionCount": 380, "completionRate": 0.84, "avgDuration": 180, "avgStepsCompleted": 4.2, "dropOffPoints": [ { "stepNo": 2, "dropOffCount": 50, "dropOffRate": 0.11 } ], "lastActivatedAt": "2026-02-27T14:30:00Z" } ] } ``` --- ### 19.8 输出护栏测试与监控设计 #### 19.8.1 禁词测试接口(对应 AC-AISVC-107) **端点**:`POST /admin/guardrails/forbidden-words/test` **请求体**: ```json { "testTexts": [ "我们的产品比竞品 A 更好", "可以给您赔偿 1000 元", "这是正常的回复" ] } ``` **响应体**: ```json { "results": [ { "originalText": "我们的产品比竞品 A 更好", "triggered": true, "triggeredWords": [ { "word": "竞品 A", "category": "competitor", "strategy": "replace", "replacement": "其他品牌" } ], "filteredText": "我们的产品比其他品牌更好", "blocked": false }, { "originalText": "可以给您赔偿 1000 元", "triggered": true, "triggeredWords": [ { "word": "赔偿", "category": "sensitive", "strategy": "block", "fallbackReply": "关于补偿问题,请联系人工客服处理" } ], "filteredText": "关于补偿问题,请联系人工客服处理", "blocked": true }, { "originalText": "这是正常的回复", "triggered": false, "triggeredWords": [], "filteredText": "这是正常的回复", "blocked": false } ], "summary": { "totalTests": 3, "triggeredCount": 2, "blockedCount": 1, "triggerRate": 0.67 } } ``` #### 19.8.2 护栏监控统计(对应 AC-AISVC-108) **端点**:`GET /admin/monitoring/guardrails` **响应体**: ```json { "totalBlocks": 120, "totalTriggers": 450, "blockRate": 0.024, "words": [ { "wordId": "word_001", "word": "竞品 A", "category": "competitor", "strategy": "replace", "hitCount": 85, "blockCount": 0, "lastHitAt": "2026-02-27T14:30:00Z" }, { "wordId": "word_002", "word": "赔偿", "category": "sensitive", "strategy": "block", "hitCount": 45, "blockCount": 45, "lastHitAt": "2026-02-27T14:25:00Z" } ], "categoryBreakdown": { "competitor": 85, "sensitive": 45, "political": 0, "custom": 20 } } ``` --- ### 19.9 对话追踪与导出设计 #### 19.9.1 对话追踪接口(对应 AC-AISVC-109) **端点**:`GET /admin/monitoring/conversations` **查询参数**: - `startDate`: 开始日期(ISO 8601) - `endDate`: 结束日期(ISO 8601) - `sessionId`: 会话 ID(可选) - `channelType`: 渠道类型(可选) - `hasError`: 是否包含错误(可选) - `limit`: 返回数量(默认 20) - `offset`: 偏移量(默认 0) **响应体**: ```json { "total": 1250, "conversations": [ { "sessionId": "kf_001_wx123", "channelType": "wechat", "messageCount": 12, "startTime": "2026-02-27T14:00:00Z", "lastMessageTime": "2026-02-27T14:15:00Z", "duration": 900, "intentRulesHit": [ {"ruleId": "rule_001", "ruleName": "退货意图", "hitCount": 2} ], "flowsActivated": [ {"flowId": "flow_001", "flowName": "退货流程", "status": "completed"} ], "guardrailTriggered": true, "errorCount": 0, "avgConfidence": 0.85, "transferRequested": false } ] } ``` #### 19.9.2 对话详情接口 **端点**:`GET /admin/monitoring/conversations/{sessionId}` **响应体**: ```json { "sessionId": "kf_001_wx123", "channelType": "wechat", "startTime": "2026-02-27T14:00:00Z", "messages": [ { "messageId": "msg_001", "role": "user", "content": "我想退货", "timestamp": "2026-02-27T14:00:00Z" }, { "messageId": "msg_002", "role": "assistant", "content": "您好,请问您的订单号是多少?", "timestamp": "2026-02-27T14:00:02Z", "confidence": 0.95, "intentMatched": { "ruleId": "rule_001", "ruleName": "退货意图", "responseType": "flow" }, "flowActivated": { "flowId": "flow_001", "flowName": "退货流程", "currentStep": 1 }, "guardrailResult": { "triggered": false, "words": [] }, "latencyMs": 1200, "totalTokens": 450, "promptTokens": 380, "completionTokens": 70 } ], "summary": { "totalMessages": 12, "userMessages": 6, "assistantMessages": 6, "avgConfidence": 0.85, "avgLatency": 1150, "totalTokens": 5400, "intentRulesHit": 2, "flowsActivated": 1, "guardrailTriggered": false, "errorOccurred": false } } ``` #### 19.9.3 对话导出接口(对应 AC-AISVC-110) **端点**:`POST /admin/monitoring/conversations/export` **请求体**: ```json { "startDate": "2026-02-20T00:00:00Z", "endDate": "2026-02-27T23:59:59Z", "format": "csv", "filters": { "channelType": "wechat", "hasError": false, "minConfidence": 0.7 }, "fields": [ "sessionId", "channelType", "messageCount", "avgConfidence", "intentRulesHit", "flowsActivated" ] } ``` **响应体**: ```json { "exportId": "export_uuid", "status": "processing", "estimatedRows": 1250, "downloadUrl": null, "expiresAt": null } ``` **导出状态查询**:`GET /admin/monitoring/conversations/export/{exportId}` **响应体**: ```json { "exportId": "export_uuid", "status": "completed", "totalRows": 1250, "downloadUrl": "/admin/monitoring/conversations/export/export_uuid/download", "expiresAt": "2026-02-28T14:30:00Z", "createdAt": "2026-02-27T14:25:00Z" } ``` #### 19.9.4 实现策略 **异步导出处理**: ```python import asyncio import csv from io import StringIO class ConversationExporter: async def export_conversations( self, tenant_id: str, filters: dict, fields: list[str], format: str = "csv" ) -> str: """异步导出对话数据""" export_id = str(uuid.uuid4()) # 创建导出任务记录 export_task = ExportTask( id=export_id, tenant_id=tenant_id, status="processing", created_at=datetime.utcnow() ) await self.save_export_task(export_task) # 异步执行导出 asyncio.create_task(self._process_export( export_id, tenant_id, filters, fields, format )) return export_id async def _process_export( self, export_id: str, tenant_id: str, filters: dict, fields: list[str], format: str ): """后台处理导出任务""" try: # 1. 查询对话数据(分批处理,避免内存溢出) batch_size = 1000 offset = 0 output = StringIO() writer = csv.DictWriter(output, fieldnames=fields) writer.writeheader() while True: conversations = await self.query_conversations( tenant_id, filters, limit=batch_size, offset=offset ) if not conversations: break for conv in conversations: row = {field: conv.get(field) for field in fields} writer.writerow(row) offset += batch_size # 2. 保存到临时文件 file_path = f"/tmp/exports/{export_id}.csv" with open(file_path, "w", encoding="utf-8") as f: f.write(output.getvalue()) # 3. 更新导出任务状态 await self.update_export_task( export_id, status="completed", file_path=file_path, total_rows=offset, expires_at=datetime.utcnow() + timedelta(hours=24) ) except Exception as e: logger.error(f"Export failed: {e}") await self.update_export_task( export_id, status="failed", error=str(e) ) ``` --- ### 19.10 新增数据库实体汇总 v0.7.0 新增以下 SQLModel 实体(均包含 `tenant_id` 字段,遵循现有多租户隔离模式): | 实体 | 表名 | 用途 | 关键字段 | |------|------|------|----------| | FlowTestRecord | flow_test_records | 完整流程测试记录 | session_id, steps (JSONB), final_response (JSONB) | | ExportTask | export_tasks | 对话导出任务 | status, file_path, total_rows, expires_at | **扩展现有实体**: | 实体 | 表名 | 新增字段 | 用途 | |------|------|----------|------| | ChatMessage | chat_messages | prompt_template_id (UUID, nullable) | 关联使用的 Prompt 模板 | | ChatMessage | chat_messages | intent_rule_id (UUID, nullable) | 关联命中的意图规则 | | ChatMessage | chat_messages | flow_instance_id (UUID, nullable) | 关联的话术流程实例 | | ChatMessage | chat_messages | guardrail_triggered (BOOLEAN) | 是否触发护栏 | | ChatMessage | chat_messages | guardrail_words (JSONB, nullable) | 触发的禁词列表 | **索引优化**: ```sql -- 监控查询优化 CREATE INDEX idx_chat_messages_monitoring ON chat_messages(tenant_id, created_at DESC, role); -- 意图规则统计优化 CREATE INDEX idx_chat_messages_intent ON chat_messages(tenant_id, intent_rule_id) WHERE intent_rule_id IS NOT NULL; -- 流程统计优化 CREATE INDEX idx_flow_instances_monitoring ON flow_instances(tenant_id, status, started_at DESC); -- 护栏统计优化 CREATE INDEX idx_chat_messages_guardrail ON chat_messages(tenant_id, guardrail_triggered, created_at DESC) WHERE guardrail_triggered = true; ``` --- ### 19.11 性能优化策略 #### 19.11.1 缓存策略 **Redis 缓存层次**: ``` Level 1: Dashboard 统计(TTL 60s) - Key: stats:{tenant_id}:dashboard - 内容:聚合统计数据 Level 2: Top N 排行榜(TTL 300s) - Key: stats:{tenant_id}:top:intent_rules - Key: stats:{tenant_id}:top:prompt_templates - Key: stats:{tenant_id}:top:script_flows - Key: stats:{tenant_id}:top:forbidden_words Level 3: 实时计数器(TTL 90天) - Key: stats:{tenant_id}:counter:{metric}:{date} - 内容:增量计数器 ``` **缓存更新策略**: - Dashboard 统计:每次对话结束后异步更新计数器 - Top N 排行榜:后台任务每 5 分钟重新计算 - 实时计数器:使用 Redis INCR 原子操作 #### 19.11.2 数据库优化 **分区策略**: ```sql -- flow_test_records 按日期分区(保留 7 天) CREATE TABLE flow_test_records ( id UUID PRIMARY KEY, tenant_id VARCHAR NOT NULL, created_at TIMESTAMP NOT NULL, ... ) PARTITION BY RANGE (created_at); CREATE TABLE flow_test_records_2026_02_27 PARTITION OF flow_test_records FOR VALUES FROM ('2026-02-27') TO ('2026-02-28'); -- 自动清理过期分区(定时任务) DROP TABLE IF EXISTS flow_test_records_2026_02_20; ``` **查询优化**: - 使用覆盖索引减少回表查询 - 对大表使用 LIMIT + 游标分页 - 避免 SELECT *,只查询需要的字段 - 使用 EXPLAIN ANALYZE 分析慢查询 #### 19.11.3 监控埋点优化 **最小化性能影响**: ```python class MonitoringMiddleware: async def __call__(self, request, call_next): # 仅在测试模式或采样时记录详细日志 enable_detailed_log = ( request.url.path.startswith("/admin/test/") or self._should_sample() # 1% 采样率 ) if enable_detailed_log: # 记录详细步骤日志 request.state.monitoring_enabled = True response = await call_next(request) # 异步更新统计(不阻塞响应) if hasattr(request.state, "monitoring_data"): asyncio.create_task( self._update_stats_async(request.state.monitoring_data) ) return response def _should_sample(self) -> bool: """1% 采样率""" return random.random() < 0.01 ``` --- ### 19.12 v0.7.0 风险与待澄清 #### 19.12.1 性能风险 - **完整流程测试**:记录 12 步详细日志会增加 10-15% 的延迟,仅用于测试环境。 - **对话导出**:大批量导出(>10000 条)可能导致内存压力,需要流式处理。 - **实时统计**:高并发场景下 Redis 计数器可能成为瓶颈,考虑使用 Redis Cluster。 #### 19.12.2 存储风险 - **测试日志膨胀**:完整流程测试日志每条约 5KB,需严格执行 7 天清理策略。 - **导出文件管理**:导出文件需要定期清理(24 小时过期),避免磁盘占用。 - **索引膨胀**:新增多个索引会增加写入开销,需监控索引使用率。 #### 19.12.3 功能待澄清 - **对话导出格式**:是否需要支持 JSON/Excel 格式?当前仅实现 CSV。 - **实时监控推送**:是否需要 WebSocket 实时推送监控数据?当前仅支持轮询。 - **历史数据迁移**:现有对话数据是否需要回填 `prompt_template_id` 等新字段? - **权限控制**:测试与监控接口是否需要更细粒度的权限控制(如只读/读写)? #### 19.12.4 兼容性风险 - **数据库迁移**:新增字段和索引需要在生产环境谨慎执行,建议分批迁移。 - **API 版本**:新增监控接口不影响现有 `/ai/chat` 接口,向后兼容。 - **前端适配**:Dashboard 新增统计字段需要前端同步更新,否则显示为空。 --- ## 20. 总结 v0.7.0 测试与监控系统为 AI 中台提供了完整的可观测性与可测试性能力: **核心价值**: - **独立测试**:为四大功能(Prompt 模板、意图规则、话术流程、输出护栏)提供独立测试能力 - **完整追踪**:12 步流程的详细执行日志,支持问题排查与效果评估 - **实时监控**:细粒度的运行时统计,支持规则命中率、流程完成率、护栏拦截率等 - **数据导出**:支持对话数据导出,便于离线分析与模型优化 **技术亮点**: - **性能优先**:生产模式性能影响 <2%,测试模式仅在显式开启时生效 - **存储可控**:测试日志 7 天自动清理,导出文件 24 小时过期 - **租户隔离**:所有监控数据按 `tenant_id` 隔离,保证多租户安全 - **向后兼容**:新增监控不影响现有接口行为与性能 **实施建议**: 1. 优先实现 Dashboard 统计增强(AC-AISVC-91, AC-AISVC-92) 2. 其次实现完整流程测试台(AC-AISVC-93 ~ AC-AISVC-96) 3. 再实现各功能的独立测试接口(AC-AISVC-97 ~ AC-AISVC-108) 4. 最后实现对话追踪与导出(AC-AISVC-109, AC-AISVC-110)