feat: add metadata validation in KB upload and unify metadata storage [AC-IDSMETA-15, AC-IDSMETA-16]

This commit is contained in:
MerCry 2026-03-02 22:15:19 +08:00
parent c432f457b8
commit d3ae92dec5
4 changed files with 94 additions and 16 deletions

View File

@ -7,7 +7,7 @@ Knowledge Base management endpoints.
import logging
import uuid
from dataclasses import dataclass
from typing import Annotated, Optional
from typing import Annotated, Any, Optional
import tiktoken
from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, Query, UploadFile
@ -479,7 +479,7 @@ async def list_documents(
"/documents",
operation_id="uploadDocument",
summary="Upload/import document",
description="[AC-ASA-01, AC-AISVC-63] Upload document to specified knowledge base and trigger indexing job.",
description="[AC-ASA-01, AC-AISVC-63, AC-IDSMETA-15] Upload document to specified knowledge base and trigger indexing job.",
responses={
202: {"description": "Accepted - async indexing job started"},
400: {"description": "Bad Request - unsupported format or invalid kb_id"},
@ -493,17 +493,28 @@ async def upload_document(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
kb_id: str = Form(...),
metadata: str = Form(default="{}", description="元数据 JSON 字符串,根据元数据模式配置动态字段"),
) -> JSONResponse:
"""
[AC-ASA-01, AC-AISVC-63] Upload document to specified knowledge base.
[AC-ASA-01, AC-AISVC-63, AC-IDSMETA-15] Upload document to specified knowledge base.
Creates KB if not exists, indexes to corresponding Qdrant Collection.
[AC-IDSMETA-15] 支持动态元数据校验:
- metadata: JSON 格式的元数据字段根据元数据模式配置
- 根据 scope=kb_document 的字段定义进行 required/type/enum 校验
示例 metadata:
- 教育行业: {"grade": "初一", "subject": "语文", "type": "痛点"}
- 医疗行业: {"department": "内科", "disease_type": "慢性病", "content_type": "科普"}
"""
import json
from pathlib import Path
from app.services.document import get_supported_document_formats
from app.services.metadata_field_definition_service import MetadataFieldDefinitionService
logger.info(
f"[AC-AISVC-63] Uploading document: tenant={tenant_id}, "
f"[AC-IDSMETA-15] Uploading document: tenant={tenant_id}, "
f"kb_id={kb_id}, filename={file.filename}"
)
@ -522,6 +533,36 @@ async def upload_document(
},
)
try:
metadata_dict = json.loads(metadata) if metadata else {}
except json.JSONDecodeError:
return JSONResponse(
status_code=400,
content={
"code": "INVALID_METADATA",
"message": "Invalid JSON format for metadata",
},
)
field_def_service = MetadataFieldDefinitionService(session)
is_valid, validation_errors = await field_def_service.validate_metadata_for_create(
tenant_id, metadata_dict, "kb_document"
)
if not is_valid:
logger.warning(f"[AC-IDSMETA-15] Metadata validation failed: {validation_errors}")
return JSONResponse(
status_code=400,
content={
"code": "METADATA_VALIDATION_ERROR",
"message": "Metadata validation failed",
"details": {
"errors": validation_errors,
},
},
)
kb_service = KnowledgeBaseService(session)
try:
@ -529,7 +570,7 @@ async def upload_document(
if not kb:
kb = await kb_service.get_or_create_default_kb(tenant_id)
kb_id = str(kb.id)
logger.info(f"[AC-AISVC-63] KB not found, using default: {kb_id}")
logger.info(f"[AC-IDSMETA-15] KB not found, using default: {kb_id}")
else:
kb_id = str(kb.id)
except Exception:
@ -550,7 +591,7 @@ async def upload_document(
await session.commit()
background_tasks.add_task(
_index_document, tenant_id, kb_id, str(job.id), str(document.id), file_content, file.filename
_index_document, tenant_id, kb_id, str(job.id), str(document.id), file_content, file.filename, metadata_dict
)
return JSONResponse(
@ -560,6 +601,7 @@ async def upload_document(
"docId": str(document.id),
"kbId": kb_id,
"status": job.status,
"metadata": metadata_dict,
},
)
@ -571,11 +613,15 @@ async def _index_document(
doc_id: str,
content: bytes,
filename: str | None = None,
metadata: dict[str, Any] | None = None,
):
"""
Background indexing task.
[AC-AISVC-33, AC-AISVC-34, AC-AISVC-35, AC-AISVC-63] Uses document parsing and pluggable embedding.
Indexes to the specified knowledge base's Qdrant Collection.
Args:
metadata: 动态元数据字段根据元数据模式配置
"""
import asyncio
import tempfile
@ -704,6 +750,10 @@ async def _index_document(
points = []
total_chunks = len(all_chunks)
doc_metadata = metadata or {}
logger.info(f"[INDEX] Document metadata: {doc_metadata}")
for i, chunk in enumerate(all_chunks):
payload = {
"text": chunk.text,
@ -712,6 +762,7 @@ async def _index_document(
"chunk_index": i,
"start_token": chunk.start_token,
"end_token": chunk.end_token,
"metadata": doc_metadata,
}
if chunk.page is not None:
payload["page"] = chunk.page

View File

@ -42,6 +42,7 @@ class ScriptFlowService:
) -> ScriptFlow:
"""
[AC-AISVC-71] Create a new script flow with steps.
[AC-IDSMETA-16] Support metadata field.
"""
self._validate_steps(create_data.steps)
@ -51,12 +52,13 @@ class ScriptFlowService:
description=create_data.description,
steps=create_data.steps,
is_enabled=create_data.is_enabled,
metadata_=create_data.metadata_,
)
self._session.add(flow)
await self._session.flush()
logger.info(
f"[AC-AISVC-71] Created script flow: tenant={tenant_id}, "
f"[AC-AISVC-71][AC-IDSMETA-16] Created script flow: tenant={tenant_id}, "
f"id={flow.id}, name={flow.name}, steps={len(flow.steps)}"
)
return flow
@ -102,6 +104,7 @@ class ScriptFlowService:
) -> dict[str, Any] | None:
"""
[AC-AISVC-73] Get flow detail with complete step definitions.
[AC-IDSMETA-16] Include metadata field.
"""
flow = await self.get_flow(tenant_id, flow_id)
if not flow:
@ -117,6 +120,7 @@ class ScriptFlowService:
"is_enabled": flow.is_enabled,
"step_count": len(flow.steps),
"linked_rule_count": linked_rule_count,
"metadata": flow.metadata_,
"created_at": flow.created_at.isoformat(),
"updated_at": flow.updated_at.isoformat(),
}
@ -129,6 +133,7 @@ class ScriptFlowService:
) -> ScriptFlow | None:
"""
[AC-AISVC-73] Update flow definition.
[AC-IDSMETA-16] Support metadata field.
"""
flow = await self.get_flow(tenant_id, flow_id)
if not flow:
@ -143,12 +148,14 @@ class ScriptFlowService:
flow.steps = update_data.steps
if update_data.is_enabled is not None:
flow.is_enabled = update_data.is_enabled
if update_data.metadata_ is not None:
flow.metadata_ = update_data.metadata_
flow.updated_at = datetime.utcnow()
await self._session.flush()
logger.info(
f"[AC-AISVC-73] Updated script flow: tenant={tenant_id}, id={flow_id}"
f"[AC-AISVC-73][AC-IDSMETA-16] Updated script flow: tenant={tenant_id}, id={flow_id}"
)
return flow

View File

@ -83,6 +83,7 @@ class IntentRuleService:
) -> IntentRule:
"""
[AC-AISVC-65] Create a new intent rule.
[AC-IDSMETA-16] Support metadata field.
"""
flow_id_uuid = None
if create_data.flow_id:
@ -104,6 +105,7 @@ class IntentRuleService:
transfer_message=create_data.transfer_message,
is_enabled=True,
hit_count=0,
metadata_=create_data.metadata_,
)
self._session.add(rule)
await self._session.flush()
@ -111,7 +113,7 @@ class IntentRuleService:
self._cache.invalidate(tenant_id)
logger.info(
f"[AC-AISVC-65] Created intent rule: tenant={tenant_id}, "
f"[AC-AISVC-65][AC-IDSMETA-16] Created intent rule: tenant={tenant_id}, "
f"id={rule.id}, name={rule.name}, response_type={rule.response_type}"
)
return rule
@ -162,6 +164,7 @@ class IntentRuleService:
) -> IntentRule | None:
"""
[AC-AISVC-67] Update an intent rule.
[AC-IDSMETA-16] Support metadata field.
"""
rule = await self.get_rule(tenant_id, rule_id)
if not rule:
@ -190,6 +193,8 @@ class IntentRuleService:
rule.transfer_message = update_data.transfer_message
if update_data.is_enabled is not None:
rule.is_enabled = update_data.is_enabled
if update_data.metadata_ is not None:
rule.metadata_ = update_data.metadata_
rule.updated_at = datetime.utcnow()
await self._session.flush()
@ -197,7 +202,7 @@ class IntentRuleService:
self._cache.invalidate(tenant_id)
logger.info(
f"[AC-AISVC-67] Updated intent rule: tenant={tenant_id}, id={rule_id}"
f"[AC-AISVC-67][AC-IDSMETA-16] Updated intent rule: tenant={tenant_id}, id={rule_id}"
)
return rule
@ -294,6 +299,7 @@ class IntentRuleService:
"transfer_message": rule.transfer_message,
"is_enabled": rule.is_enabled,
"hit_count": rule.hit_count,
"metadata": rule.metadata_,
"created_at": rule.created_at.isoformat(),
"updated_at": rule.updated_at.isoformat(),
}

View File

@ -95,6 +95,7 @@ class PromptTemplateService:
) -> PromptTemplate:
"""
[AC-AISVC-52] Create a new prompt template with initial version.
[AC-IDSMETA-16] Support metadata field.
"""
template = PromptTemplate(
tenant_id=tenant_id,
@ -102,6 +103,7 @@ class PromptTemplateService:
scene=create_data.scene,
description=create_data.description,
is_default=create_data.is_default,
metadata_=create_data.metadata_,
)
self._session.add(template)
await self._session.flush()
@ -117,7 +119,7 @@ class PromptTemplateService:
await self._session.flush()
logger.info(
f"[AC-AISVC-52] Created prompt template: tenant={tenant_id}, "
f"[AC-AISVC-52][AC-IDSMETA-16] Created prompt template: tenant={tenant_id}, "
f"id={template.id}, name={template.name}"
)
return template
@ -182,6 +184,7 @@ class PromptTemplateService:
"scene": template.scene,
"description": template.description,
"is_default": template.is_default,
"metadata": template.metadata_,
"current_version": {
"version": current_version.version,
"status": current_version.status,
@ -208,6 +211,7 @@ class PromptTemplateService:
) -> PromptTemplate | None:
"""
[AC-AISVC-53] Update template and create a new version.
[AC-IDSMETA-16] Support metadata field.
"""
template = await self.get_template(tenant_id, template_id)
if not template:
@ -221,6 +225,8 @@ class PromptTemplateService:
template.description = update_data.description
if update_data.is_default is not None:
template.is_default = update_data.is_default
if update_data.metadata_ is not None:
template.metadata_ = update_data.metadata_
template.updated_at = datetime.utcnow()
if update_data.system_instruction is not None:
@ -241,7 +247,7 @@ class PromptTemplateService:
self._cache.invalidate(tenant_id, template.scene)
logger.info(
f"[AC-AISVC-53] Updated prompt template: tenant={tenant_id}, id={template_id}"
f"[AC-AISVC-53][AC-IDSMETA-16] Updated prompt template: tenant={tenant_id}, id={template_id}"
)
return template
@ -400,11 +406,19 @@ class PromptTemplateService:
if not template:
return False
versions = await self._get_versions(template_id)
for v in versions:
await self._session.delete(v)
from sqlalchemy import delete
await self._session.delete(template)
await self._session.execute(
delete(PromptTemplateVersion).where(
PromptTemplateVersion.template_id == template_id
)
)
await self._session.execute(
delete(PromptTemplate).where(
PromptTemplate.id == template_id
)
)
await self._session.flush()
self._cache.invalidate(tenant_id, template.scene)