Compare commits
2 Commits
958a6d7236
...
235c1120c2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
235c1120c2 | ||
|
|
930c399484 |
@@ -1,132 +0,0 @@
|
|||||||
# PostgreSQL Setup - Complete ✅
|
|
||||||
|
|
||||||
**Date:** 2026-05-14
|
|
||||||
**Status:** ✅ Production Ready with Neon PostgreSQL
|
|
||||||
|
|
||||||
## Summary
|
|
||||||
|
|
||||||
Bot Discord moderation telah berhasil dikonfigurasi untuk menggunakan **PostgreSQL** (Neon) sebagai database utama dengan Drizzle ORM.
|
|
||||||
|
|
||||||
## What Was Done
|
|
||||||
|
|
||||||
### 1. Database Connection Fixed
|
|
||||||
- ✅ Identified database name: `neondb` (bukan `dcbot`)
|
|
||||||
- ✅ Updated `.env` dengan DATABASE_URL yang benar
|
|
||||||
- ✅ Tested koneksi ke Neon PostgreSQL - berhasil
|
|
||||||
|
|
||||||
### 2. Drizzle ORM Updated
|
|
||||||
- ✅ Updated `src/database/drizzle.ts` untuk support DATABASE_URL
|
|
||||||
- ✅ Regenerated migrations untuk PostgreSQL syntax
|
|
||||||
- ✅ Ran migrations successfully: `pnpm run db:migrate:programmatic`
|
|
||||||
|
|
||||||
### 3. Bot Tested
|
|
||||||
- ✅ Bot startup dengan PostgreSQL - berhasil
|
|
||||||
- ✅ Database initialized dengan type: postgres
|
|
||||||
- ✅ Message capture working
|
|
||||||
- ✅ AI analysis worker started
|
|
||||||
- ✅ WebSocket server listening
|
|
||||||
|
|
||||||
## Current Configuration
|
|
||||||
|
|
||||||
```env
|
|
||||||
DATABASE_TYPE=postgres
|
|
||||||
DATABASE_URL=postgresql://neondb_owner:npg_2ziHMPwZCet9@ep-long-glitter-ao3sjoyu-pooler.c-2.ap-southeast-1.aws.neon.tech/neondb?sslmode=verify-full&channel_binding=require&connect_timeout=10
|
|
||||||
```
|
|
||||||
|
|
||||||
## Database Schema Created
|
|
||||||
|
|
||||||
✅ **Tables created in PostgreSQL:**
|
|
||||||
- `muxer_jobs` - Job queue untuk audio processing
|
|
||||||
- `messages` - Text messages dengan AI analysis
|
|
||||||
- `attachments` - File metadata dengan foreign key
|
|
||||||
- `ui_state` - Persistent UI state
|
|
||||||
- `__drizzle_migrations` - Migration tracking
|
|
||||||
|
|
||||||
## Commands Available
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# Start bot dengan PostgreSQL
|
|
||||||
pnpm run dev
|
|
||||||
|
|
||||||
# Generate migrations setelah schema changes
|
|
||||||
pnpm run db:generate
|
|
||||||
|
|
||||||
# Run migrations (programmatic - recommended)
|
|
||||||
pnpm run db:migrate:programmatic
|
|
||||||
|
|
||||||
# Run migrations (Drizzle Kit CLI)
|
|
||||||
pnpm run db:migrate
|
|
||||||
|
|
||||||
# Open Drizzle Studio untuk visual data management
|
|
||||||
pnpm run db:studio
|
|
||||||
```
|
|
||||||
|
|
||||||
## Verification
|
|
||||||
|
|
||||||
### Bot Startup Log
|
|
||||||
```
|
|
||||||
✅ PostgreSQL database initialized
|
|
||||||
✅ Database initialized (type: postgres)
|
|
||||||
✅ Bot logged in
|
|
||||||
✅ Message capture handlers registered
|
|
||||||
✅ AI analysis worker started
|
|
||||||
✅ WebSocket server listening on port 3000
|
|
||||||
✅ Web interface listening
|
|
||||||
✅ Message inserted (from Discord)
|
|
||||||
```
|
|
||||||
|
|
||||||
### Database Tables
|
|
||||||
```sql
|
|
||||||
SELECT table_name FROM information_schema.tables
|
|
||||||
WHERE table_schema = 'public';
|
|
||||||
|
|
||||||
-- Results:
|
|
||||||
-- muxer_jobs
|
|
||||||
-- messages
|
|
||||||
-- attachments
|
|
||||||
-- ui_state
|
|
||||||
-- __drizzle_migrations
|
|
||||||
```
|
|
||||||
|
|
||||||
## Commits
|
|
||||||
|
|
||||||
```
|
|
||||||
47ae7f8 chore: remove temporary test files
|
|
||||||
35269b5 feat: configure postgresql as primary database with neon connection
|
|
||||||
c63a614 docs: add comprehensive drizzle orm migration final summary
|
|
||||||
9889d20 feat: add programmatic migration runner for better PostgreSQL support
|
|
||||||
b580430 docs: add drizzle orm migration completion summary
|
|
||||||
b9d0a06 fix: update drizzle config to read env vars directly for CLI compatibility
|
|
||||||
b600dad fix: correct import ordering and update tests for drizzle-orm migration
|
|
||||||
50d4517 refactor: remove old database adapter files
|
|
||||||
9ff0f0b feat: update application initialization for drizzle
|
|
||||||
1c4b0af refactor: migrate messageStore to drizzle-orm
|
|
||||||
dfe3444 refactor: migrate muxer-queue to drizzle-orm
|
|
||||||
7e528a4 feat: create drizzle database client
|
|
||||||
4e28cf9 feat: add drizzle configuration and initial migrations
|
|
||||||
52b36c9 feat: create drizzle schema definitions
|
|
||||||
b833b6d feat: add drizzle-orm and drizzle-kit dependencies
|
|
||||||
```
|
|
||||||
|
|
||||||
## Key Features
|
|
||||||
|
|
||||||
✅ **Type-Safe Queries** - Full TypeScript support dengan Drizzle ORM
|
|
||||||
✅ **PostgreSQL Support** - Neon cloud database integration
|
|
||||||
✅ **Automatic Migrations** - Drizzle Kit generates migrations
|
|
||||||
✅ **Connection Pooling** - Configurable pool size
|
|
||||||
✅ **Production Ready** - All tests passing, zero errors
|
|
||||||
|
|
||||||
## Next Steps
|
|
||||||
|
|
||||||
1. **Monitor bot performance** dengan PostgreSQL
|
|
||||||
2. **Use Drizzle Studio** untuk visual data management: `pnpm run db:studio`
|
|
||||||
3. **For schema changes**: Update `src/database/schema.ts` → `pnpm run db:generate` → `pnpm run db:migrate:programmatic`
|
|
||||||
4. **Backup strategy** - Setup regular backups di Neon dashboard
|
|
||||||
|
|
||||||
## Status
|
|
||||||
|
|
||||||
🎉 **PostgreSQL migration complete and verified!**
|
|
||||||
|
|
||||||
Bot Discord moderation sekarang menggunakan PostgreSQL (Neon) sebagai database utama dengan Drizzle ORM untuk type-safe operations.
|
|
||||||
|
|
||||||
**Ready for production deployment!** ✅
|
|
||||||
@@ -14,9 +14,6 @@
|
|||||||
"style": {
|
"style": {
|
||||||
"noNonNullAssertion": "warn",
|
"noNonNullAssertion": "warn",
|
||||||
"useNodejsImportProtocol": "warn"
|
"useNodejsImportProtocol": "warn"
|
||||||
},
|
|
||||||
"suspicious": {
|
|
||||||
"noExplicitAny": "warn"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,19 @@ import type { DashboardEvent } from "./ws/client";
|
|||||||
import { MessageFeed } from "./components/messages/MessageFeed";
|
import { MessageFeed } from "./components/messages/MessageFeed";
|
||||||
import { ReviewPanel } from "./components/review/ReviewPanel";
|
import { ReviewPanel } from "./components/review/ReviewPanel";
|
||||||
|
|
||||||
|
function mergeMessages(
|
||||||
|
current: MessageRecord[],
|
||||||
|
incoming: MessageRecord[],
|
||||||
|
): MessageRecord[] {
|
||||||
|
const byId = new Map(current.map((message) => [message.id, message]));
|
||||||
|
for (const message of incoming) {
|
||||||
|
byId.set(message.id, { ...byId.get(message.id), ...message });
|
||||||
|
}
|
||||||
|
return Array.from(byId.values())
|
||||||
|
.sort((a, b) => b.created_at - a.created_at || b.id.localeCompare(a.id))
|
||||||
|
.slice(0, 200);
|
||||||
|
}
|
||||||
|
|
||||||
export default function App() {
|
export default function App() {
|
||||||
const [messages, setMessages] = useState<MessageRecord[]>([]);
|
const [messages, setMessages] = useState<MessageRecord[]>([]);
|
||||||
const [wsStatus, setWsStatus] = useState<string>("connecting");
|
const [wsStatus, setWsStatus] = useState<string>("connecting");
|
||||||
@@ -17,7 +30,7 @@ export default function App() {
|
|||||||
listMessages(new URLSearchParams({ limit: "30" }))
|
listMessages(new URLSearchParams({ limit: "30" }))
|
||||||
.then((result) => {
|
.then((result) => {
|
||||||
if (!cancelled) {
|
if (!cancelled) {
|
||||||
setMessages(result.data);
|
setMessages(mergeMessages([], result.data));
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.catch((err) => {
|
.catch((err) => {
|
||||||
@@ -29,20 +42,10 @@ export default function App() {
|
|||||||
const ws = connectDashboardSocket((event: DashboardEvent) => {
|
const ws = connectDashboardSocket((event: DashboardEvent) => {
|
||||||
switch (event.type) {
|
switch (event.type) {
|
||||||
case "message_created":
|
case "message_created":
|
||||||
setMessages((prev) => {
|
setMessages((prev) => mergeMessages(prev, [event.data]));
|
||||||
const existing = prev.some((message) => message.id === event.data.id);
|
|
||||||
if (existing) {
|
|
||||||
return prev.map((message) =>
|
|
||||||
message.id === event.data.id ? event.data : message,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
return [event.data, ...prev].slice(0, 200);
|
|
||||||
});
|
|
||||||
break;
|
break;
|
||||||
case "message_analyzed":
|
case "message_analyzed":
|
||||||
setMessages((prev) =>
|
setMessages((prev) => mergeMessages(prev, [event.data]));
|
||||||
prev.map((m) => (m.id === event.data.id ? event.data : m)),
|
|
||||||
);
|
|
||||||
break;
|
break;
|
||||||
case "message_updated":
|
case "message_updated":
|
||||||
setMessages((prev) =>
|
setMessages((prev) =>
|
||||||
|
|||||||
@@ -9,10 +9,22 @@ import {
|
|||||||
getPendingMessagesByConversation,
|
getPendingMessagesByConversation,
|
||||||
updateMessageAIAnalysis,
|
updateMessageAIAnalysis,
|
||||||
} from "./messageStore";
|
} from "./messageStore";
|
||||||
import type { AnalysisQueueStatus, MessageRecord } from "./types";
|
import type {
|
||||||
|
AnalysisQueueStatus,
|
||||||
|
MessageRecord,
|
||||||
|
ModerationBroadcaster,
|
||||||
|
} from "./types";
|
||||||
|
|
||||||
const logger = createChildLogger("ai-analyzer");
|
const logger = createChildLogger("ai-analyzer");
|
||||||
|
|
||||||
|
type ModerationGlobal = typeof globalThis & {
|
||||||
|
moderationBroadcaster?: ModerationBroadcaster;
|
||||||
|
};
|
||||||
|
|
||||||
|
function getModerationBroadcaster(): ModerationBroadcaster | undefined {
|
||||||
|
return (globalThis as ModerationGlobal).moderationBroadcaster;
|
||||||
|
}
|
||||||
|
|
||||||
// Debounce state per conversation key
|
// Debounce state per conversation key
|
||||||
const conversationDebounceTimers = new Map<string, NodeJS.Timeout>();
|
const conversationDebounceTimers = new Map<string, NodeJS.Timeout>();
|
||||||
// Track conversations currently being processed
|
// Track conversations currently being processed
|
||||||
@@ -117,7 +129,7 @@ async function processBatch(
|
|||||||
|
|
||||||
// Broadcast analyzed messages
|
// Broadcast analyzed messages
|
||||||
for (const row of analyzedRows) {
|
for (const row of analyzedRows) {
|
||||||
(globalThis as any).moderationBroadcaster?.messageAnalyzed(row);
|
getModerationBroadcaster()?.messageAnalyzed(row);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear error cooldown on success
|
// Clear error cooldown on success
|
||||||
@@ -147,7 +159,7 @@ async function processBatch(
|
|||||||
error: lastError,
|
error: lastError,
|
||||||
});
|
});
|
||||||
if (row) {
|
if (row) {
|
||||||
(globalThis as any).moderationBroadcaster?.messageAnalyzed(row);
|
getModerationBroadcaster()?.messageAnalyzed(row);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,12 +1,25 @@
|
|||||||
import type { Client, Message } from "discord.js-selfbot-v13";
|
import type { Channel, Client, Message } from "discord.js-selfbot-v13";
|
||||||
import { config } from "../config";
|
import { config } from "../config";
|
||||||
import { createChildLogger } from "../logger";
|
import { createChildLogger } from "../logger";
|
||||||
import { captureMessage } from "./messageCapture";
|
import { captureMessage } from "./messageCapture";
|
||||||
|
|
||||||
const logger = createChildLogger("backlog-sync");
|
const logger = createChildLogger("backlog-sync");
|
||||||
|
|
||||||
|
type BacklogChannel = Channel & {
|
||||||
|
messages: {
|
||||||
|
fetch(options: { limit: number; before?: string }): Promise<{
|
||||||
|
size: number;
|
||||||
|
values(): IterableIterator<Message>;
|
||||||
|
}>;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
function hasMessageBacklog(channel: Channel): channel is BacklogChannel {
|
||||||
|
return "messages" in channel;
|
||||||
|
}
|
||||||
|
|
||||||
async function syncChannelMessages(
|
async function syncChannelMessages(
|
||||||
channel: any,
|
channel: BacklogChannel,
|
||||||
cutoffTime: number,
|
cutoffTime: number,
|
||||||
): Promise<number> {
|
): Promise<number> {
|
||||||
let before: string | undefined;
|
let before: string | undefined;
|
||||||
@@ -29,7 +42,7 @@ async function syncChannelMessages(
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
await captureMessage(message, "text");
|
await captureMessage(message, "text", { source: "backlog" });
|
||||||
synced++;
|
synced++;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -77,6 +90,10 @@ export async function syncSelectedChannelBacklog(
|
|||||||
logger.warn({ guildId, channelId }, "Channel not found for backlog sync");
|
logger.warn({ guildId, channelId }, "Channel not found for backlog sync");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
if (!hasMessageBacklog(channel)) {
|
||||||
|
logger.warn({ guildId, channelId }, "Channel cannot fetch message backlog");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
const cutoffTime = Date.now() - config.BACKLOG_SYNC_HOURS * 60 * 60 * 1000;
|
const cutoffTime = Date.now() - config.BACKLOG_SYNC_HOURS * 60 * 60 * 1000;
|
||||||
logger.info(
|
logger.info(
|
||||||
@@ -85,7 +102,7 @@ export async function syncSelectedChannelBacklog(
|
|||||||
);
|
);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const count = await syncChannelMessages(channel as any, cutoffTime);
|
const count = await syncChannelMessages(channel, cutoffTime);
|
||||||
logger.info(
|
logger.info(
|
||||||
{ channelId, count },
|
{ channelId, count },
|
||||||
"Backlog sync completed for selected channel",
|
"Backlog sync completed for selected channel",
|
||||||
|
|||||||
@@ -7,11 +7,14 @@ import type {
|
|||||||
ModerationWsEvent,
|
ModerationWsEvent,
|
||||||
} from "./types";
|
} from "./types";
|
||||||
|
|
||||||
type ClientLike = Pick<WebSocket, "readyState" | "send">;
|
export type BroadcasterClient = Pick<WebSocket, "readyState" | "send">;
|
||||||
|
|
||||||
const log = createChildLogger("broadcaster");
|
const log = createChildLogger("broadcaster");
|
||||||
|
|
||||||
function sendJson(clients: Set<ClientLike>, event: ModerationWsEvent): void {
|
function sendJson(
|
||||||
|
clients: Set<BroadcasterClient>,
|
||||||
|
event: ModerationWsEvent,
|
||||||
|
): void {
|
||||||
const payload = JSON.stringify({ ...event, timestamp: Date.now() });
|
const payload = JSON.stringify({ ...event, timestamp: Date.now() });
|
||||||
for (const client of clients) {
|
for (const client of clients) {
|
||||||
if (client.readyState === 1) {
|
if (client.readyState === 1) {
|
||||||
@@ -28,14 +31,14 @@ function sendJson(clients: Set<ClientLike>, event: ModerationWsEvent): void {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export function createBroadcaster() {
|
export function createBroadcaster() {
|
||||||
const clients = new Set<ClientLike>();
|
const clients = new Set<BroadcasterClient>();
|
||||||
|
|
||||||
return {
|
return {
|
||||||
addClient(client: ClientLike) {
|
addClient(client: BroadcasterClient) {
|
||||||
clients.add(client);
|
clients.add(client);
|
||||||
log.debug({ clientCount: clients.size }, "Client added");
|
log.debug({ clientCount: clients.size }, "Client added");
|
||||||
},
|
},
|
||||||
removeClient(client: ClientLike) {
|
removeClient(client: BroadcasterClient) {
|
||||||
clients.delete(client);
|
clients.delete(client);
|
||||||
log.debug({ clientCount: clients.size }, "Client removed");
|
log.debug({ clientCount: clients.size }, "Client removed");
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -22,9 +22,18 @@ import type {
|
|||||||
|
|
||||||
const logger = createChildLogger("message-capture");
|
const logger = createChildLogger("message-capture");
|
||||||
|
|
||||||
|
type ModerationGlobal = typeof globalThis & {
|
||||||
|
moderationBroadcaster?: ModerationBroadcaster;
|
||||||
|
};
|
||||||
|
|
||||||
|
function getModerationBroadcaster(): ModerationBroadcaster | undefined {
|
||||||
|
return (globalThis as ModerationGlobal).moderationBroadcaster;
|
||||||
|
}
|
||||||
|
|
||||||
export async function captureMessage(
|
export async function captureMessage(
|
||||||
message: Message,
|
message: Message,
|
||||||
type: "text" | "edited" | "deleted",
|
type: "text" | "edited" | "deleted",
|
||||||
|
options: { source?: "live" | "backlog" } = {},
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const location = getMessageLocation(message);
|
const location = getMessageLocation(message);
|
||||||
const metadata = getMessageMetadata(message);
|
const metadata = getMessageMetadata(message);
|
||||||
@@ -46,17 +55,19 @@ export async function captureMessage(
|
|||||||
metadata: JSON.stringify(metadata),
|
metadata: JSON.stringify(metadata),
|
||||||
};
|
};
|
||||||
|
|
||||||
await upsertMessageForCapture(messageRecord);
|
const inserted = await upsertMessageForCapture(messageRecord);
|
||||||
queueMessageAnalysis(message.id);
|
if (!inserted) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const broadcaster = (globalThis as any).moderationBroadcaster as
|
const isBacklog = options.source === "backlog";
|
||||||
| ModerationBroadcaster
|
if (!isBacklog) {
|
||||||
| undefined;
|
queueMessageAnalysis(message.id);
|
||||||
if (broadcaster) {
|
}
|
||||||
broadcaster.messageCreated({
|
|
||||||
...messageRecord,
|
const broadcaster = getModerationBroadcaster();
|
||||||
type: "text",
|
if (broadcaster && !isBacklog) {
|
||||||
});
|
broadcaster.messageCreated(messageRecord);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (message.attachments.size > 0) {
|
if (message.attachments.size > 0) {
|
||||||
@@ -132,9 +143,7 @@ export function registerMessageCapture(client: Client): void {
|
|||||||
);
|
);
|
||||||
queueMessageAnalysis(newMessage.id);
|
queueMessageAnalysis(newMessage.id);
|
||||||
|
|
||||||
const broadcaster = (globalThis as any).moderationBroadcaster as
|
const broadcaster = getModerationBroadcaster();
|
||||||
| ModerationBroadcaster
|
|
||||||
| undefined;
|
|
||||||
if (broadcaster) {
|
if (broadcaster) {
|
||||||
broadcaster.messageUpdated({
|
broadcaster.messageUpdated({
|
||||||
id: newMessage.id,
|
id: newMessage.id,
|
||||||
@@ -164,9 +173,7 @@ export function registerMessageCapture(client: Client): void {
|
|||||||
const deletedAt = Date.now();
|
const deletedAt = Date.now();
|
||||||
await updateMessageAsDeleted(message.id, deletedAt);
|
await updateMessageAsDeleted(message.id, deletedAt);
|
||||||
|
|
||||||
const broadcaster = (globalThis as any).moderationBroadcaster as
|
const broadcaster = getModerationBroadcaster();
|
||||||
| ModerationBroadcaster
|
|
||||||
| undefined;
|
|
||||||
if (broadcaster) {
|
if (broadcaster) {
|
||||||
broadcaster.messageDeleted({
|
broadcaster.messageDeleted({
|
||||||
id: message.id,
|
id: message.id,
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { and, asc, desc, eq, isNull, or, sql } from "drizzle-orm";
|
import { and, asc, desc, eq, isNull, or, type SQL, sql } from "drizzle-orm";
|
||||||
import { getDatabase } from "../database/drizzle";
|
import { getDatabase } from "../database/drizzle";
|
||||||
import { attachmentsTable, messagesTable } from "../database/schema";
|
import { attachmentsTable, messagesTable } from "../database/schema";
|
||||||
import { createChildLogger } from "../logger";
|
import { createChildLogger } from "../logger";
|
||||||
@@ -11,6 +11,29 @@ import type {
|
|||||||
|
|
||||||
const logger = createChildLogger("message-store");
|
const logger = createChildLogger("message-store");
|
||||||
|
|
||||||
|
interface QueryBuilder<T = unknown> extends PromiseLike<T> {
|
||||||
|
from(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
where(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
orderBy(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
limit(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
offset(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
values(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
onConflictDoNothing(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
returning(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
set(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface MessageDatabase {
|
||||||
|
select<T = unknown[]>(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
selectDistinct<T = unknown[]>(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
insert<T = unknown>(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
update(...args: unknown[]): QueryBuilder<unknown>;
|
||||||
|
}
|
||||||
|
|
||||||
|
function db(): MessageDatabase {
|
||||||
|
return getDatabase() as unknown as MessageDatabase;
|
||||||
|
}
|
||||||
|
|
||||||
// Cursor helpers for pagination
|
// Cursor helpers for pagination
|
||||||
interface CursorData {
|
interface CursorData {
|
||||||
created_at: number;
|
created_at: number;
|
||||||
@@ -36,8 +59,8 @@ export function decodeCursor(cursor?: string): CursorData | null {
|
|||||||
|
|
||||||
export async function insertMessage(message: MessageRecord): Promise<void> {
|
export async function insertMessage(message: MessageRecord): Promise<void> {
|
||||||
try {
|
try {
|
||||||
const db = getDatabase() as any;
|
const database = db();
|
||||||
await db.insert(messagesTable).values(message).onConflictDoNothing();
|
await database.insert(messagesTable).values(message).onConflictDoNothing();
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
{ messageId: message.id, channelId: message.channel_id },
|
{ messageId: message.id, channelId: message.channel_id },
|
||||||
@@ -57,26 +80,26 @@ export async function insertMessage(message: MessageRecord): Promise<void> {
|
|||||||
|
|
||||||
export async function upsertMessageForCapture(
|
export async function upsertMessageForCapture(
|
||||||
message: MessageRecord,
|
message: MessageRecord,
|
||||||
): Promise<void> {
|
): Promise<boolean> {
|
||||||
try {
|
try {
|
||||||
const db = getDatabase() as any;
|
const database = db();
|
||||||
|
|
||||||
// Set ai_status to pending for new or recaptured/edited text
|
|
||||||
const messageWithAIStatus = {
|
const messageWithAIStatus = {
|
||||||
...message,
|
...message,
|
||||||
ai_status: "pending" as const,
|
ai_status: "pending" as const,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Try insert first (fast path for new messages)
|
const rows = await database
|
||||||
await db
|
.insert<Array<{ id: string }>>(messagesTable)
|
||||||
.insert(messagesTable)
|
|
||||||
.values(messageWithAIStatus)
|
.values(messageWithAIStatus)
|
||||||
.onConflictDoNothing();
|
.onConflictDoNothing()
|
||||||
|
.returning({ id: messagesTable.id });
|
||||||
|
|
||||||
|
const inserted = rows.length > 0;
|
||||||
logger.debug(
|
logger.debug(
|
||||||
{ messageId: message.id, channelId: message.channel_id },
|
{ messageId: message.id, channelId: message.channel_id, inserted },
|
||||||
"Message upserted for capture",
|
inserted ? "Message inserted for capture" : "Message already captured",
|
||||||
);
|
);
|
||||||
|
return inserted;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(
|
logger.error(
|
||||||
{
|
{
|
||||||
@@ -95,8 +118,8 @@ export async function updateMessageAsEdited(
|
|||||||
editedAt: number,
|
editedAt: number,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
try {
|
try {
|
||||||
const db = getDatabase() as any;
|
const database = db();
|
||||||
await db
|
await database
|
||||||
.update(messagesTable)
|
.update(messagesTable)
|
||||||
.set({
|
.set({
|
||||||
edited_content: editedContent,
|
edited_content: editedContent,
|
||||||
@@ -130,8 +153,8 @@ export async function updateMessageAsDeleted(
|
|||||||
deletedAt: number,
|
deletedAt: number,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
try {
|
try {
|
||||||
const db = getDatabase() as any;
|
const database = db();
|
||||||
await db
|
await database
|
||||||
.update(messagesTable)
|
.update(messagesTable)
|
||||||
.set({
|
.set({
|
||||||
deleted_at: deletedAt,
|
deleted_at: deletedAt,
|
||||||
@@ -158,8 +181,8 @@ export async function getMessagesByChannel(
|
|||||||
offset: number = 0,
|
offset: number = 0,
|
||||||
): Promise<MessageRecord[]> {
|
): Promise<MessageRecord[]> {
|
||||||
try {
|
try {
|
||||||
const db = getDatabase() as any;
|
const database = db();
|
||||||
const rows = await db
|
const rows = await database
|
||||||
.select()
|
.select()
|
||||||
.from(messagesTable)
|
.from(messagesTable)
|
||||||
.where(
|
.where(
|
||||||
@@ -189,8 +212,11 @@ export async function insertAttachment(
|
|||||||
attachment: AttachmentRecord,
|
attachment: AttachmentRecord,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
try {
|
try {
|
||||||
const db = getDatabase() as any;
|
const database = db();
|
||||||
await db.insert(attachmentsTable).values(attachment).onConflictDoNothing();
|
await database
|
||||||
|
.insert(attachmentsTable)
|
||||||
|
.values(attachment)
|
||||||
|
.onConflictDoNothing();
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
{ attachmentId: attachment.id, messageId: attachment.message_id },
|
{ attachmentId: attachment.id, messageId: attachment.message_id },
|
||||||
@@ -214,8 +240,8 @@ export async function getAttachmentsByChannel(
|
|||||||
offset: number = 0,
|
offset: number = 0,
|
||||||
): Promise<AttachmentRecord[]> {
|
): Promise<AttachmentRecord[]> {
|
||||||
try {
|
try {
|
||||||
const db = getDatabase() as any;
|
const database = db();
|
||||||
const rows = await db
|
const rows = await database
|
||||||
.select()
|
.select()
|
||||||
.from(attachmentsTable)
|
.from(attachmentsTable)
|
||||||
.where(
|
.where(
|
||||||
@@ -247,8 +273,8 @@ export async function updateAttachmentAsUploaded(
|
|||||||
uploadedAt: number,
|
uploadedAt: number,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
try {
|
try {
|
||||||
const db = getDatabase() as any;
|
const database = db();
|
||||||
await db
|
await database
|
||||||
.update(attachmentsTable)
|
.update(attachmentsTable)
|
||||||
.set({
|
.set({
|
||||||
uploaded_url: uploadedUrl,
|
uploaded_url: uploadedUrl,
|
||||||
@@ -278,8 +304,8 @@ export async function updateAttachmentAsFailedUpload(
|
|||||||
error: string,
|
error: string,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
try {
|
try {
|
||||||
const db = getDatabase() as any;
|
const database = db();
|
||||||
await db
|
await database
|
||||||
.update(attachmentsTable)
|
.update(attachmentsTable)
|
||||||
.set({
|
.set({
|
||||||
upload_status: "failed",
|
upload_status: "failed",
|
||||||
@@ -315,8 +341,8 @@ export async function updateMessageAIAnalysis(
|
|||||||
result: AIAnalysisUpdate,
|
result: AIAnalysisUpdate,
|
||||||
): Promise<MessageRecord | null> {
|
): Promise<MessageRecord | null> {
|
||||||
try {
|
try {
|
||||||
const db = getDatabase() as any;
|
const database = db();
|
||||||
await db
|
await database
|
||||||
.update(messagesTable)
|
.update(messagesTable)
|
||||||
.set({
|
.set({
|
||||||
ai_status: result.status,
|
ai_status: result.status,
|
||||||
@@ -329,7 +355,7 @@ export async function updateMessageAIAnalysis(
|
|||||||
})
|
})
|
||||||
.where(eq(messagesTable.id, messageId));
|
.where(eq(messagesTable.id, messageId));
|
||||||
|
|
||||||
const rows = await db
|
const rows = await database
|
||||||
.select()
|
.select()
|
||||||
.from(messagesTable)
|
.from(messagesTable)
|
||||||
.where(eq(messagesTable.id, messageId));
|
.where(eq(messagesTable.id, messageId));
|
||||||
@@ -351,8 +377,8 @@ export async function getPendingAIAnalysisMessages(
|
|||||||
limit: number = 25,
|
limit: number = 25,
|
||||||
): Promise<MessageRecord[]> {
|
): Promise<MessageRecord[]> {
|
||||||
try {
|
try {
|
||||||
const db = getDatabase() as any;
|
const database = db();
|
||||||
const rows = await db
|
const rows = await database
|
||||||
.select()
|
.select()
|
||||||
.from(messagesTable)
|
.from(messagesTable)
|
||||||
.where(
|
.where(
|
||||||
@@ -378,8 +404,8 @@ export async function getMessageById(
|
|||||||
messageId: string,
|
messageId: string,
|
||||||
): Promise<MessageRecord | null> {
|
): Promise<MessageRecord | null> {
|
||||||
try {
|
try {
|
||||||
const db = getDatabase() as any;
|
const database = db();
|
||||||
const rows = await db
|
const rows = await database
|
||||||
.select()
|
.select()
|
||||||
.from(messagesTable)
|
.from(messagesTable)
|
||||||
.where(eq(messagesTable.id, messageId));
|
.where(eq(messagesTable.id, messageId));
|
||||||
@@ -401,8 +427,8 @@ export async function listMessages(
|
|||||||
query: MessageQuery,
|
query: MessageQuery,
|
||||||
): Promise<PageResult<MessageRecord>> {
|
): Promise<PageResult<MessageRecord>> {
|
||||||
try {
|
try {
|
||||||
const db = getDatabase() as any;
|
const database = db();
|
||||||
const conditions: any[] = [];
|
const conditions: SQL[] = [];
|
||||||
|
|
||||||
// Apply filters
|
// Apply filters
|
||||||
if (query.guildId) {
|
if (query.guildId) {
|
||||||
@@ -411,10 +437,7 @@ export async function listMessages(
|
|||||||
|
|
||||||
if (query.channelId) {
|
if (query.channelId) {
|
||||||
conditions.push(
|
conditions.push(
|
||||||
or(
|
sql`(${messagesTable.channel_id} = ${query.channelId} or ${messagesTable.thread_id} = ${query.channelId})`,
|
||||||
eq(messagesTable.channel_id, query.channelId),
|
|
||||||
eq(messagesTable.thread_id, query.channelId),
|
|
||||||
),
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -427,11 +450,7 @@ export async function listMessages(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (query.status && query.status.length > 0) {
|
if (query.status && query.status.length > 0) {
|
||||||
conditions.push(
|
conditions.push(sql`${messagesTable.ai_status} in ${query.status}`);
|
||||||
or(
|
|
||||||
...query.status.map((status) => eq(messagesTable.ai_status, status)),
|
|
||||||
),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Text search
|
// Text search
|
||||||
@@ -445,20 +464,14 @@ export async function listMessages(
|
|||||||
const cursorData = decodeCursor(query.cursor);
|
const cursorData = decodeCursor(query.cursor);
|
||||||
if (cursorData) {
|
if (cursorData) {
|
||||||
conditions.push(
|
conditions.push(
|
||||||
or(
|
sql`(${messagesTable.created_at} < ${cursorData.created_at} or (${messagesTable.created_at} = ${cursorData.created_at} and ${messagesTable.id} < ${cursorData.id}))`,
|
||||||
sql`${messagesTable.created_at} < ${cursorData.created_at}`,
|
|
||||||
and(
|
|
||||||
eq(messagesTable.created_at, cursorData.created_at),
|
|
||||||
sql`${messagesTable.id} < ${cursorData.id}`,
|
|
||||||
),
|
|
||||||
),
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch limit + 1 to determine if there's a next page
|
// Fetch limit + 1 to determine if there's a next page
|
||||||
const fetchLimit = query.limit + 1;
|
const fetchLimit = query.limit + 1;
|
||||||
const rows = await db
|
const rows = await database
|
||||||
.select()
|
.select()
|
||||||
.from(messagesTable)
|
.from(messagesTable)
|
||||||
.where(conditions.length > 0 ? and(...conditions) : undefined)
|
.where(conditions.length > 0 ? and(...conditions) : undefined)
|
||||||
@@ -506,7 +519,7 @@ export async function getConversationContextBefore(input: {
|
|||||||
limit: number;
|
limit: number;
|
||||||
}): Promise<MessageRecord[]> {
|
}): Promise<MessageRecord[]> {
|
||||||
try {
|
try {
|
||||||
const db = getDatabase() as any;
|
const database = db();
|
||||||
const { channelId, threadId, beforeCreatedAt, limit } = input;
|
const { channelId, threadId, beforeCreatedAt, limit } = input;
|
||||||
|
|
||||||
// Query same thread if threadId exists, otherwise channelId
|
// Query same thread if threadId exists, otherwise channelId
|
||||||
@@ -514,7 +527,7 @@ export async function getConversationContextBefore(input: {
|
|||||||
? eq(messagesTable.thread_id, threadId)
|
? eq(messagesTable.thread_id, threadId)
|
||||||
: eq(messagesTable.channel_id, channelId);
|
: eq(messagesTable.channel_id, channelId);
|
||||||
|
|
||||||
const rows = await db
|
const rows = await database
|
||||||
.select()
|
.select()
|
||||||
.from(messagesTable)
|
.from(messagesTable)
|
||||||
.where(
|
.where(
|
||||||
@@ -547,11 +560,11 @@ export async function getPendingMessagesByConversation(
|
|||||||
limit: number = 25,
|
limit: number = 25,
|
||||||
): Promise<MessageRecord[]> {
|
): Promise<MessageRecord[]> {
|
||||||
try {
|
try {
|
||||||
const db = getDatabase() as any;
|
const database = db();
|
||||||
|
|
||||||
// conversationKey is either thread_id or channel_id
|
// conversationKey is either thread_id or channel_id
|
||||||
// Query both to safely handle the key
|
// Query both to safely handle the key
|
||||||
const rows = await db
|
const rows = await database
|
||||||
.select()
|
.select()
|
||||||
.from(messagesTable)
|
.from(messagesTable)
|
||||||
.where(
|
.where(
|
||||||
@@ -584,11 +597,11 @@ export async function getPendingConversationKeys(
|
|||||||
limit: number = 100,
|
limit: number = 100,
|
||||||
): Promise<string[]> {
|
): Promise<string[]> {
|
||||||
try {
|
try {
|
||||||
const db = getDatabase() as any;
|
const database = db();
|
||||||
|
|
||||||
// Get distinct conversation keys (thread_id or channel_id) for pending messages
|
// Get distinct conversation keys (thread_id or channel_id) for pending messages
|
||||||
const rows = await db
|
const rows = await database
|
||||||
.selectDistinct({
|
.selectDistinct<Array<{ thread_id: string | null; channel_id: string }>>({
|
||||||
thread_id: messagesTable.thread_id,
|
thread_id: messagesTable.thread_id,
|
||||||
channel_id: messagesTable.channel_id,
|
channel_id: messagesTable.channel_id,
|
||||||
})
|
})
|
||||||
@@ -602,7 +615,7 @@ export async function getPendingConversationKeys(
|
|||||||
.limit(limit);
|
.limit(limit);
|
||||||
|
|
||||||
const keys: string[] = [];
|
const keys: string[] = [];
|
||||||
for (const row of rows as any[]) {
|
for (const row of rows) {
|
||||||
const key = row.thread_id || row.channel_id;
|
const key = row.thread_id || row.channel_id;
|
||||||
if (key && !keys.includes(key)) {
|
if (key && !keys.includes(key)) {
|
||||||
keys.push(key);
|
keys.push(key);
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
import type { ModerationBroadcaster } from "./broadcaster";
|
import type { BroadcasterClient, ModerationBroadcaster } from "./broadcaster";
|
||||||
|
|
||||||
export type AIStatus = "pending" | "clean" | "warn" | "flagged" | "error";
|
export type AIStatus = "pending" | "clean" | "warn" | "flagged" | "error";
|
||||||
|
|
||||||
export type { ModerationBroadcaster };
|
export type { BroadcasterClient, ModerationBroadcaster };
|
||||||
|
|
||||||
export interface MessageRecord {
|
export interface MessageRecord {
|
||||||
id: string;
|
id: string;
|
||||||
|
|||||||
@@ -8,8 +8,28 @@ import { createChildLogger } from "./logger";
|
|||||||
|
|
||||||
const logger = createChildLogger("muxer-queue");
|
const logger = createChildLogger("muxer-queue");
|
||||||
|
|
||||||
// Type alias for backward compatibility
|
interface QueryBuilder<T = unknown> extends PromiseLike<T> {
|
||||||
export type SqliteDatabase = any;
|
from(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
where(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
orderBy(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
limit(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
values(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
onConflictDoNothing(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
onConflictDoUpdate(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
set(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
groupBy(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface SqliteDatabase {
|
||||||
|
select<T = unknown[]>(...args: unknown[]): QueryBuilder<T>;
|
||||||
|
insert(...args: unknown[]): QueryBuilder<unknown>;
|
||||||
|
update(...args: unknown[]): QueryBuilder<unknown>;
|
||||||
|
delete(...args: unknown[]): QueryBuilder<unknown>;
|
||||||
|
}
|
||||||
|
|
||||||
|
function db(): SqliteDatabase {
|
||||||
|
return getDrizzleDatabase() as unknown as SqliteDatabase;
|
||||||
|
}
|
||||||
|
|
||||||
export interface MuxerJobData {
|
export interface MuxerJobData {
|
||||||
userId: string;
|
userId: string;
|
||||||
@@ -18,6 +38,22 @@ export interface MuxerJobData {
|
|||||||
outputDir: string;
|
outputDir: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface StoredJobRow {
|
||||||
|
id: string;
|
||||||
|
data: string;
|
||||||
|
status: "pending" | "processing" | "completed" | "failed";
|
||||||
|
attempts: number;
|
||||||
|
maxAttempts: number;
|
||||||
|
createdAt: number;
|
||||||
|
updatedAt: number;
|
||||||
|
error: string | null;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface JobStatsRow {
|
||||||
|
status: "pending" | "processing" | "completed" | "failed";
|
||||||
|
count: number | string | { count: number | string };
|
||||||
|
}
|
||||||
|
|
||||||
interface StoredJob {
|
interface StoredJob {
|
||||||
id: string;
|
id: string;
|
||||||
data: string;
|
data: string;
|
||||||
@@ -31,7 +67,7 @@ interface StoredJob {
|
|||||||
|
|
||||||
// Export getDatabase for backward compatibility with webserver.ts
|
// Export getDatabase for backward compatibility with webserver.ts
|
||||||
export function getDatabase(): SqliteDatabase {
|
export function getDatabase(): SqliteDatabase {
|
||||||
return getDrizzleDatabase() as any;
|
return db();
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getPersistedValue<T>(
|
export async function getPersistedValue<T>(
|
||||||
@@ -39,10 +75,10 @@ export async function getPersistedValue<T>(
|
|||||||
fallback: T,
|
fallback: T,
|
||||||
): Promise<T> {
|
): Promise<T> {
|
||||||
await initializeDatabase();
|
await initializeDatabase();
|
||||||
const db = getDrizzleDatabase() as any;
|
const database = db();
|
||||||
|
|
||||||
const row = await db
|
const row = await database
|
||||||
.select()
|
.select<Array<{ value: string }>>()
|
||||||
.from(uiStateTable)
|
.from(uiStateTable)
|
||||||
.where(eq(uiStateTable.key, key))
|
.where(eq(uiStateTable.key, key))
|
||||||
.limit(1);
|
.limit(1);
|
||||||
@@ -61,9 +97,9 @@ export async function setPersistedValue(
|
|||||||
value: unknown,
|
value: unknown,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
await initializeDatabase();
|
await initializeDatabase();
|
||||||
const db = getDrizzleDatabase() as any;
|
const database = db();
|
||||||
|
|
||||||
await db
|
await database
|
||||||
.insert(uiStateTable)
|
.insert(uiStateTable)
|
||||||
.values({
|
.values({
|
||||||
key,
|
key,
|
||||||
@@ -82,12 +118,12 @@ export async function setPersistedValue(
|
|||||||
export async function enqueueMuxerJob(data: MuxerJobData): Promise<string> {
|
export async function enqueueMuxerJob(data: MuxerJobData): Promise<string> {
|
||||||
try {
|
try {
|
||||||
await initializeDatabase();
|
await initializeDatabase();
|
||||||
const db = getDrizzleDatabase() as any;
|
const database = db();
|
||||||
|
|
||||||
const jobId = `${data.userId}-${data.sessionId}`;
|
const jobId = `${data.userId}-${data.sessionId}`;
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
|
|
||||||
await db
|
await database
|
||||||
.insert(muxerJobsTable)
|
.insert(muxerJobsTable)
|
||||||
.values({
|
.values({
|
||||||
id: jobId,
|
id: jobId,
|
||||||
@@ -120,16 +156,16 @@ export async function enqueueMuxerJob(data: MuxerJobData): Promise<string> {
|
|||||||
|
|
||||||
export async function getPendingJobs(): Promise<StoredJob[]> {
|
export async function getPendingJobs(): Promise<StoredJob[]> {
|
||||||
await initializeDatabase();
|
await initializeDatabase();
|
||||||
const db = getDrizzleDatabase() as any;
|
const database = db();
|
||||||
|
|
||||||
const rows = await db
|
const rows = await database
|
||||||
.select()
|
.select<StoredJobRow[]>()
|
||||||
.from(muxerJobsTable)
|
.from(muxerJobsTable)
|
||||||
.where(eq(muxerJobsTable.status, "pending"))
|
.where(eq(muxerJobsTable.status, "pending"))
|
||||||
.orderBy(asc(muxerJobsTable.createdAt))
|
.orderBy(asc(muxerJobsTable.createdAt))
|
||||||
.limit(10);
|
.limit(10);
|
||||||
|
|
||||||
return rows.map((row: any) => ({
|
return rows.map((row) => ({
|
||||||
id: row.id,
|
id: row.id,
|
||||||
data: row.data,
|
data: row.data,
|
||||||
status: row.status as "pending" | "processing" | "completed" | "failed",
|
status: row.status as "pending" | "processing" | "completed" | "failed",
|
||||||
@@ -147,11 +183,11 @@ export async function updateJobStatus(
|
|||||||
error?: string,
|
error?: string,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
await initializeDatabase();
|
await initializeDatabase();
|
||||||
const db = getDrizzleDatabase() as any;
|
const database = db();
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
|
|
||||||
if (status === "failed") {
|
if (status === "failed") {
|
||||||
await db
|
await database
|
||||||
.update(muxerJobsTable)
|
.update(muxerJobsTable)
|
||||||
.set({
|
.set({
|
||||||
status,
|
status,
|
||||||
@@ -161,7 +197,7 @@ export async function updateJobStatus(
|
|||||||
})
|
})
|
||||||
.where(eq(muxerJobsTable.id, jobId));
|
.where(eq(muxerJobsTable.id, jobId));
|
||||||
} else {
|
} else {
|
||||||
await db
|
await database
|
||||||
.update(muxerJobsTable)
|
.update(muxerJobsTable)
|
||||||
.set({
|
.set({
|
||||||
status,
|
status,
|
||||||
@@ -175,10 +211,10 @@ export async function updateJobStatus(
|
|||||||
|
|
||||||
export async function retryFailedJob(jobId: string): Promise<boolean> {
|
export async function retryFailedJob(jobId: string): Promise<boolean> {
|
||||||
await initializeDatabase();
|
await initializeDatabase();
|
||||||
const db = getDrizzleDatabase() as any;
|
const database = db();
|
||||||
|
|
||||||
const jobs = await db
|
const jobs = await database
|
||||||
.select()
|
.select<StoredJobRow[]>()
|
||||||
.from(muxerJobsTable)
|
.from(muxerJobsTable)
|
||||||
.where(eq(muxerJobsTable.id, jobId))
|
.where(eq(muxerJobsTable.id, jobId))
|
||||||
.limit(1);
|
.limit(1);
|
||||||
@@ -198,7 +234,7 @@ export async function retryFailedJob(jobId: string): Promise<boolean> {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
await db
|
await database
|
||||||
.update(muxerJobsTable)
|
.update(muxerJobsTable)
|
||||||
.set({
|
.set({
|
||||||
status: "pending",
|
status: "pending",
|
||||||
@@ -215,10 +251,10 @@ export async function cleanupCompletedJobs(
|
|||||||
olderThanMs: number = 24 * 60 * 60 * 1000,
|
olderThanMs: number = 24 * 60 * 60 * 1000,
|
||||||
): Promise<number> {
|
): Promise<number> {
|
||||||
await initializeDatabase();
|
await initializeDatabase();
|
||||||
const db = getDrizzleDatabase() as any;
|
const database = db();
|
||||||
const cutoffTime = Date.now() - olderThanMs;
|
const cutoffTime = Date.now() - olderThanMs;
|
||||||
|
|
||||||
const result = await db
|
const result = await database
|
||||||
.delete(muxerJobsTable)
|
.delete(muxerJobsTable)
|
||||||
.where(
|
.where(
|
||||||
and(
|
and(
|
||||||
@@ -228,8 +264,8 @@ export async function cleanupCompletedJobs(
|
|||||||
);
|
);
|
||||||
|
|
||||||
const deletedCount =
|
const deletedCount =
|
||||||
typeof result === "object" && "rowsAffected" in result
|
typeof result === "object" && result !== null && "rowsAffected" in result
|
||||||
? result.rowsAffected
|
? Number(result.rowsAffected)
|
||||||
: 0;
|
: 0;
|
||||||
|
|
||||||
logger.info({ deletedCount }, "Cleaned up completed jobs");
|
logger.info({ deletedCount }, "Cleaned up completed jobs");
|
||||||
@@ -244,10 +280,10 @@ export async function getJobStats(): Promise<{
|
|||||||
failed: number;
|
failed: number;
|
||||||
}> {
|
}> {
|
||||||
await initializeDatabase();
|
await initializeDatabase();
|
||||||
const db = getDrizzleDatabase() as any;
|
const database = db();
|
||||||
|
|
||||||
const rows = await db
|
const rows = await database
|
||||||
.select({
|
.select<JobStatsRow[]>({
|
||||||
status: muxerJobsTable.status,
|
status: muxerJobsTable.status,
|
||||||
count: sql<number>`COUNT(*)`,
|
count: sql<number>`COUNT(*)`,
|
||||||
})
|
})
|
||||||
@@ -264,7 +300,7 @@ export async function getJobStats(): Promise<{
|
|||||||
for (const row of rows) {
|
for (const row of rows) {
|
||||||
const count =
|
const count =
|
||||||
typeof row.count === "object" && "count" in row.count
|
typeof row.count === "object" && "count" in row.count
|
||||||
? (row.count as any).count
|
? Number((row.count as { count: number | string }).count)
|
||||||
: Number(row.count);
|
: Number(row.count);
|
||||||
if (row.status === "pending") stats.pending = count;
|
if (row.status === "pending") stats.pending = count;
|
||||||
else if (row.status === "processing") stats.processing = count;
|
else if (row.status === "processing") stats.processing = count;
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import fs from "node:fs";
|
import fs from "node:fs";
|
||||||
import path from "node:path";
|
import path from "node:path";
|
||||||
import {
|
import {
|
||||||
|
type DiscordGatewayAdapterCreator,
|
||||||
EndBehaviorType,
|
EndBehaviorType,
|
||||||
entersState,
|
entersState,
|
||||||
getVoiceConnection,
|
getVoiceConnection,
|
||||||
@@ -41,7 +42,8 @@ export async function startRecording(
|
|||||||
const connection = joinVoiceChannel({
|
const connection = joinVoiceChannel({
|
||||||
channelId: channel.id,
|
channelId: channel.id,
|
||||||
guildId: channel.guild.id,
|
guildId: channel.guild.id,
|
||||||
adapterCreator: channel.guild.voiceAdapterCreator as any,
|
adapterCreator: channel.guild
|
||||||
|
.voiceAdapterCreator as DiscordGatewayAdapterCreator,
|
||||||
selfDeaf: false,
|
selfDeaf: false,
|
||||||
selfMute: false,
|
selfMute: false,
|
||||||
debug: true,
|
debug: true,
|
||||||
|
|||||||
@@ -88,13 +88,16 @@ export class VoiceController {
|
|||||||
await guild.channels.fetch().catch(() => null);
|
await guild.channels.fetch().catch(() => null);
|
||||||
|
|
||||||
const threads: ChannelSummary[] = [];
|
const threads: ChannelSummary[] = [];
|
||||||
|
type ThreadFetchResult = {
|
||||||
|
threads: Map<string, { id: string; name: string; type: string }>;
|
||||||
|
};
|
||||||
for (const channel of guild.channels.cache.values()) {
|
for (const channel of guild.channels.cache.values()) {
|
||||||
const threadParent = channel as typeof channel & {
|
const threadParent = channel as typeof channel & {
|
||||||
threads?: {
|
threads?: {
|
||||||
fetch: (options: {
|
fetch: (options: {
|
||||||
archived: boolean;
|
archived: boolean;
|
||||||
limit: number;
|
limit: number;
|
||||||
}) => Promise<any>;
|
}) => Promise<ThreadFetchResult>;
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
if (!threadParent.threads?.fetch) continue;
|
if (!threadParent.threads?.fetch) continue;
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import { AppError } from "./errors";
|
|||||||
import { createChildLogger, logger } from "./logger";
|
import { createChildLogger, logger } from "./logger";
|
||||||
import { getMetrics, uptimeGauge } from "./metrics";
|
import { getMetrics, uptimeGauge } from "./metrics";
|
||||||
import { createBroadcaster } from "./moderation/broadcaster";
|
import { createBroadcaster } from "./moderation/broadcaster";
|
||||||
|
import type { ModerationBroadcaster } from "./moderation/types";
|
||||||
import { getPersistedValue, setPersistedValue } from "./muxer-queue";
|
import { getPersistedValue, setPersistedValue } from "./muxer-queue";
|
||||||
import { discordPlayer } from "./player";
|
import { discordPlayer } from "./player";
|
||||||
import { createAnalysisRoutes } from "./routes/analysisRoutes";
|
import { createAnalysisRoutes } from "./routes/analysisRoutes";
|
||||||
@@ -26,6 +27,15 @@ const activeUsers = new Map<
|
|||||||
{ username: string; avatar: string; speaking: boolean }
|
{ username: string; avatar: string; speaking: boolean }
|
||||||
>();
|
>();
|
||||||
|
|
||||||
|
type VoiceGlobals = typeof globalThis & {
|
||||||
|
moderationBroadcaster?: ModerationBroadcaster;
|
||||||
|
broadcastPcmToWeb?: (chunk: Buffer, userId: string) => void;
|
||||||
|
updateActiveUser?: (
|
||||||
|
userId: string,
|
||||||
|
data: { username: string; avatar: string; speaking: boolean },
|
||||||
|
) => void;
|
||||||
|
};
|
||||||
|
|
||||||
interface SharedUIState {
|
interface SharedUIState {
|
||||||
selectedGuild: string;
|
selectedGuild: string;
|
||||||
selectedVoiceChannel: string;
|
selectedVoiceChannel: string;
|
||||||
@@ -118,7 +128,7 @@ export async function startWebserver(
|
|||||||
|
|
||||||
// Create broadcaster instance
|
// Create broadcaster instance
|
||||||
const broadcaster = createBroadcaster();
|
const broadcaster = createBroadcaster();
|
||||||
(globalThis as any).moderationBroadcaster = broadcaster;
|
(globalThis as VoiceGlobals).moderationBroadcaster = broadcaster;
|
||||||
|
|
||||||
// Security headers. CSP disabled because the current static UI uses inline scripts/styles.
|
// Security headers. CSP disabled because the current static UI uses inline scripts/styles.
|
||||||
app.use(
|
app.use(
|
||||||
@@ -196,7 +206,10 @@ export async function startWebserver(
|
|||||||
app.use("/api", createSyncRoutes(_client));
|
app.use("/api", createSyncRoutes(_client));
|
||||||
|
|
||||||
// Inbound: Discord PCM → tagged chunks → browser
|
// Inbound: Discord PCM → tagged chunks → browser
|
||||||
(global as any).broadcastPcmToWeb = (chunk: Buffer, userId: string) => {
|
(globalThis as VoiceGlobals).broadcastPcmToWeb = (
|
||||||
|
chunk: Buffer,
|
||||||
|
userId: string,
|
||||||
|
) => {
|
||||||
let hash = 0;
|
let hash = 0;
|
||||||
for (let i = 0; i < userId.length; i++) {
|
for (let i = 0; i < userId.length; i++) {
|
||||||
hash = (hash << 5) - hash + userId.charCodeAt(i);
|
hash = (hash << 5) - hash + userId.charCodeAt(i);
|
||||||
@@ -210,7 +223,7 @@ export async function startWebserver(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
(global as any).updateActiveUser = (
|
(globalThis as VoiceGlobals).updateActiveUser = (
|
||||||
userId: string,
|
userId: string,
|
||||||
data: { username: string; avatar: string; speaking: boolean },
|
data: { username: string; avatar: string; speaking: boolean },
|
||||||
) => {
|
) => {
|
||||||
@@ -327,7 +340,7 @@ export async function startWebserver(
|
|||||||
);
|
);
|
||||||
ws.send(JSON.stringify({ type: "ui_state", state: getSharedUIState() }));
|
ws.send(JSON.stringify({ type: "ui_state", state: getSharedUIState() }));
|
||||||
|
|
||||||
ws.on("message", (data: any) => {
|
ws.on("message", (data: Buffer | ArrayBuffer | Buffer[]) => {
|
||||||
if (!Buffer.isBuffer(data)) return;
|
if (!Buffer.isBuffer(data)) return;
|
||||||
lastBrowserAudioTime = Date.now();
|
lastBrowserAudioTime = Date.now();
|
||||||
|
|
||||||
|
|||||||
@@ -4,9 +4,9 @@ import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
|
|||||||
const originalEnv = process.env;
|
const originalEnv = process.env;
|
||||||
|
|
||||||
describe("Drizzle ORM Database", () => {
|
describe("Drizzle ORM Database", () => {
|
||||||
let config: any;
|
let config: typeof import("../src/config").config;
|
||||||
let drizzle: any;
|
let drizzle: typeof import("../src/database/drizzle");
|
||||||
let logger: any;
|
let logger: ReturnType<typeof import("../src/logger").createChildLogger>;
|
||||||
|
|
||||||
beforeAll(async () => {
|
beforeAll(async () => {
|
||||||
// Set up environment for config loading
|
// Set up environment for config loading
|
||||||
|
|||||||
@@ -1,17 +1,70 @@
|
|||||||
|
import type { Mock } from "vitest";
|
||||||
import { describe, expect, it, vi } from "vitest";
|
import { describe, expect, it, vi } from "vitest";
|
||||||
import { createBroadcaster } from "../../src/moderation/broadcaster";
|
import {
|
||||||
|
type BroadcasterClient,
|
||||||
|
createBroadcaster,
|
||||||
|
} from "../../src/moderation/broadcaster";
|
||||||
|
import type {
|
||||||
|
AttachmentRecord,
|
||||||
|
MessageRecord,
|
||||||
|
} from "../../src/moderation/types";
|
||||||
|
|
||||||
function client() {
|
type TestClient = BroadcasterClient & { send: Mock };
|
||||||
|
|
||||||
|
function client(): TestClient {
|
||||||
return { readyState: 1, send: vi.fn() };
|
return { readyState: 1, send: vi.fn() };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function messageRecord(overrides: Partial<MessageRecord> = {}): MessageRecord {
|
||||||
|
return {
|
||||||
|
id: "m1",
|
||||||
|
guild_id: "guild-1",
|
||||||
|
channel_id: "channel-1",
|
||||||
|
thread_id: null,
|
||||||
|
user_id: "user-1",
|
||||||
|
username: "alice",
|
||||||
|
avatar_url: null,
|
||||||
|
content: "test",
|
||||||
|
edited_content: null,
|
||||||
|
created_at: 1,
|
||||||
|
edited_at: null,
|
||||||
|
deleted_at: null,
|
||||||
|
type: "text",
|
||||||
|
metadata: null,
|
||||||
|
...overrides,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function attachmentRecord(
|
||||||
|
overrides: Partial<AttachmentRecord> = {},
|
||||||
|
): AttachmentRecord {
|
||||||
|
return {
|
||||||
|
id: "a1",
|
||||||
|
message_id: "m1",
|
||||||
|
guild_id: "guild-1",
|
||||||
|
channel_id: "channel-1",
|
||||||
|
thread_id: null,
|
||||||
|
user_id: "user-1",
|
||||||
|
filename: "image.png",
|
||||||
|
size: 1,
|
||||||
|
type: "image/png",
|
||||||
|
discord_url: "https://example.com/image.png",
|
||||||
|
uploaded_url: null,
|
||||||
|
upload_status: "pending",
|
||||||
|
upload_error: null,
|
||||||
|
created_at: 1,
|
||||||
|
uploaded_at: null,
|
||||||
|
...overrides,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
describe("createBroadcaster", () => {
|
describe("createBroadcaster", () => {
|
||||||
it("sends JSON events to open clients", () => {
|
it("sends JSON events to open clients", () => {
|
||||||
const ws = client();
|
const ws = client();
|
||||||
const broadcaster = createBroadcaster();
|
const broadcaster = createBroadcaster();
|
||||||
|
|
||||||
broadcaster.addClient(ws as any);
|
broadcaster.addClient(ws);
|
||||||
broadcaster.messageAnalyzed({ id: "m1", ai_status: "clean" } as any);
|
broadcaster.messageAnalyzed(messageRecord({ ai_status: "clean" }));
|
||||||
|
|
||||||
expect(ws.send).toHaveBeenCalledTimes(1);
|
expect(ws.send).toHaveBeenCalledTimes(1);
|
||||||
expect(JSON.parse(ws.send.mock.calls[0][0])).toMatchObject({
|
expect(JSON.parse(ws.send.mock.calls[0][0])).toMatchObject({
|
||||||
@@ -21,10 +74,10 @@ describe("createBroadcaster", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it("skips closed clients", () => {
|
it("skips closed clients", () => {
|
||||||
const ws = { readyState: 3, send: vi.fn() };
|
const ws: TestClient = { readyState: 3, send: vi.fn() };
|
||||||
const broadcaster = createBroadcaster();
|
const broadcaster = createBroadcaster();
|
||||||
|
|
||||||
broadcaster.addClient(ws as any);
|
broadcaster.addClient(ws);
|
||||||
broadcaster.messageDeleted({ id: "m1", deleted_at: 123 });
|
broadcaster.messageDeleted({ id: "m1", deleted_at: 123 });
|
||||||
|
|
||||||
expect(ws.send).not.toHaveBeenCalled();
|
expect(ws.send).not.toHaveBeenCalled();
|
||||||
@@ -36,14 +89,11 @@ describe("createBroadcaster", () => {
|
|||||||
const ws3 = client();
|
const ws3 = client();
|
||||||
const broadcaster = createBroadcaster();
|
const broadcaster = createBroadcaster();
|
||||||
|
|
||||||
broadcaster.addClient(ws1 as any);
|
broadcaster.addClient(ws1);
|
||||||
broadcaster.addClient(ws2 as any);
|
broadcaster.addClient(ws2);
|
||||||
broadcaster.addClient(ws3 as any);
|
broadcaster.addClient(ws3);
|
||||||
|
|
||||||
broadcaster.messageCreated({
|
broadcaster.messageCreated(messageRecord());
|
||||||
id: "m1",
|
|
||||||
content: "test",
|
|
||||||
} as any);
|
|
||||||
|
|
||||||
expect(ws1.send).toHaveBeenCalledTimes(1);
|
expect(ws1.send).toHaveBeenCalledTimes(1);
|
||||||
expect(ws2.send).toHaveBeenCalledTimes(1);
|
expect(ws2.send).toHaveBeenCalledTimes(1);
|
||||||
@@ -61,14 +111,14 @@ describe("createBroadcaster", () => {
|
|||||||
throw new Error("Send failed");
|
throw new Error("Send failed");
|
||||||
});
|
});
|
||||||
|
|
||||||
broadcaster.addClient(ws1 as any);
|
broadcaster.addClient(ws1);
|
||||||
broadcaster.addClient(ws2 as any);
|
broadcaster.addClient(ws2);
|
||||||
broadcaster.addClient(ws3 as any);
|
broadcaster.addClient(ws3);
|
||||||
|
|
||||||
broadcaster.messageUpdated({
|
broadcaster.messageUpdated({
|
||||||
id: "m1",
|
id: "m1",
|
||||||
content: "updated",
|
content: "updated",
|
||||||
} as any);
|
});
|
||||||
|
|
||||||
// ws1 attempted send (threw)
|
// ws1 attempted send (threw)
|
||||||
expect(ws1.send).toHaveBeenCalledTimes(1);
|
expect(ws1.send).toHaveBeenCalledTimes(1);
|
||||||
@@ -84,16 +134,16 @@ describe("createBroadcaster", () => {
|
|||||||
|
|
||||||
expect(broadcaster.clientCount()).toBe(0);
|
expect(broadcaster.clientCount()).toBe(0);
|
||||||
|
|
||||||
broadcaster.addClient(ws1 as any);
|
broadcaster.addClient(ws1);
|
||||||
expect(broadcaster.clientCount()).toBe(1);
|
expect(broadcaster.clientCount()).toBe(1);
|
||||||
|
|
||||||
broadcaster.addClient(ws2 as any);
|
broadcaster.addClient(ws2);
|
||||||
expect(broadcaster.clientCount()).toBe(2);
|
expect(broadcaster.clientCount()).toBe(2);
|
||||||
|
|
||||||
broadcaster.removeClient(ws1 as any);
|
broadcaster.removeClient(ws1);
|
||||||
expect(broadcaster.clientCount()).toBe(1);
|
expect(broadcaster.clientCount()).toBe(1);
|
||||||
|
|
||||||
broadcaster.removeClient(ws2 as any);
|
broadcaster.removeClient(ws2);
|
||||||
expect(broadcaster.clientCount()).toBe(0);
|
expect(broadcaster.clientCount()).toBe(0);
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -101,11 +151,8 @@ describe("createBroadcaster", () => {
|
|||||||
const ws = client();
|
const ws = client();
|
||||||
const broadcaster = createBroadcaster();
|
const broadcaster = createBroadcaster();
|
||||||
|
|
||||||
broadcaster.addClient(ws as any);
|
broadcaster.addClient(ws);
|
||||||
broadcaster.attachmentCreated({
|
broadcaster.attachmentCreated(attachmentRecord());
|
||||||
id: "a1",
|
|
||||||
message_id: "m1",
|
|
||||||
} as any);
|
|
||||||
|
|
||||||
expect(ws.send).toHaveBeenCalledTimes(1);
|
expect(ws.send).toHaveBeenCalledTimes(1);
|
||||||
const payload = JSON.parse(ws.send.mock.calls[0][0]);
|
const payload = JSON.parse(ws.send.mock.calls[0][0]);
|
||||||
|
|||||||
156
tests/moderation/messageCapture.test.ts
Normal file
156
tests/moderation/messageCapture.test.ts
Normal file
@@ -0,0 +1,156 @@
|
|||||||
|
import {
|
||||||
|
afterAll,
|
||||||
|
beforeAll,
|
||||||
|
beforeEach,
|
||||||
|
describe,
|
||||||
|
expect,
|
||||||
|
it,
|
||||||
|
vi,
|
||||||
|
} from "vitest";
|
||||||
|
import {
|
||||||
|
closeDatabase,
|
||||||
|
getDatabase,
|
||||||
|
initializeDatabase,
|
||||||
|
} from "../../src/database/drizzle";
|
||||||
|
import { captureMessage } from "../../src/moderation/messageCapture";
|
||||||
|
import type { ModerationBroadcaster } from "../../src/moderation/types";
|
||||||
|
|
||||||
|
const queueMessageAnalysis = vi.fn();
|
||||||
|
|
||||||
|
type TestMessage = Parameters<typeof captureMessage>[0];
|
||||||
|
type ModerationTestGlobal = typeof globalThis & {
|
||||||
|
moderationBroadcaster?: Partial<ModerationBroadcaster>;
|
||||||
|
};
|
||||||
|
|
||||||
|
interface TestDatabase {
|
||||||
|
run(sql: string): void;
|
||||||
|
}
|
||||||
|
|
||||||
|
function getTestDatabase(): TestDatabase {
|
||||||
|
return getDatabase() as unknown as TestDatabase;
|
||||||
|
}
|
||||||
|
|
||||||
|
vi.mock("../../src/moderation/aiAnalyzer", () => ({
|
||||||
|
queueMessageAnalysis: (id: string) => queueMessageAnalysis(id),
|
||||||
|
}));
|
||||||
|
|
||||||
|
function createMessage(id = "message-1"): TestMessage {
|
||||||
|
return {
|
||||||
|
id,
|
||||||
|
guildId: "guild-1",
|
||||||
|
channelId: "channel-1",
|
||||||
|
author: {
|
||||||
|
id: "user-1",
|
||||||
|
username: "alice",
|
||||||
|
bot: false,
|
||||||
|
avatarURL: () => null,
|
||||||
|
},
|
||||||
|
content: "hello",
|
||||||
|
cleanContent: "hello",
|
||||||
|
createdTimestamp: 1_700_000_000_000,
|
||||||
|
attachments: new Map(),
|
||||||
|
stickers: new Map(),
|
||||||
|
embeds: [],
|
||||||
|
member: null,
|
||||||
|
reference: null,
|
||||||
|
channel: {
|
||||||
|
id: "channel-1",
|
||||||
|
name: "general",
|
||||||
|
isThread: () => false,
|
||||||
|
},
|
||||||
|
} as unknown as TestMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function createTables() {
|
||||||
|
const db = getTestDatabase();
|
||||||
|
db.run(`
|
||||||
|
CREATE TABLE IF NOT EXISTS "messages" (
|
||||||
|
"id" text PRIMARY KEY NOT NULL,
|
||||||
|
"guild_id" text NOT NULL,
|
||||||
|
"channel_id" text NOT NULL,
|
||||||
|
"thread_id" text,
|
||||||
|
"user_id" text NOT NULL,
|
||||||
|
"username" text NOT NULL,
|
||||||
|
"avatar_url" text,
|
||||||
|
"content" text NOT NULL,
|
||||||
|
"edited_content" text,
|
||||||
|
"created_at" integer NOT NULL,
|
||||||
|
"edited_at" integer,
|
||||||
|
"deleted_at" integer,
|
||||||
|
"type" text DEFAULT 'text' NOT NULL,
|
||||||
|
"metadata" text,
|
||||||
|
"ai_status" text DEFAULT 'pending' NOT NULL,
|
||||||
|
"ai_moderation_flags" text,
|
||||||
|
"ai_moderation_score" real,
|
||||||
|
"ai_moderation_raw" text,
|
||||||
|
"ai_analysis" text,
|
||||||
|
"ai_analyzed_at" integer,
|
||||||
|
"ai_error" text
|
||||||
|
)
|
||||||
|
`);
|
||||||
|
db.run(`
|
||||||
|
CREATE TABLE IF NOT EXISTS "attachments" (
|
||||||
|
"id" text PRIMARY KEY NOT NULL,
|
||||||
|
"message_id" text NOT NULL,
|
||||||
|
"guild_id" text NOT NULL,
|
||||||
|
"channel_id" text NOT NULL,
|
||||||
|
"thread_id" text,
|
||||||
|
"user_id" text NOT NULL,
|
||||||
|
"filename" text NOT NULL,
|
||||||
|
"size" integer NOT NULL,
|
||||||
|
"type" text NOT NULL,
|
||||||
|
"discord_url" text NOT NULL,
|
||||||
|
"uploaded_url" text,
|
||||||
|
"upload_status" text DEFAULT 'pending' NOT NULL,
|
||||||
|
"upload_error" text,
|
||||||
|
"created_at" integer NOT NULL,
|
||||||
|
"uploaded_at" integer
|
||||||
|
)
|
||||||
|
`);
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("captureMessage", () => {
|
||||||
|
beforeAll(async () => {
|
||||||
|
await initializeDatabase();
|
||||||
|
await createTables();
|
||||||
|
});
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
queueMessageAnalysis.mockClear();
|
||||||
|
const db = getTestDatabase();
|
||||||
|
db.run(`DELETE FROM "attachments"`);
|
||||||
|
db.run(`DELETE FROM "messages"`);
|
||||||
|
delete (globalThis as ModerationTestGlobal).moderationBroadcaster;
|
||||||
|
});
|
||||||
|
|
||||||
|
afterAll(async () => {
|
||||||
|
await closeDatabase();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not requeue or rebroadcast a duplicate captured message", async () => {
|
||||||
|
const message = createMessage();
|
||||||
|
const messageCreated = vi.fn();
|
||||||
|
(globalThis as ModerationTestGlobal).moderationBroadcaster = {
|
||||||
|
messageCreated,
|
||||||
|
};
|
||||||
|
|
||||||
|
await captureMessage(message, "text");
|
||||||
|
await captureMessage(message, "text");
|
||||||
|
|
||||||
|
expect(queueMessageAnalysis).toHaveBeenCalledTimes(1);
|
||||||
|
expect(messageCreated).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not queue or broadcast backlog captures one message at a time", async () => {
|
||||||
|
const message = createMessage("backlog-message-1");
|
||||||
|
const messageCreated = vi.fn();
|
||||||
|
(globalThis as ModerationTestGlobal).moderationBroadcaster = {
|
||||||
|
messageCreated,
|
||||||
|
};
|
||||||
|
|
||||||
|
await captureMessage(message, "text", { source: "backlog" });
|
||||||
|
|
||||||
|
expect(queueMessageAnalysis).not.toHaveBeenCalled();
|
||||||
|
expect(messageCreated).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -16,6 +16,14 @@ import {
|
|||||||
} from "../../src/moderation/messageStore";
|
} from "../../src/moderation/messageStore";
|
||||||
import type { MessageRecord } from "../../src/moderation/types";
|
import type { MessageRecord } from "../../src/moderation/types";
|
||||||
|
|
||||||
|
interface TestDatabase {
|
||||||
|
run(sql: string): void;
|
||||||
|
}
|
||||||
|
|
||||||
|
function getTestDatabase(): TestDatabase {
|
||||||
|
return getDatabase() as unknown as TestDatabase;
|
||||||
|
}
|
||||||
|
|
||||||
const logger = createChildLogger("messageStoreQueries.test");
|
const logger = createChildLogger("messageStoreQueries.test");
|
||||||
|
|
||||||
describe("message cursor helpers", () => {
|
describe("message cursor helpers", () => {
|
||||||
@@ -36,7 +44,7 @@ describe("message query integration tests", () => {
|
|||||||
beforeAll(async () => {
|
beforeAll(async () => {
|
||||||
await initializeDatabase();
|
await initializeDatabase();
|
||||||
// Create tables using Drizzle schema (SQLite doesn't support migrations with PostgreSQL syntax)
|
// Create tables using Drizzle schema (SQLite doesn't support migrations with PostgreSQL syntax)
|
||||||
const db = getDatabase() as any;
|
const db = getTestDatabase();
|
||||||
try {
|
try {
|
||||||
// Create messages table
|
// Create messages table
|
||||||
await db.run(`
|
await db.run(`
|
||||||
@@ -75,7 +83,7 @@ describe("message query integration tests", () => {
|
|||||||
beforeEach(async () => {
|
beforeEach(async () => {
|
||||||
// Clear messages table before each test
|
// Clear messages table before each test
|
||||||
try {
|
try {
|
||||||
const db = getDatabase() as any;
|
const db = getTestDatabase();
|
||||||
await db.run(`DELETE FROM "messages"`);
|
await db.run(`DELETE FROM "messages"`);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.debug({ error }, "Could not clear messages table");
|
logger.debug({ error }, "Could not clear messages table");
|
||||||
|
|||||||
Reference in New Issue
Block a user