Compare commits

...

27 Commits

Author SHA1 Message Date
MythEclipse
49d4bbf781 chore: remove obsolete migration documentation files 2026-05-14 16:40:43 +07:00
MythEclipse
4fbbc056bb docs: add postgresql setup completion documentation 2026-05-14 16:26:12 +07:00
MythEclipse
47ae7f8650 chore: remove temporary test files 2026-05-14 16:25:45 +07:00
MythEclipse
35269b5bef feat: configure postgresql as primary database with neon connection
- Updated .env to use PostgreSQL (neondb) instead of SQLite
- Updated drizzle.ts to support DATABASE_URL connection string
- Regenerated migrations for PostgreSQL syntax
- Bot successfully connects and operates with PostgreSQL
- All database operations working correctly
2026-05-14 16:25:39 +07:00
MythEclipse
c63a61460c docs: add comprehensive drizzle orm migration final summary 2026-05-14 16:21:35 +07:00
MythEclipse
9889d20edd feat: add programmatic migration runner for better PostgreSQL support 2026-05-14 16:21:02 +07:00
MythEclipse
b580430eb6 docs: add drizzle orm migration completion summary 2026-05-14 16:17:25 +07:00
MythEclipse
b9d0a06d01 fix: update drizzle config to read env vars directly for CLI compatibility 2026-05-14 16:14:48 +07:00
MythEclipse
b600dad011 fix: correct import ordering and update tests for drizzle-orm migration 2026-05-14 15:47:03 +07:00
MythEclipse
50d4517079 refactor: remove old database adapter files 2026-05-14 15:43:52 +07:00
MythEclipse
9ff0f0bede feat: update application initialization for drizzle 2026-05-14 15:43:16 +07:00
MythEclipse
1c4b0afbce refactor: migrate messageStore to drizzle-orm
- Replace all raw SQL queries in messageStore.ts with Drizzle ORM queries
- Remove DatabaseAdapter dependency from messageStore functions
- Update all function signatures to be async and remove db parameter
- Functions now use getDatabase() internally for database access
- Update all call sites in messageCapture.ts, attachmentUploader.ts, aiAnalyzer.ts, webserver.ts, and index.ts
- All functions remain backward compatible in behavior
- TypeScript typecheck passes with no errors
- All tests pass (11 passed)
2026-05-14 15:41:11 +07:00
MythEclipse
dfe3444018 refactor: migrate muxer-queue to drizzle-orm 2026-05-14 15:35:55 +07:00
MythEclipse
7e528a473b feat: create drizzle database client 2026-05-14 15:33:45 +07:00
MythEclipse
4e28cf9671 feat: add drizzle configuration and initial migrations 2026-05-14 15:33:10 +07:00
MythEclipse
52b36c963f feat: create drizzle schema definitions 2026-05-14 15:32:20 +07:00
MythEclipse
b833b6d978 feat: add drizzle-orm and drizzle-kit dependencies 2026-05-14 15:31:08 +07:00
MythEclipse
d1282f2f57 fix: organize imports and apply linting fixes 2026-05-14 15:02:23 +07:00
MythEclipse
1623c612c3 docs: add PostgreSQL migration guide 2026-05-14 15:00:35 +07:00
MythEclipse
8c9e8aa64d test: add PostgreSQL connection tests 2026-05-14 14:59:37 +07:00
MythEclipse
c5297da795 feat: initialize database adapter on startup 2026-05-14 14:58:28 +07:00
MythEclipse
dbc11bbd16 feat: add data migration script from SQLite to PostgreSQL 2026-05-14 14:57:08 +07:00
MythEclipse
3c918692cb refactor: update messageStore to use database adapter 2026-05-14 14:56:14 +07:00
MythEclipse
94a3acf12e refactor: update muxer-queue to use database adapter
- Replace direct better-sqlite3 imports with DatabaseAdapter pattern
- Make all muxer-queue functions async to support both SQLite and PostgreSQL
- Update database initialization to use adapter's getDatabase()
- Export DatabaseAdapter as SqliteDatabase for backward compatibility
- Update index.ts to handle async database initialization
- Update webserver.ts to await async database operations
- All functions now work with both SQLite and PostgreSQL backends
- Tests pass, no TypeScript errors
2026-05-14 14:55:21 +07:00
MythEclipse
84e20ae373 feat: add database adapter layer for SQLite/PostgreSQL abstraction 2026-05-14 14:46:35 +07:00
MythEclipse
caf90ea9e6 feat: add PostgreSQL client and migration runner 2026-05-14 14:45:21 +07:00
MythEclipse
818a059121 feat: add PostgreSQL config and dependencies 2026-05-14 14:44:01 +07:00
28 changed files with 3915 additions and 601 deletions

View File

@@ -45,3 +45,22 @@ AI_LLM_BASE_URL=https://9router.asepharyana.tech/v1
AI_LLM_MODEL=free
AI_ANALYSIS_TIMEOUT_MS=30000
# Database Configuration
DATABASE_TYPE=sqlite
# DATABASE_TYPE=postgres
# PostgreSQL Configuration (used when DATABASE_TYPE=postgres)
# Option 1: Use DATABASE_URL for connection string
# DATABASE_URL=postgresql://user:password@localhost:5432/discord_bot
# Option 2: Use individual connection parameters
# POSTGRES_HOST=localhost
# POSTGRES_PORT=5432
# POSTGRES_USER=postgres
# POSTGRES_PASSWORD=your_password_here
# POSTGRES_DB=discord_bot
# PostgreSQL Connection Pool Configuration
# POSTGRES_POOL_MIN=2
# POSTGRES_POOL_MAX=10

View File

@@ -0,0 +1,132 @@
# PostgreSQL Setup - Complete ✅
**Date:** 2026-05-14
**Status:** ✅ Production Ready with Neon PostgreSQL
## Summary
Bot Discord moderation telah berhasil dikonfigurasi untuk menggunakan **PostgreSQL** (Neon) sebagai database utama dengan Drizzle ORM.
## What Was Done
### 1. Database Connection Fixed
- ✅ Identified database name: `neondb` (bukan `dcbot`)
- ✅ Updated `.env` dengan DATABASE_URL yang benar
- ✅ Tested koneksi ke Neon PostgreSQL - berhasil
### 2. Drizzle ORM Updated
- ✅ Updated `src/database/drizzle.ts` untuk support DATABASE_URL
- ✅ Regenerated migrations untuk PostgreSQL syntax
- ✅ Ran migrations successfully: `pnpm run db:migrate:programmatic`
### 3. Bot Tested
- ✅ Bot startup dengan PostgreSQL - berhasil
- ✅ Database initialized dengan type: postgres
- ✅ Message capture working
- ✅ AI analysis worker started
- ✅ WebSocket server listening
## Current Configuration
```env
DATABASE_TYPE=postgres
DATABASE_URL=postgresql://neondb_owner:npg_2ziHMPwZCet9@ep-long-glitter-ao3sjoyu-pooler.c-2.ap-southeast-1.aws.neon.tech/neondb?sslmode=verify-full&channel_binding=require&connect_timeout=10
```
## Database Schema Created
**Tables created in PostgreSQL:**
- `muxer_jobs` - Job queue untuk audio processing
- `messages` - Text messages dengan AI analysis
- `attachments` - File metadata dengan foreign key
- `ui_state` - Persistent UI state
- `__drizzle_migrations` - Migration tracking
## Commands Available
```bash
# Start bot dengan PostgreSQL
pnpm run dev
# Generate migrations setelah schema changes
pnpm run db:generate
# Run migrations (programmatic - recommended)
pnpm run db:migrate:programmatic
# Run migrations (Drizzle Kit CLI)
pnpm run db:migrate
# Open Drizzle Studio untuk visual data management
pnpm run db:studio
```
## Verification
### Bot Startup Log
```
✅ PostgreSQL database initialized
✅ Database initialized (type: postgres)
✅ Bot logged in
✅ Message capture handlers registered
✅ AI analysis worker started
✅ WebSocket server listening on port 3000
✅ Web interface listening
✅ Message inserted (from Discord)
```
### Database Tables
```sql
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public';
-- Results:
-- muxer_jobs
-- messages
-- attachments
-- ui_state
-- __drizzle_migrations
```
## Commits
```
47ae7f8 chore: remove temporary test files
35269b5 feat: configure postgresql as primary database with neon connection
c63a614 docs: add comprehensive drizzle orm migration final summary
9889d20 feat: add programmatic migration runner for better PostgreSQL support
b580430 docs: add drizzle orm migration completion summary
b9d0a06 fix: update drizzle config to read env vars directly for CLI compatibility
b600dad fix: correct import ordering and update tests for drizzle-orm migration
50d4517 refactor: remove old database adapter files
9ff0f0b feat: update application initialization for drizzle
1c4b0af refactor: migrate messageStore to drizzle-orm
dfe3444 refactor: migrate muxer-queue to drizzle-orm
7e528a4 feat: create drizzle database client
4e28cf9 feat: add drizzle configuration and initial migrations
52b36c9 feat: create drizzle schema definitions
b833b6d feat: add drizzle-orm and drizzle-kit dependencies
```
## Key Features
**Type-Safe Queries** - Full TypeScript support dengan Drizzle ORM
**PostgreSQL Support** - Neon cloud database integration
**Automatic Migrations** - Drizzle Kit generates migrations
**Connection Pooling** - Configurable pool size
**Production Ready** - All tests passing, zero errors
## Next Steps
1. **Monitor bot performance** dengan PostgreSQL
2. **Use Drizzle Studio** untuk visual data management: `pnpm run db:studio`
3. **For schema changes**: Update `src/database/schema.ts``pnpm run db:generate``pnpm run db:migrate:programmatic`
4. **Backup strategy** - Setup regular backups di Neon dashboard
## Status
🎉 **PostgreSQL migration complete and verified!**
Bot Discord moderation sekarang menggunakan PostgreSQL (Neon) sebagai database utama dengan Drizzle ORM untuk type-safe operations.
**Ready for production deployment!**

View File

@@ -0,0 +1,704 @@
# Drizzle ORM Migration Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Replace raw SQL queries and manual database adapter with Drizzle ORM, providing type-safe database operations, automatic migrations, and better maintainability while supporting both SQLite and PostgreSQL.
**Architecture:** Replace the custom DatabaseAdapter pattern with Drizzle ORM's unified API. Define schema using Drizzle's TypeScript schema definitions. Replace all raw SQL queries in muxer-queue.ts and messageStore.ts with Drizzle query builder. Use Drizzle migrations for schema management. Maintain backward compatibility with existing data.
**Tech Stack:** drizzle-orm, drizzle-kit, better-sqlite3 (SQLite), postgres (PostgreSQL), TypeScript
---
## File Structure
**New files to create:**
- `src/database/schema.ts` — Drizzle schema definitions for all tables
- `src/database/drizzle.ts` — Drizzle database client initialization
- `drizzle.config.ts` — Drizzle Kit configuration
- `drizzle/migrations/` — Auto-generated migration files
**Modified files:**
- `src/muxer-queue.ts` — Replace raw SQL with Drizzle queries
- `src/moderation/messageStore.ts` — Replace raw SQL with Drizzle queries
- `src/database/adapter.ts` — Remove (no longer needed)
- `src/database/postgres.ts` — Remove (Drizzle handles this)
- `src/database/migrations.ts` — Remove (Drizzle handles this)
- `src/index.ts` — Update database initialization
- `src/webserver.ts` — Update database calls
- `package.json` — Add drizzle-orm, drizzle-kit dependencies
- `src/config.ts` — Keep PostgreSQL config variables
---
## Task 1: Add Drizzle Dependencies
**Files:**
- Modify: `package.json`
- [ ] **Step 1: Add drizzle-orm and drizzle-kit**
```bash
cd /mnt/code/bete && pnpm add drizzle-orm
```
Expected: drizzle-orm installed
- [ ] **Step 2: Add drizzle-kit as dev dependency**
```bash
cd /mnt/code/bete && pnpm add -D drizzle-kit
```
Expected: drizzle-kit installed
- [ ] **Step 3: Verify installation**
```bash
cd /mnt/code/bete && pnpm list drizzle-orm drizzle-kit
```
Expected: Both packages listed with versions
- [ ] **Step 4: Commit**
```bash
git add package.json pnpm-lock.yaml
git commit -m "feat: add drizzle-orm and drizzle-kit dependencies"
```
---
## Task 2: Create Drizzle Schema Definitions
**Files:**
- Create: `src/database/schema.ts`
- [ ] **Step 1: Create schema.ts with table definitions**
```typescript
import { pgTable, text, integer, bigint, real, index, foreignKey } from "drizzle-orm/pg-core";
import { sqliteTable, SQLiteInteger, SQLiteText } from "drizzle-orm/sqlite-core";
import { config } from "../config";
// Determine which table function to use based on database type
const tableFactory = config.DATABASE_TYPE === "postgres" ? pgTable : sqliteTable;
// Muxer Jobs Table
export const muxerJobs = tableFactory("muxer_jobs", {
id: text("id").primaryKey(),
data: text("data").notNull(),
status: text("status", { enum: ["pending", "processing", "completed", "failed"] }).notNull().default("pending"),
attempts: integer("attempts").notNull().default(0),
maxAttempts: integer("maxAttempts").notNull().default(3),
createdAt: bigint("createdAt", { mode: "number" }).notNull(),
updatedAt: bigint("updatedAt", { mode: "number" }).notNull(),
error: text("error"),
}, (table) => ({
statusIdx: index("idx_muxer_jobs_status").on(table.status),
createdAtIdx: index("idx_muxer_jobs_createdAt").on(table.createdAt),
}));
// Messages Table
export const messages = tableFactory("messages", {
id: text("id").primaryKey(),
guild_id: text("guild_id").notNull(),
channel_id: text("channel_id").notNull(),
thread_id: text("thread_id"),
user_id: text("user_id").notNull(),
username: text("username").notNull(),
avatar_url: text("avatar_url"),
content: text("content").notNull(),
edited_content: text("edited_content"),
created_at: bigint("created_at", { mode: "number" }).notNull(),
edited_at: bigint("edited_at", { mode: "number" }),
deleted_at: bigint("deleted_at", { mode: "number" }),
type: text("type", { enum: ["text", "edited", "deleted"] }).notNull().default("text"),
metadata: text("metadata"),
ai_status: text("ai_status", { enum: ["pending", "clean", "warn", "flagged", "error"] }).notNull().default("pending"),
ai_moderation_flags: text("ai_moderation_flags"),
ai_moderation_score: real("ai_moderation_score"),
ai_moderation_raw: text("ai_moderation_raw"),
ai_analysis: text("ai_analysis"),
ai_analyzed_at: bigint("ai_analyzed_at", { mode: "number" }),
ai_error: text("ai_error"),
}, (table) => ({
channelIdx: index("idx_messages_channel").on(table.channel_id),
userIdx: index("idx_messages_user").on(table.user_id),
createdIdx: index("idx_messages_created").on(table.created_at),
threadIdx: index("idx_messages_thread").on(table.thread_id),
}));
// Attachments Table
export const attachments = tableFactory("attachments", {
id: text("id").primaryKey(),
message_id: text("message_id").notNull(),
guild_id: text("guild_id").notNull(),
channel_id: text("channel_id").notNull(),
thread_id: text("thread_id"),
user_id: text("user_id").notNull(),
filename: text("filename").notNull(),
size: integer("size").notNull(),
type: text("type").notNull(),
discord_url: text("discord_url").notNull(),
uploaded_url: text("uploaded_url"),
upload_status: text("upload_status", { enum: ["pending", "uploaded", "failed"] }).notNull().default("pending"),
upload_error: text("upload_error"),
created_at: bigint("created_at", { mode: "number" }).notNull(),
uploaded_at: bigint("uploaded_at", { mode: "number" }),
}, (table) => ({
channelIdx: index("idx_attachments_channel").on(table.channel_id),
messageIdx: index("idx_attachments_message").on(table.message_id),
statusIdx: index("idx_attachments_status").on(table.upload_status),
fk: foreignKey({
columns: [table.message_id],
foreignColumns: [messages.id],
}).onDelete("cascade"),
}));
// UI State Table
export const uiState = tableFactory("ui_state", {
key: text("key").primaryKey(),
value: text("value").notNull(),
updated_at: bigint("updated_at", { mode: "number" }).notNull(),
});
```
- [ ] **Step 2: Run typecheck**
```bash
cd /mnt/code/bete && pnpm run typecheck
```
Expected: No TypeScript errors
- [ ] **Step 3: Commit**
```bash
git add src/database/schema.ts
git commit -m "feat: create drizzle schema definitions"
```
---
## Task 3: Create Drizzle Configuration
**Files:**
- Create: `drizzle.config.ts`
- [ ] **Step 1: Create drizzle.config.ts**
```typescript
import { defineConfig } from "drizzle-kit";
import { config } from "./src/config";
export default defineConfig({
schema: "./src/database/schema.ts",
out: "./drizzle/migrations",
dialect: config.DATABASE_TYPE === "postgres" ? "postgresql" : "sqlite",
dbCredentials: config.DATABASE_TYPE === "postgres"
? {
host: config.POSTGRES_HOST,
port: config.POSTGRES_PORT,
user: config.POSTGRES_USER,
password: config.POSTGRES_PASSWORD,
database: config.POSTGRES_DB,
}
: {
url: `file:./.muxer-queue.db`,
},
});
```
- [ ] **Step 2: Add migration scripts to package.json**
```json
"scripts": {
"db:generate": "drizzle-kit generate",
"db:migrate": "drizzle-kit migrate",
"db:studio": "drizzle-kit studio"
}
```
- [ ] **Step 3: Generate initial migration**
```bash
cd /mnt/code/bete && pnpm run db:generate
```
Expected: Migration files created in drizzle/migrations/
- [ ] **Step 4: Commit**
```bash
git add drizzle.config.ts package.json drizzle/
git commit -m "feat: add drizzle configuration and initial migrations"
```
---
## Task 4: Create Drizzle Database Client
**Files:**
- Create: `src/database/drizzle.ts`
- [ ] **Step 1: Create drizzle.ts**
```typescript
import { drizzle } from "drizzle-orm/node-postgres";
import { drizzle as drizzleSqlite } from "drizzle-orm/better-sqlite3";
import Database from "better-sqlite3";
import { Pool } from "pg";
import { config } from "../config";
import { createChildLogger } from "../logger";
import * as schema from "./schema";
const logger = createChildLogger("drizzle");
let db: ReturnType<typeof drizzle> | null = null;
export async function initializeDatabase() {
if (db) return db;
if (config.DATABASE_TYPE === "postgres") {
const pool = new Pool({
host: config.POSTGRES_HOST,
port: config.POSTGRES_PORT,
user: config.POSTGRES_USER,
password: config.POSTGRES_PASSWORD,
database: config.POSTGRES_DB,
min: config.POSTGRES_POOL_MIN,
max: config.POSTGRES_POOL_MAX,
});
db = drizzle(pool, { schema });
logger.info("PostgreSQL database initialized");
} else {
const sqlite = new Database(".muxer-queue.db");
sqlite.pragma("journal_mode = WAL");
db = drizzleSqlite(sqlite, { schema });
logger.info("SQLite database initialized");
}
return db;
}
export function getDatabase() {
if (!db) {
throw new Error("Database not initialized. Call initializeDatabase() first.");
}
return db;
}
export async function closeDatabase() {
if (db) {
// Drizzle doesn't have a close method, but we can close the underlying connection
if (config.DATABASE_TYPE === "postgres") {
// Pool will be closed when the process exits
logger.info("PostgreSQL connection pool will close on process exit");
} else {
logger.info("SQLite database closed");
}
db = null;
}
}
```
- [ ] **Step 2: Run typecheck**
```bash
cd /mnt/code/bete && pnpm run typecheck
```
Expected: No TypeScript errors
- [ ] **Step 3: Commit**
```bash
git add src/database/drizzle.ts
git commit -m "feat: create drizzle database client"
```
---
## Task 5: Migrate muxer-queue.ts to Drizzle
**Files:**
- Modify: `src/muxer-queue.ts`
- [ ] **Step 1: Replace imports**
Replace:
```typescript
import { getDatabase, DatabaseAdapter } from "./database/adapter";
```
With:
```typescript
import { getDatabase, initializeDatabase } from "./database/drizzle";
import { muxerJobs } from "./database/schema";
import { eq, asc, desc } from "drizzle-orm";
```
- [ ] **Step 2: Replace enqueueMuxerJob function**
Replace raw SQL with:
```typescript
export async function enqueueMuxerJob(data: MuxerJobData): Promise<string> {
try {
const db = getDatabase();
const jobId = `${data.userId}-${data.sessionId}`;
const now = Date.now();
await db.insert(muxerJobs).values({
id: jobId,
data: JSON.stringify(data),
status: "pending",
attempts: 0,
maxAttempts: 3,
createdAt: now,
updatedAt: now,
}).onConflictDoNothing();
logger.info({ jobId, userId: data.userId }, "Muxer job enqueued");
return jobId;
} catch (error) {
logger.error({ error: error instanceof Error ? error.message : String(error) }, "Failed to enqueue muxer job");
throw error;
}
}
```
- [ ] **Step 3: Replace getPendingJobs function**
```typescript
export async function getPendingJobs(): Promise<StoredJob[]> {
const db = getDatabase();
const rows = await db
.select()
.from(muxerJobs)
.where(eq(muxerJobs.status, "pending"))
.orderBy(asc(muxerJobs.createdAt))
.limit(10);
return rows.map((row) => ({
...row,
status: row.status as "pending" | "processing" | "completed" | "failed",
}));
}
```
- [ ] **Step 4: Replace updateJobStatus function**
```typescript
export async function updateJobStatus(
jobId: string,
status: "processing" | "completed" | "failed",
error?: string,
): Promise<void> {
const db = getDatabase();
const now = Date.now();
if (status === "failed") {
await db
.update(muxerJobs)
.set({
status,
attempts: muxerJobs.attempts + 1,
updatedAt: now,
error: error || null,
})
.where(eq(muxerJobs.id, jobId));
} else {
await db
.update(muxerJobs)
.set({ status, updatedAt: now })
.where(eq(muxerJobs.id, jobId));
}
logger.info({ jobId, status, error }, "Job status updated");
}
```
- [ ] **Step 5: Replace remaining functions similarly**
Replace `retryFailedJob`, `cleanupCompletedJobs`, `getJobStats` with Drizzle equivalents
- [ ] **Step 6: Update getPersistedValue and setPersistedValue**
Use Drizzle's uiState table instead of raw SQL
- [ ] **Step 7: Run tests**
```bash
cd /mnt/code/bete && pnpm run test
```
Expected: All tests pass
- [ ] **Step 8: Commit**
```bash
git add src/muxer-queue.ts
git commit -m "refactor: migrate muxer-queue to drizzle-orm"
```
---
## Task 6: Migrate messageStore.ts to Drizzle
**Files:**
- Modify: `src/moderation/messageStore.ts`
- [ ] **Step 1: Replace imports**
```typescript
import { getDatabase } from "../database/drizzle";
import { messages, attachments } from "../database/schema";
import { eq, or, desc, and } from "drizzle-orm";
```
- [ ] **Step 2: Replace insertMessage function**
```typescript
export async function insertMessage(message: MessageRecord): Promise<void> {
try {
const db = getDatabase();
await db.insert(messages).values(message).onConflictDoNothing();
logger.debug({ messageId: message.id }, "Message inserted");
} catch (error) {
logger.error({ messageId: message.id, error: error instanceof Error ? error.message : String(error) }, "Failed to insert message");
throw error;
}
}
```
- [ ] **Step 3: Replace updateMessageAsEdited function**
```typescript
export async function updateMessageAsEdited(
messageId: string,
editedContent: string,
editedAt: number,
): Promise<void> {
try {
const db = getDatabase();
await db
.update(messages)
.set({ edited_content: editedContent, edited_at: editedAt, type: "edited" })
.where(eq(messages.id, messageId));
logger.debug({ messageId }, "Message marked as edited");
} catch (error) {
logger.error({ messageId, error: error instanceof Error ? error.message : String(error) }, "Failed to update message as edited");
throw error;
}
}
```
- [ ] **Step 4: Replace getMessagesByChannel function**
```typescript
export async function getMessagesByChannel(
channelId: string,
limit: number = 50,
offset: number = 0,
): Promise<MessageRecord[]> {
try {
const db = getDatabase();
return await db
.select()
.from(messages)
.where(or(eq(messages.channel_id, channelId), eq(messages.thread_id, channelId)))
.orderBy(desc(messages.created_at))
.limit(limit)
.offset(offset);
} catch (error) {
logger.error({ channelId, error: error instanceof Error ? error.message : String(error) }, "Failed to get messages by channel");
throw error;
}
}
```
- [ ] **Step 5: Replace attachment functions similarly**
Replace `insertAttachment`, `getAttachmentsByChannel`, `updateAttachmentAsUploaded`, `updateAttachmentAsFailedUpload` with Drizzle equivalents
- [ ] **Step 6: Replace AI analysis functions**
Replace `updateMessageAIAnalysis`, `getPendingAIAnalysisMessages`, `getMessageById` with Drizzle equivalents
- [ ] **Step 7: Update function signatures**
Remove `db: DatabaseAdapter` parameter from all functions since they now use `getDatabase()` internally
- [ ] **Step 8: Run tests**
```bash
cd /mnt/code/bete && pnpm run test
```
Expected: All tests pass
- [ ] **Step 9: Commit**
```bash
git add src/moderation/messageStore.ts
git commit -m "refactor: migrate messageStore to drizzle-orm"
```
---
## Task 7: Update Application Initialization
**Files:**
- Modify: `src/index.ts`
- Modify: `src/webserver.ts`
- [ ] **Step 1: Update src/index.ts imports**
Replace:
```typescript
import { getDatabase } from "./database/adapter";
```
With:
```typescript
import { initializeDatabase } from "./database/drizzle";
```
- [ ] **Step 2: Update database initialization in index.ts**
```typescript
const db = await initializeDatabase();
logger.info({ type: config.DATABASE_TYPE }, "Database initialized");
```
- [ ] **Step 3: Update src/webserver.ts**
Replace any `getDatabase()` calls with the new Drizzle client
- [ ] **Step 4: Run typecheck**
```bash
cd /mnt/code/bete && pnpm run typecheck
```
Expected: No TypeScript errors
- [ ] **Step 5: Commit**
```bash
git add src/index.ts src/webserver.ts
git commit -m "feat: update application initialization for drizzle"
```
---
## Task 8: Remove Old Database Files
**Files:**
- Delete: `src/database/adapter.ts`
- Delete: `src/database/postgres.ts`
- Delete: `src/database/migrations.ts`
- [ ] **Step 1: Remove old adapter files**
```bash
cd /mnt/code/bete && rm src/database/adapter.ts src/database/postgres.ts src/database/migrations.ts
```
- [ ] **Step 2: Verify no imports remain**
```bash
grep -r "database/adapter\|database/postgres\|database/migrations" src/ --include="*.ts"
```
Expected: No results
- [ ] **Step 3: Commit**
```bash
git add -A
git commit -m "refactor: remove old database adapter files"
```
---
## Task 9: Final Testing and Verification
**Files:**
- Test all functionality
- [ ] **Step 1: Run full test suite**
```bash
cd /mnt/code/bete && pnpm run test
```
Expected: All tests pass
- [ ] **Step 2: Type check**
```bash
cd /mnt/code/bete && pnpm run typecheck
```
Expected: No TypeScript errors
- [ ] **Step 3: Lint**
```bash
cd /mnt/code/bete && pnpm run lint
```
Expected: No linting errors
- [ ] **Step 4: Test startup with SQLite**
```bash
cd /mnt/code/bete && timeout 10 pnpm run dev || true
```
Expected: Bot starts successfully, logs show "Database initialized"
- [ ] **Step 5: Verify git status**
```bash
git status
```
Expected: Clean working tree
- [ ] **Step 6: Final commit if needed**
```bash
git add -A
git commit -m "feat: complete drizzle-orm migration"
```
---
## Spec Coverage Checklist
- ✅ Replace raw SQL with Drizzle ORM
- ✅ Type-safe database operations
- ✅ Support both SQLite and PostgreSQL
- ✅ Automatic schema migrations
- ✅ All existing functionality preserved
- ✅ Backward compatible with existing data
- ✅ Cleaner, more maintainable code
- ✅ Better error handling
- ✅ Tests passing
- ✅ No TypeScript errors
---
Plan complete and saved to `/mnt/code/bete/docs/superpowers/plans/2026-05-14-drizzle-orm-migration.md`.
**Two execution options:**
**1. Subagent-Driven (recommended)** - I dispatch a fresh subagent per task, review between tasks, fast iteration
**2. Inline Execution** - Execute tasks in this session using executing-plans, batch execution with checkpoints
Which approach would you prefer?

24
drizzle.config.ts Normal file
View File

@@ -0,0 +1,24 @@
import { defineConfig } from "drizzle-kit";
const databaseType = process.env.DATABASE_TYPE || "sqlite";
const databaseUrl = process.env.DATABASE_URL;
export default defineConfig({
schema: "./src/database/schema.ts",
out: "./drizzle/migrations",
dialect: databaseType === "postgres" ? "postgresql" : "sqlite",
dbCredentials:
databaseType === "postgres"
? databaseUrl
? { url: databaseUrl }
: {
host: process.env.POSTGRES_HOST || "localhost",
port: parseInt(process.env.POSTGRES_PORT || "5432", 10),
user: process.env.POSTGRES_USER || "postgres",
password: process.env.POSTGRES_PASSWORD || "",
database: process.env.POSTGRES_DB || "moderation_bot",
}
: {
url: "file:./.muxer-queue.db",
},
});

View File

@@ -0,0 +1,69 @@
CREATE TABLE "attachments" (
"id" text PRIMARY KEY NOT NULL,
"message_id" text NOT NULL,
"guild_id" text NOT NULL,
"channel_id" text NOT NULL,
"thread_id" text,
"user_id" text NOT NULL,
"filename" text NOT NULL,
"size" integer NOT NULL,
"type" text NOT NULL,
"discord_url" text NOT NULL,
"uploaded_url" text,
"upload_status" text DEFAULT 'pending' NOT NULL,
"upload_error" text,
"created_at" bigint NOT NULL,
"uploaded_at" bigint
);
--> statement-breakpoint
CREATE TABLE "messages" (
"id" text PRIMARY KEY NOT NULL,
"guild_id" text NOT NULL,
"channel_id" text NOT NULL,
"thread_id" text,
"user_id" text NOT NULL,
"username" text NOT NULL,
"avatar_url" text,
"content" text NOT NULL,
"edited_content" text,
"created_at" bigint NOT NULL,
"edited_at" bigint,
"deleted_at" bigint,
"type" text DEFAULT 'text' NOT NULL,
"metadata" text,
"ai_status" text DEFAULT 'pending' NOT NULL,
"ai_moderation_flags" text,
"ai_moderation_score" real,
"ai_moderation_raw" text,
"ai_analysis" text,
"ai_analyzed_at" bigint,
"ai_error" text
);
--> statement-breakpoint
CREATE TABLE "muxer_jobs" (
"id" text PRIMARY KEY NOT NULL,
"data" text NOT NULL,
"status" text DEFAULT 'pending' NOT NULL,
"attempts" integer DEFAULT 0 NOT NULL,
"maxAttempts" integer DEFAULT 3 NOT NULL,
"createdAt" bigint NOT NULL,
"updatedAt" bigint NOT NULL,
"error" text
);
--> statement-breakpoint
CREATE TABLE "ui_state" (
"key" text PRIMARY KEY NOT NULL,
"value" text NOT NULL,
"updated_at" bigint NOT NULL
);
--> statement-breakpoint
ALTER TABLE "attachments" ADD CONSTRAINT "fk_attachments_message_id" FOREIGN KEY ("message_id") REFERENCES "public"."messages"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
CREATE INDEX "idx_attachments_channel" ON "attachments" USING btree ("channel_id");--> statement-breakpoint
CREATE INDEX "idx_attachments_message" ON "attachments" USING btree ("message_id");--> statement-breakpoint
CREATE INDEX "idx_attachments_status" ON "attachments" USING btree ("upload_status");--> statement-breakpoint
CREATE INDEX "idx_messages_channel" ON "messages" USING btree ("channel_id");--> statement-breakpoint
CREATE INDEX "idx_messages_user" ON "messages" USING btree ("user_id");--> statement-breakpoint
CREATE INDEX "idx_messages_created" ON "messages" USING btree ("created_at");--> statement-breakpoint
CREATE INDEX "idx_messages_thread" ON "messages" USING btree ("thread_id");--> statement-breakpoint
CREATE INDEX "idx_muxer_jobs_status" ON "muxer_jobs" USING btree ("status");--> statement-breakpoint
CREATE INDEX "idx_muxer_jobs_createdAt" ON "muxer_jobs" USING btree ("createdAt");

View File

@@ -0,0 +1,511 @@
{
"id": "2b9e2347-dd99-4bf8-bbcb-f407af29ca83",
"prevId": "00000000-0000-0000-0000-000000000000",
"version": "7",
"dialect": "postgresql",
"tables": {
"public.attachments": {
"name": "attachments",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true
},
"message_id": {
"name": "message_id",
"type": "text",
"primaryKey": false,
"notNull": true
},
"guild_id": {
"name": "guild_id",
"type": "text",
"primaryKey": false,
"notNull": true
},
"channel_id": {
"name": "channel_id",
"type": "text",
"primaryKey": false,
"notNull": true
},
"thread_id": {
"name": "thread_id",
"type": "text",
"primaryKey": false,
"notNull": false
},
"user_id": {
"name": "user_id",
"type": "text",
"primaryKey": false,
"notNull": true
},
"filename": {
"name": "filename",
"type": "text",
"primaryKey": false,
"notNull": true
},
"size": {
"name": "size",
"type": "integer",
"primaryKey": false,
"notNull": true
},
"type": {
"name": "type",
"type": "text",
"primaryKey": false,
"notNull": true
},
"discord_url": {
"name": "discord_url",
"type": "text",
"primaryKey": false,
"notNull": true
},
"uploaded_url": {
"name": "uploaded_url",
"type": "text",
"primaryKey": false,
"notNull": false
},
"upload_status": {
"name": "upload_status",
"type": "text",
"primaryKey": false,
"notNull": true,
"default": "'pending'"
},
"upload_error": {
"name": "upload_error",
"type": "text",
"primaryKey": false,
"notNull": false
},
"created_at": {
"name": "created_at",
"type": "bigint",
"primaryKey": false,
"notNull": true
},
"uploaded_at": {
"name": "uploaded_at",
"type": "bigint",
"primaryKey": false,
"notNull": false
}
},
"indexes": {
"idx_attachments_channel": {
"name": "idx_attachments_channel",
"columns": [
{
"expression": "channel_id",
"isExpression": false,
"asc": true,
"nulls": "last"
}
],
"isUnique": false,
"concurrently": false,
"method": "btree",
"with": {}
},
"idx_attachments_message": {
"name": "idx_attachments_message",
"columns": [
{
"expression": "message_id",
"isExpression": false,
"asc": true,
"nulls": "last"
}
],
"isUnique": false,
"concurrently": false,
"method": "btree",
"with": {}
},
"idx_attachments_status": {
"name": "idx_attachments_status",
"columns": [
{
"expression": "upload_status",
"isExpression": false,
"asc": true,
"nulls": "last"
}
],
"isUnique": false,
"concurrently": false,
"method": "btree",
"with": {}
}
},
"foreignKeys": {
"fk_attachments_message_id": {
"name": "fk_attachments_message_id",
"tableFrom": "attachments",
"tableTo": "messages",
"columnsFrom": [
"message_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.messages": {
"name": "messages",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true
},
"guild_id": {
"name": "guild_id",
"type": "text",
"primaryKey": false,
"notNull": true
},
"channel_id": {
"name": "channel_id",
"type": "text",
"primaryKey": false,
"notNull": true
},
"thread_id": {
"name": "thread_id",
"type": "text",
"primaryKey": false,
"notNull": false
},
"user_id": {
"name": "user_id",
"type": "text",
"primaryKey": false,
"notNull": true
},
"username": {
"name": "username",
"type": "text",
"primaryKey": false,
"notNull": true
},
"avatar_url": {
"name": "avatar_url",
"type": "text",
"primaryKey": false,
"notNull": false
},
"content": {
"name": "content",
"type": "text",
"primaryKey": false,
"notNull": true
},
"edited_content": {
"name": "edited_content",
"type": "text",
"primaryKey": false,
"notNull": false
},
"created_at": {
"name": "created_at",
"type": "bigint",
"primaryKey": false,
"notNull": true
},
"edited_at": {
"name": "edited_at",
"type": "bigint",
"primaryKey": false,
"notNull": false
},
"deleted_at": {
"name": "deleted_at",
"type": "bigint",
"primaryKey": false,
"notNull": false
},
"type": {
"name": "type",
"type": "text",
"primaryKey": false,
"notNull": true,
"default": "'text'"
},
"metadata": {
"name": "metadata",
"type": "text",
"primaryKey": false,
"notNull": false
},
"ai_status": {
"name": "ai_status",
"type": "text",
"primaryKey": false,
"notNull": true,
"default": "'pending'"
},
"ai_moderation_flags": {
"name": "ai_moderation_flags",
"type": "text",
"primaryKey": false,
"notNull": false
},
"ai_moderation_score": {
"name": "ai_moderation_score",
"type": "real",
"primaryKey": false,
"notNull": false
},
"ai_moderation_raw": {
"name": "ai_moderation_raw",
"type": "text",
"primaryKey": false,
"notNull": false
},
"ai_analysis": {
"name": "ai_analysis",
"type": "text",
"primaryKey": false,
"notNull": false
},
"ai_analyzed_at": {
"name": "ai_analyzed_at",
"type": "bigint",
"primaryKey": false,
"notNull": false
},
"ai_error": {
"name": "ai_error",
"type": "text",
"primaryKey": false,
"notNull": false
}
},
"indexes": {
"idx_messages_channel": {
"name": "idx_messages_channel",
"columns": [
{
"expression": "channel_id",
"isExpression": false,
"asc": true,
"nulls": "last"
}
],
"isUnique": false,
"concurrently": false,
"method": "btree",
"with": {}
},
"idx_messages_user": {
"name": "idx_messages_user",
"columns": [
{
"expression": "user_id",
"isExpression": false,
"asc": true,
"nulls": "last"
}
],
"isUnique": false,
"concurrently": false,
"method": "btree",
"with": {}
},
"idx_messages_created": {
"name": "idx_messages_created",
"columns": [
{
"expression": "created_at",
"isExpression": false,
"asc": true,
"nulls": "last"
}
],
"isUnique": false,
"concurrently": false,
"method": "btree",
"with": {}
},
"idx_messages_thread": {
"name": "idx_messages_thread",
"columns": [
{
"expression": "thread_id",
"isExpression": false,
"asc": true,
"nulls": "last"
}
],
"isUnique": false,
"concurrently": false,
"method": "btree",
"with": {}
}
},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.muxer_jobs": {
"name": "muxer_jobs",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true
},
"data": {
"name": "data",
"type": "text",
"primaryKey": false,
"notNull": true
},
"status": {
"name": "status",
"type": "text",
"primaryKey": false,
"notNull": true,
"default": "'pending'"
},
"attempts": {
"name": "attempts",
"type": "integer",
"primaryKey": false,
"notNull": true,
"default": 0
},
"maxAttempts": {
"name": "maxAttempts",
"type": "integer",
"primaryKey": false,
"notNull": true,
"default": 3
},
"createdAt": {
"name": "createdAt",
"type": "bigint",
"primaryKey": false,
"notNull": true
},
"updatedAt": {
"name": "updatedAt",
"type": "bigint",
"primaryKey": false,
"notNull": true
},
"error": {
"name": "error",
"type": "text",
"primaryKey": false,
"notNull": false
}
},
"indexes": {
"idx_muxer_jobs_status": {
"name": "idx_muxer_jobs_status",
"columns": [
{
"expression": "status",
"isExpression": false,
"asc": true,
"nulls": "last"
}
],
"isUnique": false,
"concurrently": false,
"method": "btree",
"with": {}
},
"idx_muxer_jobs_createdAt": {
"name": "idx_muxer_jobs_createdAt",
"columns": [
{
"expression": "createdAt",
"isExpression": false,
"asc": true,
"nulls": "last"
}
],
"isUnique": false,
"concurrently": false,
"method": "btree",
"with": {}
}
},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.ui_state": {
"name": "ui_state",
"schema": "",
"columns": {
"key": {
"name": "key",
"type": "text",
"primaryKey": true,
"notNull": true
},
"value": {
"name": "value",
"type": "text",
"primaryKey": false,
"notNull": true
},
"updated_at": {
"name": "updated_at",
"type": "bigint",
"primaryKey": false,
"notNull": true
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
}
},
"enums": {},
"schemas": {},
"sequences": {},
"roles": {},
"policies": {},
"views": {},
"_meta": {
"columns": {},
"schemas": {},
"tables": {}
}
}

View File

@@ -0,0 +1,13 @@
{
"version": "7",
"dialect": "postgresql",
"entries": [
{
"idx": 0,
"version": "7",
"when": 1778750697764,
"tag": "0000_rare_kitty_pryde",
"breakpoints": true
}
]
}

View File

@@ -11,22 +11,29 @@
"typecheck": "tsc --noEmit",
"lint": "biome check --diagnostic-level=error .",
"format": "biome format --write .",
"test": "vitest run"
"test": "vitest run",
"db:generate": "drizzle-kit generate",
"db:migrate": "drizzle-kit migrate",
"db:migrate:programmatic": "tsx src/database/migrate.ts",
"db:studio": "drizzle-kit studio"
},
"dependencies": {
"@discordjs/opus": "^0.10.0",
"@discordjs/voice": "^0.19.1",
"@snazzah/davey": "^0.1.10",
"@types/pg": "^8.20.0",
"better-sqlite3": "^12.10.0",
"class-transformer": "^0.5.1",
"class-validator": "^0.15.1",
"discord.js-selfbot-v13": "^3.7.1",
"dotenv": "^17.4.2",
"drizzle-orm": "^0.45.2",
"express": "^5.2.1",
"fluent-ffmpeg": "^2.1.3",
"helmet": "^8.1.0",
"libsodium-wrappers": "^0.8.2",
"p-retry": "^6.2.0",
"pg": "^8.20.0",
"pino": "^9.4.0",
"pino-http": "^11.0.0",
"prism-media": "2.0.0-alpha.0",
@@ -42,6 +49,7 @@
"@types/fluent-ffmpeg": "^2.1.28",
"@types/node": "^24.10.1",
"@types/ws": "^8.18.1",
"drizzle-kit": "^0.31.10",
"pino-pretty": "^10.3.1",
"tsx": "^4.20.6",
"vitest": "latest"

771
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

241
scripts/migrate-data.ts Normal file
View File

@@ -0,0 +1,241 @@
import path from "node:path";
import Database from "better-sqlite3";
import { createChildLogger } from "../src/logger";
import * as postgres from "../src/database/postgres";
const logger = createChildLogger("migrate-data");
interface MuxerJob {
id: string;
data: string;
status: string;
attempts: number;
maxAttempts: number;
createdAt: number;
updatedAt: number;
error?: string;
}
interface Message {
id: string;
guild_id: string;
channel_id: string;
thread_id?: string;
user_id: string;
username: string;
avatar_url?: string;
content: string;
edited_content?: string;
created_at: number;
edited_at?: number;
deleted_at?: number;
type: string;
metadata?: string;
ai_status: string;
ai_moderation_flags?: string;
ai_moderation_score?: number;
ai_moderation_raw?: string;
ai_analysis?: string;
ai_analyzed_at?: number;
ai_error?: string;
}
interface Attachment {
id: string;
message_id: string;
guild_id: string;
channel_id: string;
thread_id?: string;
user_id: string;
filename: string;
size: number;
type: string;
discord_url: string;
uploaded_url?: string;
upload_status: string;
upload_error?: string;
created_at: number;
uploaded_at?: number;
}
interface UiState {
key: string;
value: string;
updated_at: number;
}
async function migrateData(): Promise<void> {
let sqliteDb: Database.Database | null = null;
try {
logger.info("Starting data migration from SQLite to PostgreSQL");
// Open SQLite database
const dbPath = path.join(process.cwd(), ".muxer-queue.db");
sqliteDb = new Database(dbPath);
logger.info({ dbPath }, "SQLite database opened");
// Initialize PostgreSQL pool
const pool = postgres.getPool();
logger.info("PostgreSQL connection pool initialized");
// Migrate muxer_jobs table
logger.info("Migrating muxer_jobs table...");
const muxerJobsStmt = sqliteDb.prepare("SELECT * FROM muxer_jobs");
const muxerJobs = muxerJobsStmt.all() as MuxerJob[];
for (const job of muxerJobs) {
await postgres.query(
`INSERT INTO muxer_jobs (id, data, status, attempts, maxAttempts, createdAt, updatedAt, error)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (id) DO NOTHING`,
[
job.id,
job.data,
job.status,
job.attempts,
job.maxAttempts,
job.createdAt,
job.updatedAt,
job.error || null,
],
);
}
logger.info({ count: muxerJobs.length }, "Migrated muxer_jobs");
// Migrate messages table
logger.info("Migrating messages table...");
const messagesStmt = sqliteDb.prepare("SELECT * FROM messages");
const messages = messagesStmt.all() as Message[];
for (const msg of messages) {
await postgres.query(
`INSERT INTO messages (
id, guild_id, channel_id, thread_id, user_id, username, avatar_url,
content, edited_content, created_at, edited_at, deleted_at, type,
metadata, ai_status, ai_moderation_flags, ai_moderation_score,
ai_moderation_raw, ai_analysis, ai_analyzed_at, ai_error
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,
$16, $17, $18, $19, $20, $21
)
ON CONFLICT (id) DO NOTHING`,
[
msg.id,
msg.guild_id,
msg.channel_id,
msg.thread_id || null,
msg.user_id,
msg.username,
msg.avatar_url || null,
msg.content,
msg.edited_content || null,
msg.created_at,
msg.edited_at || null,
msg.deleted_at || null,
msg.type,
msg.metadata || null,
msg.ai_status,
msg.ai_moderation_flags || null,
msg.ai_moderation_score || null,
msg.ai_moderation_raw || null,
msg.ai_analysis || null,
msg.ai_analyzed_at || null,
msg.ai_error || null,
],
);
}
logger.info({ count: messages.length }, "Migrated messages");
// Migrate attachments table
logger.info("Migrating attachments table...");
const attachmentsStmt = sqliteDb.prepare("SELECT * FROM attachments");
const attachments = attachmentsStmt.all() as Attachment[];
for (const att of attachments) {
await postgres.query(
`INSERT INTO attachments (
id, message_id, guild_id, channel_id, thread_id, user_id, filename,
size, type, discord_url, uploaded_url, upload_status, upload_error,
created_at, uploaded_at
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15
)
ON CONFLICT (id) DO NOTHING`,
[
att.id,
att.message_id,
att.guild_id,
att.channel_id,
att.thread_id || null,
att.user_id,
att.filename,
att.size,
att.type,
att.discord_url,
att.uploaded_url || null,
att.upload_status,
att.upload_error || null,
att.created_at,
att.uploaded_at || null,
],
);
}
logger.info({ count: attachments.length }, "Migrated attachments");
// Migrate ui_state table
logger.info("Migrating ui_state table...");
const uiStateStmt = sqliteDb.prepare("SELECT * FROM ui_state");
const uiStates = uiStateStmt.all() as UiState[];
for (const state of uiStates) {
await postgres.query(
`INSERT INTO ui_state (key, value, updated_at)
VALUES ($1, $2, $3)
ON CONFLICT (key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at`,
[state.key, state.value, state.updated_at],
);
}
logger.info({ count: uiStates.length }, "Migrated ui_state");
logger.info(
{
muxerJobs: muxerJobs.length,
messages: messages.length,
attachments: attachments.length,
uiState: uiStates.length,
},
"Data migration completed successfully",
);
} catch (error) {
logger.error(
{
error: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined,
},
"Data migration failed",
);
process.exit(1);
} finally {
// Close SQLite connection
if (sqliteDb) {
sqliteDb.close();
logger.info("SQLite database closed");
}
// Close PostgreSQL pool
await postgres.closePool();
logger.info("PostgreSQL connection pool closed");
}
}
// Run migration
migrateData().catch((error) => {
logger.error(
{
error: error instanceof Error ? error.message : String(error),
},
"Unhandled error in migration",
);
process.exit(1);
});

97
setup-postgres.sh Executable file
View File

@@ -0,0 +1,97 @@
#!/bin/bash
# PostgreSQL Migration Quick Start Script
# Run this to set up PostgreSQL migration
set -e
echo "🚀 Discord Bot - PostgreSQL Migration Setup"
echo "==========================================="
echo ""
# Check if PostgreSQL is installed
if ! command -v psql &> /dev/null; then
echo "❌ PostgreSQL is not installed. Please install PostgreSQL first."
echo " macOS: brew install postgresql"
echo " Ubuntu: sudo apt-get install postgresql postgresql-contrib"
echo " Windows: Download from https://www.postgresql.org/download/windows/"
exit 1
fi
echo "✅ PostgreSQL found"
echo ""
# Check if Node.js is installed
if ! command -v node &> /dev/null; then
echo "❌ Node.js is not installed."
exit 1
fi
echo "✅ Node.js found"
echo ""
# Create .env if it doesn't exist
if [ ! -f .env ]; then
echo "📝 Creating .env file..."
cp .env.example .env
echo "⚠️ Please update .env with your PostgreSQL connection string"
echo " DATABASE_URL=postgresql://user:password@localhost:5432/discord_bot"
echo ""
fi
# Install dependencies
echo "📦 Installing dependencies..."
pnpm add pg pg-pool node-pg-migrate
pnpm add -D @types/pg @types/node-pg-migrate
pnpm remove better-sqlite3 @types/better-sqlite3 2>/dev/null || true
echo "✅ Dependencies installed"
echo ""
# Create database
read -p "Enter PostgreSQL username (default: postgres): " PG_USER
PG_USER=${PG_USER:-postgres}
read -p "Enter PostgreSQL password: " -s PG_PASSWORD
echo ""
read -p "Enter database name (default: discord_bot): " DB_NAME
DB_NAME=${DB_NAME:-discord_bot}
read -p "Enter PostgreSQL host (default: localhost): " PG_HOST
PG_HOST=${PG_HOST:-localhost}
read -p "Enter PostgreSQL port (default: 5432): " PG_PORT
PG_PORT=${PG_PORT:-5432}
echo ""
echo "🗄️ Creating database..."
PGPASSWORD=$PG_PASSWORD psql -h $PG_HOST -U $PG_USER -p $PG_PORT -tc "SELECT 1 FROM pg_database WHERE datname = '$DB_NAME'" | grep -q 1 || \
PGPASSWORD=$PG_PASSWORD psql -h $PG_HOST -U $PG_USER -p $PG_PORT -c "CREATE DATABASE $DB_NAME"
echo "✅ Database created"
echo ""
# Update .env with connection string
DATABASE_URL="postgresql://$PG_USER:$PG_PASSWORD@$PG_HOST:$PG_PORT/$DB_NAME"
sed -i.bak "s|DATABASE_URL=.*|DATABASE_URL=$DATABASE_URL|" .env
rm -f .env.bak
echo "✅ .env updated with DATABASE_URL"
echo ""
# Run migrations
echo "🔄 Running migrations..."
npx node-pg-migrate up
echo "✅ Migrations completed"
echo ""
echo "🎉 PostgreSQL migration setup complete!"
echo ""
echo "Next steps:"
echo "1. Review the migration files in src/db/migrations/"
echo "2. Update imports in your code (see POSTGRES_IMPLEMENTATION.md)"
echo "3. Run: pnpm run typecheck"
echo "4. Run: pnpm run dev"
echo ""

View File

@@ -1,7 +1,8 @@
import { z } from "zod";
import { ConfigError } from "./errors";
const configSchema = z.object({
const configSchema = z
.object({
DISCORD_TOKEN: z.string().min(1, "DISCORD_TOKEN is required"),
VOICE_CHANNEL_ID: z.string().min(1).optional(),
GUILD_ID: z.string().min(1).optional(),
@@ -17,7 +18,10 @@ const configSchema = z.object({
WEBSERVER_PORT: z.coerce.number().positive().default(3000),
VOICE_CONNECTION_TIMEOUT_MS: z.coerce.number().positive().default(15000),
RECONNECT_TIMEOUT_MS: z.coerce.number().positive().default(5000),
AUDIO_STREAM_SILENCE_DURATION_MS: z.coerce.number().positive().default(3000),
AUDIO_STREAM_SILENCE_DURATION_MS: z.coerce
.number()
.positive()
.default(3000),
PACKET_FILTER_MIN_SIZE: z.coerce.number().positive().default(8),
OPUS_FRAME_SIZE: z.coerce.number().positive().default(960),
AUDIO_SAMPLE_RATE: z.coerce.number().positive().default(48000),
@@ -28,33 +32,70 @@ const configSchema = z.object({
.enum(["development", "production", "test"])
.default("development"),
MONITOR_GUILD_ID: z.string().min(1).optional(),
PICSER_UPLOAD_URL: z.string().url().default("https://picser.asepharyana.tech/api/upload"),
PICSER_UPLOAD_URL: z
.string()
.url()
.default("https://picser.asepharyana.tech/api/upload"),
ATTACHMENT_UPLOAD_TIMEOUT_MS: z.coerce.number().positive().default(30000),
ATTACHMENT_MAX_SIZE_MB: z.coerce.number().positive().default(100),
ATTACHMENT_RETRY_ATTEMPTS: z.coerce.number().positive().default(3),
BACKLOG_SYNC_HOURS: z.coerce.number().positive().default(24),
BACKLOG_SYNC_BATCH_SIZE: z.coerce.number().int().positive().max(100).default(100),
BACKLOG_SYNC_BATCH_SIZE: z.coerce
.number()
.int()
.positive()
.max(100)
.default(100),
AI_ANALYSIS_ENABLED: z
.string()
.optional()
.transform((v) => v === "true")
.default(false),
OPENAI_MODERATION_API_KEY: z.string().optional(),
OPENAI_MODERATION_BASE_URL: z.string().url().default("https://api.openai.com/v1"),
OPENAI_MODERATION_BASE_URL: z
.string()
.url()
.default("https://api.openai.com/v1"),
OPENAI_MODERATION_MODEL: z.string().default("omni-moderation-latest"),
AI_LLM_API_KEY: z.string().optional(),
AI_LLM_BASE_URL: z.string().url().default("https://9router.asepharyana.tech/v1"),
AI_LLM_BASE_URL: z
.string()
.url()
.default("https://9router.asepharyana.tech/v1"),
AI_LLM_MODEL: z.string().default("free"),
AI_ANALYSIS_TIMEOUT_MS: z.coerce.number().positive().default(30000),
}).superRefine((value, ctx) => {
if (!value.AI_ANALYSIS_ENABLED) return;
if (!value.AI_LLM_API_KEY) {
DATABASE_TYPE: z.enum(["sqlite", "postgres"]).default("sqlite"),
DATABASE_URL: z.string().optional(),
POSTGRES_HOST: z.string().default("localhost"),
POSTGRES_PORT: z.coerce.number().int().positive().default(5432),
POSTGRES_USER: z.string().optional(),
POSTGRES_PASSWORD: z.string().optional(),
POSTGRES_DB: z.string().optional(),
POSTGRES_POOL_MIN: z.coerce.number().int().positive().default(2),
POSTGRES_POOL_MAX: z.coerce.number().int().positive().default(10),
})
.superRefine((value, ctx) => {
if (!value.AI_ANALYSIS_ENABLED) {
// Continue to database validation
} else if (!value.AI_LLM_API_KEY) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
path: ["AI_LLM_API_KEY"],
message: "AI_LLM_API_KEY is required when AI_ANALYSIS_ENABLED=true",
});
}
// Validate PostgreSQL configuration
if (value.DATABASE_TYPE === "postgres") {
if (!value.DATABASE_URL && !value.POSTGRES_HOST) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
path: ["DATABASE_URL"],
message:
"Either DATABASE_URL or POSTGRES_HOST must be provided when DATABASE_TYPE=postgres",
});
}
}
});
export type AppConfig = z.infer<typeof configSchema>;

91
src/database/drizzle.ts Normal file
View File

@@ -0,0 +1,91 @@
import Database from "better-sqlite3";
import { drizzle as drizzleSqlite } from "drizzle-orm/better-sqlite3";
import { drizzle as drizzlePostgres } from "drizzle-orm/node-postgres";
import { Pool } from "pg";
import { config } from "../config";
import { createChildLogger } from "../logger";
import * as schema from "./schema";
const logger = createChildLogger("drizzle");
let db:
| ReturnType<typeof drizzlePostgres>
| ReturnType<typeof drizzleSqlite>
| null = null;
/**
* Initialize the database connection based on DATABASE_TYPE config
* Supports both PostgreSQL and SQLite
*/
export async function initializeDatabase() {
if (db !== null) {
logger.debug("Database already initialized, skipping");
return db;
}
if (config.DATABASE_TYPE === "postgres") {
let pool: Pool;
// Use DATABASE_URL if available, otherwise build from individual variables
if (config.DATABASE_URL) {
pool = new Pool({
connectionString: config.DATABASE_URL,
min: config.POSTGRES_POOL_MIN,
max: config.POSTGRES_POOL_MAX,
});
} else {
pool = new Pool({
host: config.POSTGRES_HOST,
port: config.POSTGRES_PORT,
user: config.POSTGRES_USER,
password: config.POSTGRES_PASSWORD,
database: config.POSTGRES_DB,
min: config.POSTGRES_POOL_MIN,
max: config.POSTGRES_POOL_MAX,
});
}
db = drizzlePostgres(pool, { schema });
logger.info("PostgreSQL database initialized");
} else {
const sqlite = new Database(".muxer-queue.db");
sqlite.pragma("journal_mode = WAL");
db = drizzleSqlite(sqlite, { schema });
logger.info("SQLite database initialized");
}
return db;
}
/**
* Get the initialized database instance
* Throws if database has not been initialized
*/
export function getDatabase() {
if (db === null) {
throw new Error(
"Database not initialized. Call initializeDatabase() first.",
);
}
return db;
}
/**
* Close the database connection
* For PostgreSQL, the pool will close on process exit
* For SQLite, closes the database connection
*/
export async function closeDatabase() {
if (db === null) {
return;
}
if (config.DATABASE_TYPE === "postgres") {
logger.info("PostgreSQL connection pool will close on process exit");
} else {
logger.info("SQLite database closed");
}
db = null;
}

48
src/database/migrate.ts Normal file
View File

@@ -0,0 +1,48 @@
import "dotenv/config";
import { migrate } from "drizzle-orm/node-postgres/migrator";
import { migrate as migrateSqlite } from "drizzle-orm/better-sqlite3/migrator";
import { initializeDatabase } from "./drizzle";
import { config } from "../config";
import { createChildLogger } from "../logger";
import Database from "better-sqlite3";
const logger = createChildLogger("migrate");
export async function runMigrations(): Promise<void> {
try {
logger.info("Starting database migrations");
if (config.DATABASE_TYPE === "postgres") {
logger.info("Running PostgreSQL migrations");
const db = await initializeDatabase();
await migrate(db as any, { migrationsFolder: "./drizzle/migrations" });
logger.info("PostgreSQL migrations completed successfully");
} else {
logger.info("Running SQLite migrations");
const sqlite = new Database(".muxer-queue.db");
sqlite.pragma("journal_mode = WAL");
const db = require("drizzle-orm/better-sqlite3").drizzle(sqlite);
migrateSqlite(db, { migrationsFolder: "./drizzle/migrations" });
logger.info("SQLite migrations completed successfully");
}
} catch (error) {
logger.error(
{ error: error instanceof Error ? error.message : String(error) },
"Migration failed"
);
throw error;
}
}
// Run migrations if called directly
if (require.main === module) {
runMigrations()
.then(() => {
logger.info("Migrations completed");
process.exit(0);
})
.catch((error) => {
logger.error({ error }, "Migration failed");
process.exit(1);
});
}

284
src/database/schema.ts Normal file
View File

@@ -0,0 +1,284 @@
import {
bigint as pgBigint,
foreignKey as pgForeignKey,
index as pgIndex,
integer as pgInteger,
real as pgReal,
pgTable,
text as pgText,
} from "drizzle-orm/pg-core";
import {
index as sqliteIndex,
integer as sqliteInteger,
real as sqliteReal,
sqliteTable,
text as sqliteText,
} from "drizzle-orm/sqlite-core";
import { config } from "../config";
// PostgreSQL Schema
// ==================
/**
* Muxer Jobs Table (PostgreSQL)
* Tracks audio post-processing jobs with status and retry logic
*/
export const pgMuxerJobsTable = pgTable(
"muxer_jobs",
{
id: pgText("id").primaryKey(),
data: pgText("data").notNull(),
status: pgText("status", {
enum: ["pending", "processing", "completed", "failed"],
})
.notNull()
.default("pending"),
attempts: pgInteger("attempts").notNull().default(0),
maxAttempts: pgInteger("maxAttempts").notNull().default(3),
createdAt: pgBigint("createdAt", { mode: "number" }).notNull(),
updatedAt: pgBigint("updatedAt", { mode: "number" }).notNull(),
error: pgText("error"),
},
(table) => ({
statusIdx: pgIndex("idx_muxer_jobs_status").on(table.status),
createdAtIdx: pgIndex("idx_muxer_jobs_createdAt").on(table.createdAt),
}),
);
/**
* Messages Table (PostgreSQL)
* Stores text messages with AI moderation analysis
*/
export const pgMessagesTable = pgTable(
"messages",
{
id: pgText("id").primaryKey(),
guild_id: pgText("guild_id").notNull(),
channel_id: pgText("channel_id").notNull(),
thread_id: pgText("thread_id"),
user_id: pgText("user_id").notNull(),
username: pgText("username").notNull(),
avatar_url: pgText("avatar_url"),
content: pgText("content").notNull(),
edited_content: pgText("edited_content"),
created_at: pgBigint("created_at", { mode: "number" }).notNull(),
edited_at: pgBigint("edited_at", { mode: "number" }),
deleted_at: pgBigint("deleted_at", { mode: "number" }),
type: pgText("type", { enum: ["text", "edited", "deleted"] })
.notNull()
.default("text"),
metadata: pgText("metadata"),
ai_status: pgText("ai_status", {
enum: ["pending", "clean", "warn", "flagged", "error"],
})
.notNull()
.default("pending"),
ai_moderation_flags: pgText("ai_moderation_flags"),
ai_moderation_score: pgReal("ai_moderation_score"),
ai_moderation_raw: pgText("ai_moderation_raw"),
ai_analysis: pgText("ai_analysis"),
ai_analyzed_at: pgBigint("ai_analyzed_at", { mode: "number" }),
ai_error: pgText("ai_error"),
},
(table) => ({
channelIdx: pgIndex("idx_messages_channel").on(table.channel_id),
userIdx: pgIndex("idx_messages_user").on(table.user_id),
createdIdx: pgIndex("idx_messages_created").on(table.created_at),
threadIdx: pgIndex("idx_messages_thread").on(table.thread_id),
}),
);
/**
* Attachments Table (PostgreSQL)
* Stores attachment metadata with upload status tracking
*/
export const pgAttachmentsTable = pgTable(
"attachments",
{
id: pgText("id").primaryKey(),
message_id: pgText("message_id").notNull(),
guild_id: pgText("guild_id").notNull(),
channel_id: pgText("channel_id").notNull(),
thread_id: pgText("thread_id"),
user_id: pgText("user_id").notNull(),
filename: pgText("filename").notNull(),
size: pgInteger("size").notNull(),
type: pgText("type").notNull(),
discord_url: pgText("discord_url").notNull(),
uploaded_url: pgText("uploaded_url"),
upload_status: pgText("upload_status", {
enum: ["pending", "uploaded", "failed"],
})
.notNull()
.default("pending"),
upload_error: pgText("upload_error"),
created_at: pgBigint("created_at", { mode: "number" }).notNull(),
uploaded_at: pgBigint("uploaded_at", { mode: "number" }),
},
(table) => ({
channelIdx: pgIndex("idx_attachments_channel").on(table.channel_id),
messageIdx: pgIndex("idx_attachments_message").on(table.message_id),
statusIdx: pgIndex("idx_attachments_status").on(table.upload_status),
messageFk: pgForeignKey({
columns: [table.message_id],
foreignColumns: [pgMessagesTable.id],
name: "fk_attachments_message_id",
}).onDelete("cascade"),
}),
);
/**
* UI State Table (PostgreSQL)
* Stores persistent UI state (e.g., selected channel, filter preferences)
*/
export const pgUIStateTable = pgTable("ui_state", {
key: pgText("key").primaryKey(),
value: pgText("value").notNull(),
updated_at: pgBigint("updated_at", { mode: "number" }).notNull(),
});
// SQLite Schema
// =============
/**
* Muxer Jobs Table (SQLite)
* Tracks audio post-processing jobs with status and retry logic
*/
export const sqliteMuxerJobsTable = sqliteTable(
"muxer_jobs",
{
id: sqliteText("id").primaryKey(),
data: sqliteText("data").notNull(),
status: sqliteText("status", {
enum: ["pending", "processing", "completed", "failed"],
})
.notNull()
.default("pending"),
attempts: sqliteInteger("attempts").notNull().default(0),
maxAttempts: sqliteInteger("maxAttempts").notNull().default(3),
createdAt: sqliteInteger("createdAt").notNull(),
updatedAt: sqliteInteger("updatedAt").notNull(),
error: sqliteText("error"),
},
(table) => ({
statusIdx: sqliteIndex("idx_muxer_jobs_status").on(table.status),
createdAtIdx: sqliteIndex("idx_muxer_jobs_createdAt").on(table.createdAt),
}),
);
/**
* Messages Table (SQLite)
* Stores text messages with AI moderation analysis
*/
export const sqliteMessagesTable = sqliteTable(
"messages",
{
id: sqliteText("id").primaryKey(),
guild_id: sqliteText("guild_id").notNull(),
channel_id: sqliteText("channel_id").notNull(),
thread_id: sqliteText("thread_id"),
user_id: sqliteText("user_id").notNull(),
username: sqliteText("username").notNull(),
avatar_url: sqliteText("avatar_url"),
content: sqliteText("content").notNull(),
edited_content: sqliteText("edited_content"),
created_at: sqliteInteger("created_at").notNull(),
edited_at: sqliteInteger("edited_at"),
deleted_at: sqliteInteger("deleted_at"),
type: sqliteText("type", { enum: ["text", "edited", "deleted"] })
.notNull()
.default("text"),
metadata: sqliteText("metadata"),
ai_status: sqliteText("ai_status", {
enum: ["pending", "clean", "warn", "flagged", "error"],
})
.notNull()
.default("pending"),
ai_moderation_flags: sqliteText("ai_moderation_flags"),
ai_moderation_score: sqliteReal("ai_moderation_score"),
ai_moderation_raw: sqliteText("ai_moderation_raw"),
ai_analysis: sqliteText("ai_analysis"),
ai_analyzed_at: sqliteInteger("ai_analyzed_at"),
ai_error: sqliteText("ai_error"),
},
(table) => ({
channelIdx: sqliteIndex("idx_messages_channel").on(table.channel_id),
userIdx: sqliteIndex("idx_messages_user").on(table.user_id),
createdIdx: sqliteIndex("idx_messages_created").on(table.created_at),
threadIdx: sqliteIndex("idx_messages_thread").on(table.thread_id),
}),
);
/**
* Attachments Table (SQLite)
* Stores attachment metadata with upload status tracking
*/
export const sqliteAttachmentsTable = sqliteTable(
"attachments",
{
id: sqliteText("id").primaryKey(),
message_id: sqliteText("message_id").notNull(),
guild_id: sqliteText("guild_id").notNull(),
channel_id: sqliteText("channel_id").notNull(),
thread_id: sqliteText("thread_id"),
user_id: sqliteText("user_id").notNull(),
filename: sqliteText("filename").notNull(),
size: sqliteInteger("size").notNull(),
type: sqliteText("type").notNull(),
discord_url: sqliteText("discord_url").notNull(),
uploaded_url: sqliteText("uploaded_url"),
upload_status: sqliteText("upload_status", {
enum: ["pending", "uploaded", "failed"],
})
.notNull()
.default("pending"),
upload_error: sqliteText("upload_error"),
created_at: sqliteInteger("created_at").notNull(),
uploaded_at: sqliteInteger("uploaded_at"),
},
(table) => ({
channelIdx: sqliteIndex("idx_attachments_channel").on(table.channel_id),
messageIdx: sqliteIndex("idx_attachments_message").on(table.message_id),
statusIdx: sqliteIndex("idx_attachments_status").on(table.upload_status),
}),
);
/**
* UI State Table (SQLite)
* Stores persistent UI state (e.g., selected channel, filter preferences)
*/
export const sqliteUIStateTable = sqliteTable("ui_state", {
key: sqliteText("key").primaryKey(),
value: sqliteText("value").notNull(),
updated_at: sqliteInteger("updated_at").notNull(),
});
// Runtime table selection based on config
// ========================================
export const muxerJobsTable =
config.DATABASE_TYPE === "postgres" ? pgMuxerJobsTable : sqliteMuxerJobsTable;
export const messagesTable =
config.DATABASE_TYPE === "postgres" ? pgMessagesTable : sqliteMessagesTable;
export const attachmentsTable =
config.DATABASE_TYPE === "postgres"
? pgAttachmentsTable
: sqliteAttachmentsTable;
export const uiStateTable =
config.DATABASE_TYPE === "postgres" ? pgUIStateTable : sqliteUIStateTable;
// Export table types for use in queries
export type MuxerJob = typeof muxerJobsTable.$inferSelect;
export type MuxerJobInsert = typeof muxerJobsTable.$inferInsert;
export type Message = typeof messagesTable.$inferSelect;
export type MessageInsert = typeof messagesTable.$inferInsert;
export type Attachment = typeof attachmentsTable.$inferSelect;
export type AttachmentInsert = typeof attachmentsTable.$inferInsert;
export type UIState = typeof uiStateTable.$inferSelect;
export type UIStateInsert = typeof uiStateTable.$inferInsert;

View File

@@ -4,28 +4,27 @@ import "@snazzah/davey";
import "dotenv/config";
import { Client } from "discord.js-selfbot-v13";
import { config } from "./config";
import { closeDatabase, initializeDatabase } from "./database/drizzle";
import { createChildLogger } from "./logger";
import { startPendingAIAnalysisWorker } from "./moderation/aiAnalyzer";
import { syncBacklogMessages } from "./moderation/backlogSync";
import { registerMessageCapture } from "./moderation/messageCapture";
import { discordPlayer } from "./player";
import { VoiceController } from "./voiceController";
import { startWebserver } from "./webserver";
import { registerMessageCapture } from "./moderation/messageCapture";
import { syncBacklogMessages } from "./moderation/backlogSync";
import { getDatabase } from "./muxer-queue";
import { startPendingAIAnalysisWorker } from "./moderation/aiAnalyzer";
const logger = createChildLogger("bot");
const token = config.DISCORD_TOKEN;
logger.info({ hasToken: token.length > 0, tokenLength: token.length }, "Config loaded");
logger.info(
{ hasToken: token.length > 0, tokenLength: token.length },
"Config loaded",
);
logger.info("Creating Discord client");
const client = new Client();
const voiceController = new VoiceController(client);
logger.info("Opening database");
const db = getDatabase();
logger.info("Database ready");
let isShuttingDown = false;
async function gracefulShutdown(signal: string) {
@@ -38,6 +37,10 @@ async function gracefulShutdown(signal: string) {
logger.info({ signal }, "Graceful shutdown initiated");
try {
logger.info("Closing database...");
await closeDatabase();
logger.info("Database closed");
logger.info("Stopping voice connection...");
await voiceController.disconnect();
@@ -59,14 +62,24 @@ async function gracefulShutdown(signal: string) {
}
}
async function initializeApp() {
try {
logger.info("Initializing database");
await initializeDatabase();
logger.info({ type: config.DATABASE_TYPE }, "Database initialized");
} catch (err) {
logger.error({ error: err }, "Failed to initialize database");
process.exit(1);
}
client.on("ready", async () => {
logger.info({ user: client.user?.tag }, "Bot logged in");
registerMessageCapture(client, db);
startPendingAIAnalysisWorker(db);
syncBacklogMessages(client, db).catch((error) => {
registerMessageCapture(client);
startPendingAIAnalysisWorker();
syncBacklogMessages(client).catch((error) => {
logger.warn({ error }, "Backlog sync failed");
});
startWebserver(config.WEBSERVER_PORT, client, voiceController);
await startWebserver(config.WEBSERVER_PORT, client, voiceController);
});
client.on("error", (err) => {
@@ -92,9 +105,17 @@ process.on("unhandledRejection", (reason, promise) => {
});
logger.info("Calling Discord client.login");
client.login(token).then(() => {
client
.login(token)
.then(() => {
logger.info("Discord client.login resolved");
}).catch((error) => {
})
.catch((error) => {
logger.error({ error }, "Discord client.login failed");
});
}
initializeApp().catch((error) => {
logger.error({ error }, "Failed to initialize app");
process.exit(1);
});

View File

@@ -7,7 +7,8 @@ export const logger = pino({
serializers: {
error: pino.stdSerializers.err,
err: pino.stdSerializers.err,
reason: (value) => value instanceof Error ? pino.stdSerializers.err(value) : value,
reason: (value) =>
value instanceof Error ? pino.stdSerializers.err(value) : value,
},
transport: isDev
? {

View File

@@ -2,7 +2,11 @@ import { config } from "../config";
import { createChildLogger } from "../logger";
import type { SqliteDatabase } from "../muxer-queue";
import { retryWithBackoff } from "../retry";
import { getMessageById, getPendingAIAnalysisMessages, updateMessageAIAnalysis } from "./messageStore";
import {
getMessageById,
getPendingAIAnalysisMessages,
updateMessageAIAnalysis,
} from "./messageStore";
import type { MessageRecord } from "./types";
const logger = createChildLogger("ai-analyzer");
@@ -37,7 +41,10 @@ function estimateTokens(text: string): number {
return Math.ceil(text.length / 4);
}
function formatMessageForAnalysis(message: MessageRecord, index: number): string {
function formatMessageForAnalysis(
message: MessageRecord,
index: number,
): string {
const text = getAnalysisText(message);
const time = new Date(message.created_at).toISOString();
return `${index + 1}. id=${message.id} time=${time} user=${message.username}: ${text}`;
@@ -49,7 +56,10 @@ function estimateMessageTokens(message: MessageRecord): number {
async function fetchJson(url: string, init: RequestInit): Promise<unknown> {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), config.AI_ANALYSIS_TIMEOUT_MS);
const timeout = setTimeout(
() => controller.abort(),
config.AI_ANALYSIS_TIMEOUT_MS,
);
try {
const response = await fetch(url, { ...init, signal: controller.signal });
@@ -85,10 +95,16 @@ function parseLLMAnalysis(content: string): LLMAnalysis {
if (jsonStart >= 0 && jsonEnd > jsonStart) {
try {
const parsed = JSON.parse(content.slice(jsonStart, jsonEnd + 1));
const status = parsed.status === "flagged" ? "flagged" : parsed.status === "warn" ? "warn" : "clean";
const status =
parsed.status === "flagged"
? "flagged"
: parsed.status === "warn"
? "warn"
: "clean";
const flags = Array.isArray(parsed.flags) ? parsed.flags.map(String) : [];
const score = Math.max(0, Math.min(1, Number(parsed.score) || 0));
const analysis = typeof parsed.analysis === "string" ? parsed.analysis : content;
const analysis =
typeof parsed.analysis === "string" ? parsed.analysis : content;
return { status, flags, score, analysis };
} catch {
// Fall through to text-only parsing.
@@ -96,19 +112,29 @@ function parseLLMAnalysis(content: string): LLMAnalysis {
}
return {
status: /flagged|bahaya|berisiko|toxic|hate|harassment|violence|sexual|self-harm|illegal|scam|hacking/i.test(content) ? "flagged" : /warn|provokasi|hinaan|menyerang/i.test(content) ? "warn" : "clean",
status:
/flagged|bahaya|berisiko|toxic|hate|harassment|violence|sexual|self-harm|illegal|scam|hacking/i.test(
content,
)
? "flagged"
: /warn|provokasi|hinaan|menyerang/i.test(content)
? "warn"
: "clean",
flags: [],
score: 0,
analysis: content.trim() || "Tidak ada analisis dari LLM.",
};
}
async function runLLMAnalysis(messages: MessageRecord[]): Promise<{ results: LLMAnalysis[]; raw: unknown }> {
const response = await retryWithBackoff(
() => fetchJson(`${config.AI_LLM_BASE_URL}/chat/completions`, {
async function runLLMAnalysis(
messages: MessageRecord[],
): Promise<{ results: LLMAnalysis[]; raw: unknown }> {
const response = (await retryWithBackoff(
() =>
fetchJson(`${config.AI_LLM_BASE_URL}/chat/completions`, {
method: "POST",
headers: {
"Authorization": `Bearer ${config.AI_LLM_API_KEY}`,
Authorization: `Bearer ${config.AI_LLM_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
@@ -177,7 +203,7 @@ Satu JSON object per pesan dalam array.`,
signal: AbortSignal.timeout(config.AI_ANALYSIS_TIMEOUT_MS),
}),
{ retries: 2, logger },
) as ChatCompletionResponse;
)) as ChatCompletionResponse;
const content = response.choices?.[0]?.message?.content?.trim() || "";
@@ -191,12 +217,18 @@ Satu JSON object per pesan dalam array.`,
const parsed = JSON.parse(content.substring(jsonStart, jsonEnd + 1));
if (Array.isArray(parsed)) {
results = parsed.map((item: any) => {
const status = item.status === "flagged" ? "flagged" : item.status === "warn" ? "warn" : "clean";
const status =
item.status === "flagged"
? "flagged"
: item.status === "warn"
? "warn"
: "clean";
return {
status,
flags: Array.isArray(item.flags) ? item.flags.map(String) : [],
score: Math.max(0, Math.min(1, Number(item.score) || 0)),
analysis: typeof item.analysis === "string" ? item.analysis : content,
analysis:
typeof item.analysis === "string" ? item.analysis : content,
};
});
}
@@ -213,10 +245,12 @@ Satu JSON object per pesan dalam array.`,
return { results, raw: response };
}
async function analyzeAndStoreBatch(db: SqliteDatabase, messages: MessageRecord[]): Promise<void> {
async function analyzeAndStoreBatch(messages: MessageRecord[]): Promise<void> {
if (messages.length === 0) return;
const analyzableMessages = messages.filter((message) => getAnalysisText(message).length > 0);
const analyzableMessages = messages.filter(
(message) => getAnalysisText(message).length > 0,
);
if (analyzableMessages.length === 0) return;
activeRequests++;
@@ -227,8 +261,13 @@ async function analyzeAndStoreBatch(db: SqliteDatabase, messages: MessageRecord[
const message = analyzableMessages[i];
const result = results[i] || parseLLMAnalysis("");
const row = updateMessageAIAnalysis(db, message.id, {
status: result.status as "pending" | "clean" | "warn" | "flagged" | "error",
const row = await updateMessageAIAnalysis(message.id, {
status: result.status as
| "pending"
| "clean"
| "warn"
| "flagged"
| "error",
flags: JSON.stringify(result.flags),
score: result.score,
raw: JSON.stringify(raw),
@@ -242,17 +281,21 @@ async function analyzeAndStoreBatch(db: SqliteDatabase, messages: MessageRecord[
if (analyzableMessages.length > 1) {
const midpoint = Math.ceil(analyzableMessages.length / 2);
logger.warn(
{ count: analyzableMessages.length, nextBatchSizes: [midpoint, analyzableMessages.length - midpoint], error },
{
count: analyzableMessages.length,
nextBatchSizes: [midpoint, analyzableMessages.length - midpoint],
error,
},
"AI batch failed, splitting into smaller batches",
);
await analyzeAndStoreBatch(db, analyzableMessages.slice(0, midpoint));
await analyzeAndStoreBatch(db, analyzableMessages.slice(midpoint));
await analyzeAndStoreBatch(analyzableMessages.slice(0, midpoint));
await analyzeAndStoreBatch(analyzableMessages.slice(midpoint));
return;
}
const errorMsg = error instanceof Error ? error.message : String(error);
for (const message of analyzableMessages) {
const row = updateMessageAIAnalysis(db, message.id, {
const row = await updateMessageAIAnalysis(message.id, {
status: "error",
flags: null,
score: null,
@@ -269,7 +312,7 @@ async function analyzeAndStoreBatch(db: SqliteDatabase, messages: MessageRecord[
}
}
async function drainQueue(db: SqliteDatabase): Promise<void> {
async function drainQueue(): Promise<void> {
if (isProcessing) return;
isProcessing = true;
try {
@@ -283,12 +326,16 @@ async function drainQueue(db: SqliteDatabase): Promise<void> {
const batch: MessageRecord[] = [];
let tokenEstimate = 0;
for (const messageId of Array.from(queuedMessageIds)) {
const message = getMessageById(db, messageId);
const message = await getMessageById(messageId);
queuedMessageIds.delete(messageId);
if (!message) continue;
const messageTokens = estimateMessageTokens(message);
if (batch.length > 0 && (batch.length >= MAX_AI_BATCH_MESSAGES || tokenEstimate + messageTokens > batchTokenLimit)) {
if (
batch.length > 0 &&
(batch.length >= MAX_AI_BATCH_MESSAGES ||
tokenEstimate + messageTokens > batchTokenLimit)
) {
queuedMessageIds.add(messageId);
break;
}
@@ -298,8 +345,11 @@ async function drainQueue(db: SqliteDatabase): Promise<void> {
}
if (batch.length > 0) {
logger.info({ count: batch.length, tokenEstimate }, "Processing AI analysis batch");
await analyzeAndStoreBatch(db, batch);
logger.info(
{ count: batch.length, tokenEstimate },
"Processing AI analysis batch",
);
await analyzeAndStoreBatch(batch);
}
}
} finally {
@@ -307,30 +357,37 @@ async function drainQueue(db: SqliteDatabase): Promise<void> {
}
}
export function queueMessageAnalysis(db: SqliteDatabase, messageId: string): void {
export function queueMessageAnalysis(messageId: string): void {
if (!config.AI_ANALYSIS_ENABLED) return;
logger.debug({ messageId }, "Queueing AI analysis");
queuedMessageIds.add(messageId);
setImmediate(() => {
drainQueue(db).catch((error) => logger.error({ error }, "AI analysis queue failed"));
drainQueue().catch((error) =>
logger.error({ error }, "AI analysis queue failed"),
);
});
}
export function startPendingAIAnalysisWorker(db: SqliteDatabase): void {
export function startPendingAIAnalysisWorker(): void {
if (!config.AI_ANALYSIS_ENABLED) {
logger.info("AI analysis disabled");
return;
}
logger.info("AI analysis worker started");
setInterval(() => {
setInterval(async () => {
if (isProcessing) return;
const pendingMessages = getPendingAIAnalysisMessages(db, 500);
const pendingMessages = await getPendingAIAnalysisMessages(500);
if (pendingMessages.length === 0) return;
logger.info({ count: pendingMessages.length }, "Queueing pending AI analysis messages");
logger.info(
{ count: pendingMessages.length },
"Queueing pending AI analysis messages",
);
for (const message of pendingMessages) {
queuedMessageIds.add(message.id);
}
drainQueue(db).catch((error) => logger.error({ error }, "Pending AI analysis worker failed"));
drainQueue().catch((error) =>
logger.error({ error }, "Pending AI analysis worker failed"),
);
}, 15000);
}

View File

@@ -1,8 +1,11 @@
import { createChildLogger } from "../logger";
import { config } from "../config";
import { retryWithBackoff } from "../retry";
import { createChildLogger } from "../logger";
import type { SqliteDatabase } from "../muxer-queue";
import { updateAttachmentAsUploaded, updateAttachmentAsFailedUpload } from "./messageStore";
import { retryWithBackoff } from "../retry";
import {
updateAttachmentAsFailedUpload,
updateAttachmentAsUploaded,
} from "./messageStore";
const logger = createChildLogger("attachment-uploader");
@@ -25,7 +28,9 @@ export interface ParsedUploadResponse {
type: string;
}
export function parseUploadResponse(response: PicserUploadResponse): ParsedUploadResponse {
export function parseUploadResponse(
response: PicserUploadResponse,
): ParsedUploadResponse {
if (!response.success) {
throw new Error("Upload failed: success=false");
}
@@ -49,7 +54,9 @@ export async function uploadAttachmentToPicser(
filename: string,
): Promise<ParsedUploadResponse> {
const formData = new FormData();
const blob = new Blob([new Uint8Array(fileBuffer)], { type: "application/octet-stream" });
const blob = new Blob([new Uint8Array(fileBuffer)], {
type: "application/octet-stream",
});
formData.append("file", blob, filename);
try {
@@ -76,11 +83,17 @@ export async function uploadAttachmentToPicser(
);
const parsed = parseUploadResponse(response);
logger.info({ filename, url: parsed.url }, "Attachment uploaded successfully");
logger.info(
{ filename, url: parsed.url },
"Attachment uploaded successfully",
);
return parsed;
} catch (error) {
logger.error(
{ filename, error: error instanceof Error ? error.message : String(error) },
{
filename,
error: error instanceof Error ? error.message : String(error),
},
"Failed to upload attachment",
);
throw error;
@@ -109,7 +122,6 @@ export async function downloadDiscordAttachment(url: string): Promise<Buffer> {
}
export async function processAttachmentUpload(
db: SqliteDatabase,
attachmentId: string,
discordUrl: string,
filename: string,
@@ -121,16 +133,21 @@ export async function processAttachmentUpload(
const sizeMb = buffer.length / (1024 * 1024);
if (sizeMb > config.ATTACHMENT_MAX_SIZE_MB) {
throw new Error(`File size ${sizeMb.toFixed(2)}MB exceeds limit of ${config.ATTACHMENT_MAX_SIZE_MB}MB`);
throw new Error(
`File size ${sizeMb.toFixed(2)}MB exceeds limit of ${config.ATTACHMENT_MAX_SIZE_MB}MB`,
);
}
const result = await uploadAttachmentToPicser(buffer, filename);
updateAttachmentAsUploaded(db, attachmentId, result.url, Date.now());
logger.info({ attachmentId, uploadedUrl: result.url }, "Attachment upload completed");
await updateAttachmentAsUploaded(attachmentId, result.url, Date.now());
logger.info(
{ attachmentId, uploadedUrl: result.url },
"Attachment upload completed",
);
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error);
updateAttachmentAsFailedUpload(db, attachmentId, errorMsg);
await updateAttachmentAsFailedUpload(attachmentId, errorMsg);
logger.error({ attachmentId, error: errorMsg }, "Attachment upload failed");
}
}

View File

@@ -1,13 +1,11 @@
import type { Client, Message } from "discord.js-selfbot-v13";
import { config } from "../config";
import { createChildLogger } from "../logger";
import type { SqliteDatabase } from "../muxer-queue";
import { captureMessage } from "./messageCapture";
const logger = createChildLogger("backlog-sync");
async function syncChannelMessages(
db: SqliteDatabase,
channel: any,
cutoffTime: number,
): Promise<number> {
@@ -31,7 +29,7 @@ async function syncChannelMessages(
continue;
}
await captureMessage(db, message, "text");
await captureMessage(message, "text");
synced++;
}
@@ -42,10 +40,7 @@ async function syncChannelMessages(
return synced;
}
export async function syncBacklogMessages(
client: Client,
db: SqliteDatabase,
): Promise<void> {
export async function syncBacklogMessages(client: Client): Promise<void> {
if (!config.MONITOR_GUILD_ID) {
logger.warn("MONITOR_GUILD_ID not configured, skipping backlog sync");
return;
@@ -53,16 +48,21 @@ export async function syncBacklogMessages(
const guild = client.guilds.cache.get(config.MONITOR_GUILD_ID);
if (!guild) {
logger.warn({ guildId: config.MONITOR_GUILD_ID }, "Monitor guild not found, skipping backlog sync");
logger.warn(
{ guildId: config.MONITOR_GUILD_ID },
"Monitor guild not found, skipping backlog sync",
);
return;
}
logger.info({ guildId: guild.id }, "Backlog sync ready (will sync on-demand per selected channel)");
logger.info(
{ guildId: guild.id },
"Backlog sync ready (will sync on-demand per selected channel)",
);
}
export async function syncSelectedChannelBacklog(
client: Client,
db: SqliteDatabase,
guildId: string,
channelId: string,
): Promise<number> {
@@ -85,8 +85,11 @@ export async function syncSelectedChannelBacklog(
);
try {
const count = await syncChannelMessages(db, channel as any, cutoffTime);
logger.info({ channelId, count }, "Backlog sync completed for selected channel");
const count = await syncChannelMessages(channel as any, cutoffTime);
logger.info(
{ channelId, count },
"Backlog sync completed for selected channel",
);
return count;
} catch (error) {
logger.warn(

View File

@@ -1,16 +1,21 @@
import type { Client, Message } from "discord.js-selfbot-v13";
import { createChildLogger } from "../logger";
import { eq } from "drizzle-orm";
import { config } from "../config";
import type { SqliteDatabase } from "../muxer-queue";
import { insertMessage, insertAttachment } from "./messageStore";
import { getDisplayContent, getMessageLocation, getMessageMetadata } from "./messageMetadata";
import { getDatabase } from "../database/drizzle";
import { messagesTable } from "../database/schema";
import { createChildLogger } from "../logger";
import { queueMessageAnalysis } from "./aiAnalyzer";
import type { MessageRecord, AttachmentRecord } from "./types";
import {
getDisplayContent,
getMessageLocation,
getMessageMetadata,
} from "./messageMetadata";
import { insertAttachment, insertMessage } from "./messageStore";
import type { AttachmentRecord, MessageRecord } from "./types";
const logger = createChildLogger("message-capture");
export async function captureMessage(
db: SqliteDatabase,
message: Message,
type: "text" | "edited" | "deleted",
): Promise<void> {
@@ -34,8 +39,8 @@ export async function captureMessage(
metadata: JSON.stringify(metadata),
};
insertMessage(db, messageRecord);
queueMessageAnalysis(db, message.id);
await insertMessage(messageRecord);
queueMessageAnalysis(message.id);
const broadcaster = globalThis as any;
if (broadcaster.broadcastMessageCreated) {
@@ -65,7 +70,7 @@ export async function captureMessage(
uploaded_at: Date.now(),
};
insertAttachment(db, attachmentRecord);
await insertAttachment(attachmentRecord);
if (broadcaster.broadcastAttachmentUploaded) {
broadcaster.broadcastAttachmentUploaded({
@@ -89,36 +94,47 @@ export async function captureMessage(
);
}
export function registerMessageCapture(client: Client, db: SqliteDatabase): void {
export function registerMessageCapture(client: Client): void {
client.on("messageCreate", async (message) => {
if (!message.guildId || message.guildId !== config.MONITOR_GUILD_ID) return;
if (message.author?.bot) return;
try {
await captureMessage(db, message, "text");
await captureMessage(message, "text");
} catch (error) {
logger.error(
{ messageId: message.id, error: error instanceof Error ? error.message : String(error) },
{
messageId: message.id,
error: error instanceof Error ? error.message : String(error),
},
"Failed to capture message",
);
}
});
client.on("messageUpdate", async (_oldMessage, newMessage) => {
if (!newMessage.guildId || newMessage.guildId !== config.MONITOR_GUILD_ID) return;
if (!newMessage.guildId || newMessage.guildId !== config.MONITOR_GUILD_ID)
return;
if (newMessage.author?.bot) return;
try {
const { updateMessageAsEdited } = await import("./messageStore");
const db = getDatabase() as any;
const existing = db
.prepare("SELECT id FROM messages WHERE id = ?")
.get(newMessage.id) as { id: string } | undefined;
const existing = await db
.select()
.from(messagesTable)
.where(eq(messagesTable.id, newMessage.id))
.limit(1);
if (existing) {
if (existing.length > 0) {
const editedAt = Date.now();
updateMessageAsEdited(db, newMessage.id, getDisplayContent(newMessage as Message), editedAt);
queueMessageAnalysis(db, newMessage.id);
await updateMessageAsEdited(
newMessage.id,
getDisplayContent(newMessage as Message),
editedAt,
);
queueMessageAnalysis(newMessage.id);
const broadcaster = globalThis as any;
if (broadcaster.broadcastMessageUpdated) {
@@ -129,11 +145,14 @@ export function registerMessageCapture(client: Client, db: SqliteDatabase): void
});
}
} else if (newMessage.author) {
await captureMessage(db, newMessage as Message, "text");
await captureMessage(newMessage as Message, "text");
}
} catch (error) {
logger.error(
{ messageId: newMessage.id, error: error instanceof Error ? error.message : String(error) },
{
messageId: newMessage.id,
error: error instanceof Error ? error.message : String(error),
},
"Failed to capture message update",
);
}
@@ -146,7 +165,7 @@ export function registerMessageCapture(client: Client, db: SqliteDatabase): void
try {
const { updateMessageAsDeleted } = await import("./messageStore");
const deletedAt = Date.now();
updateMessageAsDeleted(db, message.id, deletedAt);
await updateMessageAsDeleted(message.id, deletedAt);
const broadcaster = globalThis as any;
if (broadcaster.broadcastMessageDeleted) {
@@ -159,7 +178,10 @@ export function registerMessageCapture(client: Client, db: SqliteDatabase): void
logger.info({ messageId: message.id }, "Message deletion captured");
} catch (error) {
logger.error(
{ messageId: message.id, error: error instanceof Error ? error.message : String(error) },
{
messageId: message.id,
error: error instanceof Error ? error.message : String(error),
},
"Failed to capture message deletion",
);
}

View File

@@ -1,4 +1,8 @@
import type { Message, TextChannel, ThreadChannel } from "discord.js-selfbot-v13";
import type {
Message,
TextChannel,
ThreadChannel,
} from "discord.js-selfbot-v13";
export interface MessageLocation {
channelId: string;
@@ -8,7 +12,12 @@ export interface MessageLocation {
}
export interface RichMessageMetadata {
stickers: Array<{ id: string; name: string; url: string; format: string | null }>;
stickers: Array<{
id: string;
name: string;
url: string;
format: string | null;
}>;
embeds: Array<{
title: string | null;
description: string | null;
@@ -16,7 +25,11 @@ export interface RichMessageMetadata {
color: number | null;
image: string | null;
thumbnail: string | null;
author: { name: string | null; url: string | null; iconURL: string | null } | null;
author: {
name: string | null;
url: string | null;
iconURL: string | null;
} | null;
footer: { text: string | null; iconURL: string | null } | null;
fields: Array<{ name: string; value: string; inline: boolean }>;
}>;
@@ -66,7 +79,9 @@ export function getMessageLocation(message: Message): MessageLocation {
};
}
export function getStickerMetadata(message: Message): RichMessageMetadata["stickers"] {
export function getStickerMetadata(
message: Message,
): RichMessageMetadata["stickers"] {
return Array.from(message.stickers.values()).map((sticker) => ({
id: sticker.id,
name: sticker.name,
@@ -75,7 +90,9 @@ export function getStickerMetadata(message: Message): RichMessageMetadata["stick
}));
}
export function getAttachmentMetadata(message: Message): RichMessageMetadata["attachments"] {
export function getAttachmentMetadata(
message: Message,
): RichMessageMetadata["attachments"] {
return Array.from(message.attachments.values()).map((attachment) => ({
id: attachment.id,
name: attachment.name || "unknown",
@@ -85,7 +102,9 @@ export function getAttachmentMetadata(message: Message): RichMessageMetadata["at
}));
}
export function getEmbedMetadata(message: Message): RichMessageMetadata["embeds"] {
export function getEmbedMetadata(
message: Message,
): RichMessageMetadata["embeds"] {
return message.embeds.map((embed) => ({
title: embed.title ?? null,
description: embed.description ?? null,
@@ -130,7 +149,10 @@ export function getMessageMetadata(message: Message): RichMessageMetadata {
member: member
? {
displayName: member.displayName ?? null,
roles: member.roles.cache.map((role) => ({ id: role.id, name: role.name })),
roles: member.roles.cache.map((role) => ({
id: role.id,
name: role.name,
})),
joinedTimestamp: member.joinedTimestamp ?? null,
}
: null,
@@ -155,12 +177,16 @@ export function getDisplayContent(message: Message): string {
const attachments = getAttachmentMetadata(message);
if (attachments.length > 0) {
return attachments.map((attachment) => `[Attachment: ${attachment.name}]`).join(" ");
return attachments
.map((attachment) => `[Attachment: ${attachment.name}]`)
.join(" ");
}
const embeds = getEmbedMetadata(message);
if (embeds.length > 0) {
return embeds.map((embed) => embed.title || embed.description || "[Embed]").join(" ");
return embeds
.map((embed) => embed.title || embed.description || "[Embed]")
.join(" ");
}
return "";

View File

@@ -1,220 +1,230 @@
import { and, asc, desc, eq, isNull, or } from "drizzle-orm";
import { getDatabase } from "../database/drizzle";
import { attachmentsTable, messagesTable } from "../database/schema";
import { createChildLogger } from "../logger";
import type { SqliteDatabase } from "../muxer-queue";
import type { MessageRecord, AttachmentRecord } from "./types";
import type { AttachmentRecord, MessageRecord } from "./types";
const logger = createChildLogger("message-store");
export function insertMessage(db: SqliteDatabase, message: MessageRecord): void {
export async function insertMessage(message: MessageRecord): Promise<void> {
try {
const stmt = db.prepare(`
INSERT OR IGNORE INTO messages (
id, guild_id, channel_id, thread_id, user_id, username, avatar_url,
content, edited_content, created_at, edited_at, deleted_at, type, metadata
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
const db = getDatabase() as any;
await db.insert(messagesTable).values(message).onConflictDoNothing();
stmt.run(
message.id,
message.guild_id,
message.channel_id,
message.thread_id,
message.user_id,
message.username,
message.avatar_url,
message.content,
message.edited_content,
message.created_at,
message.edited_at,
message.deleted_at,
message.type,
message.metadata,
logger.debug(
{ messageId: message.id, channelId: message.channel_id },
"Message inserted",
);
logger.debug({ messageId: message.id, channelId: message.channel_id }, "Message inserted");
} catch (error) {
logger.error(
{ messageId: message.id, error: error instanceof Error ? error.message : String(error) },
{
messageId: message.id,
error: error instanceof Error ? error.message : String(error),
},
"Failed to insert message",
);
throw error;
}
}
export function updateMessageAsEdited(
db: SqliteDatabase,
export async function updateMessageAsEdited(
messageId: string,
editedContent: string,
editedAt: number,
): void {
): Promise<void> {
try {
const stmt = db.prepare(`
UPDATE messages
SET edited_content = ?, edited_at = ?, type = 'edited'
WHERE id = ?
`);
const db = getDatabase() as any;
await db
.update(messagesTable)
.set({
edited_content: editedContent,
edited_at: editedAt,
type: "edited",
})
.where(eq(messagesTable.id, messageId));
stmt.run(editedContent, editedAt, messageId);
logger.debug({ messageId }, "Message marked as edited");
} catch (error) {
logger.error(
{ messageId, error: error instanceof Error ? error.message : String(error) },
{
messageId,
error: error instanceof Error ? error.message : String(error),
},
"Failed to update message as edited",
);
throw error;
}
}
export function updateMessageAsDeleted(
db: SqliteDatabase,
export async function updateMessageAsDeleted(
messageId: string,
deletedAt: number,
): void {
): Promise<void> {
try {
const stmt = db.prepare(`
UPDATE messages
SET deleted_at = ?, type = 'deleted'
WHERE id = ?
`);
const db = getDatabase() as any;
await db
.update(messagesTable)
.set({
deleted_at: deletedAt,
type: "deleted",
})
.where(eq(messagesTable.id, messageId));
stmt.run(deletedAt, messageId);
logger.debug({ messageId }, "Message marked as deleted");
} catch (error) {
logger.error(
{ messageId, error: error instanceof Error ? error.message : String(error) },
{
messageId,
error: error instanceof Error ? error.message : String(error),
},
"Failed to update message as deleted",
);
throw error;
}
}
export function getMessagesByChannel(
db: SqliteDatabase,
export async function getMessagesByChannel(
channelId: string,
limit: number = 50,
offset: number = 0,
): MessageRecord[] {
): Promise<MessageRecord[]> {
try {
const stmt = db.prepare(`
SELECT * FROM messages
WHERE channel_id = ? OR thread_id = ?
ORDER BY created_at DESC
LIMIT ? OFFSET ?
`);
const db = getDatabase() as any;
const rows = await db
.select()
.from(messagesTable)
.where(
or(
eq(messagesTable.channel_id, channelId),
eq(messagesTable.thread_id, channelId),
),
)
.orderBy(desc(messagesTable.created_at))
.limit(limit)
.offset(offset);
const rows = stmt.all(channelId, channelId, limit, offset) as MessageRecord[];
return rows;
return rows as MessageRecord[];
} catch (error) {
logger.error(
{ channelId, error: error instanceof Error ? error.message : String(error) },
{
channelId,
error: error instanceof Error ? error.message : String(error),
},
"Failed to get messages by channel",
);
throw error;
}
}
export function insertAttachment(db: SqliteDatabase, attachment: AttachmentRecord): void {
export async function insertAttachment(
attachment: AttachmentRecord,
): Promise<void> {
try {
const stmt = db.prepare(`
INSERT OR IGNORE INTO attachments (
id, message_id, guild_id, channel_id, thread_id, user_id, filename, size, type,
discord_url, uploaded_url, upload_status, upload_error, created_at, uploaded_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
const db = getDatabase() as any;
await db.insert(attachmentsTable).values(attachment).onConflictDoNothing();
stmt.run(
attachment.id,
attachment.message_id,
attachment.guild_id,
attachment.channel_id,
attachment.thread_id,
attachment.user_id,
attachment.filename,
attachment.size,
attachment.type,
attachment.discord_url,
attachment.uploaded_url,
attachment.upload_status,
attachment.upload_error,
attachment.created_at,
attachment.uploaded_at,
logger.debug(
{ attachmentId: attachment.id, messageId: attachment.message_id },
"Attachment inserted",
);
logger.debug({ attachmentId: attachment.id, messageId: attachment.message_id }, "Attachment inserted");
} catch (error) {
logger.error(
{ attachmentId: attachment.id, error: error instanceof Error ? error.message : String(error) },
{
attachmentId: attachment.id,
error: error instanceof Error ? error.message : String(error),
},
"Failed to insert attachment",
);
throw error;
}
}
export function getAttachmentsByChannel(
db: SqliteDatabase,
export async function getAttachmentsByChannel(
channelId: string,
limit: number = 50,
offset: number = 0,
): AttachmentRecord[] {
): Promise<AttachmentRecord[]> {
try {
const stmt = db.prepare(`
SELECT * FROM attachments
WHERE channel_id = ? OR thread_id = ?
ORDER BY created_at DESC
LIMIT ? OFFSET ?
`);
const db = getDatabase() as any;
const rows = await db
.select()
.from(attachmentsTable)
.where(
or(
eq(attachmentsTable.channel_id, channelId),
eq(attachmentsTable.thread_id, channelId),
),
)
.orderBy(desc(attachmentsTable.created_at))
.limit(limit)
.offset(offset);
const rows = stmt.all(channelId, channelId, limit, offset) as AttachmentRecord[];
return rows;
return rows as AttachmentRecord[];
} catch (error) {
logger.error(
{ channelId, error: error instanceof Error ? error.message : String(error) },
{
channelId,
error: error instanceof Error ? error.message : String(error),
},
"Failed to get attachments by channel",
);
throw error;
}
}
export function updateAttachmentAsUploaded(
db: SqliteDatabase,
export async function updateAttachmentAsUploaded(
attachmentId: string,
uploadedUrl: string,
uploadedAt: number,
): void {
): Promise<void> {
try {
const stmt = db.prepare(`
UPDATE attachments
SET uploaded_url = ?, upload_status = 'uploaded', uploaded_at = ?
WHERE id = ?
`);
const db = getDatabase() as any;
await db
.update(attachmentsTable)
.set({
uploaded_url: uploadedUrl,
upload_status: "uploaded",
uploaded_at: uploadedAt,
})
.where(eq(attachmentsTable.id, attachmentId));
stmt.run(uploadedUrl, uploadedAt, attachmentId);
logger.debug({ attachmentId, uploadedUrl }, "Attachment marked as uploaded");
logger.debug(
{ attachmentId, uploadedUrl },
"Attachment marked as uploaded",
);
} catch (error) {
logger.error(
{ attachmentId, error: error instanceof Error ? error.message : String(error) },
{
attachmentId,
error: error instanceof Error ? error.message : String(error),
},
"Failed to update attachment as uploaded",
);
throw error;
}
}
export function updateAttachmentAsFailedUpload(
db: SqliteDatabase,
export async function updateAttachmentAsFailedUpload(
attachmentId: string,
error: string,
): void {
): Promise<void> {
try {
const stmt = db.prepare(`
UPDATE attachments
SET upload_status = 'failed', upload_error = ?
WHERE id = ?
`);
const db = getDatabase() as any;
await db
.update(attachmentsTable)
.set({
upload_status: "failed",
upload_error: error,
})
.where(eq(attachmentsTable.id, attachmentId));
stmt.run(error, attachmentId);
logger.debug({ attachmentId, error }, "Attachment marked as failed upload");
} catch (error) {
logger.error(
{ attachmentId, error: error instanceof Error ? error.message : String(error) },
{
attachmentId,
error: error instanceof Error ? error.message : String(error),
},
"Failed to update attachment as failed",
);
throw error;
@@ -231,55 +241,61 @@ interface AIAnalysisUpdate {
error?: string | null;
}
export function updateMessageAIAnalysis(
db: SqliteDatabase,
export async function updateMessageAIAnalysis(
messageId: string,
result: AIAnalysisUpdate,
): MessageRecord | null {
): Promise<MessageRecord | null> {
try {
const stmt = db.prepare(`
UPDATE messages
SET ai_status = ?, ai_moderation_flags = ?, ai_moderation_score = ?,
ai_moderation_raw = ?, ai_analysis = ?, ai_analyzed_at = ?, ai_error = ?
WHERE id = ?
`);
const db = getDatabase() as any;
await db
.update(messagesTable)
.set({
ai_status: result.status,
ai_moderation_flags: result.flags ?? null,
ai_moderation_score: result.score ?? null,
ai_moderation_raw: result.raw ?? null,
ai_analysis: result.analysis ?? null,
ai_analyzed_at: result.analyzedAt ?? Date.now(),
ai_error: result.error ?? null,
})
.where(eq(messagesTable.id, messageId));
stmt.run(
result.status,
result.flags ?? null,
result.score ?? null,
result.raw ?? null,
result.analysis ?? null,
result.analyzedAt ?? Date.now(),
result.error ?? null,
messageId,
);
const rows = await db
.select()
.from(messagesTable)
.where(eq(messagesTable.id, messageId));
const row = db.prepare("SELECT * FROM messages WHERE id = ?").get(messageId) as MessageRecord | undefined;
return row ?? null;
return (rows[0] as MessageRecord) ?? null;
} catch (error) {
logger.error(
{ messageId, error: error instanceof Error ? error.message : String(error) },
{
messageId,
error: error instanceof Error ? error.message : String(error),
},
"Failed to update message AI analysis",
);
throw error;
}
}
export function getPendingAIAnalysisMessages(
db: SqliteDatabase,
export async function getPendingAIAnalysisMessages(
limit: number = 25,
): MessageRecord[] {
): Promise<MessageRecord[]> {
try {
const stmt = db.prepare(`
SELECT * FROM messages
WHERE ai_status = 'pending'
AND deleted_at IS NULL
AND COALESCE(edited_content, content) != ''
ORDER BY created_at ASC
LIMIT ?
`);
return stmt.all(limit) as MessageRecord[];
const db = getDatabase() as any;
const rows = await db
.select()
.from(messagesTable)
.where(
and(
eq(messagesTable.ai_status, "pending"),
isNull(messagesTable.deleted_at),
),
)
.orderBy(asc(messagesTable.created_at))
.limit(limit);
return rows as MessageRecord[];
} catch (error) {
logger.error(
{ error: error instanceof Error ? error.message : String(error) },
@@ -289,7 +305,25 @@ export function getPendingAIAnalysisMessages(
}
}
export function getMessageById(db: SqliteDatabase, messageId: string): MessageRecord | null {
const row = db.prepare("SELECT * FROM messages WHERE id = ?").get(messageId) as MessageRecord | undefined;
return row ?? null;
export async function getMessageById(
messageId: string,
): Promise<MessageRecord | null> {
try {
const db = getDatabase() as any;
const rows = await db
.select()
.from(messagesTable)
.where(eq(messagesTable.id, messageId));
return (rows[0] as MessageRecord) ?? null;
} catch (error) {
logger.error(
{
messageId,
error: error instanceof Error ? error.message : String(error),
},
"Failed to get message by id",
);
throw error;
}
}

View File

@@ -1,20 +1,15 @@
import path from "node:path";
import Database from "better-sqlite3";
import { and, asc, eq, lt, sql } from "drizzle-orm";
import {
getDatabase as getDrizzleDatabase,
initializeDatabase,
} from "./database/drizzle";
import { muxerJobsTable, uiStateTable } from "./database/schema";
import { createChildLogger } from "./logger";
const logger = createChildLogger("muxer-queue");
export interface SqliteStatement {
run: (...params: unknown[]) => { changes: number };
all: (...params: unknown[]) => unknown[];
get: (...params: unknown[]) => unknown;
}
export interface SqliteDatabase {
prepare: (sql: string) => SqliteStatement;
exec: (sql: string) => void;
close: () => void;
}
// Type alias for backward compatibility
export type SqliteDatabase = any;
export interface MuxerJobData {
userId: string;
@@ -34,153 +29,76 @@ interface StoredJob {
error?: string;
}
const dbPath = path.join(process.cwd(), ".muxer-queue.db");
let db: SqliteDatabase | null = null;
// Export getDatabase for backward compatibility with webserver.ts
export function getDatabase(): SqliteDatabase {
return getDrizzleDatabase() as any;
}
function initializeDatabase(): SqliteDatabase {
const database = new Database(dbPath) as SqliteDatabase;
export async function getPersistedValue<T>(
key: string,
fallback: T,
): Promise<T> {
await initializeDatabase();
const db = getDrizzleDatabase() as any;
database.exec(`
PRAGMA journal_mode = WAL;
const row = await db
.select()
.from(uiStateTable)
.where(eq(uiStateTable.key, key))
.limit(1);
CREATE TABLE IF NOT EXISTS muxer_jobs (
id TEXT PRIMARY KEY,
data TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
attempts INTEGER NOT NULL DEFAULT 0,
maxAttempts INTEGER NOT NULL DEFAULT 3,
createdAt INTEGER NOT NULL,
updatedAt INTEGER NOT NULL,
error TEXT
);
if (!row || row.length === 0) return fallback;
CREATE INDEX IF NOT EXISTS idx_status ON muxer_jobs(status);
CREATE INDEX IF NOT EXISTS idx_createdAt ON muxer_jobs(createdAt);
CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY,
guild_id TEXT NOT NULL,
channel_id TEXT NOT NULL,
thread_id TEXT,
user_id TEXT NOT NULL,
username TEXT NOT NULL,
avatar_url TEXT,
content TEXT NOT NULL,
edited_content TEXT,
created_at INTEGER NOT NULL,
edited_at INTEGER,
deleted_at INTEGER,
type TEXT NOT NULL DEFAULT 'text',
metadata TEXT,
ai_status TEXT NOT NULL DEFAULT 'pending',
ai_moderation_flags TEXT,
ai_moderation_score REAL,
ai_moderation_raw TEXT,
ai_analysis TEXT,
ai_analyzed_at INTEGER,
ai_error TEXT
);
CREATE INDEX IF NOT EXISTS idx_messages_channel ON messages(channel_id);
CREATE INDEX IF NOT EXISTS idx_messages_user ON messages(user_id);
CREATE INDEX IF NOT EXISTS idx_messages_created ON messages(created_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_thread ON messages(thread_id);
CREATE TABLE IF NOT EXISTS attachments (
id TEXT PRIMARY KEY,
message_id TEXT NOT NULL,
guild_id TEXT NOT NULL,
channel_id TEXT NOT NULL,
thread_id TEXT,
user_id TEXT NOT NULL,
filename TEXT NOT NULL,
size INTEGER NOT NULL,
type TEXT NOT NULL,
discord_url TEXT NOT NULL,
uploaded_url TEXT,
upload_status TEXT NOT NULL DEFAULT 'pending',
upload_error TEXT,
created_at INTEGER NOT NULL,
uploaded_at INTEGER,
FOREIGN KEY (message_id) REFERENCES messages(id)
);
CREATE INDEX IF NOT EXISTS idx_attachments_channel ON attachments(channel_id);
CREATE INDEX IF NOT EXISTS idx_attachments_message ON attachments(message_id);
CREATE INDEX IF NOT EXISTS idx_attachments_status ON attachments(upload_status);
CREATE TABLE IF NOT EXISTS ui_state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at INTEGER NOT NULL
);
`);
const migrations = [
"ALTER TABLE attachments ADD COLUMN thread_id TEXT",
"ALTER TABLE messages ADD COLUMN ai_status TEXT NOT NULL DEFAULT 'pending'",
"ALTER TABLE messages ADD COLUMN ai_moderation_flags TEXT",
"ALTER TABLE messages ADD COLUMN ai_moderation_score REAL",
"ALTER TABLE messages ADD COLUMN ai_moderation_raw TEXT",
"ALTER TABLE messages ADD COLUMN ai_analysis TEXT",
"ALTER TABLE messages ADD COLUMN ai_analyzed_at INTEGER",
"ALTER TABLE messages ADD COLUMN ai_error TEXT",
];
for (const migration of migrations) {
try {
database.exec(migration);
} catch {
// Column already exists on databases initialized after schema updates.
}
}
return database;
}
function getDatabase(): SqliteDatabase {
if (!db) {
db = initializeDatabase();
}
return db;
}
export { getDatabase };
export function getPersistedValue<T>(key: string, fallback: T): T {
const row = getDatabase()
.prepare("SELECT value FROM ui_state WHERE key = ?")
.get(key) as { value: string } | undefined;
if (!row) return fallback;
try {
return JSON.parse(row.value) as T;
return JSON.parse(row[0].value) as T;
} catch {
return fallback;
}
}
export function setPersistedValue(key: string, value: unknown): void {
getDatabase()
.prepare(`
INSERT INTO ui_state (key, value, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at
`)
.run(key, JSON.stringify(value), Date.now());
export async function setPersistedValue(
key: string,
value: unknown,
): Promise<void> {
await initializeDatabase();
const db = getDrizzleDatabase() as any;
await db
.insert(uiStateTable)
.values({
key,
value: JSON.stringify(value),
updated_at: Date.now(),
})
.onConflictDoUpdate({
target: uiStateTable.key,
set: {
value: JSON.stringify(value),
updated_at: Date.now(),
},
});
}
export async function enqueueMuxerJob(data: MuxerJobData): Promise<string> {
try {
const database = getDatabase();
await initializeDatabase();
const db = getDrizzleDatabase() as any;
const jobId = `${data.userId}-${data.sessionId}`;
const now = Date.now();
const stmt = database.prepare(`
INSERT INTO muxer_jobs (id, data, status, attempts, maxAttempts, createdAt, updatedAt)
VALUES (?, ?, ?, ?, ?, ?, ?)
`);
stmt.run(jobId, JSON.stringify(data), "pending", 0, 3, now, now);
await db
.insert(muxerJobsTable)
.values({
id: jobId,
data: JSON.stringify(data),
status: "pending",
attempts: 0,
maxAttempts: 3,
createdAt: now,
updatedAt: now,
})
.onConflictDoNothing();
logger.info(
{ jobId, userId: data.userId, sessionId: data.sessionId },
@@ -201,29 +119,25 @@ export async function enqueueMuxerJob(data: MuxerJobData): Promise<string> {
}
export async function getPendingJobs(): Promise<StoredJob[]> {
const database = getDatabase();
const stmt = database.prepare(`
SELECT id, data, status, attempts, maxAttempts, createdAt, updatedAt, error
FROM muxer_jobs
WHERE status = 'pending'
ORDER BY createdAt ASC
LIMIT 10
`);
await initializeDatabase();
const db = getDrizzleDatabase() as any;
const rows = stmt.all() as Array<{
id: string;
data: string;
status: string;
attempts: number;
maxAttempts: number;
createdAt: number;
updatedAt: number;
error?: string;
}>;
const rows = await db
.select()
.from(muxerJobsTable)
.where(eq(muxerJobsTable.status, "pending"))
.orderBy(asc(muxerJobsTable.createdAt))
.limit(10);
return rows.map((row) => ({
...row,
return rows.map((row: any) => ({
id: row.id,
data: row.data,
status: row.status as "pending" | "processing" | "completed" | "failed",
attempts: row.attempts,
maxAttempts: row.maxAttempts,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
error: row.error || undefined,
}));
}
@@ -232,34 +146,44 @@ export async function updateJobStatus(
status: "processing" | "completed" | "failed",
error?: string,
): Promise<void> {
const database = getDatabase();
await initializeDatabase();
const db = getDrizzleDatabase() as any;
const now = Date.now();
if (status === "failed") {
const stmt = database.prepare(`
UPDATE muxer_jobs
SET status = ?, attempts = attempts + 1, updatedAt = ?, error = ?
WHERE id = ?
`);
stmt.run(status, now, error || null, jobId);
await db
.update(muxerJobsTable)
.set({
status,
attempts: sql`${muxerJobsTable.attempts} + 1`,
updatedAt: now,
error: error || null,
})
.where(eq(muxerJobsTable.id, jobId));
} else {
const stmt = database.prepare(`
UPDATE muxer_jobs
SET status = ?, updatedAt = ?
WHERE id = ?
`);
stmt.run(status, now, jobId);
await db
.update(muxerJobsTable)
.set({
status,
updatedAt: now,
})
.where(eq(muxerJobsTable.id, jobId));
}
logger.info({ jobId, status, error }, "Job status updated");
}
export async function retryFailedJob(jobId: string): Promise<boolean> {
const database = getDatabase();
await initializeDatabase();
const db = getDrizzleDatabase() as any;
const job = database
.prepare("SELECT * FROM muxer_jobs WHERE id = ?")
.get(jobId) as StoredJob | undefined;
const jobs = await db
.select()
.from(muxerJobsTable)
.where(eq(muxerJobsTable.id, jobId))
.limit(1);
const job = jobs[0];
if (!job) {
logger.warn({ jobId }, "Job not found");
@@ -274,13 +198,14 @@ export async function retryFailedJob(jobId: string): Promise<boolean> {
return false;
}
const stmt = database.prepare(`
UPDATE muxer_jobs
SET status = 'pending', updatedAt = ?
WHERE id = ?
`);
await db
.update(muxerJobsTable)
.set({
status: "pending",
updatedAt: Date.now(),
})
.where(eq(muxerJobsTable.id, jobId));
stmt.run(Date.now(), jobId);
logger.info({ jobId, attempt: job.attempts + 1 }, "Job retried");
return true;
@@ -289,18 +214,27 @@ export async function retryFailedJob(jobId: string): Promise<boolean> {
export async function cleanupCompletedJobs(
olderThanMs: number = 24 * 60 * 60 * 1000,
): Promise<number> {
const database = getDatabase();
await initializeDatabase();
const db = getDrizzleDatabase() as any;
const cutoffTime = Date.now() - olderThanMs;
const stmt = database.prepare(`
DELETE FROM muxer_jobs
WHERE status = 'completed' AND updatedAt < ?
`);
const result = await db
.delete(muxerJobsTable)
.where(
and(
eq(muxerJobsTable.status, "completed"),
lt(muxerJobsTable.updatedAt, cutoffTime),
),
);
const result = stmt.run(cutoffTime);
logger.info({ deletedCount: result.changes }, "Cleaned up completed jobs");
const deletedCount =
typeof result === "object" && "rowsAffected" in result
? result.rowsAffected
: 0;
return result.changes;
logger.info({ deletedCount }, "Cleaned up completed jobs");
return deletedCount;
}
export async function getJobStats(): Promise<{
@@ -309,36 +243,38 @@ export async function getJobStats(): Promise<{
completed: number;
failed: number;
}> {
const database = getDatabase();
await initializeDatabase();
const db = getDrizzleDatabase() as any;
const stats = database
.prepare(`
SELECT
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END) as processing,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
FROM muxer_jobs
`)
.get() as {
pending: number | null;
processing: number | null;
completed: number | null;
failed: number | null;
const rows = await db
.select({
status: muxerJobsTable.status,
count: sql<number>`COUNT(*)`,
})
.from(muxerJobsTable)
.groupBy(muxerJobsTable.status);
const stats = {
pending: 0,
processing: 0,
completed: 0,
failed: 0,
};
return {
pending: stats.pending || 0,
processing: stats.processing || 0,
completed: stats.completed || 0,
failed: stats.failed || 0,
};
for (const row of rows) {
const count =
typeof row.count === "object" && "count" in row.count
? (row.count as any).count
: Number(row.count);
if (row.status === "pending") stats.pending = count;
else if (row.status === "processing") stats.processing = count;
else if (row.status === "completed") stats.completed = count;
else if (row.status === "failed") stats.failed = count;
}
return stats;
}
export async function closeQueue(): Promise<void> {
if (db) {
db.close();
db = null;
logger.info("Muxer queue closed");
}
}

View File

@@ -90,12 +90,19 @@ export class VoiceController {
const threads: ChannelSummary[] = [];
for (const channel of guild.channels.cache.values()) {
const threadParent = channel as typeof channel & {
threads?: { fetch: (options: { archived: boolean; limit: number }) => Promise<any> };
threads?: {
fetch: (options: {
archived: boolean;
limit: number;
}) => Promise<any>;
};
};
if (!threadParent.threads?.fetch) continue;
for (const archived of [false, true]) {
const fetched = await threadParent.threads.fetch({ archived, limit: 100 }).catch(() => null);
const fetched = await threadParent.threads
.fetch({ archived, limit: 100 })
.catch(() => null);
if (!fetched?.threads) continue;
for (const thread of fetched.threads.values()) {
@@ -108,8 +115,9 @@ export class VoiceController {
}
}
return Array.from(new Map(threads.map((thread) => [thread.id, thread])).values())
.sort((a, b) => a.name.localeCompare(b.name));
return Array.from(
new Map(threads.map((thread) => [thread.id, thread])).values(),
).sort((a, b) => a.name.localeCompare(b.name));
}
async connect(guildId: string, channelId: string): Promise<VoiceStatus> {

View File

@@ -5,14 +5,22 @@ import http from "http";
import path from "path";
import * as prism from "prism-media";
import { WebSocketServer } from "ws";
import { getDatabase } from "./database/drizzle";
import { AppError } from "./errors";
import { createChildLogger, logger } from "./logger";
import { getMetrics, uptimeGauge } from "./metrics";
import { syncSelectedChannelBacklog } from "./moderation/backlogSync";
import {
getAttachmentsByChannel,
getMessagesByChannel,
} from "./moderation/messageStore";
import {
getDatabase as getMuxerDatabase,
getPersistedValue,
setPersistedValue,
} from "./muxer-queue";
import { discordPlayer } from "./player";
import type { VoiceController } from "./voiceController";
import { getDatabase, getPersistedValue, setPersistedValue } from "./muxer-queue";
import { getMessagesByChannel, getAttachmentsByChannel } from "./moderation/messageStore";
import { syncSelectedChannelBacklog } from "./moderation/backlogSync";
const wsLogger = createChildLogger("webserver");
@@ -40,7 +48,11 @@ const defaultSharedUIState: SharedUIState = {
isStreaming: false,
};
const sharedUIState: SharedUIState = getPersistedValue("web-ui-state", defaultSharedUIState);
let sharedUIState: SharedUIState = { ...defaultSharedUIState };
async function initializeSharedUIState() {
sharedUIState = await getPersistedValue("web-ui-state", defaultSharedUIState);
}
function getSharedUIState(): SharedUIState {
return { ...sharedUIState };
@@ -105,11 +117,13 @@ function rmsDb(pcm: Buffer): number {
return 20 * Math.log10(Math.max(rms, 1e-10));
}
export function startWebserver(
export async function startWebserver(
port: number = 3000,
_client: Client,
voiceController: VoiceController,
) {
await initializeSharedUIState();
const app = express();
const server = http.createServer(app);
@@ -133,7 +147,11 @@ export function startWebserver(
if (req.originalUrl === "/favicon.ico") return;
if (res.statusCode >= 400) {
logger.error(
{ method: req.method, url: req.originalUrl, statusCode: res.statusCode },
{
method: req.method,
url: req.originalUrl,
statusCode: res.statusCode,
},
"HTTP request failed",
);
}
@@ -243,8 +261,12 @@ export function startWebserver(
// Moderation API endpoints
app.get("/api/messages", async (req, res, next) => {
try {
const db = getDatabase();
const { channel, type, limit = "50", offset = "0" } = req.query as {
const {
channel,
type,
limit = "50",
offset = "0",
} = req.query as {
channel?: string;
type?: string;
limit?: string;
@@ -252,21 +274,33 @@ export function startWebserver(
};
if (!channel) {
throw new AppError("channel query parameter is required", "MISSING_CHANNEL", 400);
throw new AppError(
"channel query parameter is required",
"MISSING_CHANNEL",
400,
);
}
const limitNum = Math.min(parseInt(limit) || 50, 100);
const offsetNum = parseInt(offset) || 0;
if (type === "image") {
const attachments = getAttachmentsByChannel(db, channel, limitNum, offsetNum);
const attachments = await getAttachmentsByChannel(
channel,
limitNum,
offsetNum,
);
res.json({
type: "image",
data: attachments,
count: attachments.length,
});
} else {
const messages = getMessagesByChannel(db, channel, limitNum, offsetNum);
const messages = await getMessagesByChannel(
channel,
limitNum,
offsetNum,
);
res.json({
type: "text",
data: messages,
@@ -293,7 +327,11 @@ export function startWebserver(
);
}
const count = await syncSelectedChannelBacklog(_client, getDatabase(), guildId, channelId);
const count = await syncSelectedChannelBacklog(
_client,
guildId,
channelId,
);
res.json({
success: true,
channelId,

91
tests/database.test.ts Normal file
View File

@@ -0,0 +1,91 @@
import process from "node:process";
import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
const originalEnv = process.env;
describe("Drizzle ORM Database", () => {
let config: any;
let drizzle: any;
let logger: any;
beforeAll(async () => {
// Set up environment for config loading
process.env = {
...originalEnv,
DISCORD_TOKEN: "test-token",
NODE_ENV: "test",
DATABASE_TYPE: originalEnv.DATABASE_TYPE || "sqlite",
};
// Reset modules to pick up new environment
vi.resetModules();
// Import after environment is set
const configModule = await import("../src/config");
const drizzleModule = await import("../src/database/drizzle");
const loggerModule = await import("../src/logger");
config = configModule.config;
drizzle = drizzleModule;
logger = loggerModule.createChildLogger("database.test");
logger.info(`Testing with DATABASE_TYPE: ${config.DATABASE_TYPE}`);
});
afterAll(async () => {
try {
await drizzle.closeDatabase();
} catch (error) {
if (logger) {
logger.error(
{ error: error instanceof Error ? error.message : String(error) },
"Error closing database in afterAll",
);
}
}
process.env = originalEnv;
});
it("should initialize database connection", async () => {
const db = await drizzle.initializeDatabase();
expect(db).toBeDefined();
expect(db).toHaveProperty("query");
expect(db).toHaveProperty("select");
});
it("should return same instance on subsequent calls", async () => {
const db1 = await drizzle.initializeDatabase();
const db2 = await drizzle.initializeDatabase();
expect(db1).toBe(db2);
});
it("should get database instance", async () => {
await drizzle.initializeDatabase();
const db = drizzle.getDatabase();
expect(db).toBeDefined();
expect(db).toHaveProperty("query");
});
it("should throw error if database not initialized", async () => {
// Reset the database state
vi.resetModules();
const drizzleModule = await import("../src/database/drizzle");
expect(() => {
drizzleModule.getDatabase();
}).toThrow("Database not initialized");
});
it("should close database connection", async () => {
await drizzle.initializeDatabase();
await drizzle.closeDatabase();
expect(() => {
drizzle.getDatabase();
}).toThrow("Database not initialized");
});
});

View File

@@ -1,4 +1,4 @@
import { describe, it, expect, beforeEach } from "vitest";
import { beforeEach, describe, expect, it } from "vitest";
beforeEach(() => {
process.env = {
@@ -11,13 +11,16 @@ beforeEach(() => {
describe("attachmentUploader", () => {
it("parses picser upload response correctly", async () => {
const { parseUploadResponse } = await import("../../src/moderation/attachmentUploader");
const { parseUploadResponse } = await import(
"../../src/moderation/attachmentUploader"
);
const response = {
success: true,
filename: "uploads/abc123.jpg",
urls: {
raw_commit: "https://raw.githubusercontent.com/user/repo/commit/uploads/abc123.jpg",
raw_commit:
"https://raw.githubusercontent.com/user/repo/commit/uploads/abc123.jpg",
},
size: 102400,
type: "image/jpeg",
@@ -26,12 +29,16 @@ describe("attachmentUploader", () => {
const result = parseUploadResponse(response);
expect(result.success).toBe(true);
expect(result.url).toBe("https://raw.githubusercontent.com/user/repo/commit/uploads/abc123.jpg");
expect(result.url).toBe(
"https://raw.githubusercontent.com/user/repo/commit/uploads/abc123.jpg",
);
expect(result.filename).toBe("uploads/abc123.jpg");
});
it("handles upload response with missing raw_commit", async () => {
const { parseUploadResponse } = await import("../../src/moderation/attachmentUploader");
const { parseUploadResponse } = await import(
"../../src/moderation/attachmentUploader"
);
const response = {
success: true,