340 lines
11 KiB
Python
340 lines
11 KiB
Python
|
|
"""
|
||
|
|
Knowledge base indexing service with optimized embedding.
|
||
|
|
Reference: rag-optimization/spec.md Section 5.1
|
||
|
|
"""
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
import logging
|
||
|
|
import uuid
|
||
|
|
from dataclasses import dataclass, field
|
||
|
|
from datetime import datetime
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
from app.core.config import get_settings
|
||
|
|
from app.core.qdrant_client import QdrantClient, get_qdrant_client
|
||
|
|
from app.services.embedding.nomic_provider import NomicEmbeddingProvider, NomicEmbeddingResult
|
||
|
|
from app.services.retrieval.metadata import ChunkMetadata, KnowledgeChunk
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
settings = get_settings()
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class IndexingProgress:
|
||
|
|
"""Progress tracking for indexing jobs."""
|
||
|
|
total_chunks: int = 0
|
||
|
|
processed_chunks: int = 0
|
||
|
|
failed_chunks: int = 0
|
||
|
|
current_document: str = ""
|
||
|
|
started_at: datetime = field(default_factory=datetime.utcnow)
|
||
|
|
|
||
|
|
@property
|
||
|
|
def progress_percent(self) -> int:
|
||
|
|
if self.total_chunks == 0:
|
||
|
|
return 0
|
||
|
|
return int((self.processed_chunks / self.total_chunks) * 100)
|
||
|
|
|
||
|
|
@property
|
||
|
|
def elapsed_seconds(self) -> float:
|
||
|
|
return (datetime.utcnow() - self.started_at).total_seconds()
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class IndexingResult:
|
||
|
|
"""Result of an indexing operation."""
|
||
|
|
success: bool
|
||
|
|
total_chunks: int
|
||
|
|
indexed_chunks: int
|
||
|
|
failed_chunks: int
|
||
|
|
elapsed_seconds: float
|
||
|
|
error_message: str | None = None
|
||
|
|
|
||
|
|
|
||
|
|
class KnowledgeIndexer:
|
||
|
|
"""
|
||
|
|
Knowledge base indexer with optimized embedding.
|
||
|
|
|
||
|
|
Features:
|
||
|
|
- Task prefixes (search_document:) for document embedding
|
||
|
|
- Multi-dimensional vectors (256/512/768)
|
||
|
|
- Metadata support
|
||
|
|
- Batch processing
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(
|
||
|
|
self,
|
||
|
|
qdrant_client: QdrantClient | None = None,
|
||
|
|
embedding_provider: NomicEmbeddingProvider | None = None,
|
||
|
|
chunk_size: int = 500,
|
||
|
|
chunk_overlap: int = 50,
|
||
|
|
batch_size: int = 10,
|
||
|
|
):
|
||
|
|
self._qdrant_client = qdrant_client
|
||
|
|
self._embedding_provider = embedding_provider
|
||
|
|
self._chunk_size = chunk_size
|
||
|
|
self._chunk_overlap = chunk_overlap
|
||
|
|
self._batch_size = batch_size
|
||
|
|
self._progress: IndexingProgress | None = None
|
||
|
|
|
||
|
|
async def _get_client(self) -> QdrantClient:
|
||
|
|
if self._qdrant_client is None:
|
||
|
|
self._qdrant_client = await get_qdrant_client()
|
||
|
|
return self._qdrant_client
|
||
|
|
|
||
|
|
async def _get_embedding_provider(self) -> NomicEmbeddingProvider:
|
||
|
|
if self._embedding_provider is None:
|
||
|
|
self._embedding_provider = NomicEmbeddingProvider(
|
||
|
|
base_url=settings.ollama_base_url,
|
||
|
|
model=settings.ollama_embedding_model,
|
||
|
|
dimension=settings.qdrant_vector_size,
|
||
|
|
)
|
||
|
|
return self._embedding_provider
|
||
|
|
|
||
|
|
def chunk_text(self, text: str, metadata: ChunkMetadata | None = None) -> list[KnowledgeChunk]:
|
||
|
|
"""
|
||
|
|
Split text into chunks for indexing.
|
||
|
|
Each line becomes a separate chunk for better retrieval granularity.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
text: Full text to chunk
|
||
|
|
metadata: Metadata to attach to each chunk
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of KnowledgeChunk objects
|
||
|
|
"""
|
||
|
|
chunks = []
|
||
|
|
doc_id = str(uuid.uuid4())
|
||
|
|
|
||
|
|
lines = text.split('\n')
|
||
|
|
|
||
|
|
for i, line in enumerate(lines):
|
||
|
|
line = line.strip()
|
||
|
|
|
||
|
|
if len(line) < 10:
|
||
|
|
continue
|
||
|
|
|
||
|
|
chunk = KnowledgeChunk(
|
||
|
|
chunk_id=f"{doc_id}_{i}",
|
||
|
|
document_id=doc_id,
|
||
|
|
content=line,
|
||
|
|
metadata=metadata or ChunkMetadata(),
|
||
|
|
)
|
||
|
|
chunks.append(chunk)
|
||
|
|
|
||
|
|
return chunks
|
||
|
|
|
||
|
|
def chunk_text_by_lines(
|
||
|
|
self,
|
||
|
|
text: str,
|
||
|
|
metadata: ChunkMetadata | None = None,
|
||
|
|
min_line_length: int = 10,
|
||
|
|
merge_short_lines: bool = False,
|
||
|
|
) -> list[KnowledgeChunk]:
|
||
|
|
"""
|
||
|
|
Split text by lines, each line is a separate chunk.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
text: Full text to chunk
|
||
|
|
metadata: Metadata to attach to each chunk
|
||
|
|
min_line_length: Minimum line length to be indexed
|
||
|
|
merge_short_lines: Whether to merge consecutive short lines
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of KnowledgeChunk objects
|
||
|
|
"""
|
||
|
|
chunks = []
|
||
|
|
doc_id = str(uuid.uuid4())
|
||
|
|
|
||
|
|
lines = text.split('\n')
|
||
|
|
|
||
|
|
if merge_short_lines:
|
||
|
|
merged_lines = []
|
||
|
|
current_line = ""
|
||
|
|
|
||
|
|
for line in lines:
|
||
|
|
line = line.strip()
|
||
|
|
if not line:
|
||
|
|
if current_line:
|
||
|
|
merged_lines.append(current_line)
|
||
|
|
current_line = ""
|
||
|
|
continue
|
||
|
|
|
||
|
|
if current_line:
|
||
|
|
current_line += " " + line
|
||
|
|
else:
|
||
|
|
current_line = line
|
||
|
|
|
||
|
|
if len(current_line) >= min_line_length * 2:
|
||
|
|
merged_lines.append(current_line)
|
||
|
|
current_line = ""
|
||
|
|
|
||
|
|
if current_line:
|
||
|
|
merged_lines.append(current_line)
|
||
|
|
|
||
|
|
lines = merged_lines
|
||
|
|
|
||
|
|
for i, line in enumerate(lines):
|
||
|
|
line = line.strip()
|
||
|
|
|
||
|
|
if len(line) < min_line_length:
|
||
|
|
continue
|
||
|
|
|
||
|
|
chunk = KnowledgeChunk(
|
||
|
|
chunk_id=f"{doc_id}_{i}",
|
||
|
|
document_id=doc_id,
|
||
|
|
content=line,
|
||
|
|
metadata=metadata or ChunkMetadata(),
|
||
|
|
)
|
||
|
|
chunks.append(chunk)
|
||
|
|
|
||
|
|
return chunks
|
||
|
|
|
||
|
|
async def index_document(
|
||
|
|
self,
|
||
|
|
tenant_id: str,
|
||
|
|
document_id: str,
|
||
|
|
text: str,
|
||
|
|
metadata: ChunkMetadata | None = None,
|
||
|
|
) -> IndexingResult:
|
||
|
|
"""
|
||
|
|
Index a single document with optimized embedding.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
tenant_id: Tenant identifier
|
||
|
|
document_id: Document identifier
|
||
|
|
text: Document text content
|
||
|
|
metadata: Optional metadata for the document
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
IndexingResult with status and statistics
|
||
|
|
"""
|
||
|
|
start_time = datetime.utcnow()
|
||
|
|
|
||
|
|
try:
|
||
|
|
client = await self._get_client()
|
||
|
|
provider = await self._get_embedding_provider()
|
||
|
|
|
||
|
|
await client.ensure_collection_exists(tenant_id, use_multi_vector=True)
|
||
|
|
|
||
|
|
chunks = self.chunk_text(text, metadata)
|
||
|
|
|
||
|
|
self._progress = IndexingProgress(
|
||
|
|
total_chunks=len(chunks),
|
||
|
|
current_document=document_id,
|
||
|
|
)
|
||
|
|
|
||
|
|
points = []
|
||
|
|
for i, chunk in enumerate(chunks):
|
||
|
|
try:
|
||
|
|
embedding_result = await provider.embed_document(chunk.content)
|
||
|
|
|
||
|
|
chunk.embedding_full = embedding_result.embedding_full
|
||
|
|
chunk.embedding_256 = embedding_result.embedding_256
|
||
|
|
chunk.embedding_512 = embedding_result.embedding_512
|
||
|
|
|
||
|
|
point = {
|
||
|
|
"id": str(uuid.uuid4()), # Generate a valid UUID for Qdrant
|
||
|
|
"vector": {
|
||
|
|
"full": chunk.embedding_full,
|
||
|
|
"dim_256": chunk.embedding_256,
|
||
|
|
"dim_512": chunk.embedding_512,
|
||
|
|
},
|
||
|
|
"payload": {
|
||
|
|
"chunk_id": chunk.chunk_id,
|
||
|
|
"document_id": document_id,
|
||
|
|
"text": chunk.content,
|
||
|
|
"metadata": chunk.metadata.to_dict(),
|
||
|
|
"created_at": chunk.created_at.isoformat(),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
points.append(point)
|
||
|
|
|
||
|
|
self._progress.processed_chunks += 1
|
||
|
|
|
||
|
|
logger.debug(
|
||
|
|
f"[RAG-OPT] Indexed chunk {i+1}/{len(chunks)} for doc={document_id}"
|
||
|
|
)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"[RAG-OPT] Failed to index chunk {i}: {e}")
|
||
|
|
self._progress.failed_chunks += 1
|
||
|
|
|
||
|
|
if points:
|
||
|
|
await client.upsert_multi_vector(tenant_id, points)
|
||
|
|
|
||
|
|
elapsed = (datetime.utcnow() - start_time).total_seconds()
|
||
|
|
|
||
|
|
logger.info(
|
||
|
|
f"[RAG-OPT] Indexed document {document_id}: "
|
||
|
|
f"{len(points)} chunks in {elapsed:.2f}s"
|
||
|
|
)
|
||
|
|
|
||
|
|
return IndexingResult(
|
||
|
|
success=True,
|
||
|
|
total_chunks=len(chunks),
|
||
|
|
indexed_chunks=len(points),
|
||
|
|
failed_chunks=self._progress.failed_chunks,
|
||
|
|
elapsed_seconds=elapsed,
|
||
|
|
)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
elapsed = (datetime.utcnow() - start_time).total_seconds()
|
||
|
|
logger.error(f"[RAG-OPT] Failed to index document {document_id}: {e}")
|
||
|
|
|
||
|
|
return IndexingResult(
|
||
|
|
success=False,
|
||
|
|
total_chunks=0,
|
||
|
|
indexed_chunks=0,
|
||
|
|
failed_chunks=0,
|
||
|
|
elapsed_seconds=elapsed,
|
||
|
|
error_message=str(e),
|
||
|
|
)
|
||
|
|
|
||
|
|
async def index_documents_batch(
|
||
|
|
self,
|
||
|
|
tenant_id: str,
|
||
|
|
documents: list[dict[str, Any]],
|
||
|
|
) -> list[IndexingResult]:
|
||
|
|
"""
|
||
|
|
Index multiple documents in batch.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
tenant_id: Tenant identifier
|
||
|
|
documents: List of documents with format:
|
||
|
|
{
|
||
|
|
"document_id": str,
|
||
|
|
"text": str,
|
||
|
|
"metadata": ChunkMetadata (optional)
|
||
|
|
}
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of IndexingResult for each document
|
||
|
|
"""
|
||
|
|
results = []
|
||
|
|
|
||
|
|
for doc in documents:
|
||
|
|
result = await self.index_document(
|
||
|
|
tenant_id=tenant_id,
|
||
|
|
document_id=doc["document_id"],
|
||
|
|
text=doc["text"],
|
||
|
|
metadata=doc.get("metadata"),
|
||
|
|
)
|
||
|
|
results.append(result)
|
||
|
|
|
||
|
|
return results
|
||
|
|
|
||
|
|
def get_progress(self) -> IndexingProgress | None:
|
||
|
|
"""Get current indexing progress."""
|
||
|
|
return self._progress
|
||
|
|
|
||
|
|
|
||
|
|
_knowledge_indexer: KnowledgeIndexer | None = None
|
||
|
|
|
||
|
|
|
||
|
|
def get_knowledge_indexer() -> KnowledgeIndexer:
|
||
|
|
"""Get or create KnowledgeIndexer instance."""
|
||
|
|
global _knowledge_indexer
|
||
|
|
if _knowledge_indexer is None:
|
||
|
|
_knowledge_indexer = KnowledgeIndexer()
|
||
|
|
return _knowledge_indexer
|