feat: add muxer job queue with SQLite integration and metrics tracking

This commit is contained in:
MythEclipse
2026-05-13 17:06:22 +07:00
parent 978c2c468d
commit f9a4b4a92d
7 changed files with 321 additions and 1 deletions

View File

@@ -31,3 +31,5 @@ RECONNECT_TIMEOUT_MS=5000
LOG_LEVEL=info LOG_LEVEL=info
NODE_ENV=development NODE_ENV=development

BIN
bun.lockb

Binary file not shown.

View File

@@ -16,6 +16,7 @@
"@discordjs/opus": "^0.10.0", "@discordjs/opus": "^0.10.0",
"@discordjs/voice": "^0.19.1", "@discordjs/voice": "^0.19.1",
"@snazzah/davey": "^0.1.10", "@snazzah/davey": "^0.1.10",
"better-sqlite3": "^12.10.0",
"class-transformer": "^0.5.1", "class-transformer": "^0.5.1",
"class-validator": "^0.15.1", "class-validator": "^0.15.1",
"discord.js-selfbot-v13": "^3.7.1", "discord.js-selfbot-v13": "^3.7.1",
@@ -28,12 +29,14 @@
"pino": "^9.4.0", "pino": "^9.4.0",
"pino-http": "^11.0.0", "pino-http": "^11.0.0",
"prism-media": "2.0.0-alpha.0", "prism-media": "2.0.0-alpha.0",
"prom-client": "^15.1.3",
"sodium-native": "^4.3.2", "sodium-native": "^4.3.2",
"ws": "^8.20.1", "ws": "^8.20.1",
"zod": "^4.4.3" "zod": "^4.4.3"
}, },
"devDependencies": { "devDependencies": {
"@biomejs/biome": "latest", "@biomejs/biome": "latest",
"@types/better-sqlite3": "^7.6.13",
"@types/bun": "latest", "@types/bun": "latest",
"@types/express": "^5.0.6", "@types/express": "^5.0.6",
"@types/fluent-ffmpeg": "^2.1.28", "@types/fluent-ffmpeg": "^2.1.28",

View File

@@ -5,7 +5,6 @@ import "dotenv/config";
import { getVoiceConnection } from "@discordjs/voice"; import { getVoiceConnection } from "@discordjs/voice";
import { Client } from "discord.js-selfbot-v13"; import { Client } from "discord.js-selfbot-v13";
import { config } from "./config"; import { config } from "./config";
import { AppError } from "./errors";
import { createChildLogger } from "./logger"; import { createChildLogger } from "./logger";
import { discordPlayer } from "./player"; import { discordPlayer } from "./player";
import { startRecording, stopRecording } from "./recorder"; import { startRecording, stopRecording } from "./recorder";

78
src/metrics.ts Normal file
View File

@@ -0,0 +1,78 @@
import { Counter, Gauge, Histogram, register } from "prom-client";
// Audio metrics
export const audioLevelGauge = new Gauge({
name: "audio_level_db",
help: "Current audio level in dB",
labelNames: ["user_id"],
});
export const recordingDurationCounter = new Counter({
name: "recording_duration_seconds_total",
help: "Total recording duration in seconds",
labelNames: ["user_id"],
});
export const activeRecordingsGauge = new Gauge({
name: "active_recordings",
help: "Number of active recordings",
});
export const recordedSegmentsCounter = new Counter({
name: "recorded_segments_total",
help: "Total number of recorded segments",
labelNames: ["user_id"],
});
// Connection metrics
export const voiceConnectionsGauge = new Gauge({
name: "voice_connections_active",
help: "Number of active voice connections",
});
export const connectionErrorsCounter = new Counter({
name: "connection_errors_total",
help: "Total number of connection errors",
labelNames: ["error_type"],
});
export const reconnectAttemptsCounter = new Counter({
name: "reconnect_attempts_total",
help: "Total number of reconnection attempts",
});
// WebSocket metrics
export const wsClientsGauge = new Gauge({
name: "websocket_clients_connected",
help: "Number of connected WebSocket clients",
});
export const wsMessagesCounter = new Counter({
name: "websocket_messages_total",
help: "Total WebSocket messages sent",
labelNames: ["message_type"],
});
// HTTP metrics
export const httpRequestDurationHistogram = new Histogram({
name: "http_request_duration_seconds",
help: "HTTP request duration in seconds",
labelNames: ["method", "route", "status"],
buckets: [0.001, 0.01, 0.1, 0.5, 1, 2, 5],
});
export const httpRequestsCounter = new Counter({
name: "http_requests_total",
help: "Total HTTP requests",
labelNames: ["method", "route", "status"],
});
// System metrics
export const uptimeGauge = new Gauge({
name: "process_uptime_seconds",
help: "Process uptime in seconds",
});
export async function getMetrics(): Promise<string> {
return register.metrics();
}

230
src/muxer-queue.ts Normal file
View File

@@ -0,0 +1,230 @@
import path from "node:path";
import Database from "better-sqlite3";
import { createChildLogger } from "./logger";
const logger = createChildLogger("muxer-queue");
export interface MuxerJobData {
userId: string;
sessionId: string;
recordingsDir: string;
outputDir: string;
}
interface StoredJob {
id: string;
data: string;
status: "pending" | "processing" | "completed" | "failed";
attempts: number;
maxAttempts: number;
createdAt: number;
updatedAt: number;
error?: string;
}
const dbPath = path.join(process.cwd(), ".muxer-queue.db");
let db: Database.Database | null = null;
function initializeDatabase(): Database.Database {
const database = new Database(dbPath);
database.pragma("journal_mode = WAL");
database.exec(`
CREATE TABLE IF NOT EXISTS muxer_jobs (
id TEXT PRIMARY KEY,
data TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
attempts INTEGER NOT NULL DEFAULT 0,
maxAttempts INTEGER NOT NULL DEFAULT 3,
createdAt INTEGER NOT NULL,
updatedAt INTEGER NOT NULL,
error TEXT
);
CREATE INDEX IF NOT EXISTS idx_status ON muxer_jobs(status);
CREATE INDEX IF NOT EXISTS idx_createdAt ON muxer_jobs(createdAt);
`);
return database;
}
function getDatabase(): Database.Database {
if (!db) {
db = initializeDatabase();
}
return db;
}
export async function enqueueMuxerJob(data: MuxerJobData): Promise<string> {
try {
const database = getDatabase();
const jobId = `${data.userId}-${data.sessionId}`;
const now = Date.now();
const stmt = database.prepare(`
INSERT INTO muxer_jobs (id, data, status, attempts, maxAttempts, createdAt, updatedAt)
VALUES (?, ?, ?, ?, ?, ?, ?)
`);
stmt.run(jobId, JSON.stringify(data), "pending", 0, 3, now, now);
logger.info(
{ jobId, userId: data.userId, sessionId: data.sessionId },
"Muxer job enqueued",
);
return jobId;
} catch (error) {
logger.error(
{
userId: data.userId,
error: error instanceof Error ? error.message : String(error),
},
"Failed to enqueue muxer job",
);
throw error;
}
}
export async function getPendingJobs(): Promise<StoredJob[]> {
const database = getDatabase();
const stmt = database.prepare(`
SELECT id, data, status, attempts, maxAttempts, createdAt, updatedAt, error
FROM muxer_jobs
WHERE status = 'pending'
ORDER BY createdAt ASC
LIMIT 10
`);
const rows = stmt.all() as Array<{
id: string;
data: string;
status: string;
attempts: number;
maxAttempts: number;
createdAt: number;
updatedAt: number;
error?: string;
}>;
return rows.map((row) => ({
...row,
status: row.status as "pending" | "processing" | "completed" | "failed",
}));
}
export async function updateJobStatus(
jobId: string,
status: "processing" | "completed" | "failed",
error?: string,
): Promise<void> {
const database = getDatabase();
const now = Date.now();
if (status === "failed") {
const stmt = database.prepare(`
UPDATE muxer_jobs
SET status = ?, attempts = attempts + 1, updatedAt = ?, error = ?
WHERE id = ?
`);
stmt.run(status, now, error || null, jobId);
} else {
const stmt = database.prepare(`
UPDATE muxer_jobs
SET status = ?, updatedAt = ?
WHERE id = ?
`);
stmt.run(status, now, jobId);
}
logger.info({ jobId, status, error }, "Job status updated");
}
export async function retryFailedJob(jobId: string): Promise<boolean> {
const database = getDatabase();
const job = database
.prepare("SELECT * FROM muxer_jobs WHERE id = ?")
.get(jobId) as StoredJob | undefined;
if (!job) {
logger.warn({ jobId }, "Job not found");
return false;
}
if (job.attempts >= job.maxAttempts) {
logger.warn(
{ jobId, attempts: job.attempts, maxAttempts: job.maxAttempts },
"Max retry attempts reached",
);
return false;
}
const stmt = database.prepare(`
UPDATE muxer_jobs
SET status = 'pending', updatedAt = ?
WHERE id = ?
`);
stmt.run(Date.now(), jobId);
logger.info({ jobId, attempt: job.attempts + 1 }, "Job retried");
return true;
}
export async function cleanupCompletedJobs(
olderThanMs: number = 24 * 60 * 60 * 1000,
): Promise<number> {
const database = getDatabase();
const cutoffTime = Date.now() - olderThanMs;
const stmt = database.prepare(`
DELETE FROM muxer_jobs
WHERE status = 'completed' AND updatedAt < ?
`);
const result = stmt.run(cutoffTime);
logger.info({ deletedCount: result.changes }, "Cleaned up completed jobs");
return result.changes;
}
export async function getJobStats(): Promise<{
pending: number;
processing: number;
completed: number;
failed: number;
}> {
const database = getDatabase();
const stats = database
.prepare(`
SELECT
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END) as processing,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
FROM muxer_jobs
`)
.get() as {
pending: number | null;
processing: number | null;
completed: number | null;
failed: number | null;
};
return {
pending: stats.pending || 0,
processing: stats.processing || 0,
completed: stats.completed || 0,
failed: stats.failed || 0,
};
}
export async function closeQueue(): Promise<void> {
if (db) {
db.close();
db = null;
logger.info("Muxer queue closed");
}
}

View File

@@ -6,6 +6,7 @@ import pinoHttp from "pino-http";
import prism from "prism-media"; import prism from "prism-media";
import { WebSocketServer } from "ws"; import { WebSocketServer } from "ws";
import { createChildLogger, logger } from "./logger"; import { createChildLogger, logger } from "./logger";
import { getMetrics, uptimeGauge } from "./metrics";
import { discordPlayer } from "./player"; import { discordPlayer } from "./player";
const wsLogger = createChildLogger("webserver"); const wsLogger = createChildLogger("webserver");
@@ -68,6 +69,13 @@ export function startWebserver(port: number = 3000) {
}); });
}); });
// Metrics endpoint
app.get("/metrics", async (_req, res) => {
res.set("Content-Type", "text/plain");
uptimeGauge.set(process.uptime());
res.send(await getMetrics());
});
// Inbound: Discord PCM → tagged chunks → browser // Inbound: Discord PCM → tagged chunks → browser
(global as any).broadcastPcmToWeb = (chunk: Buffer, userId: string) => { (global as any).broadcastPcmToWeb = (chunk: Buffer, userId: string) => {
let hash = 0; let hash = 0;