370 lines
11 KiB
Python
370 lines
11 KiB
Python
"""
|
|
Knowledge Base management endpoints.
|
|
[AC-ASA-01, AC-ASA-02, AC-ASA-08] Document upload, list, and index job status.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import uuid
|
|
from typing import Annotated, Optional
|
|
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, Query, UploadFile, File, Form
|
|
from fastapi.responses import JSONResponse
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.database import get_session
|
|
from app.core.exceptions import MissingTenantIdException
|
|
from app.core.tenant import get_tenant_id
|
|
from app.models import ErrorResponse
|
|
from app.models.entities import DocumentStatus, IndexJob, IndexJobStatus
|
|
from app.services.kb import KBService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/admin/kb", tags=["KB Management"])
|
|
|
|
|
|
def get_current_tenant_id() -> str:
|
|
"""Dependency to get current tenant ID or raise exception."""
|
|
tenant_id = get_tenant_id()
|
|
if not tenant_id:
|
|
raise MissingTenantIdException()
|
|
return tenant_id
|
|
|
|
|
|
@router.get(
|
|
"/knowledge-bases",
|
|
operation_id="listKnowledgeBases",
|
|
summary="Query knowledge base list",
|
|
description="Get list of knowledge bases for the current tenant.",
|
|
responses={
|
|
200: {"description": "Knowledge base list"},
|
|
401: {"description": "Unauthorized", "model": ErrorResponse},
|
|
403: {"description": "Forbidden", "model": ErrorResponse},
|
|
},
|
|
)
|
|
async def list_knowledge_bases(
|
|
tenant_id: Annotated[str, Depends(get_current_tenant_id)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
) -> JSONResponse:
|
|
"""
|
|
List all knowledge bases for the current tenant.
|
|
"""
|
|
logger.info(f"Listing knowledge bases: tenant={tenant_id}")
|
|
|
|
kb_service = KBService(session)
|
|
knowledge_bases = await kb_service.list_knowledge_bases(tenant_id)
|
|
|
|
kb_ids = [str(kb.id) for kb in knowledge_bases]
|
|
|
|
doc_counts = {}
|
|
if kb_ids:
|
|
from sqlalchemy import func
|
|
from app.models.entities import Document
|
|
count_stmt = (
|
|
select(Document.kb_id, func.count(Document.id).label("count"))
|
|
.where(Document.tenant_id == tenant_id, Document.kb_id.in_(kb_ids))
|
|
.group_by(Document.kb_id)
|
|
)
|
|
count_result = await session.execute(count_stmt)
|
|
for row in count_result:
|
|
doc_counts[row.kb_id] = row.count
|
|
|
|
data = []
|
|
for kb in knowledge_bases:
|
|
kb_id_str = str(kb.id)
|
|
data.append({
|
|
"id": kb_id_str,
|
|
"name": kb.name,
|
|
"documentCount": doc_counts.get(kb_id_str, 0),
|
|
"createdAt": kb.created_at.isoformat() + "Z",
|
|
})
|
|
|
|
return JSONResponse(content={"data": data})
|
|
|
|
|
|
@router.get(
|
|
"/documents",
|
|
operation_id="listDocuments",
|
|
summary="Query document list",
|
|
description="[AC-ASA-08] Get list of documents with pagination and filtering.",
|
|
responses={
|
|
200: {"description": "Document list with pagination"},
|
|
401: {"description": "Unauthorized", "model": ErrorResponse},
|
|
403: {"description": "Forbidden", "model": ErrorResponse},
|
|
},
|
|
)
|
|
async def list_documents(
|
|
tenant_id: Annotated[str, Depends(get_current_tenant_id)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
kb_id: Annotated[Optional[str], Query()] = None,
|
|
status: Annotated[Optional[str], Query()] = None,
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(20, ge=1, le=100),
|
|
) -> JSONResponse:
|
|
"""
|
|
[AC-ASA-08] List documents with filtering and pagination.
|
|
"""
|
|
logger.info(
|
|
f"[AC-ASA-08] Listing documents: tenant={tenant_id}, kb_id={kb_id}, "
|
|
f"status={status}, page={page}, page_size={page_size}"
|
|
)
|
|
|
|
kb_service = KBService(session)
|
|
documents, total = await kb_service.list_documents(
|
|
tenant_id=tenant_id,
|
|
kb_id=kb_id,
|
|
status=status,
|
|
page=page,
|
|
page_size=page_size,
|
|
)
|
|
|
|
total_pages = (total + page_size - 1) // page_size if total > 0 else 0
|
|
|
|
data = []
|
|
for doc in documents:
|
|
job_stmt = select(IndexJob).where(
|
|
IndexJob.tenant_id == tenant_id,
|
|
IndexJob.doc_id == doc.id,
|
|
).order_by(IndexJob.created_at.desc())
|
|
job_result = await session.execute(job_stmt)
|
|
latest_job = job_result.scalar_one_or_none()
|
|
|
|
data.append({
|
|
"docId": str(doc.id),
|
|
"kbId": doc.kb_id,
|
|
"fileName": doc.file_name,
|
|
"status": doc.status,
|
|
"jobId": str(latest_job.id) if latest_job else None,
|
|
"createdAt": doc.created_at.isoformat() + "Z",
|
|
"updatedAt": doc.updated_at.isoformat() + "Z",
|
|
})
|
|
|
|
return JSONResponse(
|
|
content={
|
|
"data": data,
|
|
"pagination": {
|
|
"page": page,
|
|
"pageSize": page_size,
|
|
"total": total,
|
|
"totalPages": total_pages,
|
|
},
|
|
}
|
|
)
|
|
|
|
|
|
@router.post(
|
|
"/documents",
|
|
operation_id="uploadDocument",
|
|
summary="Upload/import document",
|
|
description="[AC-ASA-01] Upload document and trigger indexing job.",
|
|
responses={
|
|
202: {"description": "Accepted - async indexing job started"},
|
|
401: {"description": "Unauthorized", "model": ErrorResponse},
|
|
403: {"description": "Forbidden", "model": ErrorResponse},
|
|
},
|
|
)
|
|
async def upload_document(
|
|
tenant_id: Annotated[str, Depends(get_current_tenant_id)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
background_tasks: BackgroundTasks,
|
|
file: UploadFile = File(...),
|
|
kb_id: str = Form(...),
|
|
) -> JSONResponse:
|
|
"""
|
|
[AC-ASA-01] Upload document and create indexing job.
|
|
"""
|
|
logger.info(
|
|
f"[AC-ASA-01] Uploading document: tenant={tenant_id}, "
|
|
f"kb_id={kb_id}, filename={file.filename}"
|
|
)
|
|
|
|
kb_service = KBService(session)
|
|
|
|
kb = await kb_service.get_or_create_kb(tenant_id, kb_id)
|
|
|
|
file_content = await file.read()
|
|
document, job = await kb_service.upload_document(
|
|
tenant_id=tenant_id,
|
|
kb_id=str(kb.id),
|
|
file_name=file.filename or "unknown",
|
|
file_content=file_content,
|
|
file_type=file.content_type,
|
|
)
|
|
|
|
await session.commit()
|
|
|
|
background_tasks.add_task(
|
|
_index_document, tenant_id, str(job.id), str(document.id), file_content
|
|
)
|
|
|
|
return JSONResponse(
|
|
status_code=202,
|
|
content={
|
|
"jobId": str(job.id),
|
|
"docId": str(document.id),
|
|
"status": job.status,
|
|
},
|
|
)
|
|
|
|
|
|
async def _index_document(tenant_id: str, job_id: str, doc_id: str, content: bytes):
|
|
"""
|
|
Background indexing task.
|
|
Uses Ollama nomic-embed-text for real embeddings.
|
|
"""
|
|
from app.core.database import async_session_maker
|
|
from app.services.kb import KBService
|
|
from app.core.qdrant_client import get_qdrant_client
|
|
from app.services.embedding.ollama_embedding import get_embedding
|
|
from qdrant_client.models import PointStruct
|
|
import asyncio
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
async with async_session_maker() as session:
|
|
kb_service = KBService(session)
|
|
try:
|
|
await kb_service.update_job_status(
|
|
tenant_id, job_id, IndexJobStatus.PROCESSING.value, progress=10
|
|
)
|
|
await session.commit()
|
|
|
|
text = content.decode("utf-8", errors="ignore")
|
|
|
|
chunks = [text[i:i+500] for i in range(0, len(text), 500)]
|
|
|
|
qdrant = await get_qdrant_client()
|
|
await qdrant.ensure_collection_exists(tenant_id)
|
|
|
|
points = []
|
|
for i, chunk in enumerate(chunks):
|
|
embedding = await get_embedding(chunk)
|
|
|
|
points.append(
|
|
PointStruct(
|
|
id=str(uuid.uuid4()),
|
|
vector=embedding,
|
|
payload={
|
|
"text": chunk,
|
|
"source": doc_id,
|
|
"chunk_index": i,
|
|
},
|
|
)
|
|
)
|
|
|
|
if points:
|
|
await qdrant.upsert_vectors(tenant_id, points)
|
|
|
|
await kb_service.update_job_status(
|
|
tenant_id, job_id, IndexJobStatus.COMPLETED.value, progress=100
|
|
)
|
|
await session.commit()
|
|
|
|
logger.info(
|
|
f"[AC-ASA-01] Indexing completed: tenant={tenant_id}, "
|
|
f"job_id={job_id}, chunks={len(chunks)}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"[AC-ASA-01] Indexing failed: {e}")
|
|
await session.rollback()
|
|
async with async_session_maker() as error_session:
|
|
kb_service = KBService(error_session)
|
|
await kb_service.update_job_status(
|
|
tenant_id, job_id, IndexJobStatus.FAILED.value,
|
|
progress=0, error_msg=str(e)
|
|
)
|
|
await error_session.commit()
|
|
|
|
|
|
@router.get(
|
|
"/index/jobs/{job_id}",
|
|
operation_id="getIndexJob",
|
|
summary="Query index job status",
|
|
description="[AC-ASA-02] Get indexing job status and progress.",
|
|
responses={
|
|
200: {"description": "Job status details"},
|
|
401: {"description": "Unauthorized", "model": ErrorResponse},
|
|
403: {"description": "Forbidden", "model": ErrorResponse},
|
|
},
|
|
)
|
|
async def get_index_job(
|
|
tenant_id: Annotated[str, Depends(get_current_tenant_id)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
job_id: str,
|
|
) -> JSONResponse:
|
|
"""
|
|
[AC-ASA-02] Get indexing job status with progress.
|
|
"""
|
|
logger.info(
|
|
f"[AC-ASA-02] Getting job status: tenant={tenant_id}, job_id={job_id}"
|
|
)
|
|
|
|
kb_service = KBService(session)
|
|
job = await kb_service.get_index_job(tenant_id, job_id)
|
|
|
|
if not job:
|
|
return JSONResponse(
|
|
status_code=404,
|
|
content={
|
|
"code": "JOB_NOT_FOUND",
|
|
"message": f"Job {job_id} not found",
|
|
},
|
|
)
|
|
|
|
return JSONResponse(
|
|
content={
|
|
"jobId": str(job.id),
|
|
"docId": str(job.doc_id),
|
|
"status": job.status,
|
|
"progress": job.progress,
|
|
"errorMsg": job.error_msg,
|
|
}
|
|
)
|
|
|
|
|
|
@router.delete(
|
|
"/documents/{doc_id}",
|
|
operation_id="deleteDocument",
|
|
summary="Delete document",
|
|
description="[AC-ASA-08] Delete a document and its associated files.",
|
|
responses={
|
|
200: {"description": "Document deleted"},
|
|
404: {"description": "Document not found"},
|
|
401: {"description": "Unauthorized", "model": ErrorResponse},
|
|
403: {"description": "Forbidden", "model": ErrorResponse},
|
|
},
|
|
)
|
|
async def delete_document(
|
|
tenant_id: Annotated[str, Depends(get_current_tenant_id)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
doc_id: str,
|
|
) -> JSONResponse:
|
|
"""
|
|
[AC-ASA-08] Delete a document.
|
|
"""
|
|
logger.info(
|
|
f"[AC-ASA-08] Deleting document: tenant={tenant_id}, doc_id={doc_id}"
|
|
)
|
|
|
|
kb_service = KBService(session)
|
|
deleted = await kb_service.delete_document(tenant_id, doc_id)
|
|
|
|
if not deleted:
|
|
return JSONResponse(
|
|
status_code=404,
|
|
content={
|
|
"code": "DOCUMENT_NOT_FOUND",
|
|
"message": f"Document {doc_id} not found",
|
|
},
|
|
)
|
|
|
|
return JSONResponse(
|
|
content={
|
|
"success": True,
|
|
"message": "Document deleted",
|
|
}
|
|
)
|