feat: add API endpoint for syncing selected channel backlog
This commit is contained in:
@@ -6,65 +6,6 @@ import { captureMessage } from "./messageCapture";
|
|||||||
|
|
||||||
const logger = createChildLogger("backlog-sync");
|
const logger = createChildLogger("backlog-sync");
|
||||||
|
|
||||||
function isWatchableChannel(channel: { type?: string; messages?: unknown }): boolean {
|
|
||||||
return Boolean(
|
|
||||||
channel.messages &&
|
|
||||||
["GUILD_TEXT", "GUILD_PUBLIC_THREAD", "GUILD_PRIVATE_THREAD"].includes(
|
|
||||||
channel.type ?? "",
|
|
||||||
),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
async function collectWatchableChannels(guild: any): Promise<any[]> {
|
|
||||||
const channels: any[] = [];
|
|
||||||
|
|
||||||
// Fast pass: collect text channels from cache only
|
|
||||||
for (const channel of guild.channels.cache.values()) {
|
|
||||||
if (isWatchableChannel(channel)) {
|
|
||||||
channels.push(channel);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Slow pass: discover threads with timeout per channel (non-blocking to message sync)
|
|
||||||
const threadPromises: Promise<void>[] = [];
|
|
||||||
for (const channel of guild.channels.cache.values()) {
|
|
||||||
if (!channel.threads?.fetch) continue;
|
|
||||||
|
|
||||||
threadPromises.push(
|
|
||||||
(async () => {
|
|
||||||
for (const archived of [false, true]) {
|
|
||||||
try {
|
|
||||||
const controller = new AbortController();
|
|
||||||
const timeout = setTimeout(() => controller.abort(), 5000);
|
|
||||||
const fetched = await Promise.race([
|
|
||||||
channel.threads.fetch({ archived, limit: 100 }),
|
|
||||||
new Promise((_, reject) => controller.signal.addEventListener('abort', () => reject(new Error('timeout')))),
|
|
||||||
]).catch(() => null);
|
|
||||||
clearTimeout(timeout);
|
|
||||||
|
|
||||||
if (!fetched?.threads) continue;
|
|
||||||
for (const thread of fetched.threads.values()) {
|
|
||||||
if (isWatchableChannel(thread)) channels.push(thread);
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
// Skip this channel's threads on timeout/error
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for all thread discoveries with overall timeout
|
|
||||||
await Promise.race([
|
|
||||||
Promise.all(threadPromises),
|
|
||||||
new Promise((resolve) => setTimeout(resolve, 30000)),
|
|
||||||
]).catch(() => {
|
|
||||||
logger.warn("Thread discovery timeout, proceeding with cached channels");
|
|
||||||
});
|
|
||||||
|
|
||||||
return Array.from(new Map(channels.map((channel) => [channel.id, channel])).values());
|
|
||||||
}
|
|
||||||
|
|
||||||
async function syncChannelMessages(
|
async function syncChannelMessages(
|
||||||
db: SqliteDatabase,
|
db: SqliteDatabase,
|
||||||
channel: any,
|
channel: any,
|
||||||
@@ -116,63 +57,45 @@ export async function syncBacklogMessages(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.info({ guildId: guild.id }, "Backlog sync ready (will sync on-demand per selected channel)");
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function syncSelectedChannelBacklog(
|
||||||
|
client: Client,
|
||||||
|
db: SqliteDatabase,
|
||||||
|
guildId: string,
|
||||||
|
channelId: string,
|
||||||
|
): Promise<number> {
|
||||||
|
const guild = client.guilds.cache.get(guildId);
|
||||||
|
if (!guild) {
|
||||||
|
logger.warn({ guildId }, "Guild not found for backlog sync");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
const channel = guild.channels.cache.get(channelId);
|
||||||
|
if (!channel) {
|
||||||
|
logger.warn({ guildId, channelId }, "Channel not found for backlog sync");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
const cutoffTime = Date.now() - config.BACKLOG_SYNC_HOURS * 60 * 60 * 1000;
|
const cutoffTime = Date.now() - config.BACKLOG_SYNC_HOURS * 60 * 60 * 1000;
|
||||||
logger.info(
|
logger.info(
|
||||||
{ guildId: guild.id, hours: config.BACKLOG_SYNC_HOURS },
|
{ guildId, channelId, hours: config.BACKLOG_SYNC_HOURS },
|
||||||
"Starting message backlog sync",
|
"Starting backlog sync for selected channel",
|
||||||
);
|
);
|
||||||
|
|
||||||
logger.info({ guildId: guild.id }, "Fetching guild channels for backlog sync");
|
|
||||||
await guild.channels.fetch().catch((error) => {
|
|
||||||
logger.warn(
|
|
||||||
{ guildId: guild.id, error: error instanceof Error ? error.message : String(error) },
|
|
||||||
"Failed to fetch guild channels before backlog sync",
|
|
||||||
);
|
|
||||||
return null;
|
|
||||||
});
|
|
||||||
|
|
||||||
logger.info({ guildId: guild.id }, "Collecting watchable channels for backlog sync");
|
|
||||||
const channels = await collectWatchableChannels(guild);
|
|
||||||
|
|
||||||
let total = 0;
|
|
||||||
logger.info(
|
|
||||||
{ guildId: guild.id, channels: channels.length, hours: config.BACKLOG_SYNC_HOURS },
|
|
||||||
"Watchable channels collected for backlog sync",
|
|
||||||
);
|
|
||||||
|
|
||||||
// Sync channels in parallel with concurrency limit of 3
|
|
||||||
const concurrency = 3;
|
|
||||||
const queue = [...channels];
|
|
||||||
const active: Promise<number>[] = [];
|
|
||||||
|
|
||||||
while (queue.length > 0 || active.length > 0) {
|
|
||||||
while (active.length < concurrency && queue.length > 0) {
|
|
||||||
const channel = queue.shift()!;
|
|
||||||
const promise = (async () => {
|
|
||||||
try {
|
try {
|
||||||
const count = await syncChannelMessages(db, channel as any, cutoffTime);
|
const count = await syncChannelMessages(db, channel as any, cutoffTime);
|
||||||
logger.info({ channelId: channel.id, count }, "Backlog channel sync completed");
|
logger.info({ channelId, count }, "Backlog sync completed for selected channel");
|
||||||
return count;
|
return count;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.warn(
|
logger.warn(
|
||||||
{
|
{
|
||||||
channelId: channel.id,
|
channelId,
|
||||||
error: error instanceof Error ? error.message : String(error),
|
error: error instanceof Error ? error.message : String(error),
|
||||||
},
|
},
|
||||||
"Backlog channel sync failed",
|
"Backlog sync failed for selected channel",
|
||||||
);
|
);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
})();
|
|
||||||
active.push(promise);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (active.length > 0) {
|
|
||||||
const result = await Promise.race(active);
|
|
||||||
total += result;
|
|
||||||
active.splice(active.findIndex((p) => p === Promise.resolve(result)), 1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info({ total }, "Message backlog sync completed");
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import { discordPlayer } from "./player";
|
|||||||
import type { VoiceController } from "./voiceController";
|
import type { VoiceController } from "./voiceController";
|
||||||
import { getDatabase, getPersistedValue, setPersistedValue } from "./muxer-queue";
|
import { getDatabase, getPersistedValue, setPersistedValue } from "./muxer-queue";
|
||||||
import { getMessagesByChannel, getAttachmentsByChannel } from "./moderation/messageStore";
|
import { getMessagesByChannel, getAttachmentsByChannel } from "./moderation/messageStore";
|
||||||
|
import { syncSelectedChannelBacklog } from "./moderation/backlogSync";
|
||||||
|
|
||||||
const wsLogger = createChildLogger("webserver");
|
const wsLogger = createChildLogger("webserver");
|
||||||
|
|
||||||
@@ -277,6 +278,32 @@ export function startWebserver(
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
app.post("/api/backlog-sync", async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
const { guildId, channelId } = req.body as {
|
||||||
|
guildId?: string;
|
||||||
|
channelId?: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
if (!guildId || !channelId) {
|
||||||
|
throw new AppError(
|
||||||
|
"guildId and channelId are required",
|
||||||
|
"MISSING_BACKLOG_PARAMS",
|
||||||
|
400,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
const count = await syncSelectedChannelBacklog(_client, getDatabase(), guildId, channelId);
|
||||||
|
res.json({
|
||||||
|
success: true,
|
||||||
|
channelId,
|
||||||
|
messagesSync: count,
|
||||||
|
});
|
||||||
|
} catch (error) {
|
||||||
|
next(error);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// Inbound: Discord PCM → tagged chunks → browser
|
// Inbound: Discord PCM → tagged chunks → browser
|
||||||
(global as any).broadcastPcmToWeb = (chunk: Buffer, userId: string) => {
|
(global as any).broadcastPcmToWeb = (chunk: Buffer, userId: string) => {
|
||||||
let hash = 0;
|
let hash = 0;
|
||||||
|
|||||||
Reference in New Issue
Block a user