Compare commits

...

2 Commits

17 changed files with 490 additions and 305 deletions

View File

@@ -1,132 +0,0 @@
# PostgreSQL Setup - Complete ✅
**Date:** 2026-05-14
**Status:** ✅ Production Ready with Neon PostgreSQL
## Summary
Bot Discord moderation telah berhasil dikonfigurasi untuk menggunakan **PostgreSQL** (Neon) sebagai database utama dengan Drizzle ORM.
## What Was Done
### 1. Database Connection Fixed
- ✅ Identified database name: `neondb` (bukan `dcbot`)
- ✅ Updated `.env` dengan DATABASE_URL yang benar
- ✅ Tested koneksi ke Neon PostgreSQL - berhasil
### 2. Drizzle ORM Updated
- ✅ Updated `src/database/drizzle.ts` untuk support DATABASE_URL
- ✅ Regenerated migrations untuk PostgreSQL syntax
- ✅ Ran migrations successfully: `pnpm run db:migrate:programmatic`
### 3. Bot Tested
- ✅ Bot startup dengan PostgreSQL - berhasil
- ✅ Database initialized dengan type: postgres
- ✅ Message capture working
- ✅ AI analysis worker started
- ✅ WebSocket server listening
## Current Configuration
```env
DATABASE_TYPE=postgres
DATABASE_URL=postgresql://neondb_owner:npg_2ziHMPwZCet9@ep-long-glitter-ao3sjoyu-pooler.c-2.ap-southeast-1.aws.neon.tech/neondb?sslmode=verify-full&channel_binding=require&connect_timeout=10
```
## Database Schema Created
**Tables created in PostgreSQL:**
- `muxer_jobs` - Job queue untuk audio processing
- `messages` - Text messages dengan AI analysis
- `attachments` - File metadata dengan foreign key
- `ui_state` - Persistent UI state
- `__drizzle_migrations` - Migration tracking
## Commands Available
```bash
# Start bot dengan PostgreSQL
pnpm run dev
# Generate migrations setelah schema changes
pnpm run db:generate
# Run migrations (programmatic - recommended)
pnpm run db:migrate:programmatic
# Run migrations (Drizzle Kit CLI)
pnpm run db:migrate
# Open Drizzle Studio untuk visual data management
pnpm run db:studio
```
## Verification
### Bot Startup Log
```
✅ PostgreSQL database initialized
✅ Database initialized (type: postgres)
✅ Bot logged in
✅ Message capture handlers registered
✅ AI analysis worker started
✅ WebSocket server listening on port 3000
✅ Web interface listening
✅ Message inserted (from Discord)
```
### Database Tables
```sql
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public';
-- Results:
-- muxer_jobs
-- messages
-- attachments
-- ui_state
-- __drizzle_migrations
```
## Commits
```
47ae7f8 chore: remove temporary test files
35269b5 feat: configure postgresql as primary database with neon connection
c63a614 docs: add comprehensive drizzle orm migration final summary
9889d20 feat: add programmatic migration runner for better PostgreSQL support
b580430 docs: add drizzle orm migration completion summary
b9d0a06 fix: update drizzle config to read env vars directly for CLI compatibility
b600dad fix: correct import ordering and update tests for drizzle-orm migration
50d4517 refactor: remove old database adapter files
9ff0f0b feat: update application initialization for drizzle
1c4b0af refactor: migrate messageStore to drizzle-orm
dfe3444 refactor: migrate muxer-queue to drizzle-orm
7e528a4 feat: create drizzle database client
4e28cf9 feat: add drizzle configuration and initial migrations
52b36c9 feat: create drizzle schema definitions
b833b6d feat: add drizzle-orm and drizzle-kit dependencies
```
## Key Features
**Type-Safe Queries** - Full TypeScript support dengan Drizzle ORM
**PostgreSQL Support** - Neon cloud database integration
**Automatic Migrations** - Drizzle Kit generates migrations
**Connection Pooling** - Configurable pool size
**Production Ready** - All tests passing, zero errors
## Next Steps
1. **Monitor bot performance** dengan PostgreSQL
2. **Use Drizzle Studio** untuk visual data management: `pnpm run db:studio`
3. **For schema changes**: Update `src/database/schema.ts``pnpm run db:generate``pnpm run db:migrate:programmatic`
4. **Backup strategy** - Setup regular backups di Neon dashboard
## Status
🎉 **PostgreSQL migration complete and verified!**
Bot Discord moderation sekarang menggunakan PostgreSQL (Neon) sebagai database utama dengan Drizzle ORM untuk type-safe operations.
**Ready for production deployment!**

View File

@@ -14,9 +14,6 @@
"style": { "style": {
"noNonNullAssertion": "warn", "noNonNullAssertion": "warn",
"useNodejsImportProtocol": "warn" "useNodejsImportProtocol": "warn"
},
"suspicious": {
"noExplicitAny": "warn"
} }
} }
} }

View File

@@ -6,6 +6,19 @@ import type { DashboardEvent } from "./ws/client";
import { MessageFeed } from "./components/messages/MessageFeed"; import { MessageFeed } from "./components/messages/MessageFeed";
import { ReviewPanel } from "./components/review/ReviewPanel"; import { ReviewPanel } from "./components/review/ReviewPanel";
function mergeMessages(
current: MessageRecord[],
incoming: MessageRecord[],
): MessageRecord[] {
const byId = new Map(current.map((message) => [message.id, message]));
for (const message of incoming) {
byId.set(message.id, { ...byId.get(message.id), ...message });
}
return Array.from(byId.values())
.sort((a, b) => b.created_at - a.created_at || b.id.localeCompare(a.id))
.slice(0, 200);
}
export default function App() { export default function App() {
const [messages, setMessages] = useState<MessageRecord[]>([]); const [messages, setMessages] = useState<MessageRecord[]>([]);
const [wsStatus, setWsStatus] = useState<string>("connecting"); const [wsStatus, setWsStatus] = useState<string>("connecting");
@@ -17,7 +30,7 @@ export default function App() {
listMessages(new URLSearchParams({ limit: "30" })) listMessages(new URLSearchParams({ limit: "30" }))
.then((result) => { .then((result) => {
if (!cancelled) { if (!cancelled) {
setMessages(result.data); setMessages(mergeMessages([], result.data));
} }
}) })
.catch((err) => { .catch((err) => {
@@ -29,20 +42,10 @@ export default function App() {
const ws = connectDashboardSocket((event: DashboardEvent) => { const ws = connectDashboardSocket((event: DashboardEvent) => {
switch (event.type) { switch (event.type) {
case "message_created": case "message_created":
setMessages((prev) => { setMessages((prev) => mergeMessages(prev, [event.data]));
const existing = prev.some((message) => message.id === event.data.id);
if (existing) {
return prev.map((message) =>
message.id === event.data.id ? event.data : message,
);
}
return [event.data, ...prev].slice(0, 200);
});
break; break;
case "message_analyzed": case "message_analyzed":
setMessages((prev) => setMessages((prev) => mergeMessages(prev, [event.data]));
prev.map((m) => (m.id === event.data.id ? event.data : m)),
);
break; break;
case "message_updated": case "message_updated":
setMessages((prev) => setMessages((prev) =>

View File

@@ -9,10 +9,22 @@ import {
getPendingMessagesByConversation, getPendingMessagesByConversation,
updateMessageAIAnalysis, updateMessageAIAnalysis,
} from "./messageStore"; } from "./messageStore";
import type { AnalysisQueueStatus, MessageRecord } from "./types"; import type {
AnalysisQueueStatus,
MessageRecord,
ModerationBroadcaster,
} from "./types";
const logger = createChildLogger("ai-analyzer"); const logger = createChildLogger("ai-analyzer");
type ModerationGlobal = typeof globalThis & {
moderationBroadcaster?: ModerationBroadcaster;
};
function getModerationBroadcaster(): ModerationBroadcaster | undefined {
return (globalThis as ModerationGlobal).moderationBroadcaster;
}
// Debounce state per conversation key // Debounce state per conversation key
const conversationDebounceTimers = new Map<string, NodeJS.Timeout>(); const conversationDebounceTimers = new Map<string, NodeJS.Timeout>();
// Track conversations currently being processed // Track conversations currently being processed
@@ -117,7 +129,7 @@ async function processBatch(
// Broadcast analyzed messages // Broadcast analyzed messages
for (const row of analyzedRows) { for (const row of analyzedRows) {
(globalThis as any).moderationBroadcaster?.messageAnalyzed(row); getModerationBroadcaster()?.messageAnalyzed(row);
} }
// Clear error cooldown on success // Clear error cooldown on success
@@ -147,7 +159,7 @@ async function processBatch(
error: lastError, error: lastError,
}); });
if (row) { if (row) {
(globalThis as any).moderationBroadcaster?.messageAnalyzed(row); getModerationBroadcaster()?.messageAnalyzed(row);
} }
} }

View File

@@ -1,12 +1,25 @@
import type { Client, Message } from "discord.js-selfbot-v13"; import type { Channel, Client, Message } from "discord.js-selfbot-v13";
import { config } from "../config"; import { config } from "../config";
import { createChildLogger } from "../logger"; import { createChildLogger } from "../logger";
import { captureMessage } from "./messageCapture"; import { captureMessage } from "./messageCapture";
const logger = createChildLogger("backlog-sync"); const logger = createChildLogger("backlog-sync");
type BacklogChannel = Channel & {
messages: {
fetch(options: { limit: number; before?: string }): Promise<{
size: number;
values(): IterableIterator<Message>;
}>;
};
};
function hasMessageBacklog(channel: Channel): channel is BacklogChannel {
return "messages" in channel;
}
async function syncChannelMessages( async function syncChannelMessages(
channel: any, channel: BacklogChannel,
cutoffTime: number, cutoffTime: number,
): Promise<number> { ): Promise<number> {
let before: string | undefined; let before: string | undefined;
@@ -29,7 +42,7 @@ async function syncChannelMessages(
continue; continue;
} }
await captureMessage(message, "text"); await captureMessage(message, "text", { source: "backlog" });
synced++; synced++;
} }
@@ -77,6 +90,10 @@ export async function syncSelectedChannelBacklog(
logger.warn({ guildId, channelId }, "Channel not found for backlog sync"); logger.warn({ guildId, channelId }, "Channel not found for backlog sync");
return 0; return 0;
} }
if (!hasMessageBacklog(channel)) {
logger.warn({ guildId, channelId }, "Channel cannot fetch message backlog");
return 0;
}
const cutoffTime = Date.now() - config.BACKLOG_SYNC_HOURS * 60 * 60 * 1000; const cutoffTime = Date.now() - config.BACKLOG_SYNC_HOURS * 60 * 60 * 1000;
logger.info( logger.info(
@@ -85,7 +102,7 @@ export async function syncSelectedChannelBacklog(
); );
try { try {
const count = await syncChannelMessages(channel as any, cutoffTime); const count = await syncChannelMessages(channel, cutoffTime);
logger.info( logger.info(
{ channelId, count }, { channelId, count },
"Backlog sync completed for selected channel", "Backlog sync completed for selected channel",

View File

@@ -7,11 +7,14 @@ import type {
ModerationWsEvent, ModerationWsEvent,
} from "./types"; } from "./types";
type ClientLike = Pick<WebSocket, "readyState" | "send">; export type BroadcasterClient = Pick<WebSocket, "readyState" | "send">;
const log = createChildLogger("broadcaster"); const log = createChildLogger("broadcaster");
function sendJson(clients: Set<ClientLike>, event: ModerationWsEvent): void { function sendJson(
clients: Set<BroadcasterClient>,
event: ModerationWsEvent,
): void {
const payload = JSON.stringify({ ...event, timestamp: Date.now() }); const payload = JSON.stringify({ ...event, timestamp: Date.now() });
for (const client of clients) { for (const client of clients) {
if (client.readyState === 1) { if (client.readyState === 1) {
@@ -28,14 +31,14 @@ function sendJson(clients: Set<ClientLike>, event: ModerationWsEvent): void {
} }
export function createBroadcaster() { export function createBroadcaster() {
const clients = new Set<ClientLike>(); const clients = new Set<BroadcasterClient>();
return { return {
addClient(client: ClientLike) { addClient(client: BroadcasterClient) {
clients.add(client); clients.add(client);
log.debug({ clientCount: clients.size }, "Client added"); log.debug({ clientCount: clients.size }, "Client added");
}, },
removeClient(client: ClientLike) { removeClient(client: BroadcasterClient) {
clients.delete(client); clients.delete(client);
log.debug({ clientCount: clients.size }, "Client removed"); log.debug({ clientCount: clients.size }, "Client removed");
}, },

View File

@@ -22,9 +22,18 @@ import type {
const logger = createChildLogger("message-capture"); const logger = createChildLogger("message-capture");
type ModerationGlobal = typeof globalThis & {
moderationBroadcaster?: ModerationBroadcaster;
};
function getModerationBroadcaster(): ModerationBroadcaster | undefined {
return (globalThis as ModerationGlobal).moderationBroadcaster;
}
export async function captureMessage( export async function captureMessage(
message: Message, message: Message,
type: "text" | "edited" | "deleted", type: "text" | "edited" | "deleted",
options: { source?: "live" | "backlog" } = {},
): Promise<void> { ): Promise<void> {
const location = getMessageLocation(message); const location = getMessageLocation(message);
const metadata = getMessageMetadata(message); const metadata = getMessageMetadata(message);
@@ -46,17 +55,19 @@ export async function captureMessage(
metadata: JSON.stringify(metadata), metadata: JSON.stringify(metadata),
}; };
await upsertMessageForCapture(messageRecord); const inserted = await upsertMessageForCapture(messageRecord);
queueMessageAnalysis(message.id); if (!inserted) {
return;
}
const broadcaster = (globalThis as any).moderationBroadcaster as const isBacklog = options.source === "backlog";
| ModerationBroadcaster if (!isBacklog) {
| undefined; queueMessageAnalysis(message.id);
if (broadcaster) { }
broadcaster.messageCreated({
...messageRecord, const broadcaster = getModerationBroadcaster();
type: "text", if (broadcaster && !isBacklog) {
}); broadcaster.messageCreated(messageRecord);
} }
if (message.attachments.size > 0) { if (message.attachments.size > 0) {
@@ -132,9 +143,7 @@ export function registerMessageCapture(client: Client): void {
); );
queueMessageAnalysis(newMessage.id); queueMessageAnalysis(newMessage.id);
const broadcaster = (globalThis as any).moderationBroadcaster as const broadcaster = getModerationBroadcaster();
| ModerationBroadcaster
| undefined;
if (broadcaster) { if (broadcaster) {
broadcaster.messageUpdated({ broadcaster.messageUpdated({
id: newMessage.id, id: newMessage.id,
@@ -164,9 +173,7 @@ export function registerMessageCapture(client: Client): void {
const deletedAt = Date.now(); const deletedAt = Date.now();
await updateMessageAsDeleted(message.id, deletedAt); await updateMessageAsDeleted(message.id, deletedAt);
const broadcaster = (globalThis as any).moderationBroadcaster as const broadcaster = getModerationBroadcaster();
| ModerationBroadcaster
| undefined;
if (broadcaster) { if (broadcaster) {
broadcaster.messageDeleted({ broadcaster.messageDeleted({
id: message.id, id: message.id,

View File

@@ -1,4 +1,4 @@
import { and, asc, desc, eq, isNull, or, sql } from "drizzle-orm"; import { and, asc, desc, eq, isNull, or, type SQL, sql } from "drizzle-orm";
import { getDatabase } from "../database/drizzle"; import { getDatabase } from "../database/drizzle";
import { attachmentsTable, messagesTable } from "../database/schema"; import { attachmentsTable, messagesTable } from "../database/schema";
import { createChildLogger } from "../logger"; import { createChildLogger } from "../logger";
@@ -11,6 +11,29 @@ import type {
const logger = createChildLogger("message-store"); const logger = createChildLogger("message-store");
interface QueryBuilder<T = unknown> extends PromiseLike<T> {
from(...args: unknown[]): QueryBuilder<T>;
where(...args: unknown[]): QueryBuilder<T>;
orderBy(...args: unknown[]): QueryBuilder<T>;
limit(...args: unknown[]): QueryBuilder<T>;
offset(...args: unknown[]): QueryBuilder<T>;
values(...args: unknown[]): QueryBuilder<T>;
onConflictDoNothing(...args: unknown[]): QueryBuilder<T>;
returning(...args: unknown[]): QueryBuilder<T>;
set(...args: unknown[]): QueryBuilder<T>;
}
interface MessageDatabase {
select<T = unknown[]>(...args: unknown[]): QueryBuilder<T>;
selectDistinct<T = unknown[]>(...args: unknown[]): QueryBuilder<T>;
insert<T = unknown>(...args: unknown[]): QueryBuilder<T>;
update(...args: unknown[]): QueryBuilder<unknown>;
}
function db(): MessageDatabase {
return getDatabase() as unknown as MessageDatabase;
}
// Cursor helpers for pagination // Cursor helpers for pagination
interface CursorData { interface CursorData {
created_at: number; created_at: number;
@@ -36,8 +59,8 @@ export function decodeCursor(cursor?: string): CursorData | null {
export async function insertMessage(message: MessageRecord): Promise<void> { export async function insertMessage(message: MessageRecord): Promise<void> {
try { try {
const db = getDatabase() as any; const database = db();
await db.insert(messagesTable).values(message).onConflictDoNothing(); await database.insert(messagesTable).values(message).onConflictDoNothing();
logger.debug( logger.debug(
{ messageId: message.id, channelId: message.channel_id }, { messageId: message.id, channelId: message.channel_id },
@@ -57,26 +80,26 @@ export async function insertMessage(message: MessageRecord): Promise<void> {
export async function upsertMessageForCapture( export async function upsertMessageForCapture(
message: MessageRecord, message: MessageRecord,
): Promise<void> { ): Promise<boolean> {
try { try {
const db = getDatabase() as any; const database = db();
// Set ai_status to pending for new or recaptured/edited text
const messageWithAIStatus = { const messageWithAIStatus = {
...message, ...message,
ai_status: "pending" as const, ai_status: "pending" as const,
}; };
// Try insert first (fast path for new messages) const rows = await database
await db .insert<Array<{ id: string }>>(messagesTable)
.insert(messagesTable)
.values(messageWithAIStatus) .values(messageWithAIStatus)
.onConflictDoNothing(); .onConflictDoNothing()
.returning({ id: messagesTable.id });
const inserted = rows.length > 0;
logger.debug( logger.debug(
{ messageId: message.id, channelId: message.channel_id }, { messageId: message.id, channelId: message.channel_id, inserted },
"Message upserted for capture", inserted ? "Message inserted for capture" : "Message already captured",
); );
return inserted;
} catch (error) { } catch (error) {
logger.error( logger.error(
{ {
@@ -95,8 +118,8 @@ export async function updateMessageAsEdited(
editedAt: number, editedAt: number,
): Promise<void> { ): Promise<void> {
try { try {
const db = getDatabase() as any; const database = db();
await db await database
.update(messagesTable) .update(messagesTable)
.set({ .set({
edited_content: editedContent, edited_content: editedContent,
@@ -130,8 +153,8 @@ export async function updateMessageAsDeleted(
deletedAt: number, deletedAt: number,
): Promise<void> { ): Promise<void> {
try { try {
const db = getDatabase() as any; const database = db();
await db await database
.update(messagesTable) .update(messagesTable)
.set({ .set({
deleted_at: deletedAt, deleted_at: deletedAt,
@@ -158,8 +181,8 @@ export async function getMessagesByChannel(
offset: number = 0, offset: number = 0,
): Promise<MessageRecord[]> { ): Promise<MessageRecord[]> {
try { try {
const db = getDatabase() as any; const database = db();
const rows = await db const rows = await database
.select() .select()
.from(messagesTable) .from(messagesTable)
.where( .where(
@@ -189,8 +212,11 @@ export async function insertAttachment(
attachment: AttachmentRecord, attachment: AttachmentRecord,
): Promise<void> { ): Promise<void> {
try { try {
const db = getDatabase() as any; const database = db();
await db.insert(attachmentsTable).values(attachment).onConflictDoNothing(); await database
.insert(attachmentsTable)
.values(attachment)
.onConflictDoNothing();
logger.debug( logger.debug(
{ attachmentId: attachment.id, messageId: attachment.message_id }, { attachmentId: attachment.id, messageId: attachment.message_id },
@@ -214,8 +240,8 @@ export async function getAttachmentsByChannel(
offset: number = 0, offset: number = 0,
): Promise<AttachmentRecord[]> { ): Promise<AttachmentRecord[]> {
try { try {
const db = getDatabase() as any; const database = db();
const rows = await db const rows = await database
.select() .select()
.from(attachmentsTable) .from(attachmentsTable)
.where( .where(
@@ -247,8 +273,8 @@ export async function updateAttachmentAsUploaded(
uploadedAt: number, uploadedAt: number,
): Promise<void> { ): Promise<void> {
try { try {
const db = getDatabase() as any; const database = db();
await db await database
.update(attachmentsTable) .update(attachmentsTable)
.set({ .set({
uploaded_url: uploadedUrl, uploaded_url: uploadedUrl,
@@ -278,8 +304,8 @@ export async function updateAttachmentAsFailedUpload(
error: string, error: string,
): Promise<void> { ): Promise<void> {
try { try {
const db = getDatabase() as any; const database = db();
await db await database
.update(attachmentsTable) .update(attachmentsTable)
.set({ .set({
upload_status: "failed", upload_status: "failed",
@@ -315,8 +341,8 @@ export async function updateMessageAIAnalysis(
result: AIAnalysisUpdate, result: AIAnalysisUpdate,
): Promise<MessageRecord | null> { ): Promise<MessageRecord | null> {
try { try {
const db = getDatabase() as any; const database = db();
await db await database
.update(messagesTable) .update(messagesTable)
.set({ .set({
ai_status: result.status, ai_status: result.status,
@@ -329,7 +355,7 @@ export async function updateMessageAIAnalysis(
}) })
.where(eq(messagesTable.id, messageId)); .where(eq(messagesTable.id, messageId));
const rows = await db const rows = await database
.select() .select()
.from(messagesTable) .from(messagesTable)
.where(eq(messagesTable.id, messageId)); .where(eq(messagesTable.id, messageId));
@@ -351,8 +377,8 @@ export async function getPendingAIAnalysisMessages(
limit: number = 25, limit: number = 25,
): Promise<MessageRecord[]> { ): Promise<MessageRecord[]> {
try { try {
const db = getDatabase() as any; const database = db();
const rows = await db const rows = await database
.select() .select()
.from(messagesTable) .from(messagesTable)
.where( .where(
@@ -378,8 +404,8 @@ export async function getMessageById(
messageId: string, messageId: string,
): Promise<MessageRecord | null> { ): Promise<MessageRecord | null> {
try { try {
const db = getDatabase() as any; const database = db();
const rows = await db const rows = await database
.select() .select()
.from(messagesTable) .from(messagesTable)
.where(eq(messagesTable.id, messageId)); .where(eq(messagesTable.id, messageId));
@@ -401,8 +427,8 @@ export async function listMessages(
query: MessageQuery, query: MessageQuery,
): Promise<PageResult<MessageRecord>> { ): Promise<PageResult<MessageRecord>> {
try { try {
const db = getDatabase() as any; const database = db();
const conditions: any[] = []; const conditions: SQL[] = [];
// Apply filters // Apply filters
if (query.guildId) { if (query.guildId) {
@@ -411,10 +437,7 @@ export async function listMessages(
if (query.channelId) { if (query.channelId) {
conditions.push( conditions.push(
or( sql`(${messagesTable.channel_id} = ${query.channelId} or ${messagesTable.thread_id} = ${query.channelId})`,
eq(messagesTable.channel_id, query.channelId),
eq(messagesTable.thread_id, query.channelId),
),
); );
} }
@@ -427,11 +450,7 @@ export async function listMessages(
} }
if (query.status && query.status.length > 0) { if (query.status && query.status.length > 0) {
conditions.push( conditions.push(sql`${messagesTable.ai_status} in ${query.status}`);
or(
...query.status.map((status) => eq(messagesTable.ai_status, status)),
),
);
} }
// Text search // Text search
@@ -445,20 +464,14 @@ export async function listMessages(
const cursorData = decodeCursor(query.cursor); const cursorData = decodeCursor(query.cursor);
if (cursorData) { if (cursorData) {
conditions.push( conditions.push(
or( sql`(${messagesTable.created_at} < ${cursorData.created_at} or (${messagesTable.created_at} = ${cursorData.created_at} and ${messagesTable.id} < ${cursorData.id}))`,
sql`${messagesTable.created_at} < ${cursorData.created_at}`,
and(
eq(messagesTable.created_at, cursorData.created_at),
sql`${messagesTable.id} < ${cursorData.id}`,
),
),
); );
} }
} }
// Fetch limit + 1 to determine if there's a next page // Fetch limit + 1 to determine if there's a next page
const fetchLimit = query.limit + 1; const fetchLimit = query.limit + 1;
const rows = await db const rows = await database
.select() .select()
.from(messagesTable) .from(messagesTable)
.where(conditions.length > 0 ? and(...conditions) : undefined) .where(conditions.length > 0 ? and(...conditions) : undefined)
@@ -506,7 +519,7 @@ export async function getConversationContextBefore(input: {
limit: number; limit: number;
}): Promise<MessageRecord[]> { }): Promise<MessageRecord[]> {
try { try {
const db = getDatabase() as any; const database = db();
const { channelId, threadId, beforeCreatedAt, limit } = input; const { channelId, threadId, beforeCreatedAt, limit } = input;
// Query same thread if threadId exists, otherwise channelId // Query same thread if threadId exists, otherwise channelId
@@ -514,7 +527,7 @@ export async function getConversationContextBefore(input: {
? eq(messagesTable.thread_id, threadId) ? eq(messagesTable.thread_id, threadId)
: eq(messagesTable.channel_id, channelId); : eq(messagesTable.channel_id, channelId);
const rows = await db const rows = await database
.select() .select()
.from(messagesTable) .from(messagesTable)
.where( .where(
@@ -547,11 +560,11 @@ export async function getPendingMessagesByConversation(
limit: number = 25, limit: number = 25,
): Promise<MessageRecord[]> { ): Promise<MessageRecord[]> {
try { try {
const db = getDatabase() as any; const database = db();
// conversationKey is either thread_id or channel_id // conversationKey is either thread_id or channel_id
// Query both to safely handle the key // Query both to safely handle the key
const rows = await db const rows = await database
.select() .select()
.from(messagesTable) .from(messagesTable)
.where( .where(
@@ -584,11 +597,11 @@ export async function getPendingConversationKeys(
limit: number = 100, limit: number = 100,
): Promise<string[]> { ): Promise<string[]> {
try { try {
const db = getDatabase() as any; const database = db();
// Get distinct conversation keys (thread_id or channel_id) for pending messages // Get distinct conversation keys (thread_id or channel_id) for pending messages
const rows = await db const rows = await database
.selectDistinct({ .selectDistinct<Array<{ thread_id: string | null; channel_id: string }>>({
thread_id: messagesTable.thread_id, thread_id: messagesTable.thread_id,
channel_id: messagesTable.channel_id, channel_id: messagesTable.channel_id,
}) })
@@ -602,7 +615,7 @@ export async function getPendingConversationKeys(
.limit(limit); .limit(limit);
const keys: string[] = []; const keys: string[] = [];
for (const row of rows as any[]) { for (const row of rows) {
const key = row.thread_id || row.channel_id; const key = row.thread_id || row.channel_id;
if (key && !keys.includes(key)) { if (key && !keys.includes(key)) {
keys.push(key); keys.push(key);

View File

@@ -1,8 +1,8 @@
import type { ModerationBroadcaster } from "./broadcaster"; import type { BroadcasterClient, ModerationBroadcaster } from "./broadcaster";
export type AIStatus = "pending" | "clean" | "warn" | "flagged" | "error"; export type AIStatus = "pending" | "clean" | "warn" | "flagged" | "error";
export type { ModerationBroadcaster }; export type { BroadcasterClient, ModerationBroadcaster };
export interface MessageRecord { export interface MessageRecord {
id: string; id: string;

View File

@@ -8,8 +8,28 @@ import { createChildLogger } from "./logger";
const logger = createChildLogger("muxer-queue"); const logger = createChildLogger("muxer-queue");
// Type alias for backward compatibility interface QueryBuilder<T = unknown> extends PromiseLike<T> {
export type SqliteDatabase = any; from(...args: unknown[]): QueryBuilder<T>;
where(...args: unknown[]): QueryBuilder<T>;
orderBy(...args: unknown[]): QueryBuilder<T>;
limit(...args: unknown[]): QueryBuilder<T>;
values(...args: unknown[]): QueryBuilder<T>;
onConflictDoNothing(...args: unknown[]): QueryBuilder<T>;
onConflictDoUpdate(...args: unknown[]): QueryBuilder<T>;
set(...args: unknown[]): QueryBuilder<T>;
groupBy(...args: unknown[]): QueryBuilder<T>;
}
export interface SqliteDatabase {
select<T = unknown[]>(...args: unknown[]): QueryBuilder<T>;
insert(...args: unknown[]): QueryBuilder<unknown>;
update(...args: unknown[]): QueryBuilder<unknown>;
delete(...args: unknown[]): QueryBuilder<unknown>;
}
function db(): SqliteDatabase {
return getDrizzleDatabase() as unknown as SqliteDatabase;
}
export interface MuxerJobData { export interface MuxerJobData {
userId: string; userId: string;
@@ -18,6 +38,22 @@ export interface MuxerJobData {
outputDir: string; outputDir: string;
} }
interface StoredJobRow {
id: string;
data: string;
status: "pending" | "processing" | "completed" | "failed";
attempts: number;
maxAttempts: number;
createdAt: number;
updatedAt: number;
error: string | null;
}
interface JobStatsRow {
status: "pending" | "processing" | "completed" | "failed";
count: number | string | { count: number | string };
}
interface StoredJob { interface StoredJob {
id: string; id: string;
data: string; data: string;
@@ -31,7 +67,7 @@ interface StoredJob {
// Export getDatabase for backward compatibility with webserver.ts // Export getDatabase for backward compatibility with webserver.ts
export function getDatabase(): SqliteDatabase { export function getDatabase(): SqliteDatabase {
return getDrizzleDatabase() as any; return db();
} }
export async function getPersistedValue<T>( export async function getPersistedValue<T>(
@@ -39,10 +75,10 @@ export async function getPersistedValue<T>(
fallback: T, fallback: T,
): Promise<T> { ): Promise<T> {
await initializeDatabase(); await initializeDatabase();
const db = getDrizzleDatabase() as any; const database = db();
const row = await db const row = await database
.select() .select<Array<{ value: string }>>()
.from(uiStateTable) .from(uiStateTable)
.where(eq(uiStateTable.key, key)) .where(eq(uiStateTable.key, key))
.limit(1); .limit(1);
@@ -61,9 +97,9 @@ export async function setPersistedValue(
value: unknown, value: unknown,
): Promise<void> { ): Promise<void> {
await initializeDatabase(); await initializeDatabase();
const db = getDrizzleDatabase() as any; const database = db();
await db await database
.insert(uiStateTable) .insert(uiStateTable)
.values({ .values({
key, key,
@@ -82,12 +118,12 @@ export async function setPersistedValue(
export async function enqueueMuxerJob(data: MuxerJobData): Promise<string> { export async function enqueueMuxerJob(data: MuxerJobData): Promise<string> {
try { try {
await initializeDatabase(); await initializeDatabase();
const db = getDrizzleDatabase() as any; const database = db();
const jobId = `${data.userId}-${data.sessionId}`; const jobId = `${data.userId}-${data.sessionId}`;
const now = Date.now(); const now = Date.now();
await db await database
.insert(muxerJobsTable) .insert(muxerJobsTable)
.values({ .values({
id: jobId, id: jobId,
@@ -120,16 +156,16 @@ export async function enqueueMuxerJob(data: MuxerJobData): Promise<string> {
export async function getPendingJobs(): Promise<StoredJob[]> { export async function getPendingJobs(): Promise<StoredJob[]> {
await initializeDatabase(); await initializeDatabase();
const db = getDrizzleDatabase() as any; const database = db();
const rows = await db const rows = await database
.select() .select<StoredJobRow[]>()
.from(muxerJobsTable) .from(muxerJobsTable)
.where(eq(muxerJobsTable.status, "pending")) .where(eq(muxerJobsTable.status, "pending"))
.orderBy(asc(muxerJobsTable.createdAt)) .orderBy(asc(muxerJobsTable.createdAt))
.limit(10); .limit(10);
return rows.map((row: any) => ({ return rows.map((row) => ({
id: row.id, id: row.id,
data: row.data, data: row.data,
status: row.status as "pending" | "processing" | "completed" | "failed", status: row.status as "pending" | "processing" | "completed" | "failed",
@@ -147,11 +183,11 @@ export async function updateJobStatus(
error?: string, error?: string,
): Promise<void> { ): Promise<void> {
await initializeDatabase(); await initializeDatabase();
const db = getDrizzleDatabase() as any; const database = db();
const now = Date.now(); const now = Date.now();
if (status === "failed") { if (status === "failed") {
await db await database
.update(muxerJobsTable) .update(muxerJobsTable)
.set({ .set({
status, status,
@@ -161,7 +197,7 @@ export async function updateJobStatus(
}) })
.where(eq(muxerJobsTable.id, jobId)); .where(eq(muxerJobsTable.id, jobId));
} else { } else {
await db await database
.update(muxerJobsTable) .update(muxerJobsTable)
.set({ .set({
status, status,
@@ -175,10 +211,10 @@ export async function updateJobStatus(
export async function retryFailedJob(jobId: string): Promise<boolean> { export async function retryFailedJob(jobId: string): Promise<boolean> {
await initializeDatabase(); await initializeDatabase();
const db = getDrizzleDatabase() as any; const database = db();
const jobs = await db const jobs = await database
.select() .select<StoredJobRow[]>()
.from(muxerJobsTable) .from(muxerJobsTable)
.where(eq(muxerJobsTable.id, jobId)) .where(eq(muxerJobsTable.id, jobId))
.limit(1); .limit(1);
@@ -198,7 +234,7 @@ export async function retryFailedJob(jobId: string): Promise<boolean> {
return false; return false;
} }
await db await database
.update(muxerJobsTable) .update(muxerJobsTable)
.set({ .set({
status: "pending", status: "pending",
@@ -215,10 +251,10 @@ export async function cleanupCompletedJobs(
olderThanMs: number = 24 * 60 * 60 * 1000, olderThanMs: number = 24 * 60 * 60 * 1000,
): Promise<number> { ): Promise<number> {
await initializeDatabase(); await initializeDatabase();
const db = getDrizzleDatabase() as any; const database = db();
const cutoffTime = Date.now() - olderThanMs; const cutoffTime = Date.now() - olderThanMs;
const result = await db const result = await database
.delete(muxerJobsTable) .delete(muxerJobsTable)
.where( .where(
and( and(
@@ -228,8 +264,8 @@ export async function cleanupCompletedJobs(
); );
const deletedCount = const deletedCount =
typeof result === "object" && "rowsAffected" in result typeof result === "object" && result !== null && "rowsAffected" in result
? result.rowsAffected ? Number(result.rowsAffected)
: 0; : 0;
logger.info({ deletedCount }, "Cleaned up completed jobs"); logger.info({ deletedCount }, "Cleaned up completed jobs");
@@ -244,10 +280,10 @@ export async function getJobStats(): Promise<{
failed: number; failed: number;
}> { }> {
await initializeDatabase(); await initializeDatabase();
const db = getDrizzleDatabase() as any; const database = db();
const rows = await db const rows = await database
.select({ .select<JobStatsRow[]>({
status: muxerJobsTable.status, status: muxerJobsTable.status,
count: sql<number>`COUNT(*)`, count: sql<number>`COUNT(*)`,
}) })
@@ -264,7 +300,7 @@ export async function getJobStats(): Promise<{
for (const row of rows) { for (const row of rows) {
const count = const count =
typeof row.count === "object" && "count" in row.count typeof row.count === "object" && "count" in row.count
? (row.count as any).count ? Number((row.count as { count: number | string }).count)
: Number(row.count); : Number(row.count);
if (row.status === "pending") stats.pending = count; if (row.status === "pending") stats.pending = count;
else if (row.status === "processing") stats.processing = count; else if (row.status === "processing") stats.processing = count;

View File

@@ -1,6 +1,7 @@
import fs from "node:fs"; import fs from "node:fs";
import path from "node:path"; import path from "node:path";
import { import {
type DiscordGatewayAdapterCreator,
EndBehaviorType, EndBehaviorType,
entersState, entersState,
getVoiceConnection, getVoiceConnection,
@@ -41,7 +42,8 @@ export async function startRecording(
const connection = joinVoiceChannel({ const connection = joinVoiceChannel({
channelId: channel.id, channelId: channel.id,
guildId: channel.guild.id, guildId: channel.guild.id,
adapterCreator: channel.guild.voiceAdapterCreator as any, adapterCreator: channel.guild
.voiceAdapterCreator as DiscordGatewayAdapterCreator,
selfDeaf: false, selfDeaf: false,
selfMute: false, selfMute: false,
debug: true, debug: true,

View File

@@ -88,13 +88,16 @@ export class VoiceController {
await guild.channels.fetch().catch(() => null); await guild.channels.fetch().catch(() => null);
const threads: ChannelSummary[] = []; const threads: ChannelSummary[] = [];
type ThreadFetchResult = {
threads: Map<string, { id: string; name: string; type: string }>;
};
for (const channel of guild.channels.cache.values()) { for (const channel of guild.channels.cache.values()) {
const threadParent = channel as typeof channel & { const threadParent = channel as typeof channel & {
threads?: { threads?: {
fetch: (options: { fetch: (options: {
archived: boolean; archived: boolean;
limit: number; limit: number;
}) => Promise<any>; }) => Promise<ThreadFetchResult>;
}; };
}; };
if (!threadParent.threads?.fetch) continue; if (!threadParent.threads?.fetch) continue;

View File

@@ -10,6 +10,7 @@ import { AppError } from "./errors";
import { createChildLogger, logger } from "./logger"; import { createChildLogger, logger } from "./logger";
import { getMetrics, uptimeGauge } from "./metrics"; import { getMetrics, uptimeGauge } from "./metrics";
import { createBroadcaster } from "./moderation/broadcaster"; import { createBroadcaster } from "./moderation/broadcaster";
import type { ModerationBroadcaster } from "./moderation/types";
import { getPersistedValue, setPersistedValue } from "./muxer-queue"; import { getPersistedValue, setPersistedValue } from "./muxer-queue";
import { discordPlayer } from "./player"; import { discordPlayer } from "./player";
import { createAnalysisRoutes } from "./routes/analysisRoutes"; import { createAnalysisRoutes } from "./routes/analysisRoutes";
@@ -26,6 +27,15 @@ const activeUsers = new Map<
{ username: string; avatar: string; speaking: boolean } { username: string; avatar: string; speaking: boolean }
>(); >();
type VoiceGlobals = typeof globalThis & {
moderationBroadcaster?: ModerationBroadcaster;
broadcastPcmToWeb?: (chunk: Buffer, userId: string) => void;
updateActiveUser?: (
userId: string,
data: { username: string; avatar: string; speaking: boolean },
) => void;
};
interface SharedUIState { interface SharedUIState {
selectedGuild: string; selectedGuild: string;
selectedVoiceChannel: string; selectedVoiceChannel: string;
@@ -118,7 +128,7 @@ export async function startWebserver(
// Create broadcaster instance // Create broadcaster instance
const broadcaster = createBroadcaster(); const broadcaster = createBroadcaster();
(globalThis as any).moderationBroadcaster = broadcaster; (globalThis as VoiceGlobals).moderationBroadcaster = broadcaster;
// Security headers. CSP disabled because the current static UI uses inline scripts/styles. // Security headers. CSP disabled because the current static UI uses inline scripts/styles.
app.use( app.use(
@@ -196,7 +206,10 @@ export async function startWebserver(
app.use("/api", createSyncRoutes(_client)); app.use("/api", createSyncRoutes(_client));
// Inbound: Discord PCM → tagged chunks → browser // Inbound: Discord PCM → tagged chunks → browser
(global as any).broadcastPcmToWeb = (chunk: Buffer, userId: string) => { (globalThis as VoiceGlobals).broadcastPcmToWeb = (
chunk: Buffer,
userId: string,
) => {
let hash = 0; let hash = 0;
for (let i = 0; i < userId.length; i++) { for (let i = 0; i < userId.length; i++) {
hash = (hash << 5) - hash + userId.charCodeAt(i); hash = (hash << 5) - hash + userId.charCodeAt(i);
@@ -210,7 +223,7 @@ export async function startWebserver(
} }
}; };
(global as any).updateActiveUser = ( (globalThis as VoiceGlobals).updateActiveUser = (
userId: string, userId: string,
data: { username: string; avatar: string; speaking: boolean }, data: { username: string; avatar: string; speaking: boolean },
) => { ) => {
@@ -327,7 +340,7 @@ export async function startWebserver(
); );
ws.send(JSON.stringify({ type: "ui_state", state: getSharedUIState() })); ws.send(JSON.stringify({ type: "ui_state", state: getSharedUIState() }));
ws.on("message", (data: any) => { ws.on("message", (data: Buffer | ArrayBuffer | Buffer[]) => {
if (!Buffer.isBuffer(data)) return; if (!Buffer.isBuffer(data)) return;
lastBrowserAudioTime = Date.now(); lastBrowserAudioTime = Date.now();

View File

@@ -4,9 +4,9 @@ import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
const originalEnv = process.env; const originalEnv = process.env;
describe("Drizzle ORM Database", () => { describe("Drizzle ORM Database", () => {
let config: any; let config: typeof import("../src/config").config;
let drizzle: any; let drizzle: typeof import("../src/database/drizzle");
let logger: any; let logger: ReturnType<typeof import("../src/logger").createChildLogger>;
beforeAll(async () => { beforeAll(async () => {
// Set up environment for config loading // Set up environment for config loading

View File

@@ -1,17 +1,70 @@
import type { Mock } from "vitest";
import { describe, expect, it, vi } from "vitest"; import { describe, expect, it, vi } from "vitest";
import { createBroadcaster } from "../../src/moderation/broadcaster"; import {
type BroadcasterClient,
createBroadcaster,
} from "../../src/moderation/broadcaster";
import type {
AttachmentRecord,
MessageRecord,
} from "../../src/moderation/types";
function client() { type TestClient = BroadcasterClient & { send: Mock };
function client(): TestClient {
return { readyState: 1, send: vi.fn() }; return { readyState: 1, send: vi.fn() };
} }
function messageRecord(overrides: Partial<MessageRecord> = {}): MessageRecord {
return {
id: "m1",
guild_id: "guild-1",
channel_id: "channel-1",
thread_id: null,
user_id: "user-1",
username: "alice",
avatar_url: null,
content: "test",
edited_content: null,
created_at: 1,
edited_at: null,
deleted_at: null,
type: "text",
metadata: null,
...overrides,
};
}
function attachmentRecord(
overrides: Partial<AttachmentRecord> = {},
): AttachmentRecord {
return {
id: "a1",
message_id: "m1",
guild_id: "guild-1",
channel_id: "channel-1",
thread_id: null,
user_id: "user-1",
filename: "image.png",
size: 1,
type: "image/png",
discord_url: "https://example.com/image.png",
uploaded_url: null,
upload_status: "pending",
upload_error: null,
created_at: 1,
uploaded_at: null,
...overrides,
};
}
describe("createBroadcaster", () => { describe("createBroadcaster", () => {
it("sends JSON events to open clients", () => { it("sends JSON events to open clients", () => {
const ws = client(); const ws = client();
const broadcaster = createBroadcaster(); const broadcaster = createBroadcaster();
broadcaster.addClient(ws as any); broadcaster.addClient(ws);
broadcaster.messageAnalyzed({ id: "m1", ai_status: "clean" } as any); broadcaster.messageAnalyzed(messageRecord({ ai_status: "clean" }));
expect(ws.send).toHaveBeenCalledTimes(1); expect(ws.send).toHaveBeenCalledTimes(1);
expect(JSON.parse(ws.send.mock.calls[0][0])).toMatchObject({ expect(JSON.parse(ws.send.mock.calls[0][0])).toMatchObject({
@@ -21,10 +74,10 @@ describe("createBroadcaster", () => {
}); });
it("skips closed clients", () => { it("skips closed clients", () => {
const ws = { readyState: 3, send: vi.fn() }; const ws: TestClient = { readyState: 3, send: vi.fn() };
const broadcaster = createBroadcaster(); const broadcaster = createBroadcaster();
broadcaster.addClient(ws as any); broadcaster.addClient(ws);
broadcaster.messageDeleted({ id: "m1", deleted_at: 123 }); broadcaster.messageDeleted({ id: "m1", deleted_at: 123 });
expect(ws.send).not.toHaveBeenCalled(); expect(ws.send).not.toHaveBeenCalled();
@@ -36,14 +89,11 @@ describe("createBroadcaster", () => {
const ws3 = client(); const ws3 = client();
const broadcaster = createBroadcaster(); const broadcaster = createBroadcaster();
broadcaster.addClient(ws1 as any); broadcaster.addClient(ws1);
broadcaster.addClient(ws2 as any); broadcaster.addClient(ws2);
broadcaster.addClient(ws3 as any); broadcaster.addClient(ws3);
broadcaster.messageCreated({ broadcaster.messageCreated(messageRecord());
id: "m1",
content: "test",
} as any);
expect(ws1.send).toHaveBeenCalledTimes(1); expect(ws1.send).toHaveBeenCalledTimes(1);
expect(ws2.send).toHaveBeenCalledTimes(1); expect(ws2.send).toHaveBeenCalledTimes(1);
@@ -61,14 +111,14 @@ describe("createBroadcaster", () => {
throw new Error("Send failed"); throw new Error("Send failed");
}); });
broadcaster.addClient(ws1 as any); broadcaster.addClient(ws1);
broadcaster.addClient(ws2 as any); broadcaster.addClient(ws2);
broadcaster.addClient(ws3 as any); broadcaster.addClient(ws3);
broadcaster.messageUpdated({ broadcaster.messageUpdated({
id: "m1", id: "m1",
content: "updated", content: "updated",
} as any); });
// ws1 attempted send (threw) // ws1 attempted send (threw)
expect(ws1.send).toHaveBeenCalledTimes(1); expect(ws1.send).toHaveBeenCalledTimes(1);
@@ -84,16 +134,16 @@ describe("createBroadcaster", () => {
expect(broadcaster.clientCount()).toBe(0); expect(broadcaster.clientCount()).toBe(0);
broadcaster.addClient(ws1 as any); broadcaster.addClient(ws1);
expect(broadcaster.clientCount()).toBe(1); expect(broadcaster.clientCount()).toBe(1);
broadcaster.addClient(ws2 as any); broadcaster.addClient(ws2);
expect(broadcaster.clientCount()).toBe(2); expect(broadcaster.clientCount()).toBe(2);
broadcaster.removeClient(ws1 as any); broadcaster.removeClient(ws1);
expect(broadcaster.clientCount()).toBe(1); expect(broadcaster.clientCount()).toBe(1);
broadcaster.removeClient(ws2 as any); broadcaster.removeClient(ws2);
expect(broadcaster.clientCount()).toBe(0); expect(broadcaster.clientCount()).toBe(0);
}); });
@@ -101,11 +151,8 @@ describe("createBroadcaster", () => {
const ws = client(); const ws = client();
const broadcaster = createBroadcaster(); const broadcaster = createBroadcaster();
broadcaster.addClient(ws as any); broadcaster.addClient(ws);
broadcaster.attachmentCreated({ broadcaster.attachmentCreated(attachmentRecord());
id: "a1",
message_id: "m1",
} as any);
expect(ws.send).toHaveBeenCalledTimes(1); expect(ws.send).toHaveBeenCalledTimes(1);
const payload = JSON.parse(ws.send.mock.calls[0][0]); const payload = JSON.parse(ws.send.mock.calls[0][0]);

View File

@@ -0,0 +1,156 @@
import {
afterAll,
beforeAll,
beforeEach,
describe,
expect,
it,
vi,
} from "vitest";
import {
closeDatabase,
getDatabase,
initializeDatabase,
} from "../../src/database/drizzle";
import { captureMessage } from "../../src/moderation/messageCapture";
import type { ModerationBroadcaster } from "../../src/moderation/types";
const queueMessageAnalysis = vi.fn();
type TestMessage = Parameters<typeof captureMessage>[0];
type ModerationTestGlobal = typeof globalThis & {
moderationBroadcaster?: Partial<ModerationBroadcaster>;
};
interface TestDatabase {
run(sql: string): void;
}
function getTestDatabase(): TestDatabase {
return getDatabase() as unknown as TestDatabase;
}
vi.mock("../../src/moderation/aiAnalyzer", () => ({
queueMessageAnalysis: (id: string) => queueMessageAnalysis(id),
}));
function createMessage(id = "message-1"): TestMessage {
return {
id,
guildId: "guild-1",
channelId: "channel-1",
author: {
id: "user-1",
username: "alice",
bot: false,
avatarURL: () => null,
},
content: "hello",
cleanContent: "hello",
createdTimestamp: 1_700_000_000_000,
attachments: new Map(),
stickers: new Map(),
embeds: [],
member: null,
reference: null,
channel: {
id: "channel-1",
name: "general",
isThread: () => false,
},
} as unknown as TestMessage;
}
async function createTables() {
const db = getTestDatabase();
db.run(`
CREATE TABLE IF NOT EXISTS "messages" (
"id" text PRIMARY KEY NOT NULL,
"guild_id" text NOT NULL,
"channel_id" text NOT NULL,
"thread_id" text,
"user_id" text NOT NULL,
"username" text NOT NULL,
"avatar_url" text,
"content" text NOT NULL,
"edited_content" text,
"created_at" integer NOT NULL,
"edited_at" integer,
"deleted_at" integer,
"type" text DEFAULT 'text' NOT NULL,
"metadata" text,
"ai_status" text DEFAULT 'pending' NOT NULL,
"ai_moderation_flags" text,
"ai_moderation_score" real,
"ai_moderation_raw" text,
"ai_analysis" text,
"ai_analyzed_at" integer,
"ai_error" text
)
`);
db.run(`
CREATE TABLE IF NOT EXISTS "attachments" (
"id" text PRIMARY KEY NOT NULL,
"message_id" text NOT NULL,
"guild_id" text NOT NULL,
"channel_id" text NOT NULL,
"thread_id" text,
"user_id" text NOT NULL,
"filename" text NOT NULL,
"size" integer NOT NULL,
"type" text NOT NULL,
"discord_url" text NOT NULL,
"uploaded_url" text,
"upload_status" text DEFAULT 'pending' NOT NULL,
"upload_error" text,
"created_at" integer NOT NULL,
"uploaded_at" integer
)
`);
}
describe("captureMessage", () => {
beforeAll(async () => {
await initializeDatabase();
await createTables();
});
beforeEach(async () => {
queueMessageAnalysis.mockClear();
const db = getTestDatabase();
db.run(`DELETE FROM "attachments"`);
db.run(`DELETE FROM "messages"`);
delete (globalThis as ModerationTestGlobal).moderationBroadcaster;
});
afterAll(async () => {
await closeDatabase();
});
it("does not requeue or rebroadcast a duplicate captured message", async () => {
const message = createMessage();
const messageCreated = vi.fn();
(globalThis as ModerationTestGlobal).moderationBroadcaster = {
messageCreated,
};
await captureMessage(message, "text");
await captureMessage(message, "text");
expect(queueMessageAnalysis).toHaveBeenCalledTimes(1);
expect(messageCreated).toHaveBeenCalledTimes(1);
});
it("does not queue or broadcast backlog captures one message at a time", async () => {
const message = createMessage("backlog-message-1");
const messageCreated = vi.fn();
(globalThis as ModerationTestGlobal).moderationBroadcaster = {
messageCreated,
};
await captureMessage(message, "text", { source: "backlog" });
expect(queueMessageAnalysis).not.toHaveBeenCalled();
expect(messageCreated).not.toHaveBeenCalled();
});
});

View File

@@ -16,6 +16,14 @@ import {
} from "../../src/moderation/messageStore"; } from "../../src/moderation/messageStore";
import type { MessageRecord } from "../../src/moderation/types"; import type { MessageRecord } from "../../src/moderation/types";
interface TestDatabase {
run(sql: string): void;
}
function getTestDatabase(): TestDatabase {
return getDatabase() as unknown as TestDatabase;
}
const logger = createChildLogger("messageStoreQueries.test"); const logger = createChildLogger("messageStoreQueries.test");
describe("message cursor helpers", () => { describe("message cursor helpers", () => {
@@ -36,7 +44,7 @@ describe("message query integration tests", () => {
beforeAll(async () => { beforeAll(async () => {
await initializeDatabase(); await initializeDatabase();
// Create tables using Drizzle schema (SQLite doesn't support migrations with PostgreSQL syntax) // Create tables using Drizzle schema (SQLite doesn't support migrations with PostgreSQL syntax)
const db = getDatabase() as any; const db = getTestDatabase();
try { try {
// Create messages table // Create messages table
await db.run(` await db.run(`
@@ -75,7 +83,7 @@ describe("message query integration tests", () => {
beforeEach(async () => { beforeEach(async () => {
// Clear messages table before each test // Clear messages table before each test
try { try {
const db = getDatabase() as any; const db = getTestDatabase();
await db.run(`DELETE FROM "messages"`); await db.run(`DELETE FROM "messages"`);
} catch (error) { } catch (error) {
logger.debug({ error }, "Could not clear messages table"); logger.debug({ error }, "Could not clear messages table");