""" Knowledge base indexing service with optimized embedding. Reference: rag-optimization/spec.md Section 5.1 """ import asyncio import logging import uuid from dataclasses import dataclass, field from datetime import datetime from typing import Any from app.core.config import get_settings from app.core.qdrant_client import QdrantClient, get_qdrant_client from app.services.embedding.nomic_provider import NomicEmbeddingProvider, NomicEmbeddingResult from app.services.retrieval.metadata import ChunkMetadata, KnowledgeChunk logger = logging.getLogger(__name__) settings = get_settings() @dataclass class IndexingProgress: """Progress tracking for indexing jobs.""" total_chunks: int = 0 processed_chunks: int = 0 failed_chunks: int = 0 current_document: str = "" started_at: datetime = field(default_factory=datetime.utcnow) @property def progress_percent(self) -> int: if self.total_chunks == 0: return 0 return int((self.processed_chunks / self.total_chunks) * 100) @property def elapsed_seconds(self) -> float: return (datetime.utcnow() - self.started_at).total_seconds() @dataclass class IndexingResult: """Result of an indexing operation.""" success: bool total_chunks: int indexed_chunks: int failed_chunks: int elapsed_seconds: float error_message: str | None = None class KnowledgeIndexer: """ Knowledge base indexer with optimized embedding. Features: - Task prefixes (search_document:) for document embedding - Multi-dimensional vectors (256/512/768) - Metadata support - Batch processing """ def __init__( self, qdrant_client: QdrantClient | None = None, embedding_provider: NomicEmbeddingProvider | None = None, chunk_size: int = 500, chunk_overlap: int = 50, batch_size: int = 10, ): self._qdrant_client = qdrant_client self._embedding_provider = embedding_provider self._chunk_size = chunk_size self._chunk_overlap = chunk_overlap self._batch_size = batch_size self._progress: IndexingProgress | None = None async def _get_client(self) -> QdrantClient: if self._qdrant_client is None: self._qdrant_client = await get_qdrant_client() return self._qdrant_client async def _get_embedding_provider(self) -> NomicEmbeddingProvider: if self._embedding_provider is None: self._embedding_provider = NomicEmbeddingProvider( base_url=settings.ollama_base_url, model=settings.ollama_embedding_model, dimension=settings.qdrant_vector_size, ) return self._embedding_provider def chunk_text(self, text: str, metadata: ChunkMetadata | None = None) -> list[KnowledgeChunk]: """ Split text into chunks for indexing. Each line becomes a separate chunk for better retrieval granularity. Args: text: Full text to chunk metadata: Metadata to attach to each chunk Returns: List of KnowledgeChunk objects """ chunks = [] doc_id = str(uuid.uuid4()) lines = text.split('\n') for i, line in enumerate(lines): line = line.strip() if len(line) < 10: continue chunk = KnowledgeChunk( chunk_id=f"{doc_id}_{i}", document_id=doc_id, content=line, metadata=metadata or ChunkMetadata(), ) chunks.append(chunk) return chunks def chunk_text_by_lines( self, text: str, metadata: ChunkMetadata | None = None, min_line_length: int = 10, merge_short_lines: bool = False, ) -> list[KnowledgeChunk]: """ Split text by lines, each line is a separate chunk. Args: text: Full text to chunk metadata: Metadata to attach to each chunk min_line_length: Minimum line length to be indexed merge_short_lines: Whether to merge consecutive short lines Returns: List of KnowledgeChunk objects """ chunks = [] doc_id = str(uuid.uuid4()) lines = text.split('\n') if merge_short_lines: merged_lines = [] current_line = "" for line in lines: line = line.strip() if not line: if current_line: merged_lines.append(current_line) current_line = "" continue if current_line: current_line += " " + line else: current_line = line if len(current_line) >= min_line_length * 2: merged_lines.append(current_line) current_line = "" if current_line: merged_lines.append(current_line) lines = merged_lines for i, line in enumerate(lines): line = line.strip() if len(line) < min_line_length: continue chunk = KnowledgeChunk( chunk_id=f"{doc_id}_{i}", document_id=doc_id, content=line, metadata=metadata or ChunkMetadata(), ) chunks.append(chunk) return chunks async def index_document( self, tenant_id: str, document_id: str, text: str, metadata: ChunkMetadata | None = None, ) -> IndexingResult: """ Index a single document with optimized embedding. Args: tenant_id: Tenant identifier document_id: Document identifier text: Document text content metadata: Optional metadata for the document Returns: IndexingResult with status and statistics """ start_time = datetime.utcnow() try: client = await self._get_client() provider = await self._get_embedding_provider() await client.ensure_collection_exists(tenant_id, use_multi_vector=True) chunks = self.chunk_text(text, metadata) self._progress = IndexingProgress( total_chunks=len(chunks), current_document=document_id, ) points = [] for i, chunk in enumerate(chunks): try: embedding_result = await provider.embed_document(chunk.content) chunk.embedding_full = embedding_result.embedding_full chunk.embedding_256 = embedding_result.embedding_256 chunk.embedding_512 = embedding_result.embedding_512 point = { "id": str(uuid.uuid4()), # Generate a valid UUID for Qdrant "vector": { "full": chunk.embedding_full, "dim_256": chunk.embedding_256, "dim_512": chunk.embedding_512, }, "payload": { "chunk_id": chunk.chunk_id, "document_id": document_id, "text": chunk.content, "metadata": chunk.metadata.to_dict(), "created_at": chunk.created_at.isoformat(), } } points.append(point) self._progress.processed_chunks += 1 logger.debug( f"[RAG-OPT] Indexed chunk {i+1}/{len(chunks)} for doc={document_id}" ) except Exception as e: logger.warning(f"[RAG-OPT] Failed to index chunk {i}: {e}") self._progress.failed_chunks += 1 if points: await client.upsert_multi_vector(tenant_id, points) elapsed = (datetime.utcnow() - start_time).total_seconds() logger.info( f"[RAG-OPT] Indexed document {document_id}: " f"{len(points)} chunks in {elapsed:.2f}s" ) return IndexingResult( success=True, total_chunks=len(chunks), indexed_chunks=len(points), failed_chunks=self._progress.failed_chunks, elapsed_seconds=elapsed, ) except Exception as e: elapsed = (datetime.utcnow() - start_time).total_seconds() logger.error(f"[RAG-OPT] Failed to index document {document_id}: {e}") return IndexingResult( success=False, total_chunks=0, indexed_chunks=0, failed_chunks=0, elapsed_seconds=elapsed, error_message=str(e), ) async def index_documents_batch( self, tenant_id: str, documents: list[dict[str, Any]], ) -> list[IndexingResult]: """ Index multiple documents in batch. Args: tenant_id: Tenant identifier documents: List of documents with format: { "document_id": str, "text": str, "metadata": ChunkMetadata (optional) } Returns: List of IndexingResult for each document """ results = [] for doc in documents: result = await self.index_document( tenant_id=tenant_id, document_id=doc["document_id"], text=doc["text"], metadata=doc.get("metadata"), ) results.append(result) return results def get_progress(self) -> IndexingProgress | None: """Get current indexing progress.""" return self._progress _knowledge_indexer: KnowledgeIndexer | None = None def get_knowledge_indexer() -> KnowledgeIndexer: """Get or create KnowledgeIndexer instance.""" global _knowledge_indexer if _knowledge_indexer is None: _knowledge_indexer = KnowledgeIndexer() return _knowledge_indexer