ai-robot-core/ai-service/check_kb_collections.py

48 lines
1.4 KiB
Python

"""
检查知识库特定集合中的元数据
"""
import asyncio
from app.core.qdrant_client import get_qdrant_client
async def check_kb_collections():
client = await get_qdrant_client()
qdrant = await client.get_client()
print("=" * 60)
print("检查知识库集合中的元数据")
print("=" * 60)
# 检查所有集合
collections = await qdrant.get_collections()
for col in collections.collections:
print(f"\n{'='*60}")
print(f"集合: {col.name}")
print('='*60)
info = await qdrant.get_collection(col.name)
print(f"向量数: {info.points_count}")
if info.points_count > 0:
results = await qdrant.scroll(
collection_name=col.name,
limit=3,
with_payload=True,
)
for i, point in enumerate(results[0], 1):
payload = point.payload or {}
metadata = payload.get("metadata", {})
text = payload.get("text", "")[:80]
kb_id = payload.get("kb_id", "N/A")
print(f"\n [{i}] ID: {str(point.id)[:8]}...")
print(f" KB ID: {kb_id}")
print(f" 元数据: {metadata if metadata else '(空)'}")
print(f" 内容: {text}...")
if __name__ == "__main__":
asyncio.run(check_kb_collections())