326 lines
10 KiB
Python
326 lines
10 KiB
Python
"""
|
||
Metadata Schema Service.
|
||
动态元数据模式管理服务,支持租户自定义元数据字段配置。
|
||
"""
|
||
|
||
import logging
|
||
import uuid
|
||
from datetime import datetime
|
||
from typing import Any
|
||
|
||
from sqlalchemy import select
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
from sqlmodel import col
|
||
|
||
from app.models.entities import (
|
||
MetadataField,
|
||
MetadataFieldType,
|
||
MetadataSchema,
|
||
MetadataSchemaCreate,
|
||
MetadataSchemaUpdate,
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class MetadataSchemaService:
|
||
"""
|
||
元数据模式服务
|
||
管理租户的动态元数据字段配置
|
||
"""
|
||
|
||
def __init__(self, session: AsyncSession):
|
||
self._session = session
|
||
|
||
async def get_schema(
|
||
self,
|
||
tenant_id: str,
|
||
schema_id: str | None = None,
|
||
) -> MetadataSchema | None:
|
||
"""
|
||
获取元数据模式
|
||
|
||
Args:
|
||
tenant_id: 租户 ID
|
||
schema_id: 模式 ID(可选,不传则获取默认模式)
|
||
|
||
Returns:
|
||
MetadataSchema 或 None
|
||
"""
|
||
if schema_id:
|
||
stmt = select(MetadataSchema).where(
|
||
MetadataSchema.tenant_id == tenant_id,
|
||
MetadataSchema.id == uuid.UUID(schema_id),
|
||
)
|
||
else:
|
||
stmt = select(MetadataSchema).where(
|
||
MetadataSchema.tenant_id == tenant_id,
|
||
MetadataSchema.is_default == True,
|
||
MetadataSchema.is_enabled == True,
|
||
).order_by(col(MetadataSchema.created_at).desc())
|
||
|
||
result = await self._session.execute(stmt)
|
||
return result.scalar_one_or_none()
|
||
|
||
async def list_schemas(
|
||
self,
|
||
tenant_id: str,
|
||
include_disabled: bool = False,
|
||
) -> list[MetadataSchema]:
|
||
"""
|
||
列出租户所有元数据模式
|
||
|
||
Args:
|
||
tenant_id: 租户 ID
|
||
include_disabled: 是否包含禁用的模式
|
||
|
||
Returns:
|
||
MetadataSchema 列表
|
||
"""
|
||
stmt = select(MetadataSchema).where(
|
||
MetadataSchema.tenant_id == tenant_id,
|
||
)
|
||
if not include_disabled:
|
||
stmt = stmt.where(MetadataSchema.is_enabled == True)
|
||
|
||
stmt = stmt.order_by(col(MetadataSchema.created_at).desc())
|
||
|
||
result = await self._session.execute(stmt)
|
||
return list(result.scalars().all())
|
||
|
||
async def create_schema(
|
||
self,
|
||
tenant_id: str,
|
||
schema_create: MetadataSchemaCreate,
|
||
) -> MetadataSchema:
|
||
"""
|
||
创建元数据模式
|
||
|
||
Args:
|
||
tenant_id: 租户 ID
|
||
schema_create: 创建数据
|
||
|
||
Returns:
|
||
创建的 MetadataSchema
|
||
"""
|
||
schema = MetadataSchema(
|
||
tenant_id=tenant_id,
|
||
name=schema_create.name,
|
||
description=schema_create.description,
|
||
fields=[f.model_dump() if hasattr(f, 'model_dump') else f for f in schema_create.fields],
|
||
is_default=schema_create.is_default,
|
||
is_enabled=True,
|
||
)
|
||
|
||
self._session.add(schema)
|
||
await self._session.flush()
|
||
|
||
logger.info(
|
||
f"[MetadataSchemaService] Created schema: tenant={tenant_id}, "
|
||
f"name={schema.name}, fields_count={len(schema.fields)}"
|
||
)
|
||
|
||
return schema
|
||
|
||
async def update_schema(
|
||
self,
|
||
tenant_id: str,
|
||
schema_id: str,
|
||
schema_update: MetadataSchemaUpdate,
|
||
) -> MetadataSchema | None:
|
||
"""
|
||
更新元数据模式
|
||
|
||
Args:
|
||
tenant_id: 租户 ID
|
||
schema_id: 模式 ID
|
||
schema_update: 更新数据
|
||
|
||
Returns:
|
||
更新后的 MetadataSchema 或 None
|
||
"""
|
||
stmt = select(MetadataSchema).where(
|
||
MetadataSchema.tenant_id == tenant_id,
|
||
MetadataSchema.id == uuid.UUID(schema_id),
|
||
)
|
||
result = await self._session.execute(stmt)
|
||
schema = result.scalar_one_or_none()
|
||
|
||
if not schema:
|
||
return None
|
||
|
||
if schema_update.name is not None:
|
||
schema.name = schema_update.name
|
||
if schema_update.description is not None:
|
||
schema.description = schema_update.description
|
||
if schema_update.fields is not None:
|
||
schema.fields = schema_update.fields
|
||
if schema_update.is_default is not None:
|
||
if schema_update.is_default:
|
||
await self._unset_other_defaults(tenant_id, schema_id)
|
||
schema.is_default = True
|
||
if schema_update.is_enabled is not None:
|
||
schema.is_enabled = schema_update.is_enabled
|
||
|
||
schema.updated_at = datetime.utcnow()
|
||
await self._session.flush()
|
||
|
||
logger.info(
|
||
f"[MetadataSchemaService] Updated schema: tenant={tenant_id}, "
|
||
f"schema_id={schema_id}"
|
||
)
|
||
|
||
return schema
|
||
|
||
async def delete_schema(
|
||
self,
|
||
tenant_id: str,
|
||
schema_id: str,
|
||
) -> bool:
|
||
"""
|
||
删除元数据模式
|
||
|
||
Args:
|
||
tenant_id: 租户 ID
|
||
schema_id: 模式 ID
|
||
|
||
Returns:
|
||
是否删除成功
|
||
"""
|
||
stmt = select(MetadataSchema).where(
|
||
MetadataSchema.tenant_id == tenant_id,
|
||
MetadataSchema.id == uuid.UUID(schema_id),
|
||
)
|
||
result = await self._session.execute(stmt)
|
||
schema = result.scalar_one_or_none()
|
||
|
||
if not schema:
|
||
return False
|
||
|
||
if schema.is_default:
|
||
logger.warning(
|
||
f"[MetadataSchemaService] Cannot delete default schema: "
|
||
f"tenant={tenant_id}, schema_id={schema_id}"
|
||
)
|
||
return False
|
||
|
||
await self._session.delete(schema)
|
||
await self._session.flush()
|
||
|
||
logger.info(
|
||
f"[MetadataSchemaService] Deleted schema: tenant={tenant_id}, "
|
||
f"schema_id={schema_id}"
|
||
)
|
||
|
||
return True
|
||
|
||
async def _unset_other_defaults(
|
||
self,
|
||
tenant_id: str,
|
||
exclude_schema_id: str,
|
||
) -> None:
|
||
"""取消其他模式的默认状态"""
|
||
stmt = select(MetadataSchema).where(
|
||
MetadataSchema.tenant_id == tenant_id,
|
||
MetadataSchema.is_default == True,
|
||
MetadataSchema.id != uuid.UUID(exclude_schema_id),
|
||
)
|
||
result = await self._session.execute(stmt)
|
||
other_schemas = result.scalars().all()
|
||
|
||
for other in other_schemas:
|
||
other.is_default = False
|
||
|
||
async def get_field_definitions(
|
||
self,
|
||
tenant_id: str,
|
||
schema_id: str | None = None,
|
||
) -> dict[str, dict[str, Any]]:
|
||
"""
|
||
获取字段定义映射
|
||
|
||
Args:
|
||
tenant_id: 租户 ID
|
||
schema_id: 模式 ID(可选)
|
||
|
||
Returns:
|
||
字段名到字段定义的映射,如 {"grade": {"label": "年级", "type": "select", "options": [...]}}
|
||
"""
|
||
schema = await self.get_schema(tenant_id, schema_id)
|
||
if not schema:
|
||
return {}
|
||
|
||
field_map = {}
|
||
for field in schema.fields:
|
||
field_name = field.get("name") if isinstance(field, dict) else field.name
|
||
field_map[field_name] = field if isinstance(field, dict) else field.model_dump()
|
||
|
||
return field_map
|
||
|
||
async def validate_metadata(
|
||
self,
|
||
tenant_id: str,
|
||
metadata: dict[str, Any],
|
||
schema_id: str | None = None,
|
||
) -> tuple[bool, list[str]]:
|
||
"""
|
||
验证元数据是否符合模式定义
|
||
|
||
Args:
|
||
tenant_id: 租户 ID
|
||
metadata: 元数据字典
|
||
schema_id: 模式 ID(可选)
|
||
|
||
Returns:
|
||
(是否有效, 错误消息列表)
|
||
"""
|
||
field_defs = await self.get_field_definitions(tenant_id, schema_id)
|
||
|
||
if not field_defs:
|
||
return True, []
|
||
|
||
errors = []
|
||
|
||
for field_name, field_def in field_defs.items():
|
||
field_type = field_def.get("field_type", "string")
|
||
required = field_def.get("required", False)
|
||
options = field_def.get("options", [])
|
||
value = metadata.get(field_name)
|
||
|
||
if required and value is None:
|
||
errors.append(f"字段 '{field_def.get('label', field_name)}' 是必填的")
|
||
continue
|
||
|
||
if value is None:
|
||
continue
|
||
|
||
if field_type == MetadataFieldType.SELECT.value:
|
||
if value not in options:
|
||
errors.append(
|
||
f"字段 '{field_def.get('label', field_name)}' 的值 '{value}' 不在允许选项中"
|
||
)
|
||
|
||
elif field_type == MetadataFieldType.MULTI_SELECT.value:
|
||
if not isinstance(value, list):
|
||
errors.append(f"字段 '{field_def.get('label', field_name)}' 应该是多选值")
|
||
else:
|
||
for v in value:
|
||
if v not in options:
|
||
errors.append(
|
||
f"字段 '{field_def.get('label', field_name)}' 的值 '{v}' 不在允许选项中"
|
||
)
|
||
|
||
elif field_type == MetadataFieldType.NUMBER.value:
|
||
if not isinstance(value, (int, float)):
|
||
try:
|
||
float(value)
|
||
except (ValueError, TypeError):
|
||
errors.append(f"字段 '{field_def.get('label', field_name)}' 应该是数字")
|
||
|
||
elif field_type == MetadataFieldType.BOOLEAN.value:
|
||
if not isinstance(value, bool):
|
||
if value not in ["true", "false", "1", "0", 1, 0]:
|
||
errors.append(f"字段 '{field_def.get('label', field_name)}' 应该是布尔值")
|
||
|
||
return len(errors) == 0, errors
|