""" Script to cleanup vector data for a specific knowledge base. Clears the Qdrant collection for the given KB ID, allowing re-indexing. """ import asyncio import logging import sys sys.path.insert(0, "Q:\\agentProject\\ai-robot-core\\ai-service") from app.core.config import get_settings from app.core.qdrant_client import get_qdrant_client from app.core.database import get_session from app.models.entities import KnowledgeBase, Document from sqlalchemy import select logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) async def get_knowledge_base_info(kb_id: str) -> dict | None: """Get knowledge base information from database.""" async for session in get_session(): stmt = select(KnowledgeBase).where(KnowledgeBase.id == kb_id) result = await session.execute(stmt) kb = result.scalar_one_or_none() if kb: doc_stmt = select(Document).where(Document.kb_id == kb_id) doc_result = await session.execute(doc_stmt) documents = doc_result.scalars().all() return { "id": str(kb.id), "tenant_id": kb.tenant_id, "name": kb.name, "doc_count": len(documents), "document_ids": [str(doc.id) for doc in documents] } return None async def list_kb_collections(tenant_id: str, kb_id: str) -> list[str]: """List all collections that might be related to the KB.""" client = await get_qdrant_client() qdrant = await client.get_client() collections = await qdrant.get_collections() all_names = [c.name for c in collections.collections] safe_tenant = tenant_id.replace('@', '_') safe_kb = kb_id.replace('-', '_')[:8] matching = [ name for name in all_names if safe_kb in name or kb_id.replace('-', '')[:8] in name.replace('_', '') ] return matching async def clear_kb_vector_data(tenant_id: str, kb_id: str, delete_docs: bool = False) -> bool: """ Clear vector data for a specific knowledge base. Args: tenant_id: Tenant identifier kb_id: Knowledge base ID delete_docs: Whether to also delete document records from database Returns: True if successful """ client = await get_qdrant_client() qdrant = await client.get_client() collection_name = client.get_kb_collection_name(tenant_id, kb_id) try: exists = await qdrant.collection_exists(collection_name) if exists: await qdrant.delete_collection(collection_name=collection_name) logger.info(f"Deleted Qdrant collection: {collection_name}") else: logger.info(f"Collection {collection_name} does not exist") if delete_docs: async for session in get_session(): doc_stmt = select(Document).where(Document.kb_id == kb_id) doc_result = await session.execute(doc_stmt) documents = doc_result.scalars().all() for doc in documents: await session.delete(doc) stmt = select(KnowledgeBase).where(KnowledgeBase.id == kb_id) result = await session.execute(stmt) kb = result.scalar_one_or_none() if kb: kb.doc_count = 0 kb.updated_at = datetime.utcnow() await session.commit() logger.info(f"Deleted {len(documents)} document records from database") break return True except Exception as e: logger.error(f"Failed to clear KB vector data: {e}") return False async def main(kb_id: str, delete_docs: bool = False): """Main function to clear KB vector data.""" logger.info(f"Starting cleanup for knowledge base: {kb_id}") kb_info = await get_knowledge_base_info(kb_id) if not kb_info: logger.error(f"Knowledge base not found: {kb_id}") return False logger.info(f"Found knowledge base:") logger.info(f" - ID: {kb_info['id']}") logger.info(f" - Name: {kb_info['name']}") logger.info(f" - Tenant: {kb_info['tenant_id']}") logger.info(f" - Document count: {kb_info['doc_count']}") matching_collections = await list_kb_collections(kb_info['tenant_id'], kb_id) if matching_collections: logger.info(f" - Related collections: {matching_collections}") print() print("=" * 60) print("WARNING: This will delete all vector data for this knowledge base!") print(f"Collection to delete: kb_{kb_info['tenant_id'].replace('@', '_')}_{kb_id.replace('-', '_')[:8]}") if delete_docs: print("Document records in database will also be deleted!") print("=" * 60) print() confirm = input("Continue? (yes/no): ") if confirm.lower() != "yes": print("Cancelled") return False success = await clear_kb_vector_data( tenant_id=kb_info['tenant_id'], kb_id=kb_id, delete_docs=delete_docs ) if success: logger.info(f"Successfully cleared vector data for KB: {kb_id}") logger.info("You can now re-index the knowledge base documents.") else: logger.error(f"Failed to clear vector data for KB: {kb_id}") return success if __name__ == "__main__": import argparse from datetime import datetime parser = argparse.ArgumentParser(description="Clear vector data for a knowledge base") parser.add_argument("kb_id", help="Knowledge base ID to clear") parser.add_argument("--delete-docs", action="store_true", help="Also delete document records from database") args = parser.parse_args() asyncio.run(main(args.kb_id, args.delete_docs))