diff --git a/ai-service/app/services/knowledge_base_service.py b/ai-service/app/services/knowledge_base_service.py new file mode 100644 index 0000000..d31ae8f --- /dev/null +++ b/ai-service/app/services/knowledge_base_service.py @@ -0,0 +1,303 @@ +""" +Knowledge Base CRUD service for AI Service. +[AC-AISVC-59~AC-AISVC-64] Multi-knowledge-base management with Qdrant Collection integration. +""" + +import logging +import uuid +from collections.abc import Sequence +from datetime import datetime + +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlmodel import col + +from app.core.qdrant_client import get_qdrant_client +from app.models.entities import ( + Document, + KBType, + KnowledgeBase, + KnowledgeBaseCreate, + KnowledgeBaseUpdate, +) + +logger = logging.getLogger(__name__) + + +class KnowledgeBaseService: + """ + [AC-AISVC-59~AC-AISVC-64] Knowledge Base CRUD service. + Handles KB creation with Qdrant Collection initialization, + KB updates, deletion with Collection cleanup, and listing. + """ + + def __init__(self, session: AsyncSession): + self._session = session + + async def create_knowledge_base( + self, + tenant_id: str, + kb_create: KnowledgeBaseCreate, + ) -> KnowledgeBase: + """ + [AC-AISVC-59] Create a new knowledge base. + Initializes corresponding Qdrant Collection. + + Args: + tenant_id: Tenant identifier + kb_create: Knowledge base creation data + + Returns: + Created KnowledgeBase entity + """ + kb = KnowledgeBase( + tenant_id=tenant_id, + name=kb_create.name, + kb_type=kb_create.kb_type, + description=kb_create.description, + priority=kb_create.priority, + is_enabled=True, + doc_count=0, + ) + self._session.add(kb) + await self._session.flush() + + qdrant = await get_qdrant_client() + await qdrant.ensure_kb_collection_exists(tenant_id, str(kb.id)) + + logger.info( + f"[AC-AISVC-59] Created knowledge base: tenant={tenant_id}, " + f"kb_id={kb.id}, name={kb.name}, type={kb.kb_type}" + ) + + return kb + + async def get_knowledge_base( + self, + tenant_id: str, + kb_id: str, + ) -> KnowledgeBase | None: + """ + Get a knowledge base by ID. + + Args: + tenant_id: Tenant identifier + kb_id: Knowledge base ID + + Returns: + KnowledgeBase entity or None + """ + try: + stmt = select(KnowledgeBase).where( + KnowledgeBase.tenant_id == tenant_id, + KnowledgeBase.id == uuid.UUID(kb_id), + ) + result = await self._session.execute(stmt) + return result.scalar_one_or_none() + except ValueError: + return None + + async def list_knowledge_bases( + self, + tenant_id: str, + kb_type: str | None = None, + is_enabled: bool | None = None, + ) -> Sequence[KnowledgeBase]: + """ + [AC-AISVC-60] List knowledge bases for a tenant. + + Args: + tenant_id: Tenant identifier + kb_type: Filter by knowledge base type (optional) + is_enabled: Filter by enabled status (optional) + + Returns: + List of KnowledgeBase entities + """ + stmt = select(KnowledgeBase).where( + KnowledgeBase.tenant_id == tenant_id + ) + + if kb_type: + stmt = stmt.where(KnowledgeBase.kb_type == kb_type) + if is_enabled is not None: + stmt = stmt.where(KnowledgeBase.is_enabled == is_enabled) + + stmt = stmt.order_by( + col(KnowledgeBase.priority).desc(), + col(KnowledgeBase.created_at).desc() + ) + + result = await self._session.execute(stmt) + return result.scalars().all() + + async def update_knowledge_base( + self, + tenant_id: str, + kb_id: str, + kb_update: KnowledgeBaseUpdate, + ) -> KnowledgeBase | None: + """ + [AC-AISVC-61] Update a knowledge base. + + Args: + tenant_id: Tenant identifier + kb_id: Knowledge base ID + kb_update: Update data + + Returns: + Updated KnowledgeBase entity or None + """ + kb = await self.get_knowledge_base(tenant_id, kb_id) + if not kb: + return None + + update_data = kb_update.model_dump(exclude_unset=True) + for key, value in update_data.items(): + setattr(kb, key, value) + + kb.updated_at = datetime.utcnow() + await self._session.flush() + + logger.info( + f"[AC-AISVC-61] Updated knowledge base: tenant={tenant_id}, " + f"kb_id={kb_id}, fields={list(update_data.keys())}" + ) + + return kb + + async def delete_knowledge_base( + self, + tenant_id: str, + kb_id: str, + ) -> bool: + """ + [AC-AISVC-62] Delete a knowledge base. + Also deletes associated documents and Qdrant Collection. + + Args: + tenant_id: Tenant identifier + kb_id: Knowledge base ID + + Returns: + True if deleted successfully + """ + kb = await self.get_knowledge_base(tenant_id, kb_id) + if not kb: + return False + + doc_stmt = select(Document).where( + Document.tenant_id == tenant_id, + Document.kb_id == kb_id, + ) + doc_result = await self._session.execute(doc_stmt) + documents = doc_result.scalars().all() + + for doc in documents: + await self._session.delete(doc) + + await self._session.delete(kb) + await self._session.flush() + + qdrant = await get_qdrant_client() + await qdrant.delete_kb_collection(tenant_id, kb_id) + + logger.info( + f"[AC-AISVC-62] Deleted knowledge base: tenant={tenant_id}, " + f"kb_id={kb_id}, docs_deleted={len(documents)}" + ) + + return True + + async def update_doc_count( + self, + tenant_id: str, + kb_id: str, + delta: int = 1, + ) -> bool: + """ + Update document count for a knowledge base. + + Args: + tenant_id: Tenant identifier + kb_id: Knowledge base ID + delta: Change in document count (positive or negative) + + Returns: + True if updated successfully + """ + try: + kb = await self.get_knowledge_base(tenant_id, kb_id) + if kb: + kb.doc_count = max(0, kb.doc_count + delta) + kb.updated_at = datetime.utcnow() + await self._session.flush() + return True + except Exception as e: + logger.error(f"Error updating doc count: {e}") + return False + + async def recalculate_doc_counts( + self, + tenant_id: str, + ) -> dict[str, int]: + """ + Recalculate document counts for all knowledge bases of a tenant. + + Args: + tenant_id: Tenant identifier + + Returns: + Dictionary mapping kb_id to doc_count + """ + count_stmt = ( + select(Document.kb_id, func.count(Document.id).label("count")) + .where(Document.tenant_id == tenant_id) + .group_by(Document.kb_id) + ) + result = await self._session.execute(count_stmt) + counts = {row.kb_id: row.count for row in result} + + kb_stmt = select(KnowledgeBase).where(KnowledgeBase.tenant_id == tenant_id) + kb_result = await self._session.execute(kb_stmt) + knowledge_bases = kb_result.scalars().all() + + for kb in knowledge_bases: + kb_id_str = str(kb.id) + kb.doc_count = counts.get(kb_id_str, 0) + kb.updated_at = datetime.utcnow() + + await self._session.flush() + + return {str(kb.id): kb.doc_count for kb in knowledge_bases} + + async def get_or_create_default_kb( + self, + tenant_id: str, + ) -> KnowledgeBase: + """ + Get or create the default knowledge base for a tenant. + This is used for backward compatibility with existing data. + + Args: + tenant_id: Tenant identifier + + Returns: + Default KnowledgeBase entity + """ + stmt = select(KnowledgeBase).where( + KnowledgeBase.tenant_id == tenant_id, + ).limit(1) + result = await self._session.execute(stmt) + existing_kb = result.scalar_one_or_none() + + if existing_kb: + return existing_kb + + kb_create = KnowledgeBaseCreate( + name="Default Knowledge Base", + kb_type=KBType.GENERAL.value, + description="Default knowledge base for backward compatibility", + priority=0, + ) + return await self.create_knowledge_base(tenant_id, kb_create) diff --git a/ai-service/scripts/migrations/001_add_kb_fields.sql b/ai-service/scripts/migrations/001_add_kb_fields.sql new file mode 100644 index 0000000..420b29d --- /dev/null +++ b/ai-service/scripts/migrations/001_add_kb_fields.sql @@ -0,0 +1,31 @@ +-- Migration: Add kb_type, priority, is_enabled, doc_count to knowledge_bases +-- Date: 2026-02-27 +-- Issue: [AC-AISVC-59] Multi-KB management fields missing + +-- Add kb_type column +ALTER TABLE knowledge_bases +ADD COLUMN IF NOT EXISTS kb_type VARCHAR DEFAULT 'general'; + +-- Add priority column +ALTER TABLE knowledge_bases +ADD COLUMN IF NOT EXISTS priority INTEGER DEFAULT 0; + +-- Add is_enabled column +ALTER TABLE knowledge_bases +ADD COLUMN IF NOT EXISTS is_enabled BOOLEAN DEFAULT TRUE; + +-- Add doc_count column +ALTER TABLE knowledge_bases +ADD COLUMN IF NOT EXISTS doc_count INTEGER DEFAULT 0; + +-- Add index for tenant + kb_type +CREATE INDEX IF NOT EXISTS ix_knowledge_bases_tenant_kb_type +ON knowledge_bases (tenant_id, kb_type); + +-- Update existing records to have default values +UPDATE knowledge_bases +SET kb_type = 'general', + priority = 0, + is_enabled = TRUE, + doc_count = 0 +WHERE kb_type IS NULL OR priority IS NULL OR is_enabled IS NULL OR doc_count IS NULL; diff --git a/ai-service/scripts/migrations/run_migration.py b/ai-service/scripts/migrations/run_migration.py new file mode 100644 index 0000000..1639eb0 --- /dev/null +++ b/ai-service/scripts/migrations/run_migration.py @@ -0,0 +1,64 @@ +""" +Migration script to add kb_type, priority, is_enabled, doc_count to knowledge_bases. +Run: python scripts/migrations/run_migration.py +""" + +import asyncio +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import create_async_engine +from sqlalchemy.orm import sessionmaker +from sqlmodel.ext.asyncio.session import AsyncSession + +from app.core.config import get_settings + + +async def run_migration(): + """Run the migration to add new columns to knowledge_bases table.""" + settings = get_settings() + engine = create_async_engine(settings.database_url, echo=True) + async_session_maker = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + migration_sql = """ + -- Add kb_type column + ALTER TABLE knowledge_bases + ADD COLUMN IF NOT EXISTS kb_type VARCHAR DEFAULT 'general'; + + -- Add priority column + ALTER TABLE knowledge_bases + ADD COLUMN IF NOT EXISTS priority INTEGER DEFAULT 0; + + -- Add is_enabled column + ALTER TABLE knowledge_bases + ADD COLUMN IF NOT EXISTS is_enabled BOOLEAN DEFAULT TRUE; + + -- Add doc_count column + ALTER TABLE knowledge_bases + ADD COLUMN IF NOT EXISTS doc_count INTEGER DEFAULT 0; + """ + + async with async_session_maker() as session: + for statement in migration_sql.strip().split(';'): + statement = statement.strip() + if statement and not statement.startswith('--'): + try: + await session.execute(text(statement)) + print(f"Executed: {statement[:50]}...") + except Exception as e: + if "already exists" in str(e).lower() or "duplicate" in str(e).lower(): + print(f"Skipped (already exists): {statement[:50]}...") + else: + raise + + await session.commit() + print("\nMigration completed successfully!") + + await engine.dispose() + + +if __name__ == "__main__": + asyncio.run(run_migration())