ai-robot-core/ai-service/app/api/admin/monitoring.py

716 lines
24 KiB
Python

"""
Monitoring API for AI Service Admin.
[AC-AISVC-97~AC-AISVC-100] Intent rule and prompt template monitoring.
[AC-AISVC-103, AC-AISVC-104] Flow monitoring.
[AC-AISVC-106, AC-AISVC-107] Guardrail monitoring.
[AC-AISVC-108~AC-AISVC-110] Conversation tracking and export.
"""
import asyncio
import csv
import json
import logging
import uuid
from datetime import datetime, timedelta
from typing import Any
from fastapi import APIRouter, Depends, Header, HTTPException, Query
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from sqlalchemy import desc, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_session
from app.models.entities import (
ChatMessage,
ExportTask,
ExportTaskStatus,
FlowInstance,
FlowTestRecord,
IntentRule,
PromptTemplate,
)
from app.services.monitoring.flow_monitor import FlowMonitor
from app.services.monitoring.guardrail_monitor import GuardrailMonitor
from app.services.monitoring.intent_monitor import IntentMonitor
from app.services.monitoring.prompt_monitor import PromptMonitor
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/admin/monitoring", tags=["Monitoring"])
def get_tenant_id(x_tenant_id: str = Header(..., alias="X-Tenant-Id")) -> str:
"""Extract tenant ID from header."""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="X-Tenant-Id header is required")
return x_tenant_id
@router.get("/intent-rules")
async def get_intent_rule_stats(
tenant_id: str = Depends(get_tenant_id),
start_date: datetime | None = Query(None, description="Start date filter"),
end_date: datetime | None = Query(None, description="End date filter"),
response_type: str | None = Query(None, description="Response type filter"),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
"""
[AC-AISVC-97] Get aggregated statistics for all intent rules.
"""
logger.info(
f"[AC-AISVC-97] Getting intent rule stats for tenant={tenant_id}, "
f"start={start_date}, end={end_date}"
)
monitor = IntentMonitor(session)
result = await monitor.get_rule_stats(tenant_id, start_date, end_date, response_type)
return result.to_dict()
@router.get("/intent-rules/{rule_id}/hits")
async def get_intent_rule_hits(
rule_id: uuid.UUID,
tenant_id: str = Depends(get_tenant_id),
page: int = Query(1, ge=1, description="Page number"),
page_size: int = Query(20, ge=1, le=100, description="Page size"),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
"""
[AC-AISVC-98] Get hit records for a specific intent rule.
"""
logger.info(
f"[AC-AISVC-98] Getting intent rule hits for tenant={tenant_id}, "
f"rule_id={rule_id}, page={page}"
)
monitor = IntentMonitor(session)
records, total = await monitor.get_rule_hits(tenant_id, rule_id, page, page_size)
return {
"data": [r.to_dict() for r in records],
"page": page,
"pageSize": page_size,
"total": total,
}
@router.get("/script-flows")
async def get_flow_stats(
tenant_id: str = Depends(get_tenant_id),
start_date: datetime | None = Query(None, description="Start date filter"),
end_date: datetime | None = Query(None, description="End date filter"),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
"""
[AC-AISVC-103] Get aggregated statistics for all script flows.
"""
logger.info(
f"[AC-AISVC-103] Getting flow stats for tenant={tenant_id}, "
f"start={start_date}, end={end_date}"
)
monitor = FlowMonitor(session)
result = await monitor.get_flow_stats(tenant_id, start_date, end_date)
return result.to_dict()
@router.get("/script-flows/{flow_id}/executions")
async def get_flow_executions(
flow_id: uuid.UUID,
tenant_id: str = Depends(get_tenant_id),
page: int = Query(1, ge=1, description="Page number"),
page_size: int = Query(20, ge=1, le=100, description="Page size"),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
"""
[AC-AISVC-104] Get execution records for a specific flow.
"""
logger.info(
f"[AC-AISVC-104] Getting flow executions for tenant={tenant_id}, "
f"flow_id={flow_id}, page={page}"
)
monitor = FlowMonitor(session)
records, total = await monitor.get_flow_executions(tenant_id, flow_id, page, page_size)
return {
"data": [r.to_dict() for r in records],
"page": page,
"pageSize": page_size,
"total": total,
}
@router.get("/guardrails")
async def get_guardrail_stats(
tenant_id: str = Depends(get_tenant_id),
category: str | None = Query(None, description="Category filter"),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
"""
[AC-AISVC-106] Get aggregated statistics for all guardrails.
"""
logger.info(
f"[AC-AISVC-106] Getting guardrail stats for tenant={tenant_id}, "
f"category={category}"
)
monitor = GuardrailMonitor(session)
result = await monitor.get_guardrail_stats(tenant_id, category)
return result.to_dict()
@router.get("/guardrails/{word_id}/blocks")
async def get_guardrail_blocks(
word_id: uuid.UUID,
tenant_id: str = Depends(get_tenant_id),
page: int = Query(1, ge=1, description="Page number"),
page_size: int = Query(20, ge=1, le=100, description="Page size"),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
"""
[AC-AISVC-107] Get block records for a specific forbidden word.
"""
logger.info(
f"[AC-AISVC-107] Getting guardrail blocks for tenant={tenant_id}, "
f"word_id={word_id}, page={page}"
)
monitor = GuardrailMonitor(session)
records, total = await monitor.get_word_blocks(tenant_id, word_id, page, page_size)
return {
"data": [r.to_dict() for r in records],
"page": page,
"pageSize": page_size,
"total": total,
}
@router.get("/prompt-templates")
async def get_prompt_template_stats(
tenant_id: str = Depends(get_tenant_id),
start_date: datetime | None = Query(None, description="Start date filter"),
end_date: datetime | None = Query(None, description="End date filter"),
scene: str | None = Query(None, description="Scene filter"),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
"""
[AC-AISVC-100] Get aggregated statistics for all prompt templates.
"""
logger.info(
f"[AC-AISVC-100] Getting prompt template stats for tenant={tenant_id}, "
f"start={start_date}, end={end_date}, scene={scene}"
)
monitor = PromptMonitor(session)
result = await monitor.get_template_stats(tenant_id, scene, start_date, end_date)
return result.to_dict()
@router.get("/conversations")
async def list_conversations(
tenant_id: str = Depends(get_tenant_id),
session_id: str | None = Query(None, description="Filter by session ID"),
start_date: datetime | None = Query(None, description="Start date filter"),
end_date: datetime | None = Query(None, description="End date filter"),
has_flow: bool | None = Query(None, description="Filter by flow involvement"),
has_guardrail: bool | None = Query(None, description="Filter by guardrail trigger"),
page: int = Query(1, ge=1, description="Page number"),
page_size: int = Query(20, ge=1, le=100, description="Page size"),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
"""
[AC-AISVC-108] List conversations with filters.
Returns paginated list of conversations with basic info.
"""
logger.info(
f"[AC-AISVC-108] Listing conversations for tenant={tenant_id}, "
f"session={session_id}, page={page}"
)
stmt = (
select(ChatMessage)
.where(
ChatMessage.tenant_id == tenant_id,
ChatMessage.role == "user",
)
)
if session_id:
stmt = stmt.where(ChatMessage.session_id == session_id)
if start_date:
stmt = stmt.where(ChatMessage.created_at >= start_date)
if end_date:
stmt = stmt.where(ChatMessage.created_at <= end_date)
if has_flow is not None:
if has_flow:
stmt = stmt.where(ChatMessage.flow_instance_id.is_not(None))
else:
stmt = stmt.where(ChatMessage.flow_instance_id.is_(None))
if has_guardrail is not None:
if has_guardrail:
stmt = stmt.where(ChatMessage.guardrail_triggered.is_(True))
else:
stmt = stmt.where(ChatMessage.guardrail_triggered.is_(False))
count_stmt = select(func.count()).select_from(stmt.subquery())
total_result = await session.execute(count_stmt)
total = total_result.scalar() or 0
stmt = stmt.order_by(desc(ChatMessage.created_at))
stmt = stmt.offset((page - 1) * page_size).limit(page_size)
result = await session.execute(stmt)
messages = result.scalars().all()
conversations = []
for msg in messages:
assistant_stmt = select(ChatMessage).where(
ChatMessage.tenant_id == tenant_id,
ChatMessage.session_id == msg.session_id,
ChatMessage.role == "assistant",
ChatMessage.created_at > msg.created_at,
).order_by(ChatMessage.created_at).limit(1)
assistant_result = await session.execute(assistant_stmt)
assistant_msg = assistant_result.scalar_one_or_none()
user_msg_display = (
msg.content[:200] + "..." if len(msg.content) > 200 else msg.content
)
ai_reply_display = None
if assistant_msg:
ai_reply_display = (
assistant_msg.content[:200] + "..."
if len(assistant_msg.content) > 200
else assistant_msg.content
)
conversations.append({
"id": str(msg.id),
"sessionId": msg.session_id,
"userMessage": user_msg_display,
"aiReply": ai_reply_display,
"hasFlow": msg.flow_instance_id is not None,
"hasGuardrail": msg.guardrail_triggered,
"createdAt": msg.created_at.isoformat(),
})
return {
"data": conversations,
"page": page,
"pageSize": page_size,
"total": total,
}
@router.get("/conversations/{message_id}")
async def get_conversation_detail(
message_id: uuid.UUID,
tenant_id: str = Depends(get_tenant_id),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
"""
[AC-AISVC-109] Get conversation detail with execution chain.
Returns detailed execution steps for debugging.
"""
logger.info(
f"[AC-AISVC-109] Getting conversation detail for tenant={tenant_id}, "
f"message_id={message_id}"
)
user_msg_stmt = select(ChatMessage).where(
ChatMessage.id == message_id,
ChatMessage.tenant_id == tenant_id,
ChatMessage.role == "user",
)
result = await session.execute(user_msg_stmt)
user_msg = result.scalar_one_or_none()
if not user_msg:
raise HTTPException(status_code=404, detail="Conversation not found")
assistant_stmt = select(ChatMessage).where(
ChatMessage.tenant_id == tenant_id,
ChatMessage.session_id == user_msg.session_id,
ChatMessage.role == "assistant",
ChatMessage.created_at > user_msg.created_at,
).order_by(ChatMessage.created_at).limit(1)
assistant_result = await session.execute(assistant_stmt)
assistant_msg = assistant_result.scalar_one_or_none()
triggered_rules = []
if user_msg.intent_rule_id:
rule_stmt = select(IntentRule).where(IntentRule.id == user_msg.intent_rule_id)
rule_result = await session.execute(rule_stmt)
rule = rule_result.scalar_one_or_none()
if rule:
triggered_rules.append({
"id": str(rule.id),
"name": rule.name,
"responseType": rule.response_type,
})
used_template = None
if assistant_msg and assistant_msg.prompt_template_id:
template_stmt = select(PromptTemplate).where(
PromptTemplate.id == assistant_msg.prompt_template_id
)
template_result = await session.execute(template_stmt)
template = template_result.scalar_one_or_none()
if template:
used_template = {
"id": str(template.id),
"name": template.name,
}
used_flow = None
if user_msg.flow_instance_id:
flow_stmt = select(FlowInstance).where(
FlowInstance.id == user_msg.flow_instance_id
)
flow_result = await session.execute(flow_stmt)
flow_instance = flow_result.scalar_one_or_none()
if flow_instance:
used_flow = {
"id": str(flow_instance.id),
"flowId": str(flow_instance.flow_id),
"status": flow_instance.status,
"currentStep": flow_instance.current_step,
}
execution_steps = None
test_record_stmt = select(FlowTestRecord).where(
FlowTestRecord.session_id == user_msg.session_id,
).order_by(desc(FlowTestRecord.created_at)).limit(1)
test_result = await session.execute(test_record_stmt)
test_record = test_result.scalar_one_or_none()
if test_record:
execution_steps = test_record.steps
return {
"conversationId": str(user_msg.id),
"sessionId": user_msg.session_id,
"userMessage": user_msg.content,
"aiReply": assistant_msg.content if assistant_msg else None,
"triggeredRules": triggered_rules,
"usedTemplate": used_template,
"usedFlow": used_flow,
"executionTimeMs": assistant_msg.latency_ms if assistant_msg else None,
"confidence": None,
"shouldTransfer": False,
"guardrailTriggered": user_msg.guardrail_triggered,
"guardrailWords": user_msg.guardrail_words,
"executionSteps": execution_steps,
"routeTrace": user_msg.route_trace,
"createdAt": user_msg.created_at.isoformat(),
}
class ExportRequest(BaseModel):
"""Export request schema."""
format: str = "json"
session_id: str | None = None
start_date: datetime | None = None
end_date: datetime | None = None
@router.post("/conversations/export")
async def export_conversations(
request: ExportRequest,
tenant_id: str = Depends(get_tenant_id),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
"""
[AC-AISVC-110] Export conversations to file.
Supports JSON and CSV formats.
"""
logger.info(
f"[AC-AISVC-110] Exporting conversations for tenant={tenant_id}, "
f"format={request.format}"
)
export_task = ExportTask(
tenant_id=tenant_id,
status=ExportTaskStatus.PROCESSING.value,
format=request.format,
filters={
"session_id": request.session_id,
"start_date": request.start_date.isoformat() if request.start_date else None,
"end_date": request.end_date.isoformat() if request.end_date else None,
},
expires_at=datetime.utcnow() + timedelta(hours=24),
)
session.add(export_task)
await session.commit()
await session.refresh(export_task)
asyncio.create_task(
_process_export(
export_task.id,
tenant_id,
request.format,
request.session_id,
request.start_date,
request.end_date,
)
)
return {
"taskId": str(export_task.id),
"status": export_task.status,
"format": export_task.format,
"createdAt": export_task.created_at.isoformat(),
}
@router.get("/conversations/export/{task_id}")
async def get_export_status(
task_id: uuid.UUID,
tenant_id: str = Depends(get_tenant_id),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
"""
[AC-AISVC-110] Get export task status.
"""
stmt = select(ExportTask).where(
ExportTask.id == task_id,
ExportTask.tenant_id == tenant_id,
)
result = await session.execute(stmt)
task = result.scalar_one_or_none()
if not task:
raise HTTPException(status_code=404, detail="Export task not found")
response = {
"taskId": str(task.id),
"status": task.status,
"format": task.format,
"createdAt": task.created_at.isoformat(),
}
if task.status == ExportTaskStatus.COMPLETED.value:
response["fileName"] = task.file_name
response["fileSize"] = task.file_size
response["totalRows"] = task.total_rows
response["completedAt"] = task.completed_at.isoformat() if task.completed_at else None
elif task.status == ExportTaskStatus.FAILED.value:
response["errorMessage"] = task.error_message
return response
@router.get("/conversations/export/{task_id}/download")
async def download_export(
task_id: uuid.UUID,
tenant_id: str = Depends(get_tenant_id),
session: AsyncSession = Depends(get_session),
) -> StreamingResponse:
"""
[AC-AISVC-110] Download exported file.
"""
stmt = select(ExportTask).where(
ExportTask.id == task_id,
ExportTask.tenant_id == tenant_id,
)
result = await session.execute(stmt)
task = result.scalar_one_or_none()
if not task:
raise HTTPException(status_code=404, detail="Export task not found")
if task.status != ExportTaskStatus.COMPLETED.value:
raise HTTPException(status_code=400, detail="Export not completed")
if not task.file_path:
raise HTTPException(status_code=404, detail="Export file not found")
try:
with open(task.file_path, "rb") as f:
content = f.read()
media_type = "application/json" if task.format == "json" else "text/csv"
return StreamingResponse(
iter([content]),
media_type=media_type,
headers={
"Content-Disposition": f'attachment; filename="{task.file_name}"',
},
)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="Export file expired or not found")
async def _process_export(
task_id: uuid.UUID,
tenant_id: str,
format: str,
session_id: str | None,
start_date: datetime | None,
end_date: datetime | None,
) -> None:
"""Background task to process export."""
from app.core.database import async_session_maker
async with async_session_maker() as session:
try:
stmt = select(ExportTask).where(ExportTask.id == task_id)
result = await session.execute(stmt)
task = result.scalar_one_or_none()
if not task:
return
msg_stmt = (
select(ChatMessage)
.where(ChatMessage.tenant_id == tenant_id)
.order_by(ChatMessage.created_at)
)
if session_id:
msg_stmt = msg_stmt.where(ChatMessage.session_id == session_id)
if start_date:
msg_stmt = msg_stmt.where(ChatMessage.created_at >= start_date)
if end_date:
msg_stmt = msg_stmt.where(ChatMessage.created_at <= end_date)
result = await session.execute(msg_stmt)
messages = result.scalars().all()
conversations = []
current_conv = None
for msg in messages:
if msg.role == "user":
if current_conv:
conversations.append(current_conv)
current_conv = {
"session_id": msg.session_id,
"user_message": msg.content,
"ai_reply": None,
"created_at": msg.created_at.isoformat(),
"intent_rule_id": str(msg.intent_rule_id) if msg.intent_rule_id else None,
"flow_instance_id": str(msg.flow_instance_id) if msg.flow_instance_id else None,
"guardrail_triggered": msg.guardrail_triggered,
}
elif msg.role == "assistant" and current_conv:
current_conv["ai_reply"] = msg.content
current_conv["latency_ms"] = msg.latency_ms
current_conv["prompt_template_id"] = str(msg.prompt_template_id) if msg.prompt_template_id else None
if current_conv:
conversations.append(current_conv)
import os
export_dir = "exports"
os.makedirs(export_dir, exist_ok=True)
file_name = f"conversations_{tenant_id}_{task_id}.{format}"
file_path = os.path.join(export_dir, file_name)
if format == "json":
content = json.dumps(conversations, indent=2, ensure_ascii=False)
with open(file_path, "w", encoding="utf-8") as f:
f.write(content)
else:
with open(file_path, "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow([
"session_id", "user_message", "ai_reply", "created_at",
"intent_rule_id", "flow_instance_id", "guardrail_triggered",
"latency_ms", "prompt_template_id"
])
for conv in conversations:
writer.writerow([
conv.get("session_id"),
conv.get("user_message"),
conv.get("ai_reply"),
conv.get("created_at"),
conv.get("intent_rule_id"),
conv.get("flow_instance_id"),
conv.get("guardrail_triggered"),
conv.get("latency_ms"),
conv.get("prompt_template_id"),
])
file_size = os.path.getsize(file_path)
task.status = ExportTaskStatus.COMPLETED.value
task.file_path = file_path
task.file_name = file_name
task.file_size = file_size
task.total_rows = len(conversations)
task.completed_at = datetime.utcnow()
await session.commit()
logger.info(
f"[AC-AISVC-110] Export completed: task_id={task_id}, "
f"rows={len(conversations)}, size={file_size}"
)
except Exception as e:
logger.error(f"[AC-AISVC-110] Export failed: task_id={task_id}, error={e}")
task = task_status.get(ExportTask, task_id)
if task:
task.status = ExportTaskStatus.FAILED.value
task.error_message = str(e)
await session.commit()
@router.get("/clarification-metrics")
async def get_clarification_metrics(
tenant_id: str = Depends(get_tenant_id),
total_requests: int = Query(100, ge=1, description="Total requests for rate calculation"),
) -> dict[str, Any]:
"""
[AC-CLARIFY] Get clarification metrics.
Returns:
- clarify_trigger_rate: 澄清触发率
- clarify_converge_rate: 澄清后收敛率
- misroute_rate: 误入流程率
"""
from app.services.intent.clarification import get_clarify_metrics
metrics = get_clarify_metrics()
counts = metrics.get_metrics()
rates = metrics.get_rates(total_requests)
return {
"counts": counts,
"rates": rates,
"thresholds": {
"t_high": 0.75,
"t_low": 0.45,
"max_retry": 3,
},
}
@router.post("/clarification-metrics/reset")
async def reset_clarification_metrics(
tenant_id: str = Depends(get_tenant_id),
) -> dict[str, Any]:
"""
[AC-CLARIFY] Reset clarification metrics.
"""
from app.services.intent.clarification import get_clarify_metrics
metrics = get_clarify_metrics()
metrics.reset()
return {
"status": "reset",
"message": "Clarification metrics have been reset.",
}