""" Script to cleanup Qdrant collections and data. """ import asyncio import logging import sys sys.path.insert(0, "Q:\\agentProject\\ai-robot-core\\ai-service") from app.core.config import get_settings from app.core.qdrant_client import get_qdrant_client logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) async def list_collections(): """List all collections in Qdrant.""" client = await get_qdrant_client() qdrant = await client.get_client() collections = await qdrant.get_collections() return [c.name for c in collections.collections] async def delete_collection(collection_name: str): """Delete a specific collection.""" client = await get_qdrant_client() qdrant = await client.get_client() try: await qdrant.delete_collection(collection_name) logger.info(f"Deleted collection: {collection_name}") return True except Exception as e: logger.error(f"Failed to delete collection {collection_name}: {e}") return False async def delete_all_collections(): """Delete all collections.""" collections = await list_collections() logger.info(f"Found {len(collections)} collections: {collections}") for name in collections: await delete_collection(name) logger.info("All collections deleted") async def delete_tenant_collection(tenant_id: str): """Delete collection for a specific tenant.""" client = await get_qdrant_client() collection_name = client.get_collection_name(tenant_id) success = await delete_collection(collection_name) if success: logger.info(f"Deleted collection for tenant: {tenant_id}") return success if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Cleanup Qdrant data") parser.add_argument("--all", action="store_true", help="Delete all collections") parser.add_argument("--tenant", type=str, help="Delete collection for specific tenant") parser.add_argument("--list", action="store_true", help="List all collections") args = parser.parse_args() if args.list: collections = asyncio.run(list_collections()) print(f"Collections: {collections}") elif args.all: confirm = input("Are you sure you want to delete ALL collections? (yes/no): ") if confirm.lower() == "yes": asyncio.run(delete_all_collections()) else: print("Cancelled") elif args.tenant: confirm = input(f"Delete collection for tenant '{args.tenant}'? (yes/no): ") if confirm.lower() == "yes": asyncio.run(delete_tenant_collection(args.tenant)) else: print("Cancelled") else: parser.print_help()