+
{{ flowTestResult.final_response.reply }}
-
- 置信度: {{ (flowTestResult.finalResponse.confidence * 100).toFixed(1) }}%
+
+ 置信度: {{ (flowTestResult.final_response.confidence * 100).toFixed(1) }}%
-
+
需转人工
diff --git a/ai-service/app/services/orchestrator.py b/ai-service/app/services/orchestrator.py
index 8e63209..efad651 100644
--- a/ai-service/app/services/orchestrator.py
+++ b/ai-service/app/services/orchestrator.py
@@ -116,6 +116,7 @@ class GenerationContext:
behavior_rules: list[str] = field(default_factory=list)
diagnostics: dict[str, Any] = field(default_factory=dict)
+ execution_steps: list[dict[str, Any]] = field(default_factory=list)
class OrchestratorService:
@@ -204,6 +205,28 @@ class OrchestratorService:
self._behavior_rule_service = behavior_rule_service
self._output_filter = output_filter
+ def _record_step(
+ self,
+ ctx: GenerationContext,
+ step_no: int,
+ name: str,
+ status: str = "success",
+ duration_ms: int = 0,
+ input_data: Any = None,
+ output_data: Any = None,
+ error: str | None = None,
+ ) -> None:
+ """Record execution step for flow test visualization."""
+ ctx.execution_steps.append({
+ "step": step_no,
+ "name": name,
+ "status": status,
+ "duration_ms": duration_ms,
+ "input": input_data,
+ "output": output_data,
+ "error": error,
+ })
+
async def generate(
self,
tenant_id: str,
@@ -242,42 +265,110 @@ class OrchestratorService:
)
try:
+ import time
+
# Step 1: InputScanner - Scan user input for forbidden words
+ step_start = time.time()
await self._scan_input(ctx)
+ self._record_step(ctx, 1, "InputScanner", "success", int((time.time() - step_start) * 1000),
+ input_data={"text": ctx.current_message[:200]},
+ output_data=ctx.diagnostics.get("input_scan"))
# Load local history and merge context (original pipeline)
await self._load_local_history(ctx)
await self._merge_context(ctx, request.history)
# Step 2: FlowEngine - Check if session has active script flow
+ step_start = time.time()
await self._check_active_flow(ctx)
+ self._record_step(ctx, 2, "FlowEngine", "success", int((time.time() - step_start) * 1000),
+ input_data={"session_id": ctx.session_id},
+ output_data={"active_flow": bool(ctx.active_flow), "flow_name": getattr(ctx.active_flow, 'flow_name', None) if ctx.active_flow else None})
# Step 3: IntentRouter - Match intent rules and route
+ step_start = time.time()
await self._match_intent(ctx)
+ intent_output = {"matched": bool(ctx.intent_match)}
+ if ctx.intent_match:
+ intent_output["rule_name"] = getattr(ctx.intent_match, 'rule_name', None)
+ intent_output["confidence"] = getattr(ctx.intent_match, 'confidence', None)
+ self._record_step(ctx, 3, "IntentRouter", "success", int((time.time() - step_start) * 1000),
+ input_data={"query": ctx.current_message[:100]},
+ output_data=intent_output)
# Step 4: QueryRewriter - (Optional, skipped in MVP)
- # ctx.query_rewritten = ctx.current_message
+ self._record_step(ctx, 4, "QueryRewriter", "skipped", 0,
+ input_data={"query": ctx.current_message[:100]},
+ output_data={"note": "Skipped in MVP", "rewritten": None})
# Step 5-6: Multi-KB Retrieval + ResultRanker
+ step_start = time.time()
if self._config.enable_rag and self._retriever:
await self._retrieve_evidence(ctx)
+ retrieval_output = {
+ "hit_count": len(ctx.retrieval_result.hits) if ctx.retrieval_result else 0,
+ "max_score": ctx.retrieval_result.max_score if ctx.retrieval_result else 0,
+ }
+ if ctx.retrieval_result and ctx.retrieval_result.hits:
+ retrieval_output["top_hits"] = [
+ {
+ "content": hit.text[:200] + "..." if len(hit.text) > 200 else hit.text,
+ "score": round(hit.score, 4),
+ "source": hit.source,
+ }
+ for hit in ctx.retrieval_result.hits[:5]
+ ]
+ self._record_step(ctx, 5, "MultiKBRetrieval", "success", int((time.time() - step_start) * 1000),
+ input_data={"query": ctx.current_message[:100], "top_k": 3},
+ output_data=retrieval_output)
+ else:
+ self._record_step(ctx, 5, "MultiKBRetrieval", "skipped", 0,
+ input_data={"query": ctx.current_message[:100]},
+ output_data={"note": "RAG disabled or no retriever"})
# Step 7: PromptBuilder - Load template + inject behavior rules
+ step_start = time.time()
await self._build_system_prompt(ctx)
+ self._record_step(ctx, 7, "PromptBuilder", "success", int((time.time() - step_start) * 1000),
+ input_data={"template_id": getattr(ctx, 'template_id', None), "behavior_rules": ctx.behavior_rules[:3] if ctx.behavior_rules else []},
+ output_data={"prompt_length": len(ctx.system_prompt) if ctx.system_prompt else 0, "prompt_preview": ctx.system_prompt[:300] + "..." if ctx.system_prompt and len(ctx.system_prompt) > 300 else ctx.system_prompt})
# Step 8: LLM.generate - Generate response
+ step_start = time.time()
await self._generate_response(ctx)
+ llm_model = ctx.llm_response.model if ctx.llm_response else "unknown"
+ self._record_step(ctx, 8, "LLMGenerate", "success", int((time.time() - step_start) * 1000),
+ input_data={"model": llm_model, "messages_count": len(self._build_llm_messages(ctx)) if hasattr(self, '_build_llm_messages') else 1},
+ output_data={"reply_length": len(ctx.llm_response.content) if ctx.llm_response else 0, "reply_preview": ctx.llm_response.content[:200] + "..." if ctx.llm_response and len(ctx.llm_response.content) > 200 else (ctx.llm_response.content if ctx.llm_response else None)})
# Step 9: OutputFilter - Filter forbidden words in output
+ step_start = time.time()
await self._filter_output(ctx)
+ filter_output = {"filtered": ctx.filtered_reply != ctx.llm_response.content if ctx.llm_response else False}
+ if ctx.diagnostics.get("output_filter"):
+ filter_output["triggered_words"] = ctx.diagnostics.get("output_filter", {}).get("triggered_words", [])
+ self._record_step(ctx, 9, "OutputFilter", "success", int((time.time() - step_start) * 1000),
+ input_data={"text_length": len(ctx.llm_response.content) if ctx.llm_response else 0},
+ output_data=filter_output)
# Step 10: Confidence - Calculate confidence score
+ step_start = time.time()
self._calculate_confidence(ctx)
+ self._record_step(ctx, 10, "Confidence", "success", int((time.time() - step_start) * 1000),
+ input_data={"reply_length": len(ctx.filtered_reply) if ctx.filtered_reply else 0, "hit_count": len(ctx.retrieval_result.hits) if ctx.retrieval_result else 0},
+ output_data={"confidence": ctx.confidence_result.confidence if ctx.confidence_result else 0, "should_transfer": ctx.confidence_result.should_transfer if ctx.confidence_result else True})
# Step 11: Memory - Save messages
+ step_start = time.time()
await self._save_messages(ctx)
+ self._record_step(ctx, 11, "Memory", "success", int((time.time() - step_start) * 1000),
+ input_data={"session_id": ctx.session_id},
+ output_data={"saved": True})
# Step 12: Return - Build and return ChatResponse
+ self._record_step(ctx, 12, "Response", "success", 0,
+ input_data={"confidence": ctx.confidence_result.confidence if ctx.confidence_result else 0, "should_transfer": ctx.confidence_result.should_transfer if ctx.confidence_result else True},
+ output_data={"reply_length": len(ctx.filtered_reply) if ctx.filtered_reply else 0, "reply_preview": ctx.filtered_reply[:200] + "..." if ctx.filtered_reply and len(ctx.filtered_reply) > 200 else ctx.filtered_reply})
return self._build_response(ctx)
except Exception as e:
@@ -944,6 +1035,7 @@ class OrchestratorService:
"session_id": ctx.session_id,
"channel_type": ctx.channel_type,
"diagnostics": ctx.diagnostics,
+ "execution_steps": ctx.execution_steps,
}
return ChatResponse(