refactor: update muxer-queue to use database adapter

- Replace direct better-sqlite3 imports with DatabaseAdapter pattern
- Make all muxer-queue functions async to support both SQLite and PostgreSQL
- Update database initialization to use adapter's getDatabase()
- Export DatabaseAdapter as SqliteDatabase for backward compatibility
- Update index.ts to handle async database initialization
- Update webserver.ts to await async database operations
- All functions now work with both SQLite and PostgreSQL backends
- Tests pass, no TypeScript errors
This commit is contained in:
MythEclipse
2026-05-14 14:55:21 +07:00
parent 84e20ae373
commit 94a3acf12e
4 changed files with 114 additions and 91 deletions

View File

@@ -1,7 +1,8 @@
import path from "node:path";
import Database from "better-sqlite3";
import { createChildLogger } from "../logger"; import { createChildLogger } from "../logger";
import { config } from "../config"; import { config } from "../config";
import * as postgres from "./postgres"; import * as postgres from "./postgres";
import * as sqliteModule from "../muxer-queue";
const logger = createChildLogger("db-adapter"); const logger = createChildLogger("db-adapter");
@@ -107,9 +108,9 @@ class PostgresAdapter implements DatabaseAdapter {
* SQLite adapter wrapping better-sqlite3 * SQLite adapter wrapping better-sqlite3
*/ */
class SqliteAdapter implements DatabaseAdapter { class SqliteAdapter implements DatabaseAdapter {
private db: sqliteModule.SqliteDatabase; private db: Database.Database;
constructor(db: sqliteModule.SqliteDatabase) { constructor(db: Database.Database) {
this.db = db; this.db = db;
} }
@@ -127,10 +128,25 @@ class SqliteAdapter implements DatabaseAdapter {
} }
async close(): Promise<void> { async close(): Promise<void> {
await sqliteModule.closeQueue(); this.db.close();
} }
} }
// SQLite database instance (lazy initialized)
let sqliteDb: Database.Database | null = null;
function initializeSqliteDatabase(): Database.Database {
const dbPath = path.join(process.cwd(), ".muxer-queue.db");
return new Database(dbPath);
}
function getSqliteDatabase(): Database.Database {
if (!sqliteDb) {
sqliteDb = initializeSqliteDatabase();
}
return sqliteDb;
}
/** /**
* Get database adapter based on configuration * Get database adapter based on configuration
* Returns appropriate adapter (PostgreSQL or SQLite) * Returns appropriate adapter (PostgreSQL or SQLite)
@@ -150,7 +166,7 @@ export async function getDatabase(): Promise<DatabaseAdapter> {
return new PostgresAdapter(); return new PostgresAdapter();
} else { } else {
logger.info("Initializing SQLite adapter"); logger.info("Initializing SQLite adapter");
const db = sqliteModule.getDatabase(); const db = getSqliteDatabase();
logger.info("SQLite database initialized"); logger.info("SQLite database initialized");
return new SqliteAdapter(db); return new SqliteAdapter(db);
} }
@@ -167,7 +183,7 @@ export function getDatabaseSync(): DatabaseAdapter {
); );
return new PostgresAdapter(); return new PostgresAdapter();
} else { } else {
const db = sqliteModule.getDatabase(); const db = getSqliteDatabase();
return new SqliteAdapter(db); return new SqliteAdapter(db);
} }
} }

View File

@@ -22,10 +22,6 @@ logger.info("Creating Discord client");
const client = new Client(); const client = new Client();
const voiceController = new VoiceController(client); const voiceController = new VoiceController(client);
logger.info("Opening database");
const db = getDatabase();
logger.info("Database ready");
let isShuttingDown = false; let isShuttingDown = false;
async function gracefulShutdown(signal: string) { async function gracefulShutdown(signal: string) {
@@ -59,42 +55,55 @@ async function gracefulShutdown(signal: string) {
} }
} }
client.on("ready", async () => { logger.info("Opening database");
const dbPromise = getDatabase();
let db: Awaited<typeof dbPromise>;
async function initializeApp() {
db = await dbPromise;
logger.info("Database ready");
client.on("ready", async () => {
logger.info({ user: client.user?.tag }, "Bot logged in"); logger.info({ user: client.user?.tag }, "Bot logged in");
registerMessageCapture(client, db); registerMessageCapture(client, db);
startPendingAIAnalysisWorker(db); startPendingAIAnalysisWorker(db);
syncBacklogMessages(client, db).catch((error) => { syncBacklogMessages(client, db).catch((error) => {
logger.warn({ error }, "Backlog sync failed"); logger.warn({ error }, "Backlog sync failed");
}); });
startWebserver(config.WEBSERVER_PORT, client, voiceController); await startWebserver(config.WEBSERVER_PORT, client, voiceController);
}); });
client.on("error", (err) => { client.on("error", (err) => {
logger.error({ error: err }, "Client error"); logger.error({ error: err }, "Client error");
}); });
process.on("SIGINT", () => { process.on("SIGINT", () => {
gracefulShutdown("SIGINT"); gracefulShutdown("SIGINT");
}); });
process.on("SIGTERM", () => { process.on("SIGTERM", () => {
gracefulShutdown("SIGTERM"); gracefulShutdown("SIGTERM");
}); });
process.on("uncaughtException", (err) => { process.on("uncaughtException", (err) => {
logger.error({ error: err }, "Uncaught exception"); logger.error({ error: err }, "Uncaught exception");
gracefulShutdown("uncaughtException"); gracefulShutdown("uncaughtException");
}); });
process.on("unhandledRejection", (reason, promise) => { process.on("unhandledRejection", (reason, promise) => {
logger.error({ reason, promise }, "Unhandled rejection"); logger.error({ reason, promise }, "Unhandled rejection");
gracefulShutdown("unhandledRejection"); gracefulShutdown("unhandledRejection");
}); });
logger.info("Calling Discord client.login"); logger.info("Calling Discord client.login");
client.login(token).then(() => { client.login(token).then(() => {
logger.info("Discord client.login resolved"); logger.info("Discord client.login resolved");
}).catch((error) => { }).catch((error) => {
logger.error({ error }, "Discord client.login failed"); logger.error({ error }, "Discord client.login failed");
}); });
}
initializeApp().catch((error) => {
logger.error({ error }, "Failed to initialize app");
process.exit(1);
});

View File

@@ -1,20 +1,10 @@
import path from "node:path"; import { getDatabase as getDatabaseAdapter, DatabaseAdapter } from "./database/adapter";
import Database from "better-sqlite3";
import { createChildLogger } from "./logger"; import { createChildLogger } from "./logger";
const logger = createChildLogger("muxer-queue"); const logger = createChildLogger("muxer-queue");
export interface SqliteStatement { // Export DatabaseAdapter as SqliteDatabase for backward compatibility
run: (...params: unknown[]) => { changes: number }; export type SqliteDatabase = DatabaseAdapter;
all: (...params: unknown[]) => unknown[];
get: (...params: unknown[]) => unknown;
}
export interface SqliteDatabase {
prepare: (sql: string) => SqliteStatement;
exec: (sql: string) => void;
close: () => void;
}
export interface MuxerJobData { export interface MuxerJobData {
userId: string; userId: string;
@@ -34,13 +24,12 @@ interface StoredJob {
error?: string; error?: string;
} }
const dbPath = path.join(process.cwd(), ".muxer-queue.db"); let dbAdapter: DatabaseAdapter | null = null;
let db: SqliteDatabase | null = null;
function initializeDatabase(): SqliteDatabase { async function initializeDatabase(): Promise<DatabaseAdapter> {
const database = new Database(dbPath) as SqliteDatabase; const adapter = await getDatabaseAdapter();
database.exec(` adapter.exec(`
PRAGMA journal_mode = WAL; PRAGMA journal_mode = WAL;
CREATE TABLE IF NOT EXISTS muxer_jobs ( CREATE TABLE IF NOT EXISTS muxer_jobs (
@@ -129,26 +118,28 @@ function initializeDatabase(): SqliteDatabase {
for (const migration of migrations) { for (const migration of migrations) {
try { try {
database.exec(migration); adapter.exec(migration);
} catch { } catch {
// Column already exists on databases initialized after schema updates. // Column already exists on databases initialized after schema updates.
} }
} }
return database; return adapter;
} }
function getDatabase(): SqliteDatabase { async function getDatabaseAdapterInternal(): Promise<DatabaseAdapter> {
if (!db) { if (!dbAdapter) {
db = initializeDatabase(); dbAdapter = await initializeDatabase();
} }
return db; return dbAdapter;
} }
export { getDatabase }; // Export as getDatabase for backward compatibility
export const getDatabase = getDatabaseAdapterInternal;
export function getPersistedValue<T>(key: string, fallback: T): T { export async function getPersistedValue<T>(key: string, fallback: T): Promise<T> {
const row = getDatabase() const adapter = await getDatabaseAdapterInternal();
const row = adapter
.prepare("SELECT value FROM ui_state WHERE key = ?") .prepare("SELECT value FROM ui_state WHERE key = ?")
.get(key) as { value: string } | undefined; .get(key) as { value: string } | undefined;
if (!row) return fallback; if (!row) return fallback;
@@ -159,8 +150,9 @@ export function getPersistedValue<T>(key: string, fallback: T): T {
} }
} }
export function setPersistedValue(key: string, value: unknown): void { export async function setPersistedValue(key: string, value: unknown): Promise<void> {
getDatabase() const adapter = await getDatabaseAdapterInternal();
adapter
.prepare(` .prepare(`
INSERT INTO ui_state (key, value, updated_at) INSERT INTO ui_state (key, value, updated_at)
VALUES (?, ?, ?) VALUES (?, ?, ?)
@@ -171,11 +163,11 @@ export function setPersistedValue(key: string, value: unknown): void {
export async function enqueueMuxerJob(data: MuxerJobData): Promise<string> { export async function enqueueMuxerJob(data: MuxerJobData): Promise<string> {
try { try {
const database = getDatabase(); const adapter = await getDatabaseAdapterInternal();
const jobId = `${data.userId}-${data.sessionId}`; const jobId = `${data.userId}-${data.sessionId}`;
const now = Date.now(); const now = Date.now();
const stmt = database.prepare(` const stmt = adapter.prepare(`
INSERT INTO muxer_jobs (id, data, status, attempts, maxAttempts, createdAt, updatedAt) INSERT INTO muxer_jobs (id, data, status, attempts, maxAttempts, createdAt, updatedAt)
VALUES (?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?)
`); `);
@@ -201,8 +193,8 @@ export async function enqueueMuxerJob(data: MuxerJobData): Promise<string> {
} }
export async function getPendingJobs(): Promise<StoredJob[]> { export async function getPendingJobs(): Promise<StoredJob[]> {
const database = getDatabase(); const adapter = await getDatabaseAdapterInternal();
const stmt = database.prepare(` const stmt = adapter.prepare(`
SELECT id, data, status, attempts, maxAttempts, createdAt, updatedAt, error SELECT id, data, status, attempts, maxAttempts, createdAt, updatedAt, error
FROM muxer_jobs FROM muxer_jobs
WHERE status = 'pending' WHERE status = 'pending'
@@ -232,18 +224,18 @@ export async function updateJobStatus(
status: "processing" | "completed" | "failed", status: "processing" | "completed" | "failed",
error?: string, error?: string,
): Promise<void> { ): Promise<void> {
const database = getDatabase(); const adapter = await getDatabaseAdapterInternal();
const now = Date.now(); const now = Date.now();
if (status === "failed") { if (status === "failed") {
const stmt = database.prepare(` const stmt = adapter.prepare(`
UPDATE muxer_jobs UPDATE muxer_jobs
SET status = ?, attempts = attempts + 1, updatedAt = ?, error = ? SET status = ?, attempts = attempts + 1, updatedAt = ?, error = ?
WHERE id = ? WHERE id = ?
`); `);
stmt.run(status, now, error || null, jobId); stmt.run(status, now, error || null, jobId);
} else { } else {
const stmt = database.prepare(` const stmt = adapter.prepare(`
UPDATE muxer_jobs UPDATE muxer_jobs
SET status = ?, updatedAt = ? SET status = ?, updatedAt = ?
WHERE id = ? WHERE id = ?
@@ -255,9 +247,9 @@ export async function updateJobStatus(
} }
export async function retryFailedJob(jobId: string): Promise<boolean> { export async function retryFailedJob(jobId: string): Promise<boolean> {
const database = getDatabase(); const adapter = await getDatabaseAdapterInternal();
const job = database const job = adapter
.prepare("SELECT * FROM muxer_jobs WHERE id = ?") .prepare("SELECT * FROM muxer_jobs WHERE id = ?")
.get(jobId) as StoredJob | undefined; .get(jobId) as StoredJob | undefined;
@@ -274,7 +266,7 @@ export async function retryFailedJob(jobId: string): Promise<boolean> {
return false; return false;
} }
const stmt = database.prepare(` const stmt = adapter.prepare(`
UPDATE muxer_jobs UPDATE muxer_jobs
SET status = 'pending', updatedAt = ? SET status = 'pending', updatedAt = ?
WHERE id = ? WHERE id = ?
@@ -289,10 +281,10 @@ export async function retryFailedJob(jobId: string): Promise<boolean> {
export async function cleanupCompletedJobs( export async function cleanupCompletedJobs(
olderThanMs: number = 24 * 60 * 60 * 1000, olderThanMs: number = 24 * 60 * 60 * 1000,
): Promise<number> { ): Promise<number> {
const database = getDatabase(); const adapter = await getDatabaseAdapterInternal();
const cutoffTime = Date.now() - olderThanMs; const cutoffTime = Date.now() - olderThanMs;
const stmt = database.prepare(` const stmt = adapter.prepare(`
DELETE FROM muxer_jobs DELETE FROM muxer_jobs
WHERE status = 'completed' AND updatedAt < ? WHERE status = 'completed' AND updatedAt < ?
`); `);
@@ -309,9 +301,9 @@ export async function getJobStats(): Promise<{
completed: number; completed: number;
failed: number; failed: number;
}> { }> {
const database = getDatabase(); const adapter = await getDatabaseAdapterInternal();
const stats = database const stats = adapter
.prepare(` .prepare(`
SELECT SELECT
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending, SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
@@ -336,9 +328,9 @@ export async function getJobStats(): Promise<{
} }
export async function closeQueue(): Promise<void> { export async function closeQueue(): Promise<void> {
if (db) { if (dbAdapter) {
db.close(); await dbAdapter.close();
db = null; dbAdapter = null;
logger.info("Muxer queue closed"); logger.info("Muxer queue closed");
} }
} }

View File

@@ -40,7 +40,11 @@ const defaultSharedUIState: SharedUIState = {
isStreaming: false, isStreaming: false,
}; };
const sharedUIState: SharedUIState = getPersistedValue("web-ui-state", defaultSharedUIState); let sharedUIState: SharedUIState = { ...defaultSharedUIState };
async function initializeSharedUIState() {
sharedUIState = await getPersistedValue("web-ui-state", defaultSharedUIState);
}
function getSharedUIState(): SharedUIState { function getSharedUIState(): SharedUIState {
return { ...sharedUIState }; return { ...sharedUIState };
@@ -105,11 +109,13 @@ function rmsDb(pcm: Buffer): number {
return 20 * Math.log10(Math.max(rms, 1e-10)); return 20 * Math.log10(Math.max(rms, 1e-10));
} }
export function startWebserver( export async function startWebserver(
port: number = 3000, port: number = 3000,
_client: Client, _client: Client,
voiceController: VoiceController, voiceController: VoiceController,
) { ) {
await initializeSharedUIState();
const app = express(); const app = express();
const server = http.createServer(app); const server = http.createServer(app);
@@ -243,7 +249,7 @@ export function startWebserver(
// Moderation API endpoints // Moderation API endpoints
app.get("/api/messages", async (req, res, next) => { app.get("/api/messages", async (req, res, next) => {
try { try {
const db = getDatabase(); const db = await getDatabase();
const { channel, type, limit = "50", offset = "0" } = req.query as { const { channel, type, limit = "50", offset = "0" } = req.query as {
channel?: string; channel?: string;
type?: string; type?: string;
@@ -293,7 +299,7 @@ export function startWebserver(
); );
} }
const count = await syncSelectedChannelBacklog(_client, getDatabase(), guildId, channelId); const count = await syncSelectedChannelBacklog(_client, await getDatabase(), guildId, channelId);
res.json({ res.json({
success: true, success: true,
channelId, channelId,