ai-robot-core/ai-service/scripts/init_db.py

179 lines
6.1 KiB
Python
Raw Permalink Normal View History

"""
Database initialization script for AI Service.
Run this script to create the database and all required tables.
Usage:
python scripts/init_db.py [--create-db]
Options:
--create-db Create the database if it doesn't exist
"""
import asyncio
import argparse
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import text
from app.core.config import get_settings
CREATE_TABLES_SQL = [
"""
CREATE TABLE IF NOT EXISTS chat_sessions (
id UUID NOT NULL PRIMARY KEY,
tenant_id VARCHAR NOT NULL,
session_id VARCHAR NOT NULL,
channel_type VARCHAR,
metadata JSON,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
)
""",
"""
CREATE TABLE IF NOT EXISTS chat_messages (
id UUID NOT NULL PRIMARY KEY,
tenant_id VARCHAR NOT NULL,
session_id VARCHAR NOT NULL,
role VARCHAR NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
)
""",
"""
CREATE TABLE IF NOT EXISTS knowledge_bases (
id UUID NOT NULL PRIMARY KEY,
tenant_id VARCHAR NOT NULL,
name VARCHAR NOT NULL,
description VARCHAR,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
)
""",
"""
CREATE TABLE IF NOT EXISTS documents (
id UUID NOT NULL PRIMARY KEY,
tenant_id VARCHAR NOT NULL,
kb_id VARCHAR NOT NULL,
file_name VARCHAR NOT NULL,
file_path VARCHAR,
file_size INTEGER,
file_type VARCHAR,
status VARCHAR NOT NULL DEFAULT 'pending',
error_msg VARCHAR,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
)
""",
"""
CREATE TABLE IF NOT EXISTS index_jobs (
id UUID NOT NULL PRIMARY KEY,
tenant_id VARCHAR NOT NULL,
doc_id UUID NOT NULL,
status VARCHAR NOT NULL DEFAULT 'pending',
progress INTEGER NOT NULL DEFAULT 0,
error_msg VARCHAR,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
)
""",
]
CREATE_INDEXES_SQL = [
"CREATE INDEX IF NOT EXISTS ix_chat_sessions_tenant_id ON chat_sessions (tenant_id)",
"CREATE UNIQUE INDEX IF NOT EXISTS ix_chat_sessions_tenant_session ON chat_sessions (tenant_id, session_id)",
"CREATE INDEX IF NOT EXISTS ix_chat_messages_tenant_id ON chat_messages (tenant_id)",
"CREATE INDEX IF NOT EXISTS ix_chat_messages_tenant_session ON chat_messages (tenant_id, session_id)",
"CREATE INDEX IF NOT EXISTS ix_chat_messages_tenant_session_created ON chat_messages (tenant_id, session_id, created_at)",
"CREATE INDEX IF NOT EXISTS ix_knowledge_bases_tenant_id ON knowledge_bases (tenant_id)",
"CREATE INDEX IF NOT EXISTS ix_documents_tenant_id ON documents (tenant_id)",
"CREATE INDEX IF NOT EXISTS ix_documents_tenant_kb ON documents (tenant_id, kb_id)",
"CREATE INDEX IF NOT EXISTS ix_documents_tenant_status ON documents (tenant_id, status)",
"CREATE INDEX IF NOT EXISTS ix_index_jobs_tenant_id ON index_jobs (tenant_id)",
"CREATE INDEX IF NOT EXISTS ix_index_jobs_tenant_doc ON index_jobs (tenant_id, doc_id)",
"CREATE INDEX IF NOT EXISTS ix_index_jobs_tenant_status ON index_jobs (tenant_id, status)",
]
async def create_database_if_not_exists(settings):
"""Create database if it doesn't exist."""
db_url = settings.database_url
postgres_url = db_url.rsplit("/", 1)[0] + "/postgres"
engine = create_async_engine(
postgres_url,
isolation_level="AUTOCOMMIT",
pool_size=1,
)
db_name = db_url.rsplit("/", 1)[-1].split("?")[0]
try:
async with engine.connect() as conn:
result = await conn.execute(
text(f"SELECT datname FROM pg_database WHERE datname = '{db_name}'")
)
exists = result.fetchone()
if not exists:
print(f"Creating database '{db_name}'...")
await conn.execute(text(f'CREATE DATABASE "{db_name}"'))
print(f"Database '{db_name}' created successfully!")
else:
print(f"Database '{db_name}' already exists.")
except Exception as e:
print(f"Error creating database: {e}")
raise
finally:
await engine.dispose()
async def create_tables(settings):
"""Create all tables (idempotent)."""
engine = create_async_engine(settings.database_url)
try:
async with engine.begin() as conn:
for stmt in CREATE_TABLES_SQL:
await conn.execute(text(stmt.strip()))
for stmt in CREATE_INDEXES_SQL:
try:
await conn.execute(text(stmt))
except Exception as e:
if "already exists" in str(e).lower() or "已经存在" in str(e):
continue
raise
print("Tables and indexes created/verified successfully!")
async with engine.connect() as conn:
result = await conn.execute(
text("SELECT tablename FROM pg_tables WHERE schemaname = 'public' ORDER BY tablename")
)
tables = [row[0] for row in result]
print(f"Tables in database: {tables}")
except Exception as e:
print(f"Error creating tables: {e}")
raise
finally:
await engine.dispose()
async def main():
parser = argparse.ArgumentParser(description="Initialize AI Service database")
parser.add_argument("--create-db", action="store_true", help="Create database if it doesn't exist")
args = parser.parse_args()
settings = get_settings()
print(f"Database URL: {settings.database_url.split('@')[1] if '@' in settings.database_url else settings.database_url}")
if args.create_db:
await create_database_if_not_exists(settings)
await create_tables(settings)
print("\nDatabase initialization complete!")
if __name__ == "__main__":
asyncio.run(main())