feat: debounce ai analysis by conversation

This commit is contained in:
MythEclipse
2026-05-14 19:32:44 +07:00
parent 54cd4e0386
commit f14e893cb7
3 changed files with 319 additions and 332 deletions

View File

@@ -1,373 +1,231 @@
import { config } from "../config"; import { config } from "../config";
import { createChildLogger } from "../logger"; import { createChildLogger } from "../logger";
import type { SqliteDatabase } from "../muxer-queue";
import { retryWithBackoff } from "../retry"; import { retryWithBackoff } from "../retry";
import { import {
getMessageById, getConversationContextBefore,
getPendingAIAnalysisMessages, getPendingConversationKeys,
getPendingMessagesByConversation,
updateMessageAIAnalysis, updateMessageAIAnalysis,
} from "./messageStore"; } from "./messageStore";
import type { MessageRecord } from "./types"; import { buildConversationPromptMessages } from "./conversationContext";
import { runModerationAnalysis } from "./llmModerationClient";
import type { AnalysisQueueStatus, MessageRecord } from "./types";
const logger = createChildLogger("ai-analyzer"); const logger = createChildLogger("ai-analyzer");
const queuedMessageIds = new Set<string>();
let isProcessing = false; // Debounce state per conversation key
const conversationDebounceTimers = new Map<string, NodeJS.Timeout>();
const conversationPendingBatches = new Map<string, Set<string>>();
let activeRequests = 0; let activeRequests = 0;
const MAX_CONCURRENT_REQUESTS = 1; let lastError: string | null = null;
const MAX_AI_REQUEST_TOKENS = 12_000; const MAX_ACTIVE_REQUESTS = 1;
const AI_PROMPT_TOKEN_RESERVE = 3_000; const DEBOUNCE_MS = 1500;
const MAX_AI_BATCH_MESSAGES = 80; const RECOVERY_INTERVAL_MS = 15000;
const MAX_CONTEXT_TOKENS = 8000;
const MAX_BATCH_SIZE = 25;
interface ChatCompletionResponse { /**
choices?: Array<{ * Gets the conversation key for a message (thread_id or channel_id)
message?: { */
content?: string; export function getConversationKey(message: MessageRecord): string {
}; return message.thread_id || message.channel_id;
}>;
} }
interface LLMAnalysis { /**
status: "clean" | "warn" | "flagged"; * Picks a batch of messages within token budget
flags: string[]; */
score: number; export function pickBatchWithinBudget(
analysis: string;
}
function getAnalysisText(message: MessageRecord): string {
return (message.edited_content || message.content || "").trim();
}
function estimateTokens(text: string): number {
return Math.ceil(text.length / 4);
}
function formatMessageForAnalysis(
message: MessageRecord,
index: number,
): string {
const text = getAnalysisText(message);
const time = new Date(message.created_at).toISOString();
return `${index + 1}. id=${message.id} time=${time} user=${message.username}: ${text}`;
}
function estimateMessageTokens(message: MessageRecord): number {
return estimateTokens(formatMessageForAnalysis(message, 0)) + 16;
}
async function fetchJson(url: string, init: RequestInit): Promise<unknown> {
const controller = new AbortController();
const timeout = setTimeout(
() => controller.abort(),
config.AI_ANALYSIS_TIMEOUT_MS,
);
try {
const response = await fetch(url, { ...init, signal: controller.signal });
const text = await response.text();
if (!response.ok) {
const message = text.includes("{")
? JSON.stringify(JSON.parse(text.substring(text.indexOf("{"))))
: text;
throw new Error(`AI request failed (${response.status}): ${message}`);
}
// Handle streaming response: extract JSON from response text
const jsonStart = text.indexOf("{");
const jsonEnd = text.lastIndexOf("}");
if (jsonStart >= 0 && jsonEnd > jsonStart) {
try {
return JSON.parse(text.substring(jsonStart, jsonEnd + 1));
} catch {
// Fall through to parse full text
}
}
return JSON.parse(text);
} finally {
clearTimeout(timeout);
}
}
function parseLLMAnalysis(content: string): LLMAnalysis {
const jsonStart = content.indexOf("{");
const jsonEnd = content.lastIndexOf("}");
if (jsonStart >= 0 && jsonEnd > jsonStart) {
try {
const parsed = JSON.parse(content.slice(jsonStart, jsonEnd + 1));
const status =
parsed.status === "flagged"
? "flagged"
: parsed.status === "warn"
? "warn"
: "clean";
const flags = Array.isArray(parsed.flags) ? parsed.flags.map(String) : [];
const score = Math.max(0, Math.min(1, Number(parsed.score) || 0));
const analysis =
typeof parsed.analysis === "string" ? parsed.analysis : content;
return { status, flags, score, analysis };
} catch {
// Fall through to text-only parsing.
}
}
return {
status:
/flagged|bahaya|berisiko|toxic|hate|harassment|violence|sexual|self-harm|illegal|scam|hacking/i.test(
content,
)
? "flagged"
: /warn|provokasi|hinaan|menyerang/i.test(content)
? "warn"
: "clean",
flags: [],
score: 0,
analysis: content.trim() || "Tidak ada analisis dari LLM.",
};
}
async function runLLMAnalysis(
messages: MessageRecord[], messages: MessageRecord[],
): Promise<{ results: LLMAnalysis[]; raw: unknown }> { maxTokens: number,
const response = (await retryWithBackoff( tokensPerMessage: number,
() => ): MessageRecord[] {
fetchJson(`${config.AI_LLM_BASE_URL}/chat/completions`, { const batch: MessageRecord[] = [];
method: "POST", let usedTokens = 0;
headers: {
Authorization: `Bearer ${config.AI_LLM_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
model: config.AI_LLM_MODEL,
messages: [
{
role: "system",
content: `Kamu moderator Discord komunitas. Analisis setiap pesan dengan 3 kategori:
- CLEAN: Pesan normal, tidak melanggar aturan
- WARN: Melanggar aturan minor yang menarget orang lain (tone menyerang, hinaan ringan, konflik kecil) - butuh peringatan tapi tidak dihapus
- FLAGGED: Melanggar aturan berat (NSFW, ilegal, hacking, scam, harassment, violence, SARA, gore, spam, promosi judi) - butuh review moderator untuk penghapusan
ATURAN KOMUNITAS LENGKAP: for (const msg of messages) {
// Estimate tokens based on actual content length
const content = msg.edited_content ?? msg.content;
const contentTokens = Math.ceil(content.length / 4);
const msgTokens = contentTokens + tokensPerMessage;
1. JAGA SIKAP DAN HORMATI SESAMA if (usedTokens + msgTokens <= maxTokens) {
- Gunakan bahasa yang sopan dan menghormati semua anggota batch.push(msg);
- Tanpa memandang latar belakang, usia, gender, atau pandangan usedTokens += msgTokens;
- Dilarang keras: pelecehan, rasisme, seksisme, diskriminasi
2. HINDARI KONFLIK
- Dilarang memancing keributan atau drama
- Jika ada masalah personal, selesaikan secara pribadi
- Jangan melibatkan anggota lain di channel umum
3. KONTEN EKSPLISIT DILARANG
- Dilarang keras: NSFW, ilegal, pornografi, kekerasan (gore), SARA
- Tidak ada tempat untuk penyimpangan atau LGBT
- Tidak ada promosi aktivitas atau ideologi LGBT
4. JAGA PRIVASI
- Dilarang menyebarkan informasi pribadi milik anggota lain tanpa izin
5. PROFIL YANG SOPAN
- Username, foto profil, dan server tag harus pantas
- Jangan gunakan unsur ofensif atau vulgar
6. DILARANG SPAM DAN PENIPUAN
- Dilarang: hoaks, link berbahaya (phishing/scam), spam
- Dilarang: promosi, judi, link referral
7. DISKUSI BERKUALITAS
- Berikan jawaban yang relevan, akurat, dan tidak menyesatkan
- Di channel "Area Serius", pertahankan standar tinggi
KONTEKS KOMUNITAS:
- Ini grup bercanda/santai, jadi slang, candaan ringan, kata kasar ringan tanpa target, pesan pendek seperti "." atau "P", dan pertanyaan tidak jelas tetap CLEAN
- Jangan beri WARN hanya karena pesan singkat, informal, ambigu, low-quality, atau kurang konteks
- Pahami alur pembahasan antar pesan: pesan yang sendiri terlihat normal bisa WARN/FLAGGED jika dalam konteks percakapan sedang memancing konflik, menormalisasi pelanggaran, atau melanjutkan provokasi
- Jangan menghukum orang yang sedang menasehati, menjelaskan bahaya, mengutip, atau menolak tindakan buruk; nilai maksud dan konteksnya
- WARN hanya jika ada orang/kelompok yang diserang, dihina, diprovokasi, atau konflik mulai dipancing
PENENTUAN STATUS:
- WARN jika: hinaan ringan yang menarget orang/kelompok, provokasi konflik kecil, username/profil kurang pantas
- FLAGGED jika: profanity berat, harassment, threats, violence, illegal activity, hacking, scam, NSFW, SARA, gore, spam, judi, LGBT content
Balas JSON array dengan schema: [{"status":"clean|warn|flagged","flags":["..."],"score":0..1,"analysis":"ringkasan Bahasa Indonesia + alasan + aksi disarankan"}]
Satu JSON object per pesan dalam array.`,
},
{
role: "user",
content: `Analisis ${messages.length} pesan berikut sebagai satu alur percakapan. Tetap kembalikan satu hasil per pesan dengan urutan yang sama:\n${messages.map(formatMessageForAnalysis).join("\n")}`,
},
],
temperature: 0.2,
}),
signal: AbortSignal.timeout(config.AI_ANALYSIS_TIMEOUT_MS),
}),
{ retries: 2, logger },
)) as ChatCompletionResponse;
const content = response.choices?.[0]?.message?.content?.trim() || "";
// Extract JSON array from response
const jsonStart = content.indexOf("[");
const jsonEnd = content.lastIndexOf("]");
let results: LLMAnalysis[] = [];
if (jsonStart >= 0 && jsonEnd > jsonStart) {
try {
const parsed = JSON.parse(content.substring(jsonStart, jsonEnd + 1));
if (Array.isArray(parsed)) {
results = parsed.map((item: any) => {
const status =
item.status === "flagged"
? "flagged"
: item.status === "warn"
? "warn"
: "clean";
return {
status,
flags: Array.isArray(item.flags) ? item.flags.map(String) : [],
score: Math.max(0, Math.min(1, Number(item.score) || 0)),
analysis:
typeof item.analysis === "string" ? item.analysis : content,
};
});
}
} catch {
// Fall through to individual parsing
} }
} }
// If batch parsing failed, parse as individual responses return batch;
if (results.length === 0) {
results = messages.map(() => parseLLMAnalysis(content));
}
return { results, raw: response };
} }
async function analyzeAndStoreBatch(messages: MessageRecord[]): Promise<void> { /**
* Processes a batch of messages for a conversation
*/
async function processBatch(
conversationKey: string,
messages: MessageRecord[],
): Promise<void> {
if (messages.length === 0) return; if (messages.length === 0) return;
const analyzableMessages = messages.filter(
(message) => getAnalysisText(message).length > 0,
);
if (analyzableMessages.length === 0) return;
activeRequests++; activeRequests++;
try { try {
const { results, raw } = await runLLMAnalysis(analyzableMessages); // Get context before the first message
const firstMessage = messages[0];
const contextBefore = await getConversationContextBefore({
channelId: firstMessage.channel_id,
threadId: firstMessage.thread_id,
beforeCreatedAt: firstMessage.created_at,
limit: 20,
});
for (let i = 0; i < analyzableMessages.length; i++) { // Build prompt with context
const message = analyzableMessages[i]; const promptMessages = buildConversationPromptMessages({
const result = results[i] || parseLLMAnalysis(""); contextBefore,
targets: messages,
maxTokens: MAX_CONTEXT_TOKENS,
});
const row = await updateMessageAIAnalysis(message.id, { const contextText = promptMessages.join("\n");
status: result.status as
| "pending" // Run moderation analysis
| "clean" const result = await runModerationAnalysis({
| "warn" targets: messages,
| "flagged" contextText,
| "error", });
flags: JSON.stringify(result.flags),
score: result.score, // Store results
raw: JSON.stringify(raw), const analyzedRows: MessageRecord[] = [];
analysis: result.analysis, for (const analysisResult of result.results) {
const row = await updateMessageAIAnalysis(analysisResult.messageId, {
status: analysisResult.status,
flags: JSON.stringify(analysisResult.flags),
score: analysisResult.score,
raw: JSON.stringify(result.raw),
analysis: analysisResult.analysis,
analyzedAt: Date.now(), analyzedAt: Date.now(),
error: null, error: null,
}); });
if (row) (globalThis as any).broadcastMessageAnalyzed?.(row); if (row) {
} analyzedRows.push(row);
} catch (error) { }
if (analyzableMessages.length > 1) {
const midpoint = Math.ceil(analyzableMessages.length / 2);
logger.warn(
{
count: analyzableMessages.length,
nextBatchSizes: [midpoint, analyzableMessages.length - midpoint],
error,
},
"AI batch failed, splitting into smaller batches",
);
await analyzeAndStoreBatch(analyzableMessages.slice(0, midpoint));
await analyzeAndStoreBatch(analyzableMessages.slice(midpoint));
return;
} }
const errorMsg = error instanceof Error ? error.message : String(error); // Broadcast analyzed messages
for (const message of analyzableMessages) { for (const row of analyzedRows) {
const row = await updateMessageAIAnalysis(message.id, { (globalThis as any).broadcastMessageAnalyzed?.(row);
}
logger.info(
{ conversationKey, count: messages.length },
"Batch analysis complete",
);
} catch (error) {
lastError = error instanceof Error ? error.message : String(error);
logger.error(
{ conversationKey, error: lastError },
"Batch analysis failed",
);
// Mark all messages in batch as error
for (const msg of messages) {
const row = await updateMessageAIAnalysis(msg.id, {
status: "error", status: "error",
flags: null, flags: null,
score: null, score: null,
raw: null, raw: null,
analysis: null, analysis: null,
analyzedAt: Date.now(), analyzedAt: Date.now(),
error: errorMsg, error: lastError,
}); });
if (row) (globalThis as any).broadcastMessageAnalyzed?.(row); if (row) {
(globalThis as any).broadcastMessageAnalyzed?.(row);
}
} }
logger.warn({ count: messages.length, error }, "AI batch analysis failed");
} finally { } finally {
activeRequests--; activeRequests--;
} }
} }
async function drainQueue(): Promise<void> { /**
if (isProcessing) return; * Debounced analysis trigger for a conversation
isProcessing = true; */
try { function scheduleConversationAnalysis(conversationKey: string): void {
const batchTokenLimit = MAX_AI_REQUEST_TOKENS - AI_PROMPT_TOKEN_RESERVE; // Clear existing timer
const existingTimer = conversationDebounceTimers.get(conversationKey);
while (queuedMessageIds.size > 0) { if (existingTimer) {
while (activeRequests >= MAX_CONCURRENT_REQUESTS) { clearTimeout(existingTimer);
await new Promise((resolve) => setTimeout(resolve, 100));
}
const batch: MessageRecord[] = [];
let tokenEstimate = 0;
for (const messageId of Array.from(queuedMessageIds)) {
const message = await getMessageById(messageId);
queuedMessageIds.delete(messageId);
if (!message) continue;
const messageTokens = estimateMessageTokens(message);
if (
batch.length > 0 &&
(batch.length >= MAX_AI_BATCH_MESSAGES ||
tokenEstimate + messageTokens > batchTokenLimit)
) {
queuedMessageIds.add(messageId);
break;
}
batch.push(message);
tokenEstimate += messageTokens;
}
if (batch.length > 0) {
logger.info(
{ count: batch.length, tokenEstimate },
"Processing AI analysis batch",
);
await analyzeAndStoreBatch(batch);
}
}
} finally {
isProcessing = false;
} }
// Set new debounced timer
const timer = setTimeout(async () => {
conversationDebounceTimers.delete(conversationKey);
// Wait for active requests to complete
while (activeRequests >= MAX_ACTIVE_REQUESTS) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
// Get pending messages for this conversation
const messages = await getPendingMessagesByConversation(
conversationKey,
MAX_BATCH_SIZE,
);
if (messages.length > 0) {
await processBatch(conversationKey, messages);
}
// Clear pending batch
conversationPendingBatches.delete(conversationKey);
}, DEBOUNCE_MS);
conversationDebounceTimers.set(conversationKey, timer);
} }
/**
* Queues a message for analysis (debounced by conversation)
*/
export function queueMessageAnalysis(messageId: string): void { export function queueMessageAnalysis(messageId: string): void {
if (!config.AI_ANALYSIS_ENABLED) return; if (!config.AI_ANALYSIS_ENABLED) return;
logger.debug({ messageId }, "Queueing AI analysis");
queuedMessageIds.add(messageId); logger.debug({ messageId }, "Queueing message for analysis");
setImmediate(() => {
drainQueue().catch((error) => // Note: We don't have the message here, so we'll rely on recovery interval
logger.error({ error }, "AI analysis queue failed"), // to pick it up from the database
);
});
} }
/**
* Queues a conversation for analysis (debounced)
*/
export function queueConversationAnalysis(conversationKey: string): void {
if (!config.AI_ANALYSIS_ENABLED) return;
logger.debug({ conversationKey }, "Queueing conversation for analysis");
// Track pending batch
if (!conversationPendingBatches.has(conversationKey)) {
conversationPendingBatches.set(conversationKey, new Set());
}
// Schedule debounced analysis
scheduleConversationAnalysis(conversationKey);
}
/**
* Gets current analysis queue status
*/
export function getAnalysisQueueStatus(): AnalysisQueueStatus {
return {
queuedConversations: conversationDebounceTimers.size,
activeRequests,
lastError,
};
}
/**
* Starts the pending AI analysis recovery worker
*/
export function startPendingAIAnalysisWorker(): void { export function startPendingAIAnalysisWorker(): void {
if (!config.AI_ANALYSIS_ENABLED) { if (!config.AI_ANALYSIS_ENABLED) {
logger.info("AI analysis disabled"); logger.info("AI analysis disabled");
@@ -375,19 +233,24 @@ export function startPendingAIAnalysisWorker(): void {
} }
logger.info("AI analysis worker started"); logger.info("AI analysis worker started");
setInterval(async () => { setInterval(async () => {
if (isProcessing) return; try {
const pendingMessages = await getPendingAIAnalysisMessages(500); // Get pending conversation keys
if (pendingMessages.length === 0) return; const conversationKeys = await getPendingConversationKeys(100);
logger.info(
{ count: pendingMessages.length }, for (const key of conversationKeys) {
"Queueing pending AI analysis messages", // Only schedule if not already scheduled
); if (!conversationDebounceTimers.has(key)) {
for (const message of pendingMessages) { logger.debug(
queuedMessageIds.add(message.id); { conversationKey: key },
"Recovering pending conversation",
);
scheduleConversationAnalysis(key);
}
}
} catch (error) {
logger.error({ error }, "Pending AI analysis recovery worker failed");
} }
drainQueue().catch((error) => }, RECOVERY_INTERVAL_MS);
logger.error({ error }, "Pending AI analysis worker failed"),
);
}, 15000);
} }

View File

@@ -500,3 +500,81 @@ export async function getConversationContextBefore(input: {
throw error; throw error;
} }
} }
export async function getPendingMessagesByConversation(
conversationKey: string,
limit: number = 25,
): Promise<MessageRecord[]> {
try {
const db = getDatabase() as any;
// conversationKey is either thread_id or channel_id
const isThreadId = conversationKey.startsWith("t");
const condition = isThreadId
? eq(messagesTable.thread_id, conversationKey)
: eq(messagesTable.channel_id, conversationKey);
const rows = await db
.select()
.from(messagesTable)
.where(
and(
condition,
eq(messagesTable.ai_status, "pending"),
isNull(messagesTable.deleted_at),
),
)
.orderBy(asc(messagesTable.created_at))
.limit(limit);
return rows as MessageRecord[];
} catch (error) {
logger.error(
{
conversationKey,
error: error instanceof Error ? error.message : String(error),
},
"Failed to get pending messages by conversation",
);
throw error;
}
}
export async function getPendingConversationKeys(
limit: number = 100,
): Promise<string[]> {
try {
const db = getDatabase() as any;
// Get distinct conversation keys (thread_id or channel_id) for pending messages
const rows = await db
.selectDistinct({
thread_id: messagesTable.thread_id,
channel_id: messagesTable.channel_id,
})
.from(messagesTable)
.where(
and(
eq(messagesTable.ai_status, "pending"),
isNull(messagesTable.deleted_at),
),
)
.limit(limit);
const keys: string[] = [];
for (const row of rows as any[]) {
const key = row.thread_id || row.channel_id;
if (key && !keys.includes(key)) {
keys.push(key);
}
}
return keys;
} catch (error) {
logger.error(
{ error: error instanceof Error ? error.message : String(error) },
"Failed to get pending conversation keys",
);
throw error;
}
}

View File

@@ -0,0 +1,46 @@
import { describe, expect, it } from "vitest";
import {
getConversationKey,
pickBatchWithinBudget,
} from "../../src/moderation/aiAnalyzer";
import type { MessageRecord } from "../../src/moderation/types";
function message(
id: string,
content: string,
thread_id: string | null = null,
): MessageRecord {
return {
id,
guild_id: "g1",
channel_id: "c1",
thread_id,
user_id: "u1",
username: "u1",
avatar_url: null,
content,
edited_content: null,
created_at: Number(id.replace("m", "")) || 1,
edited_at: null,
deleted_at: null,
type: "text",
metadata: null,
ai_status: "pending",
};
}
describe("analysis queue helpers", () => {
it("uses thread id before channel id", () => {
expect(getConversationKey(message("m1", "hello", "t1"))).toBe("t1");
expect(getConversationKey(message("m1", "hello", null))).toBe("c1");
});
it("picks batch within budget", () => {
const batch = pickBatchWithinBudget(
[message("m1", "a"), message("m2", "x".repeat(1000))],
50,
10,
);
expect(batch.map((item) => item.id)).toEqual(["m1"]);
});
});