""" Database initialization script for AI Service. Run this script to create the database and all required tables. Usage: python scripts/init_db.py [--create-db] Options: --create-db Create the database if it doesn't exist """ import asyncio import argparse from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy import text from app.core.config import get_settings CREATE_TABLES_SQL = [ """ CREATE TABLE IF NOT EXISTS chat_sessions ( id UUID NOT NULL PRIMARY KEY, tenant_id VARCHAR NOT NULL, session_id VARCHAR NOT NULL, channel_type VARCHAR, metadata JSON, created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL, updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL ) """, """ CREATE TABLE IF NOT EXISTS chat_messages ( id UUID NOT NULL PRIMARY KEY, tenant_id VARCHAR NOT NULL, session_id VARCHAR NOT NULL, role VARCHAR NOT NULL, content TEXT NOT NULL, created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL ) """, """ CREATE TABLE IF NOT EXISTS knowledge_bases ( id UUID NOT NULL PRIMARY KEY, tenant_id VARCHAR NOT NULL, name VARCHAR NOT NULL, description VARCHAR, created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL, updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL ) """, """ CREATE TABLE IF NOT EXISTS documents ( id UUID NOT NULL PRIMARY KEY, tenant_id VARCHAR NOT NULL, kb_id VARCHAR NOT NULL, file_name VARCHAR NOT NULL, file_path VARCHAR, file_size INTEGER, file_type VARCHAR, status VARCHAR NOT NULL DEFAULT 'pending', error_msg VARCHAR, created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL, updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL ) """, """ CREATE TABLE IF NOT EXISTS index_jobs ( id UUID NOT NULL PRIMARY KEY, tenant_id VARCHAR NOT NULL, doc_id UUID NOT NULL, status VARCHAR NOT NULL DEFAULT 'pending', progress INTEGER NOT NULL DEFAULT 0, error_msg VARCHAR, created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL, updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL ) """, ] CREATE_INDEXES_SQL = [ "CREATE INDEX IF NOT EXISTS ix_chat_sessions_tenant_id ON chat_sessions (tenant_id)", "CREATE UNIQUE INDEX IF NOT EXISTS ix_chat_sessions_tenant_session ON chat_sessions (tenant_id, session_id)", "CREATE INDEX IF NOT EXISTS ix_chat_messages_tenant_id ON chat_messages (tenant_id)", "CREATE INDEX IF NOT EXISTS ix_chat_messages_tenant_session ON chat_messages (tenant_id, session_id)", "CREATE INDEX IF NOT EXISTS ix_chat_messages_tenant_session_created ON chat_messages (tenant_id, session_id, created_at)", "CREATE INDEX IF NOT EXISTS ix_knowledge_bases_tenant_id ON knowledge_bases (tenant_id)", "CREATE INDEX IF NOT EXISTS ix_documents_tenant_id ON documents (tenant_id)", "CREATE INDEX IF NOT EXISTS ix_documents_tenant_kb ON documents (tenant_id, kb_id)", "CREATE INDEX IF NOT EXISTS ix_documents_tenant_status ON documents (tenant_id, status)", "CREATE INDEX IF NOT EXISTS ix_index_jobs_tenant_id ON index_jobs (tenant_id)", "CREATE INDEX IF NOT EXISTS ix_index_jobs_tenant_doc ON index_jobs (tenant_id, doc_id)", "CREATE INDEX IF NOT EXISTS ix_index_jobs_tenant_status ON index_jobs (tenant_id, status)", ] async def create_database_if_not_exists(settings): """Create database if it doesn't exist.""" db_url = settings.database_url postgres_url = db_url.rsplit("/", 1)[0] + "/postgres" engine = create_async_engine( postgres_url, isolation_level="AUTOCOMMIT", pool_size=1, ) db_name = db_url.rsplit("/", 1)[-1].split("?")[0] try: async with engine.connect() as conn: result = await conn.execute( text(f"SELECT datname FROM pg_database WHERE datname = '{db_name}'") ) exists = result.fetchone() if not exists: print(f"Creating database '{db_name}'...") await conn.execute(text(f'CREATE DATABASE "{db_name}"')) print(f"Database '{db_name}' created successfully!") else: print(f"Database '{db_name}' already exists.") except Exception as e: print(f"Error creating database: {e}") raise finally: await engine.dispose() async def create_tables(settings): """Create all tables (idempotent).""" engine = create_async_engine(settings.database_url) try: async with engine.begin() as conn: for stmt in CREATE_TABLES_SQL: await conn.execute(text(stmt.strip())) for stmt in CREATE_INDEXES_SQL: try: await conn.execute(text(stmt)) except Exception as e: if "already exists" in str(e).lower() or "已经存在" in str(e): continue raise print("Tables and indexes created/verified successfully!") async with engine.connect() as conn: result = await conn.execute( text("SELECT tablename FROM pg_tables WHERE schemaname = 'public' ORDER BY tablename") ) tables = [row[0] for row in result] print(f"Tables in database: {tables}") except Exception as e: print(f"Error creating tables: {e}") raise finally: await engine.dispose() async def main(): parser = argparse.ArgumentParser(description="Initialize AI Service database") parser.add_argument("--create-db", action="store_true", help="Create database if it doesn't exist") args = parser.parse_args() settings = get_settings() print(f"Database URL: {settings.database_url.split('@')[1] if '@' in settings.database_url else settings.database_url}") if args.create_db: await create_database_if_not_exists(settings) await create_tables(settings) print("\nDatabase initialization complete!") if __name__ == "__main__": asyncio.run(main())