""" 验证 Qdrant 向量数据库中的 collections 情况 用于检查 szmp@ash@2026 租户下的知识库 collections """ import asyncio import sys from pathlib import Path # 添加项目根目录到路径 sys.path.insert(0, str(Path(__file__).parent.parent)) from qdrant_client import AsyncQdrantClient from app.core.config import get_settings async def list_collections(): """列出所有 collections""" settings = get_settings() client = AsyncQdrantClient(url=settings.qdrant_url) print(f"🔗 Qdrant URL: {settings.qdrant_url}") print(f"📦 Collection Prefix: {settings.qdrant_collection_prefix}") print("-" * 60) try: collections = await client.get_collections() if not collections.collections: print("⚠️ 没有找到任何 collections") return print(f"✅ 找到 {len(collections.collections)} 个 collections:\n") # 过滤出 szmp 相关的 collections szmp_collections = [] other_collections = [] for collection in collections.collections: name = collection.name if "szmp" in name.lower(): szmp_collections.append(name) else: other_collections.append(name) # 显示 szmp 相关的 collections if szmp_collections: print(f"🎯 szmp@ash@2026 租户相关的 collections ({len(szmp_collections)} 个):") print("-" * 60) for name in sorted(szmp_collections): try: info = await client.get_collection(name) points_count = info.points_count if hasattr(info, 'points_count') else 'N/A' print(f" 📁 {name}") print(f" └─ 向量数量: {points_count}") # 获取 collection 信息 if hasattr(info, 'config') and hasattr(info.config, 'params'): params = info.config.params if hasattr(params, 'vectors'): vector_params = params.vectors if hasattr(vector_params, 'size'): print(f" └─ 向量维度: {vector_params.size}") if hasattr(vector_params, 'distance'): print(f" └─ 距离函数: {vector_params.distance}") print() except Exception as e: print(f" 📁 {name}") print(f" └─ 获取信息失败: {e}\n") else: print("⚠️ 没有找到 szmp@ash@2026 租户相关的 collections\n") # 显示其他 collections if other_collections: print(f"📂 其他 collections ({len(other_collections)} 个):") print("-" * 60) for name in sorted(other_collections): try: info = await client.get_collection(name) points_count = info.points_count if hasattr(info, 'points_count') else 'N/A' print(f" 📁 {name} (向量数: {points_count})") except Exception as e: print(f" 📁 {name} (获取信息失败: {e})") print("\n" + "=" * 60) print("📊 总结:") print(f" - Collections 总数: {len(collections.collections)}") print(f" - szmp 相关: {len(szmp_collections)} 个") print(f" - 其他: {len(other_collections)} 个") # 验证预期 print("\n✅ 验证:") if len(szmp_collections) == 2: print(" ✓ szmp 租户的 collection 数量符合预期 (2个)") else: print(f" ⚠️ szmp 租户的 collection 数量不符合预期 (实际: {len(szmp_collections)} 个, 预期: 2个)") except Exception as e: print(f"❌ 连接 Qdrant 失败: {e}") print(f" 请检查 Qdrant 是否运行在 {settings.qdrant_url}") finally: await client.close() async def check_collection_details(collection_name: str): """查看特定 collection 的详细信息""" settings = get_settings() client = AsyncQdrantClient(url=settings.qdrant_url) try: print(f"\n📋 Collection '{collection_name}' 详细信息:") print("-" * 60) info = await client.get_collection(collection_name) print(f" 名称: {collection_name}") print(f" 向量数量: {info.points_count}") if hasattr(info, 'config') and hasattr(info.config, 'params'): params = info.config.params if hasattr(params, 'vectors'): vector_params = params.vectors print(f" 向量配置:") if hasattr(vector_params, 'size'): print(f" - 维度: {vector_params.size}") if hasattr(vector_params, 'distance'): print(f" - 距离函数: {vector_params.distance}") if hasattr(vector_params, 'on_disk'): print(f" - 磁盘存储: {vector_params.on_disk}") if hasattr(params, 'shard_number'): print(f" 分片数: {params.shard_number}") if hasattr(params, 'replication_factor'): print(f" 副本数: {params.replication_factor}") # 获取一些样本数据 try: from qdrant_client.models import ScrollRequest scroll_result = await client.scroll( collection_name=collection_name, limit=3, with_payload=True, with_vectors=False ) if scroll_result[0]: print(f"\n 样本数据 (前3条):") for i, point in enumerate(scroll_result[0], 1): payload = point.payload or {} text = payload.get('text', '')[:50] + '...' if payload.get('text') else 'N/A' kb_id = payload.get('kb_id', 'N/A') print(f" {i}. ID: {point.id}") print(f" KB ID: {kb_id}") print(f" 文本: {text}") except Exception as e: print(f" 获取样本数据失败: {e}") except Exception as e: print(f"❌ 获取 collection 信息失败: {e}") finally: await client.close() async def main(): """主函数""" print("=" * 60) print("🔍 Qdrant 向量数据库 Collections 验证工具") print("=" * 60) print() # 列出所有 collections await list_collections() # 检查 szmp 相关的 collections 详情 settings = get_settings() client = AsyncQdrantClient(url=settings.qdrant_url) try: collections = await client.get_collections() szmp_collections = [c.name for c in collections.collections if "szmp" in c.name.lower()] for name in sorted(szmp_collections): await check_collection_details(name) except Exception as e: print(f"❌ 错误: {e}") finally: await client.close() if __name__ == "__main__": asyncio.run(main())