feat: add PostgreSQL client and migration runner

This commit is contained in:
MythEclipse
2026-05-14 14:45:21 +07:00
parent 818a059121
commit caf90ea9e6
2 changed files with 301 additions and 0 deletions

130
src/database/migrations.ts Normal file
View File

@@ -0,0 +1,130 @@
import { createChildLogger } from "../logger";
import { query } from "./postgres";
const logger = createChildLogger("migrations");
/**
* Run all database migrations to create schema
*/
export async function runMigrations(): Promise<void> {
logger.info("Starting database migrations");
try {
// Create muxer_jobs table
await query(`
CREATE TABLE IF NOT EXISTS muxer_jobs (
id TEXT PRIMARY KEY,
data TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
attempts INTEGER NOT NULL DEFAULT 0,
maxAttempts INTEGER NOT NULL DEFAULT 3,
createdAt BIGINT NOT NULL,
updatedAt BIGINT NOT NULL,
error TEXT
)
`);
logger.debug("Created muxer_jobs table");
await query(`
CREATE INDEX IF NOT EXISTS idx_muxer_jobs_status ON muxer_jobs(status)
`);
await query(`
CREATE INDEX IF NOT EXISTS idx_muxer_jobs_createdAt ON muxer_jobs(createdAt)
`);
logger.debug("Created muxer_jobs indexes");
// Create messages table
await query(`
CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY,
guild_id TEXT NOT NULL,
channel_id TEXT NOT NULL,
thread_id TEXT,
user_id TEXT NOT NULL,
username TEXT NOT NULL,
avatar_url TEXT,
content TEXT NOT NULL,
edited_content TEXT,
created_at BIGINT NOT NULL,
edited_at BIGINT,
deleted_at BIGINT,
type TEXT NOT NULL DEFAULT 'text',
metadata TEXT,
ai_status TEXT NOT NULL DEFAULT 'pending',
ai_moderation_flags TEXT,
ai_moderation_score REAL,
ai_moderation_raw TEXT,
ai_analysis TEXT,
ai_analyzed_at BIGINT,
ai_error TEXT
)
`);
logger.debug("Created messages table");
await query(`
CREATE INDEX IF NOT EXISTS idx_messages_channel ON messages(channel_id)
`);
await query(`
CREATE INDEX IF NOT EXISTS idx_messages_user ON messages(user_id)
`);
await query(`
CREATE INDEX IF NOT EXISTS idx_messages_created ON messages(created_at DESC)
`);
await query(`
CREATE INDEX IF NOT EXISTS idx_messages_thread ON messages(thread_id)
`);
logger.debug("Created messages indexes");
// Create attachments table
await query(`
CREATE TABLE IF NOT EXISTS attachments (
id TEXT PRIMARY KEY,
message_id TEXT NOT NULL,
guild_id TEXT NOT NULL,
channel_id TEXT NOT NULL,
thread_id TEXT,
user_id TEXT NOT NULL,
filename TEXT NOT NULL,
size INTEGER NOT NULL,
type TEXT NOT NULL,
discord_url TEXT NOT NULL,
uploaded_url TEXT,
upload_status TEXT NOT NULL DEFAULT 'pending',
upload_error TEXT,
created_at BIGINT NOT NULL,
uploaded_at BIGINT,
FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE
)
`);
logger.debug("Created attachments table");
await query(`
CREATE INDEX IF NOT EXISTS idx_attachments_channel ON attachments(channel_id)
`);
await query(`
CREATE INDEX IF NOT EXISTS idx_attachments_message ON attachments(message_id)
`);
await query(`
CREATE INDEX IF NOT EXISTS idx_attachments_status ON attachments(upload_status)
`);
logger.debug("Created attachments indexes");
// Create ui_state table
await query(`
CREATE TABLE IF NOT EXISTS ui_state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at BIGINT NOT NULL
)
`);
logger.debug("Created ui_state table");
logger.info("Database migrations completed successfully");
} catch (error) {
logger.error(
{ error: error instanceof Error ? error.message : String(error) },
"Database migrations failed",
);
throw error;
}
}

171
src/database/postgres.ts Normal file
View File

@@ -0,0 +1,171 @@
import { Pool, PoolClient, QueryResult, QueryResultRow } from "pg";
import { createChildLogger } from "../logger";
import { config } from "../config";
const logger = createChildLogger("postgres");
export interface PostgresConfig {
host: string;
port: number;
user?: string;
password?: string;
database?: string;
min: number;
max: number;
}
let pool: Pool | null = null;
/**
* Parse DATABASE_URL or build config from individual POSTGRES_* variables
*/
export function buildConfig(): PostgresConfig {
if (config.DATABASE_URL) {
try {
const url = new URL(config.DATABASE_URL);
return {
host: url.hostname || "localhost",
port: url.port ? parseInt(url.port, 10) : 5432,
user: url.username || undefined,
password: url.password || undefined,
database: url.pathname?.slice(1) || undefined,
min: config.POSTGRES_POOL_MIN,
max: config.POSTGRES_POOL_MAX,
};
} catch (error) {
logger.warn(
{ error: error instanceof Error ? error.message : String(error) },
"Failed to parse DATABASE_URL, falling back to individual variables",
);
}
}
return {
host: config.POSTGRES_HOST,
port: config.POSTGRES_PORT,
user: config.POSTGRES_USER,
password: config.POSTGRES_PASSWORD,
database: config.POSTGRES_DB,
min: config.POSTGRES_POOL_MIN,
max: config.POSTGRES_POOL_MAX,
};
}
/**
* Initialize PostgreSQL connection pool
*/
export function initializePool(): Pool {
const pgConfig = buildConfig();
logger.info(
{
host: pgConfig.host,
port: pgConfig.port,
database: pgConfig.database,
min: pgConfig.min,
max: pgConfig.max,
},
"Initializing PostgreSQL connection pool",
);
const newPool = new Pool({
host: pgConfig.host,
port: pgConfig.port,
user: pgConfig.user,
password: pgConfig.password,
database: pgConfig.database,
min: pgConfig.min,
max: pgConfig.max,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000,
});
newPool.on("error", (error) => {
logger.error(
{ error: error instanceof Error ? error.message : String(error) },
"Unexpected error on idle client in pool",
);
});
newPool.on("connect", () => {
logger.debug("New client connected to pool");
});
newPool.on("remove", () => {
logger.debug("Client removed from pool");
});
return newPool;
}
/**
* Get or initialize the connection pool
*/
export function getPool(): Pool {
if (!pool) {
pool = initializePool();
}
return pool;
}
/**
* Execute a query with type safety
*/
export async function query<T extends QueryResultRow = QueryResultRow>(
text: string,
values?: unknown[],
): Promise<QueryResult<T>> {
const client = await getPool().connect();
try {
logger.debug({ text, values: values?.length || 0 }, "Executing query");
return await client.query<T>(text, values);
} catch (error) {
logger.error(
{
text,
error: error instanceof Error ? error.message : String(error),
},
"Query execution failed",
);
throw error;
} finally {
client.release();
}
}
/**
* Get a client from the pool for transaction or batch operations
*/
export async function getClient(): Promise<PoolClient> {
try {
const client = await getPool().connect();
logger.debug("Client acquired from pool");
return client;
} catch (error) {
logger.error(
{ error: error instanceof Error ? error.message : String(error) },
"Failed to acquire client from pool",
);
throw error;
}
}
/**
* Close the connection pool
*/
export async function closePool(): Promise<void> {
if (pool) {
try {
logger.info("Closing PostgreSQL connection pool");
await pool.end();
pool = null;
logger.info("PostgreSQL connection pool closed");
} catch (error) {
logger.error(
{ error: error instanceof Error ? error.message : String(error) },
"Error closing connection pool",
);
throw error;
}
}
}