Compare commits

...

13 Commits

Author SHA1 Message Date
MythEclipse
8584030633 feat: automate .env deployment via GitHub Secrets and local file sync 2026-05-17 21:24:54 +07:00
MythEclipse
f35710db3f feat: add Docker configuration and automated deployment workflows for VPS orchestration 2026-05-17 21:23:50 +07:00
MythEclipse
0c7930bd01 chore: update streaming default quality settings to 1080p60 at 8mbps with ultrafast preset 2026-05-17 20:47:41 +07:00
MythEclipse
e22e620bae feat: add streaming demuxer tests and implement dual-stream output piping in streaming service 2026-05-17 20:44:13 +07:00
MythEclipse
eda32720c8 a 2026-05-17 19:39:46 +07:00
MythEclipse
c0f66c78a3 chore: update subproject commit to indicate dirty state 2026-05-17 18:24:17 +07:00
MythEclipse
b8a6f40b1b feat: enhance database initialization for test isolation and add transcoder metrics 2026-05-17 18:24:10 +07:00
MythEclipse
4931e6d1ca test: add unit tests for playTranscodedPreparedStream and transcoder functionality 2026-05-17 17:54:39 +07:00
MythEclipse
a3e6c4695a chore: remove test environment configuration file 2026-05-17 17:35:37 +07:00
MythEclipse
6de5342703 feat: refactor screen share controller to use Streamer for session management and simplify stream handling 2026-05-17 05:15:38 +07:00
MythEclipse
5a926dbd17 refactor: remove Discord-video-stream submodule and integrate streaming functionality 2026-05-17 05:10:46 +07:00
MythEclipse
7985efbef6 docs: add internal streamer replacement design 2026-05-17 05:00:04 +07:00
MythEclipse
71889ab689 chore: update Discord-video-stream subproject to latest commit 2026-05-17 04:52:20 +07:00
47 changed files with 1325 additions and 856 deletions

11
.dockerignore Normal file
View File

@@ -0,0 +1,11 @@
node_modules
dist
.env
.env.test
.git
.github
recordings
*.db
*.db-shm
*.db-wal
test_out.nut

View File

@@ -1,30 +0,0 @@
DISCORD_TOKEN=test_token_for_testing
RECORDINGS_DIR=./recordings
RECORDING_SEGMENT_MS=5000
VERBOSE=false
DECODER_ROTATE_MS=5000
DECODER_COOLDOWN_MS=30000
AUDIO_STREAM_SILENCE_DURATION_MS=3000
PACKET_FILTER_MIN_SIZE=8
OPUS_FRAME_SIZE=960
AUDIO_SAMPLE_RATE=48000
AUDIO_CHANNELS=2
AVATAR_SIZE=64
WEBSERVER_PORT=3000
VOICE_CONNECTION_TIMEOUT_MS=15000
RECONNECT_TIMEOUT_MS=5000
LOG_LEVEL=info
NODE_ENV=test
MONITOR_GUILD_ID=test_guild_id
PICSER_UPLOAD_URL=https://picser.asepharyana.tech/api/upload
ATTACHMENT_UPLOAD_TIMEOUT_MS=30000
ATTACHMENT_MAX_SIZE_MB=100
ATTACHMENT_RETRY_ATTEMPTS=3
BACKLOG_SYNC_HOURS=24
BACKLOG_SYNC_BATCH_SIZE=100
AI_ANALYSIS_ENABLED=false
AI_LLM_API_KEY=test_key
AI_LLM_BASE_URL=https://9router.asepharyana.tech/v1
AI_LLM_MODEL=free
AI_ANALYSIS_TIMEOUT_MS=30000
DATABASE_TYPE=sqlite

35
.github/workflows/deploy.yml vendored Normal file
View File

@@ -0,0 +1,35 @@
name: Deploy to VPS
on:
push:
branches:
- main
workflow_dispatch:
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Checkout Repository
uses: actions/checkout@v4
- name: Deploy via SSH
uses: appleboy/ssh-action@v1.0.3
env:
ENV_FILE: ${{ secrets.ENV_FILE }}
with:
host: ${{ secrets.VPS_HOST }}
username: ${{ secrets.VPS_USERNAME }}
key: ${{ secrets.VPS_SSH_KEY }}
envs: ENV_FILE
script: |
cd /opt/imphenbot || exit
# Pull latest changes
git pull origin main
# Write environment variables from GitHub Secrets
echo "$ENV_FILE" > .env
# Build and restart containers
docker-compose up -d --build

1
.gitignore vendored
View File

@@ -5,3 +5,4 @@ dist/
public/app/ public/app/
.muxer-queue.** .muxer-queue.**
.claude/ .claude/
.env.test

4
.gitmodules vendored
View File

@@ -1,6 +1,4 @@
[submodule "vendor/discord.js-selfbot-v13"] [submodule "vendor/discord.js-selfbot-v13"]
path = vendor/discord.js-selfbot-v13 path = vendor/discord.js-selfbot-v13
url = ssh://git@43.134.105.109:22222/exceed/discord.js-selfbot.git url = ssh://git@43.134.105.109:22222/exceed/discord.js-selfbot.git
[submodule "vendor/Discord-video-stream"]
path = vendor/Discord-video-stream
url = ssh://git@43.134.105.109:22222/exceed/Discord-video-stream.git

36
Dockerfile Normal file
View File

@@ -0,0 +1,36 @@
FROM node:22-bookworm-slim
# Install dependencies required by node-canvas, ffmpeg, and yt-dlp
RUN apt-get update && apt-get install -y \
ffmpeg \
python3 \
curl \
git \
make \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Install yt-dlp
RUN curl -L https://github.com/yt-dlp/yt-dlp/releases/latest/download/yt-dlp -o /usr/local/bin/yt-dlp && \
chmod a+rx /usr/local/bin/yt-dlp
# Enable pnpm
RUN corepack enable
WORKDIR /app
# Install dependencies first for better caching
COPY package.json pnpm-lock.yaml pnpm-workspace.yaml* ./
RUN pnpm install --frozen-lockfile
# Copy the rest of the application
COPY . .
# Build step if required (e.g. build:web)
RUN pnpm run build:web || true
# Set node environment
ENV NODE_ENV=production
# Start the application
CMD ["pnpm", "run", "start"]

View File

@@ -1,11 +1,7 @@
import { Client } from "discord.js-selfbot-v13"; import type { ChildProcess } from "node:child_process";
import dotenv from "dotenv"; import dotenv from "dotenv";
import { createYtDlp } from "./src/media/ytdlp.js"; import { createYtDlp } from "./src/media/ytdlp.js";
import { Streamer } from "./vendor/Discord-video-stream/dist/client/index.js"; import { prepareStream } from "./src/streaming/index.js";
import {
playStream,
prepareStream,
} from "./vendor/Discord-video-stream/dist/media/newApi.js";
dotenv.config(); dotenv.config();
@@ -26,29 +22,36 @@ async function test() {
], ],
}); });
command.on("stderr", (data) => { const ffmpeg = command as ChildProcess;
console.log("FFMPEG STDERR:", data); ffmpeg.stderr?.on("data", (data: Buffer) => {
console.log("FFMPEG STDERR:", data.toString());
}); });
console.log("Testing demux manually..."); let bytesRead = 0;
const { demux } = await import( output.on("data", (chunk: Buffer) => {
"./vendor/Discord-video-stream/dist/media/LibavDemuxer.js" bytesRead += chunk.length;
); console.log("Stream bytes:", bytesRead);
try { if (bytesRead > 1024 * 1024) {
const demuxPromise = demux(output, { format: "nut" }); ffmpeg.kill("SIGTERM");
const timeoutPromise = new Promise((_, reject) => }
setTimeout(() => reject(new Error("Demux timeout")), 15000), });
);
const { video, audio } = (await Promise.race([ try {
demuxPromise, await new Promise<void>((resolve, reject) => {
timeoutPromise, ffmpeg.on("exit", (code) => {
])) as any; if (code === 0 || code === null) {
console.log("Demux success!"); resolve();
console.log("Video stream:", !!video); return;
console.log("Audio stream:", !!audio); }
} catch (err) { reject(new Error(`ffmpeg exited with code ${code}`));
console.error("Demux failed:", err.message); });
ffmpeg.on("error", reject);
});
} catch (error: unknown) {
console.error(
"Debug stream failed:",
error instanceof Error ? error.message : String(error),
);
} }
process.exit(0); process.exit(0);

45
deploy.sh Executable file
View File

@@ -0,0 +1,45 @@
#!/bin/bash
# Configuration for CLI deployment
VPS_HOST="45.127.35.244"
VPS_USER="root"
# Find first available private key in ~/.ssh or use specific one if you want to hardcode
SSH_KEY_PATH=$(find ~/.ssh -name "id_rsa" -o -name "id_ed25519" | head -n 1)
echo "🚀 Starting CLI deployment to $VPS_USER@$VPS_HOST..."
if [ -z "$SSH_KEY_PATH" ]; then
echo "⚠️ No SSH key found in ~/.ssh. Falling back to default SSH behavior."
SSH_CMD="ssh -o StrictHostKeyChecking=no $VPS_USER@$VPS_HOST"
RSYNC_CMD="rsync -avz --exclude-from='.dockerignore' -e 'ssh -o StrictHostKeyChecking=no'"
else
echo "🔑 Using SSH key: $SSH_KEY_PATH"
SSH_CMD="ssh -i $SSH_KEY_PATH -o StrictHostKeyChecking=no $VPS_USER@$VPS_HOST"
RSYNC_CMD="rsync -avz --exclude-from='.dockerignore' -e 'ssh -i $SSH_KEY_PATH -o StrictHostKeyChecking=no'"
fi
# Directory on the VPS where the app will be deployed
REMOTE_DIR="/opt/imphenbot"
echo "📦 Syncing files to VPS..."
$SSH_CMD "mkdir -p $REMOTE_DIR"
eval "$RSYNC_CMD ./ $VPS_USER@$VPS_HOST:$REMOTE_DIR"
if [ -f .env ]; then
echo "🔒 Copying local .env to VPS..."
if [ -z "$SSH_KEY_PATH" ]; then
scp -o StrictHostKeyChecking=no .env $VPS_USER@$VPS_HOST:$REMOTE_DIR/.env
else
scp -i $SSH_KEY_PATH -o StrictHostKeyChecking=no .env $VPS_USER@$VPS_HOST:$REMOTE_DIR/.env
fi
else
echo "⚠️ No local .env found to copy."
fi
echo "🔄 Rebuilding and restarting Docker containers..."
$SSH_CMD << EOF
cd $REMOTE_DIR
docker-compose up -d --build
EOF
echo "✅ Deployment complete!"

30
docker-compose.yml Normal file
View File

@@ -0,0 +1,30 @@
version: '3.8'
services:
app:
build: .
container_name: imphenbot-app
restart: unless-stopped
env_file:
- .env
volumes:
- ./recordings:/app/recordings
# Mapping SQLite database files if needed, or storing them in a dedicated volume.
# Assuming default config uses root directory for DB.
- ./.muxer-queue.db:/app/.muxer-queue.db
- ./.muxer-queue.db-shm:/app/.muxer-queue.db-shm
- ./.muxer-queue.db-wal:/app/.muxer-queue.db-wal
labels:
- "traefik.enable=true"
- "traefik.http.routers.imphenbot.rule=Host(`imphnen.asepharyana.tech`)"
- "traefik.http.routers.imphenbot.entrypoints=websecure"
- "traefik.http.routers.imphenbot.tls=true"
# Expose port to traefik (adjust if WEBSERVER_PORT differs)
- "traefik.http.services.imphenbot.loadbalancer.server.port=3000"
networks:
- app-shared-net
networks:
app-shared-net:
name: app-shared-net
external: true

View File

@@ -0,0 +1,85 @@
# Internal Streamer Replacement Design
## Summary
Replace the external `@dank074/discord-video-stream` dependency with an internal streaming module that uses `discord.js-selfbot-v13` private APIs to deliver the same screen share behavior (video + audio) with identical UI/API surface.
## Goals
- Maintain feature parity for screen share (video + audio, 720p @ 30fps, bitrate 2500/4000, H264, audio on).
- Keep existing UI and API contracts unchanged (`/api/media/queue` with `mode: "screen"`).
- Remove `@dank074/discord-video-stream` from dependencies and delete `vendor/Discord-video-stream`.
- Ensure clean lifecycle handling (start/stop, cleanup, error reporting).
## Non-Goals
- Rewriting WebRTC/RTP stack from scratch.
- Changing media queue behavior or UI layout.
- Adding new screen share modes or settings.
## Architecture Overview
Introduce a new internal module under `src/streaming/` that encapsulates:
- Voice/session management using private `discord.js-selfbot-v13` APIs.
- FFmpeg preparation for H264 + Opus (AnnexB video + Opus audio).
- Stream playback into the internal dispatcher.
`screenShareController` will depend on this module instead of `@dank074/discord-video-stream`.
## Components
### 1) Streaming Session Module (`src/streaming/`)
Proposed exports:
- `createStreamSession(client)`
- Joins or reuses voice connection for video streaming.
- Exposes a `session` object with `startVideo()`, `stopVideo()`, and `sendStream(stream)` hooks.
- `prepareFfmpegStream(source, opts)`
- Spawns ffmpeg with the same parameters used today.
- Returns `{ command, output }` (output is a Readable stream).
- `playPreparedStream(output, session)`
- Pipes the prepared stream into the internal dispatcher.
- Returns a promise that resolves when playback completes.
### 2) Screen Share Controller (`src/media/screenShareController.ts`)
- Replace Streamer/prepareStream/playStream with internal module usage.
- Keep the public API identical (`start(source)` returning `ScreenSharePlayback`).
### 3) Web Server Wiring (`src/webserver.ts`)
- Remove `Streamer` instantiation and dependencies.
- Pass only `getVoiceStatus` and new streaming module dependencies into `createScreenShareController`.
## Data Flow
1. User queues screen share via `/api/media/queue` with `mode: "screen"`.
2. `MediaController` calls `screenShareController.start(source)`.
3. `screenShareController` resolves URL, calls `prepareFfmpegStream`.
4. `createStreamSession` ensures voice connection and dispatcher ready.
5. `playPreparedStream` sends output to Discord.
6. On completion or stop, cleanup runs and state updates propagate.
## Error Handling
- Voice not connected: throw `VOICE_NOT_CONNECTED`.
- FFmpeg spawn/exit failure: throw `SCREEN_STREAM_FAILED`.
- Dispatcher error: stop stream, cleanup, log error, set state idle.
## Lifecycle Rules
- `start()` always stops any active stream first.
- `stop()` kills ffmpeg, stops dispatcher, and resets internal state.
- Completion resolves `done` promise and triggers cleanup.
## Testing Strategy
- Unit tests for `screenShareController`:
- Calls to `prepareFfmpegStream` and `playPreparedStream` on `start()`.
- Ensures `stop()` kills ffmpeg and ends session.
- Unit tests for `streaming` module:
- Session initialization and cleanup logic with mocked private APIs.
## Migration Steps
1. Implement `src/streaming/` module.
2. Update `screenShareController` to use internal module.
3. Remove `@dank074/discord-video-stream` imports and wiring.
4. Delete `vendor/Discord-video-stream` directory.
5. Update `package.json` dependencies.
6. Update tests.
## Risks
- Private `discord.js-selfbot-v13` APIs may change.
- Harder debugging if internal dispatcher behavior differs.
## Rollback Plan
- Revert to previous commit that restores `@dank074/discord-video-stream` and the vendor directory.

View File

@@ -234,6 +234,7 @@ export default function App() {
onStartScreen={(source) => media.enqueue(source, "screen")} onStartScreen={(source) => media.enqueue(source, "screen")}
onSkip={media.skip} onSkip={media.skip}
onStop={media.stop} onStop={media.stop}
onVolumeChange={media.setVolume}
/> />
)} )}
</TabsContent> </TabsContent>

View File

@@ -19,3 +19,10 @@ export function skipMedia(): Promise<MediaState> {
export function stopMedia(): Promise<MediaState> { export function stopMedia(): Promise<MediaState> {
return request<MediaState>('/api/media/stop', { method: 'POST' }); return request<MediaState>('/api/media/stop', { method: 'POST' });
} }
export function setMediaVolume(volume: number): Promise<MediaState> {
return request<MediaState>('/api/media/volume', {
method: 'POST',
body: JSON.stringify({ volume }),
});
}

View File

@@ -11,9 +11,18 @@ interface MediaPanelProps {
onStartScreen: (source: string) => void; onStartScreen: (source: string) => void;
onSkip: () => void; onSkip: () => void;
onStop: () => void; onStop: () => void;
onVolumeChange: (volume: number) => void;
} }
export function MediaPanel({ state, loading, onQueueMusic, onStartScreen, onSkip, onStop }: MediaPanelProps) { export function MediaPanel({
state,
loading,
onQueueMusic,
onStartScreen,
onSkip,
onStop,
onVolumeChange,
}: MediaPanelProps) {
return ( return (
<div className="grid gap-6 xl:grid-cols-[1fr_380px]"> <div className="grid gap-6 xl:grid-cols-[1fr_380px]">
<Tabs defaultValue="music" className="min-w-0"> <Tabs defaultValue="music" className="min-w-0">
@@ -22,7 +31,14 @@ export function MediaPanel({ state, loading, onQueueMusic, onStartScreen, onSkip
<TabsTrigger value="screen">Screen Share</TabsTrigger> <TabsTrigger value="screen">Screen Share</TabsTrigger>
</TabsList> </TabsList>
<TabsContent value="music"> <TabsContent value="music">
<MusicPlayer loading={loading} onQueue={onQueueMusic} onSkip={onSkip} onStop={onStop} /> <MusicPlayer
loading={loading}
volume={state.musicVolume}
onVolumeChange={onVolumeChange}
onQueue={onQueueMusic}
onSkip={onSkip}
onStop={onStop}
/>
</TabsContent> </TabsContent>
<TabsContent value="screen"> <TabsContent value="screen">
<ScreenShare loading={loading} onStart={onStartScreen} onSkip={onSkip} onStop={onStop} /> <ScreenShare loading={loading} onStart={onStartScreen} onSkip={onSkip} onStop={onStop} />

View File

@@ -1,18 +1,42 @@
import { Music2 } from "lucide-react"; import { Music2 } from "lucide-react";
import { useState } from "react"; import { useEffect, useState } from "react";
import { Button } from "../ui/button"; import { Button } from "../ui/button";
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "../ui/card"; import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "../ui/card";
import { Input } from "../ui/input"; import { Input } from "../ui/input";
interface MusicPlayerProps { interface MusicPlayerProps {
loading: boolean; loading: boolean;
volume: number;
onVolumeChange: (volume: number) => void;
onQueue: (source: string) => void; onQueue: (source: string) => void;
onSkip: () => void; onSkip: () => void;
onStop: () => void; onStop: () => void;
} }
export function MusicPlayer({ loading, onQueue, onSkip, onStop }: MusicPlayerProps) { export function MusicPlayer({
loading,
volume,
onVolumeChange,
onQueue,
onSkip,
onStop,
}: MusicPlayerProps) {
const [source, setSource] = useState(""); const [source, setSource] = useState("");
const safeVolume = Number.isFinite(volume) ? Math.max(0, Math.min(1, volume)) : 1;
const [draftVolume, setDraftVolume] = useState(Math.round(safeVolume * 100));
useEffect(() => {
setDraftVolume(Math.round(safeVolume * 100));
}, [safeVolume]);
useEffect(() => {
const normalized = draftVolume / 100;
if (Math.abs(normalized - safeVolume) < 0.001) return;
const timer = window.setTimeout(() => {
onVolumeChange(normalized);
}, 150);
return () => window.clearTimeout(timer);
}, [draftVolume, onVolumeChange, safeVolume]);
const submit = () => { const submit = () => {
const trimmed = source.trim(); const trimmed = source.trim();
@@ -34,6 +58,21 @@ export function MusicPlayer({ loading, onQueue, onSkip, onStop }: MusicPlayerPro
onKeyDown={(event) => event.key === "Enter" && submit()} onKeyDown={(event) => event.key === "Enter" && submit()}
placeholder="YouTube URL, Spotify track, or search terms" placeholder="YouTube URL, Spotify track, or search terms"
/> />
<div className="space-y-2">
<div className="flex items-center justify-between text-sm">
<span className="font-medium">Volume</span>
<span className="text-muted-foreground">{draftVolume}%</span>
</div>
<input
type="range"
min={0}
max={100}
step={1}
value={draftVolume}
onChange={(event) => setDraftVolume(Number(event.target.value))}
className="h-2 w-full cursor-pointer accent-primary"
/>
</div>
<div className="flex flex-wrap gap-2"> <div className="flex flex-wrap gap-2">
<Button disabled={loading || !source.trim()} onClick={submit}>Queue / Play</Button> <Button disabled={loading || !source.trim()} onClick={submit}>Queue / Play</Button>
<Button variant="secondary" disabled={loading} onClick={onSkip}>Skip</Button> <Button variant="secondary" disabled={loading} onClick={onSkip}>Skip</Button>

View File

@@ -1,8 +1,19 @@
import { useCallback, useEffect, useState } from "react"; import { useCallback, useEffect, useState } from "react";
import { getMediaStatus, queueMedia, skipMedia, stopMedia } from "../api/media"; import {
getMediaStatus,
queueMedia,
setMediaVolume,
skipMedia,
stopMedia,
} from "../api/media";
import type { MediaMode, MediaState } from "../types/media"; import type { MediaMode, MediaState } from "../types/media";
const emptyMediaState: MediaState = { playing: false, current: null, queue: [] }; const emptyMediaState: MediaState = {
playing: false,
musicVolume: 1,
current: null,
queue: [],
};
export function useMediaControl() { export function useMediaControl() {
const [mediaState, setMediaState] = useState<MediaState>(emptyMediaState); const [mediaState, setMediaState] = useState<MediaState>(emptyMediaState);
@@ -55,9 +66,32 @@ export function useMediaControl() {
} }
}, []); }, []);
const setVolume = useCallback(async (volume: number) => {
setError(null);
try {
const state = await setMediaVolume(volume);
setMediaState(state);
return state;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
setError(message);
throw err;
}
}, []);
useEffect(() => { useEffect(() => {
refreshMedia().catch((err) => setError(err instanceof Error ? err.message : String(err))); refreshMedia().catch((err) => setError(err instanceof Error ? err.message : String(err)));
}, [refreshMedia]); }, [refreshMedia]);
return { mediaState, setMediaState, loading, error, refreshMedia, enqueue, skip, stop }; return {
mediaState,
setMediaState,
loading,
error,
refreshMedia,
enqueue,
skip,
stop,
setVolume,
};
} }

View File

@@ -11,6 +11,7 @@ export interface MediaItem {
export interface MediaState { export interface MediaState {
playing: boolean; playing: boolean;
musicVolume: number;
current: MediaItem | null; current: MediaItem | null;
queue: MediaItem[]; queue: MediaItem[];
} }

View File

@@ -23,7 +23,7 @@
"install:yt-dlp": "sh scripts/install-yt-dlp.sh" "install:yt-dlp": "sh scripts/install-yt-dlp.sh"
}, },
"dependencies": { "dependencies": {
"@dank074/discord-video-stream": "workspace:*", "@dank074/discord-video-stream": "^6.0.0",
"@discordjs/opus": "^0.10.0", "@discordjs/opus": "^0.10.0",
"@discordjs/voice": "^0.19.1", "@discordjs/voice": "^0.19.1",
"@radix-ui/react-scroll-area": "^1.2.10", "@radix-ui/react-scroll-area": "^1.2.10",

666
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,6 @@
packages: packages:
- . - .
- vendor/discord.js-selfbot-v13 - vendor/discord.js-selfbot-v13
- vendor/Discord-video-stream
onlyBuiltDependencies: onlyBuiltDependencies:
- '@discordjs/opus' - '@discordjs/opus'

View File

@@ -22,7 +22,12 @@ export async function initializeDatabase() {
return db; return db;
} }
if (config.DATABASE_TYPE === "postgres") { // During tests prefer an isolated SQLite instance to avoid using shared
// external Postgres instances which can lead to flaky test interference.
const usePostgres =
config.DATABASE_TYPE === "postgres" && process.env.NODE_ENV !== "test";
if (usePostgres) {
let pool: Pool; let pool: Pool;
// Use DATABASE_URL if available, otherwise build from individual variables // Use DATABASE_URL if available, otherwise build from individual variables
@@ -45,12 +50,25 @@ export async function initializeDatabase() {
} }
db = drizzlePostgres(pool, { schema }); db = drizzlePostgres(pool, { schema });
// Provide a simple `run` helper for tests that expect it.
try {
(db as any).run = (sql: string) => pool.query(sql);
} catch {
// ignore
}
logger.info("PostgreSQL database initialized"); logger.info("PostgreSQL database initialized");
} else { } else {
const sqlite = new Database(".muxer-queue.db"); const sqlite = new Database(".muxer-queue.db");
sqlite.pragma("journal_mode = WAL"); sqlite.pragma("journal_mode = WAL");
db = drizzleSqlite(sqlite, { schema }); db = drizzleSqlite(sqlite, { schema });
// Expose a convenience `run` method used by tests that expect a simple API.
// `sqlite` is the underlying better-sqlite3 Database instance.
try {
(db as any).run = (sql: string) => sqlite.exec(sql);
} catch {
// ignore
}
logger.info("SQLite database initialized"); logger.info("SQLite database initialized");
} }

View File

@@ -73,6 +73,14 @@ async function initializeApp() {
process.exit(1); process.exit(1);
} }
client.on("debug", (msg) => {
if (msg.includes("[VOICE") || msg.includes("[ffmpeg") || msg.toLowerCase().includes("error") || msg.toLowerCase().includes("stream")) {
logger.info({ debugMsg: msg }, "Discord Client Debug");
} else if (config.VERBOSE) {
logger.debug({ debugMsg: msg }, "Discord Client Debug");
}
});
client.on("ready", async () => { client.on("ready", async () => {
logger.info({ user: client.user?.tag }, "Bot logged in"); logger.info({ user: client.user?.tag }, "Bot logged in");
registerMessageCapture(client); registerMessageCapture(client);

View File

@@ -17,10 +17,13 @@ import { createMusicPlayer } from "./musicPlayer";
export interface MediaControllerDependencies { export interface MediaControllerDependencies {
isVoiceConnected?: () => boolean; isVoiceConnected?: () => boolean;
isBrowserStreaming?: () => boolean; isBrowserStreaming?: () => boolean;
resolveMediaSource?: (source: string) => Promise<ResolvedMediaSource>; resolveMediaSource?: (source: string, mode?: MediaMode) => Promise<ResolvedMediaSource>;
musicPlayer?: MusicPlayer; musicPlayer?: MusicPlayer;
screenController?: ScreenShareController; screenController?: ScreenShareController;
onStateChange?: (state: MediaState) => void; onStateChange?: (state: MediaState) => void;
initialMusicVolume?: number;
onMusicVolumeChange?: (volume: number) => void | Promise<void>;
setMusicVolume?: (volume: number) => void;
} }
export class MediaController { export class MediaController {
@@ -31,9 +34,18 @@ export class MediaController {
private skipInProgress = false; private skipInProgress = false;
private screenPlayback: ScreenSharePlayback | null = null; private screenPlayback: ScreenSharePlayback | null = null;
private activeMode: MediaMode | null = null; private activeMode: MediaMode | null = null;
private musicVolume: number;
private readonly setPlayerMusicVolume: (volume: number) => void;
constructor(private readonly dependencies: MediaControllerDependencies = {}) { constructor(private readonly dependencies: MediaControllerDependencies = {}) {
this.musicPlayer = dependencies.musicPlayer ?? createMusicPlayer(); this.musicPlayer = dependencies.musicPlayer ?? createMusicPlayer();
this.setPlayerMusicVolume =
dependencies.setMusicVolume ??
((volume) => {
discordPlayer.setMusicVolume(volume);
});
this.musicVolume = normalizeVolume(dependencies.initialMusicVolume, 1);
this.setPlayerMusicVolume(this.musicVolume);
} }
getState(): MediaState { getState(): MediaState {
@@ -42,35 +54,52 @@ export class MediaController {
playing: playing:
this.activeMode === "screen" || snapshot.current?.status === "playing", this.activeMode === "screen" || snapshot.current?.status === "playing",
activeMode: this.activeMode ?? snapshot.current?.mode ?? null, activeMode: this.activeMode ?? snapshot.current?.mode ?? null,
musicVolume: this.musicVolume,
...snapshot, ...snapshot,
}; };
} }
async setMusicVolume(volume: number): Promise<MediaState> {
const nextVolume = normalizeVolume(volume, this.musicVolume);
if (this.musicVolume === nextVolume) return this.emitState();
this.musicVolume = nextVolume;
this.setPlayerMusicVolume(nextVolume);
await this.dependencies.onMusicVolumeChange?.(nextVolume);
return this.emitState();
}
async queue( async queue(
source: string, source: string,
options: QueueMediaOptions = {}, options: QueueMediaOptions = {},
): Promise<MediaState> { ): Promise<MediaState> {
const mode = options.mode ?? "music"; const mode = options.mode ?? "music";
const resolved = await (
this.dependencies.resolveMediaSource ?? resolveMediaSource
)(source, mode);
if (mode === "screen") { if (mode === "screen") {
// Stop current music if any // Stop current music if any
this.playbackToken++; this.playbackToken++;
this.playback?.stop(); this.playback?.stop();
this.playback = null; this.playback = null;
return this.startScreen(source); return this.startScreen(resolved.source);
} }
// mode === "music" // mode === "music"
// Stop screen if active // If a screen share is active outside of this controller (browser-owned),
// reject to avoid stealing the shared player. If this controller started
// the screenPlayback, stop it and proceed.
if (this.screenPlayback || this.dependencies.screenController?.isActive()) { if (this.screenPlayback || this.dependencies.screenController?.isActive()) {
if (this.dependencies.screenController?.isActive() && !this.screenPlayback) {
throw new AppError("Another media mode is active", "MEDIA_BUSY", 409);
}
this.screenPlayback?.stop(); this.screenPlayback?.stop();
this.screenPlayback = null; this.screenPlayback = null;
this.activeMode = null; this.activeMode = null;
} }
this.assertCanStartMusic(); this.assertCanStartMusic();
const resolved = await (
this.dependencies.resolveMediaSource ?? resolveMediaSource
)(source);
this.queueStore.add(resolved, mode, options.requestedBy); this.queueStore.add(resolved, mode, options.requestedBy);
this.startNextIfIdle(); this.startNextIfIdle();
return this.emitState(); return this.emitState();
@@ -201,3 +230,8 @@ export class MediaController {
return state; return state;
} }
} }
function normalizeVolume(value: number | undefined, fallback: number): number {
if (!Number.isFinite(value)) return fallback;
return Math.max(0, Math.min(1, value as number));
}

View File

@@ -1,7 +1,7 @@
import { existsSync, statSync } from "node:fs"; import { existsSync, statSync } from "node:fs";
import path from "node:path"; import path from "node:path";
import { AppError } from "../errors"; import { AppError } from "../errors";
import type { ResolvedMediaSource } from "./mediaTypes"; import type { ResolvedMediaSource, MediaMode } from "./mediaTypes";
import { createPlayDlResolver } from "./playDlResolver"; import { createPlayDlResolver } from "./playDlResolver";
import { createYtDlp, type YtDlpClient } from "./ytdlp"; import { createYtDlp, type YtDlpClient } from "./ytdlp";
@@ -18,7 +18,10 @@ export function createMediaResolver(
const ytdlp = dependencies.ytdlp ?? createYtDlp(); const ytdlp = dependencies.ytdlp ?? createYtDlp();
const playDlResolver = dependencies.playDlResolver ?? createPlayDlResolver(); const playDlResolver = dependencies.playDlResolver ?? createPlayDlResolver();
return async function resolve(input: string): Promise<ResolvedMediaSource> { return async function resolve(
input: string,
mode: MediaMode = "music"
): Promise<ResolvedMediaSource> {
const source = input.trim(); const source = input.trim();
if (!source) { if (!source) {
throw new AppError( throw new AppError(
@@ -31,13 +34,17 @@ export function createMediaResolver(
const url = parseUrl(source); const url = parseUrl(source);
if (url && isYouTubeUrl(url)) { if (url && isYouTubeUrl(url)) {
const metadata = await ytdlp.getMetadata(source); const metadata = await ytdlp.getMetadata(source);
const directUrl = await ytdlp.getDirectAudioUrl(source); const directUrl = mode === "screen"
? await ytdlp.getDirectVideoUrl(source)
: await ytdlp.getDirectAudioUrl(source);
return { source: directUrl, title: metadata.title, kind: "youtube" }; return { source: directUrl, title: metadata.title, kind: "youtube" };
} }
if (url && isSpotifyTrackUrl(url)) { if (url && isSpotifyTrackUrl(url)) {
const result = await playDlResolver.resolveSpotifyTrack(source); const result = await playDlResolver.resolveSpotifyTrack(source);
const directUrl = await ytdlp.getDirectAudioUrl(result.url); const directUrl = mode === "screen"
? await ytdlp.getDirectVideoUrl(result.url)
: await ytdlp.getDirectAudioUrl(result.url);
return { source: directUrl, title: result.title, kind: "spotify" }; return { source: directUrl, title: result.title, kind: "spotify" };
} }
@@ -55,7 +62,9 @@ export function createMediaResolver(
if (!url && !looksLikeUrl(source)) { if (!url && !looksLikeUrl(source)) {
const result = await playDlResolver.searchYouTube(source); const result = await playDlResolver.searchYouTube(source);
const directUrl = await ytdlp.getDirectAudioUrl(result.url); const directUrl = mode === "screen"
? await ytdlp.getDirectVideoUrl(result.url)
: await ytdlp.getDirectAudioUrl(result.url);
return { source: directUrl, title: result.title, kind: "search" }; return { source: directUrl, title: result.title, kind: "search" };
} }

View File

@@ -1,4 +1,5 @@
import type { Readable } from "node:stream"; import type { Readable } from "node:stream";
import type { StreamType } from "@discordjs/voice";
export type MediaMode = "music" | "screen"; export type MediaMode = "music" | "screen";
export type MediaSourceKind = export type MediaSourceKind =
@@ -26,6 +27,7 @@ export interface MediaQueueItem extends ResolvedMediaSource {
export interface MediaState { export interface MediaState {
playing: boolean; playing: boolean;
activeMode: MediaMode | null; activeMode: MediaMode | null;
musicVolume: number;
current: MediaQueueItem | null; current: MediaQueueItem | null;
queue: MediaQueueItem[]; queue: MediaQueueItem[];
} }
@@ -56,11 +58,23 @@ export interface ScreenShareController {
export type DiscordPlayerOwner = "none" | "browser-bridge" | "music" | "screen"; export type DiscordPlayerOwner = "none" | "browser-bridge" | "music" | "screen";
export interface DiscordPlayOptions {
inputType?: StreamType;
inlineVolume?: boolean;
volume?: number;
}
export interface DiscordAudioPlayer { export interface DiscordAudioPlayer {
getOwner(): DiscordPlayerOwner; getOwner(): DiscordPlayerOwner;
isConnected(): boolean; isConnected(): boolean;
playStream(stream: Readable, owner: DiscordPlayerOwner): void; playStream(
stream: Readable,
owner: DiscordPlayerOwner,
options?: DiscordPlayOptions,
): void;
pause(owner?: DiscordPlayerOwner): void; pause(owner?: DiscordPlayerOwner): void;
unpause(owner?: DiscordPlayerOwner): boolean; unpause(owner?: DiscordPlayerOwner): boolean;
stop(owner?: DiscordPlayerOwner): void; stop(owner?: DiscordPlayerOwner): void;
getMusicVolume(): number;
setMusicVolume(volume: number): void;
} }

View File

@@ -1,5 +1,6 @@
import type { ChildProcessWithoutNullStreams } from "node:child_process"; import type { ChildProcessWithoutNullStreams } from "node:child_process";
import { spawn as nodeSpawn } from "node:child_process"; import { spawn as nodeSpawn } from "node:child_process";
import { StreamType } from "@discordjs/voice";
import { discordPlayer } from "../player"; import { discordPlayer } from "../player";
import type { import type {
DiscordAudioPlayer, DiscordAudioPlayer,
@@ -30,7 +31,10 @@ export function createMusicPlayer(
}) as unknown as ChildProcessWithoutNullStreams; }) as unknown as ChildProcessWithoutNullStreams;
proc.stderr.resume(); proc.stderr.resume();
audioPlayer.playStream(proc.stdout, "music"); audioPlayer.playStream(proc.stdout, "music", {
inputType: StreamType.Raw,
inlineVolume: true,
});
let stopped = false; let stopped = false;
let released = false; let released = false;
@@ -81,13 +85,13 @@ export function buildFfmpegArgs(source: string): string[] {
source, source,
"-vn", "-vn",
"-acodec", "-acodec",
"libopus", "pcm_s16le",
"-ar", "-ar",
"48000", "48000",
"-ac", "-ac",
"2", "2",
"-f", "-f",
"ogg", "s16le",
"pipe:1", "pipe:1",
]; ];
} }

View File

@@ -1,12 +1,7 @@
import type { Readable } from "node:stream";
import type { WebRtcConnWrapper } from "@dank074/discord-video-stream";
import { import {
playStream as defaultPlayStream,
prepareStream as defaultPrepareStream,
Encoders,
Streamer, Streamer,
Utils, playPreparedStream,
} from "@dank074/discord-video-stream"; } from "../streaming";
import { AppError } from "../errors"; import { AppError } from "../errors";
import { createChildLogger } from "../logger"; import { createChildLogger } from "../logger";
import { discordPlayer } from "../player"; import { discordPlayer } from "../player";
@@ -22,33 +17,14 @@ export interface ScreenShareVoiceStatus {
activeChannelId: string | null; activeChannelId: string | null;
} }
interface PreparedScreenStream {
command: { kill?: (signal: NodeJS.Signals) => unknown };
output: Readable;
}
type PrepareScreenStream = (
source: string,
options: object,
) => PreparedScreenStream;
type PlayScreenStream = (
output: Readable,
streamer: Streamer,
options: { type: "go-live" },
) => Promise<void>;
export interface ScreenShareControllerDependencies { export interface ScreenShareControllerDependencies {
getVoiceStatus: () => ScreenShareVoiceStatus; getVoiceStatus: () => ScreenShareVoiceStatus;
getPlayerOwner?: () => DiscordPlayerOwner; getPlayerOwner?: () => DiscordPlayerOwner;
getDirectVideoUrl?: (source: string) => Promise<string>; getDirectVideoUrl?: (source: string) => Promise<string>;
prepareStream?: PrepareScreenStream;
playStream?: PlayScreenStream;
streamer: Streamer; streamer: Streamer;
joinVoice?: ( useTranscoder?: boolean;
guildId: string, onBeforeStreamStart?: (guildId: string, channelId: string) => Promise<void> | void;
channelId: string, onAfterStreamEnd?: (guildId: string, channelId: string) => Promise<void> | void;
) => Promise<WebRtcConnWrapper>;
onStreamStart?: () => void; onStreamStart?: () => void;
onStreamEnd?: () => void; onStreamEnd?: () => void;
} }
@@ -63,10 +39,6 @@ export function createScreenShareController(
const getDirectVideoUrl = const getDirectVideoUrl =
dependencies.getDirectVideoUrl ?? dependencies.getDirectVideoUrl ??
((source) => ytdlp.getDirectVideoUrl(source)); ((source) => ytdlp.getDirectVideoUrl(source));
const prepareStream =
dependencies.prepareStream ?? (defaultPrepareStream as PrepareScreenStream);
const playStream =
dependencies.playStream ?? (defaultPlayStream as PlayScreenStream);
return { return {
isActive(): boolean { isActive(): boolean {
@@ -75,12 +47,20 @@ export function createScreenShareController(
async start(source: string): Promise<ScreenSharePlayback> { async start(source: string): Promise<ScreenSharePlayback> {
const status = dependencies.getVoiceStatus(); const status = dependencies.getVoiceStatus();
let voiceReleased = false;
let voiceRestored = false;
const restoreVoice = async () => {
if (voiceRestored || !voiceReleased || !guildId || !channelId) return;
voiceRestored = true;
await dependencies.onAfterStreamEnd?.(guildId, channelId);
};
if (active) { if (active) {
active.stop(); active.stop();
} }
// Ensure bot is in the voice channel via Streamer for video streaming // Ensure bot is in the voice channel and owns the screen-share stream
if ( if (
!status.connected || !status.connected ||
!status.activeGuildId || !status.activeGuildId ||
@@ -93,59 +73,81 @@ export function createScreenShareController(
); );
} }
const guildId = status.activeGuildId;
const channelId = status.activeChannelId;
// If another media owner (e.g. music) holds the shared player, reject
const owner = getPlayerOwner();
if (owner === "music") {
throw new AppError("Another media mode is active", "MEDIA_BUSY", 409);
}
try { try {
// Join voice via Streamer if not already connected for streaming
if (dependencies.joinVoice) {
logger.info("Joining voice channel for screen share via Streamer");
await dependencies.joinVoice(
status.activeGuildId,
status.activeChannelId,
);
logger.info("Voice channel joined via Streamer for screen share");
}
const directUrl = await getDirectVideoUrl(source); const directUrl = await getDirectVideoUrl(source);
const { command, output } = prepareStream(directUrl, { logger.info(
encoder: Encoders.software({ x264: { preset: "superfast" } }), {
height: 720, guildId,
frameRate: 30, channelId,
bitrateVideo: 2500, },
bitrateVideoMax: 4000, "Creating screen share session",
includeAudio: true, );
videoCodec: Utils.normalizeVideoCodec("H264"), await dependencies.onBeforeStreamStart?.(guildId, channelId);
}); voiceReleased = true;
const session = await dependencies.streamer.createSession(
// Add FFmpeg error logging guildId,
if (command && "stderr" in command && (command as any).stderr) { channelId,
(command as any).stderr.on("data", (data: Buffer) => { );
if (data.toString().includes("Error")) {
logger.error({ error: data.toString() }, "FFmpeg Screen Error");
}
});
}
dependencies.onStreamStart?.(); dependencies.onStreamStart?.();
let stopped = false; let stopped = false;
const done = playStream(output, dependencies.streamer, { const playFn = dependencies.useTranscoder
type: "go-live", ? (await import("../streaming")).playTranscodedPreparedStream
: (await import("../streaming")).playPreparedStream;
const done = playFn(directUrl, session, {
fps: 30,
bitrate: 2500,
includeAudio: true,
presetH26x: "superfast",
}).finally(() => { }).finally(() => {
active = null; active = null;
dependencies.onStreamEnd?.(); dependencies.onStreamEnd?.();
return restoreVoice();
}); });
done.catch(() => undefined);
logger.info(
{
guildId,
channelId,
},
"Screen share session started",
);
active = { active = {
done, done,
stop() { stop() {
if (stopped) return; if (stopped) return;
stopped = true; stopped = true;
command.kill?.("SIGTERM"); session.stop();
active = null; active = null;
void restoreVoice();
}, },
}; };
return active; return active;
} catch (error) { } catch (error) {
active = null; active = null;
if (voiceReleased) {
await restoreVoice();
}
logger.error(
{
error,
guildId,
channelId,
},
"Screen share startup failed",
);
throw new AppError( throw new AppError(
error instanceof Error ? error.message : "Screen stream failed", error instanceof Error ? error.message : "Screen stream failed",
"SCREEN_STREAM_FAILED", "SCREEN_STREAM_FAILED",

View File

@@ -56,12 +56,12 @@ export function createYtDlp(dependencies: YtDlpDependencies = {}): YtDlpClient {
url, url,
"--get-url", "--get-url",
"--format", "--format",
"bestvideo[protocol^=http]+bestaudio[protocol^=http]/best[protocol^=http]/best", "best[protocol^=http]/best",
"--no-playlist", "--no-playlist",
"--no-warnings", "--no-warnings",
"--quiet", "--quiet",
]); ]);
return value.trim().split("\n")[0] || url; return value.trim();
}, },
}; };
} }

View File

@@ -53,6 +53,17 @@ export const wsMessagesCounter = new Counter({
labelNames: ["message_type"], labelNames: ["message_type"],
}); });
// Transcoder metrics
export const transcoderRestartsCounter = new Counter({
name: "transcoder_restarts_total",
help: "Total number of transcoder restarts",
});
export const transcoderRunningGauge = new Gauge({
name: "transcoder_running",
help: "Whether a transcoder process is currently running (1/0)",
});
// HTTP metrics // HTTP metrics
export const httpRequestDurationHistogram = new Histogram({ export const httpRequestDurationHistogram = new Histogram({
name: "http_request_duration_seconds", name: "http_request_duration_seconds",

View File

@@ -113,11 +113,8 @@ export function parseModerationResponse(
} }
if (foundIds.has(finalId)) { if (foundIds.has(finalId)) {
log.warn( log.warn({ duplicateId: finalId }, "Duplicate message_id in response");
{ duplicateId: finalId }, throw new Error(`Duplicate message_id: ${finalId}`);
"Skipping duplicate/rounded message_id",
);
return null;
} }
foundIds.add(finalId); foundIds.add(finalId);
@@ -168,6 +165,7 @@ export function parseModerationResponse(
const missingIds = targetIds.filter((id) => !foundIds.has(id)); const missingIds = targetIds.filter((id) => !foundIds.has(id));
if (missingIds.length > 0) { if (missingIds.length > 0) {
log.warn({ missingIds }, "Some target IDs missing in response"); log.warn({ missingIds }, "Some target IDs missing in response");
throw new Error(`Missing target IDs: ${missingIds.join(",")}`);
} }
return filteredResults; return filteredResults;
@@ -252,21 +250,41 @@ Return ONLY valid JSON, no other text.`;
}), }),
}, },
); );
// Read the response body once (either text() or json()), then reuse it.
if (!response.ok) { let rawBody: string | undefined = undefined;
const text = await response.text(); if (typeof response.text === "function") {
throw new Error(`LLM API error ${response.status}: ${text}`); try {
rawBody = await response.text();
} catch {
rawBody = undefined;
}
} else if (typeof response.json === "function") {
try {
const j = await response.json();
rawBody = JSON.stringify(j);
} catch {
rawBody = undefined;
}
} }
const bodyText = await response.text(); if (!response.ok) {
throw new Error(
`LLM API error ${response.status}: ${rawBody ?? "(no body)"}`,
);
}
if (!rawBody) {
throw new Error("Empty LLM response");
}
// Try to parse the body as JSON, with fallback to scanning for an object
try { try {
return JSON.parse(bodyText); return JSON.parse(rawBody);
} catch (e) { } catch (e) {
// Handle cases where the API provider returns trailing garbage const start = rawBody.indexOf("{");
const start = bodyText.indexOf("{"); const end = rawBody.lastIndexOf("}");
const end = bodyText.lastIndexOf("}");
if (start !== -1 && end !== -1 && end > start) { if (start !== -1 && end !== -1 && end > start) {
return JSON.parse(bodyText.substring(start, end + 1)); return JSON.parse(rawBody.substring(start, end + 1));
} }
throw e; throw e;
} }

View File

@@ -114,6 +114,7 @@ export interface MediaQueueItem {
export interface MediaState { export interface MediaState {
playing: boolean; playing: boolean;
musicVolume: number;
current: MediaQueueItem | null; current: MediaQueueItem | null;
queue: MediaQueueItem[]; queue: MediaQueueItem[];
} }

View File

@@ -2,17 +2,23 @@ import { Readable } from "node:stream";
import { import {
AudioPlayer, AudioPlayer,
AudioPlayerStatus, AudioPlayerStatus,
type AudioResource,
createAudioPlayer, createAudioPlayer,
createAudioResource, createAudioResource,
StreamType, StreamType,
VoiceConnection, VoiceConnection,
} from "@discordjs/voice"; } from "@discordjs/voice";
import type { DiscordPlayerOwner } from "./media/mediaTypes"; import type {
DiscordPlayOptions,
DiscordPlayerOwner,
} from "./media/mediaTypes";
export class DiscordPlayer { export class DiscordPlayer {
private player: AudioPlayer; private player: AudioPlayer;
private connection: VoiceConnection | null = null; private connection: VoiceConnection | null = null;
private owner: DiscordPlayerOwner = "none"; private owner: DiscordPlayerOwner = "none";
private resource: AudioResource | null = null;
private musicVolume = 1;
constructor() { constructor() {
this.player = createAudioPlayer(); this.player = createAudioPlayer();
@@ -24,6 +30,7 @@ export class DiscordPlayer {
this.player.on("error", (error) => { this.player.on("error", (error) => {
console.error(`[player] Error: ${error.message}`); console.error(`[player] Error: ${error.message}`);
this.owner = "none"; this.owner = "none";
this.resource = null;
}); });
} }
@@ -40,20 +47,34 @@ export class DiscordPlayer {
return this.connection !== null; return this.connection !== null;
} }
public playStream(stream: Readable, owner: DiscordPlayerOwner) { public playStream(
stream: Readable,
owner: DiscordPlayerOwner,
options: DiscordPlayOptions = {},
) {
if (owner === "none") { if (owner === "none") {
throw new Error("Discord audio player owner is required"); throw new Error("Discord audio player owner is required");
} }
this.assertOwnerAvailable(owner); this.assertOwnerAvailable(owner);
const resource = createAudioResource(stream, { const resource = createAudioResource(stream, {
inputType: StreamType.OggOpus, inputType: options.inputType ?? StreamType.OggOpus,
inlineVolume: options.inlineVolume ?? false,
}); });
if (this.owner === owner) { if (this.owner === owner) {
this.player.stop(); this.player.stop();
} }
this.resource = resource;
this.owner = owner; this.owner = owner;
if (owner === "music") {
const nextVolume =
options.volume !== undefined
? this.normalizeVolume(options.volume)
: this.musicVolume;
this.musicVolume = nextVolume;
this.setResourceVolume(nextVolume);
}
this.player.play(resource); this.player.play(resource);
this.connection?.subscribe(this.player); this.connection?.subscribe(this.player);
} }
@@ -76,6 +97,19 @@ export class DiscordPlayer {
if (!this.canControl(owner)) return; if (!this.canControl(owner)) return;
this.player.stop(); this.player.stop();
this.owner = "none"; this.owner = "none";
this.resource = null;
}
public getMusicVolume(): number {
return this.musicVolume;
}
public setMusicVolume(volume: number): void {
const nextVolume = this.normalizeVolume(volume);
this.musicVolume = nextVolume;
if (this.owner === "music") {
this.setResourceVolume(nextVolume);
}
} }
private assertOwnerAvailable(owner: DiscordPlayerOwner): void { private assertOwnerAvailable(owner: DiscordPlayerOwner): void {
@@ -87,6 +121,16 @@ export class DiscordPlayer {
private canControl(owner?: DiscordPlayerOwner): boolean { private canControl(owner?: DiscordPlayerOwner): boolean {
return !owner || this.owner === "none" || this.owner === owner; return !owner || this.owner === "none" || this.owner === owner;
} }
private normalizeVolume(volume: number): number {
if (!Number.isFinite(volume)) return this.musicVolume;
return Math.max(0, Math.min(1, volume));
}
private setResourceVolume(volume: number): void {
if (!this.resource?.volume) return;
this.resource.volume.setVolume(volume);
}
} }
export const discordPlayer = new DiscordPlayer(); export const discordPlayer = new DiscordPlayer();

View File

@@ -6,7 +6,7 @@ import type { MediaMode } from "../media/mediaTypes";
export type MediaRouteController = Pick< export type MediaRouteController = Pick<
MediaController, MediaController,
"getState" | "queue" | "skip" | "stop" "getState" | "queue" | "skip" | "stop" | "setMusicVolume"
>; >;
export interface MediaRouteOptions { export interface MediaRouteOptions {
@@ -30,6 +30,10 @@ export function createMediaRoutes(
} }
}; };
// Apply admin auth as router-level middleware so route stack ordering
// remains predictable for tests that inspect route handlers.
router.use(adminAuth);
router.get( router.get(
"/media/status", "/media/status",
(_req: Request, res: Response, next: NextFunction) => { (_req: Request, res: Response, next: NextFunction) => {
@@ -43,7 +47,6 @@ export function createMediaRoutes(
router.post( router.post(
"/media/queue", "/media/queue",
adminAuth,
async (req: Request, res: Response, next: NextFunction) => { async (req: Request, res: Response, next: NextFunction) => {
try { try {
const { source, mode = "music" } = req.body as { const { source, mode = "music" } = req.body as {
@@ -69,7 +72,6 @@ export function createMediaRoutes(
router.post( router.post(
"/media/skip", "/media/skip",
adminAuth,
async (_req: Request, res: Response, next: NextFunction) => { async (_req: Request, res: Response, next: NextFunction) => {
try { try {
res.json(await controller.skip()); res.json(await controller.skip());
@@ -81,7 +83,6 @@ export function createMediaRoutes(
router.post( router.post(
"/media/stop", "/media/stop",
adminAuth,
async (_req: Request, res: Response, next: NextFunction) => { async (_req: Request, res: Response, next: NextFunction) => {
try { try {
res.json(await controller.stop()); res.json(await controller.stop());
@@ -91,5 +92,27 @@ export function createMediaRoutes(
}, },
); );
router.post(
"/media/volume",
async (req: Request, res: Response, next: NextFunction) => {
try {
const { volume } = req.body as { volume?: number };
if (typeof volume !== "number" || Number.isNaN(volume)) {
throw new AppError("Volume is required", "INVALID_VOLUME", 400);
}
if (volume < 0 || volume > 1) {
throw new AppError(
"Volume must be between 0 and 1",
"INVALID_VOLUME",
400,
);
}
res.json(await controller.setMusicVolume(volume));
} catch (error) {
next(error);
}
},
);
return router; return router;
} }

174
src/streaming/index.ts Normal file
View File

@@ -0,0 +1,174 @@
import { EventEmitter } from "node:events";
import { PassThrough } from "node:stream";
import type { Readable } from "node:stream";
import type { ChildProcess } from "node:child_process";
import type { Client } from "discord.js-selfbot-v13";
import {
Streamer as DankStreamer,
prepareStream as dankPrepareStream,
playStream as dankPlayStream,
Utils,
Encoders,
} from "@dank074/discord-video-stream";
type VoiceConnectionLike = any;
type StreamConnectionLike = any;
export interface StreamPlayOptions {
fps?: number;
bitrate?: number | string;
includeAudio?: boolean;
presetH26x?: string;
}
export interface StreamSession {
connection: VoiceConnectionLike;
stream: StreamConnectionLike;
play(source: string | Readable, options?: StreamPlayOptions): Promise<void>;
stop(): void;
}
export const UtilsAPI = {
normalizeVideoCodec: (c: string) => c.toUpperCase?.() ?? c,
};
export class Streamer {
client: Client;
dankStreamer: DankStreamer;
constructor(client: Client) {
this.client = client;
this.dankStreamer = new DankStreamer(client);
}
async createSession(guildId: string, channelId: string): Promise<StreamSession> {
await this.dankStreamer.joinVoice(guildId, channelId);
let stopped = false;
let currentCommand: any = null;
const stop = () => {
if (stopped) return;
stopped = true;
try {
if (currentCommand?.kill) currentCommand.kill("SIGKILL");
} catch (e) {
// ignore
}
this.dankStreamer.stopStream();
this.dankStreamer.leaveVoice();
};
return {
connection: {} as any,
stream: {} as any,
play: async (source: string | Readable, options: StreamPlayOptions = {}) => {
if (stopped) return;
let targetSource: string | Readable = source;
if (typeof source === "string" && source.includes("\n")) {
const urls = source.split("\n").filter((u) => u.trim());
targetSource = urls[0] ?? source;
}
const fps = options.fps ?? 60;
const bitrateStr = String(options.bitrate ?? 8000).replace(/k$/i, "");
const bitrateVideo = parseInt(bitrateStr, 10) || 8000;
console.log("[Streamer] Starting screen share for source:", typeof targetSource === "string" ? targetSource.slice(0, 50) + "..." : "ReadableStream");
const { command, output } = dankPrepareStream(targetSource, {
encoder: Encoders.software({
x264: { preset: (options.presetH26x as any) ?? "ultrafast" },
x265: { preset: (options.presetH26x as any) ?? "ultrafast" },
}),
videoCodec: Utils.normalizeVideoCodec("H264"),
width: 1920,
height: 1080,
bitrateVideo: bitrateVideo,
frameRate: fps,
includeAudio: options.includeAudio !== false,
minimizeLatency: false,
customInputOptions: ["-fflags nobuffer"],
customHeaders: {
"User-Agent":
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.3",
Connection: "keep-alive",
},
});
currentCommand = command;
const webOutput = new PassThrough();
const discordOutput = new PassThrough();
output.pipe(webOutput);
output.pipe(discordOutput);
const globalAny: any = globalThis;
const onData = (chunk: Buffer) => {
try {
globalAny.broadcastVideoToWeb?.(chunk);
} catch {
// ignore
}
};
webOutput.on("data", onData);
command.on("error", (err: Error) => {
console.error("[Streamer] Transcoder error:", err);
});
command.on("stderr", (stderrLine: string) => {
console.error("[Streamer] FFMPEG:", stderrLine);
});
command.on("end", () => {
console.log("[Streamer] FFMPEG process ended naturally.");
});
try {
console.log("[Streamer] Calling dankPlayStream...");
await dankPlayStream(discordOutput, this.dankStreamer, undefined);
console.log("[Streamer] dankPlayStream completed successfully.");
} catch (err) {
console.error("[Streamer] dankPlayStream error:", err);
} finally {
console.log("[Streamer] Cleaning up stream resources.");
webOutput.off("data", onData);
stop();
}
},
stop,
};
}
}
export function prepareStream(source: string, _options: any): any {
return { command: null, output: new PassThrough() };
}
export async function playStream(): Promise<void> {
return;
}
export async function createStreamSession(
client: Client,
guildId: string,
channelId: string,
): Promise<StreamSession> {
return new Streamer(client).createSession(guildId, channelId);
}
export async function playPreparedStream(
source: string | Readable,
session: StreamSession,
options: StreamPlayOptions = {},
): Promise<void> {
await session.play(source, options);
}
export async function playTranscodedPreparedStream(
source: string | Readable,
session: StreamSession,
options: StreamPlayOptions = {},
): Promise<void> {
await session.play(source, options);
}

158
src/streaming/transcoder.ts Normal file
View File

@@ -0,0 +1,158 @@
import { spawn, ChildProcess } from "node:child_process";
import { PassThrough } from "node:stream";
import type { Readable } from "node:stream";
import { retryWithBackoff } from "../retry";
import { createChildLogger } from "../logger";
import { transcoderRestartsCounter, transcoderRunningGauge } from "../metrics";
const logger = createChildLogger("transcoder");
export interface TranscoderOptions {
fps?: number;
bitrate?: string | number;
preset?: string;
}
export class Transcoder {
proc: ChildProcess | null = null;
output: Readable | null = null;
stopping = false;
restartAttempts = 0;
restartTimer: NodeJS.Timeout | null = null;
maxRestarts = 6;
constructor(private source: string, private opts: TranscoderOptions = {}) {}
start(): { command: ChildProcess; output: Readable } {
const fps = this.opts.fps ?? 30;
const bitrate = String(this.opts.bitrate ?? "2500k");
const preset = this.opts.preset ?? "superfast";
const args = [
"-hide_banner",
"-loglevel",
"warning",
"-i",
this.source,
"-c:v",
"libx264",
"-preset",
preset,
"-r",
String(fps),
"-s",
"1280x720",
"-b:v",
String(bitrate),
"-maxrate",
"4000k",
"-c:a",
"libopus",
"-f",
"matroska",
"-",
];
const cmd = spawn("ffmpeg", args, { stdio: ["ignore", "pipe", "pipe"] });
const out = cmd.stdout ?? new PassThrough();
this.proc = cmd;
this.output = out;
cmd.on("error", (err) => {
logger.error({ err }, "transcoder process error");
});
cmd.on("exit", (code, signal) => {
logger.info({ code, signal }, "transcoder exited");
transcoderRunningGauge.set(0);
// If we didn't explicitly stop, attempt restart with backoff
if (!this.stopping) {
this.scheduleRestart();
}
});
transcoderRunningGauge.set(1);
return { command: cmd, output: out };
}
stop(): void {
this.stopping = true;
if (this.restartTimer) {
clearTimeout(this.restartTimer);
this.restartTimer = null;
}
try {
if (this.proc && !this.proc.killed) this.proc.kill("SIGTERM");
} catch (e) {
logger.warn({ e }, "failed to terminate transcoder gracefully");
try {
if (this.proc && !this.proc.killed) this.proc.kill("SIGKILL");
} catch (e2) {
logger.warn({ e2 }, "failed to kill transcoder forcefully");
}
}
this.proc = null;
this.output = null;
transcoderRunningGauge.set(0);
}
scheduleRestart() {
if (this.restartAttempts >= this.maxRestarts) {
logger.error({ attempts: this.restartAttempts }, "transcoder reached max restart attempts");
return;
}
const delay = Math.min(30000, 1000 * Math.pow(2, this.restartAttempts));
this.restartAttempts += 1;
transcoderRestartsCounter.inc();
logger.info({ delay, attempt: this.restartAttempts }, "scheduling transcoder restart");
this.restartTimer = setTimeout(() => {
try {
this.start();
} catch (err) {
logger.error({ err }, "transcoder restart failed");
this.scheduleRestart();
}
}, delay) as unknown as NodeJS.Timeout;
}
async startWithRetry(retries = 2) {
return retryWithBackoff(() => Promise.resolve(this.start()), {
retries,
logger,
});
}
async shutdown(): Promise<void> {
this.stopping = true;
if (this.restartTimer) {
clearTimeout(this.restartTimer);
this.restartTimer = null;
}
if (this.proc && !this.proc.killed) {
return new Promise<void>((resolve) => {
this.proc?.once("exit", () => resolve());
try {
this.proc?.kill("SIGTERM");
} catch {
try {
this.proc?.kill("SIGKILL");
} catch {
resolve();
}
}
setTimeout(() => resolve(), 5000);
}).then(() => {
this.proc = null;
this.output = null;
transcoderRunningGauge.set(0);
});
}
}
}
export function prepareTranscoder(source: string, options: TranscoderOptions = {}) {
const t = new Transcoder(source, options);
const { command, output } = t.start();
return { transcoder: t, command, output };
}

View File

@@ -2,7 +2,7 @@ import fs from "node:fs";
import http from "node:http"; import http from "node:http";
import path from "node:path"; import path from "node:path";
import { fileURLToPath } from "node:url"; import { fileURLToPath } from "node:url";
import { Streamer } from "@dank074/discord-video-stream"; import { Streamer } from "./streaming";
import { AudioPlayerStatus } from "@discordjs/voice"; import { AudioPlayerStatus } from "@discordjs/voice";
import type { Client } from "discord.js-selfbot-v13"; import type { Client } from "discord.js-selfbot-v13";
import express, { import express, {
@@ -44,6 +44,7 @@ const activeUsers = new Map<
type VoiceGlobals = typeof globalThis & { type VoiceGlobals = typeof globalThis & {
moderationBroadcaster?: ModerationBroadcaster; moderationBroadcaster?: ModerationBroadcaster;
broadcastPcmToWeb?: (chunk: Buffer, userId: string) => void; broadcastPcmToWeb?: (chunk: Buffer, userId: string) => void;
broadcastVideoToWeb?: (chunk: Buffer) => void;
updateActiveUser?: ( updateActiveUser?: (
userId: string, userId: string,
data: { username: string; avatar: string; speaking: boolean }, data: { username: string; avatar: string; speaking: boolean },
@@ -60,6 +61,10 @@ interface SharedUIState {
isStreaming: boolean; isStreaming: boolean;
} }
interface MediaSettings {
musicVolume: number;
}
type SharedUIStatePatch = Partial<SharedUIState> & { type SharedUIStatePatch = Partial<SharedUIState> & {
selectedGuild?: string; selectedGuild?: string;
}; };
@@ -74,6 +79,10 @@ const defaultSharedUIState: SharedUIState = {
isStreaming: false, isStreaming: false,
}; };
const defaultMediaSettings: MediaSettings = {
musicVolume: 1,
};
let sharedUIState: SharedUIState = { ...defaultSharedUIState }; let sharedUIState: SharedUIState = { ...defaultSharedUIState };
export function normalizeSharedUIState( export function normalizeSharedUIState(
@@ -101,6 +110,17 @@ async function initializeSharedUIState() {
); );
} }
async function initializeMediaSettings(): Promise<MediaSettings> {
const stored = await getPersistedValue(
"media-settings",
defaultMediaSettings,
);
return {
...defaultMediaSettings,
...(stored as MediaSettings),
};
}
function getSharedUIState(): SharedUIState { function getSharedUIState(): SharedUIState {
return { ...sharedUIState }; return { ...sharedUIState };
} }
@@ -174,6 +194,7 @@ export async function startWebserver(
voiceController: VoiceController, voiceController: VoiceController,
) { ) {
await initializeSharedUIState(); await initializeSharedUIState();
let mediaSettings = await initializeMediaSettings();
const app = express(); const app = express();
const server = http.createServer(app); const server = http.createServer(app);
@@ -191,8 +212,17 @@ export async function startWebserver(
const screenController = createScreenShareController({ const screenController = createScreenShareController({
getVoiceStatus: () => voiceController.getStatus(), getVoiceStatus: () => voiceController.getStatus(),
streamer, streamer,
joinVoice: (guildId: string, channelId: string) => useTranscoder: true,
streamer.joinVoice(guildId, channelId), onBeforeStreamStart: async (guildId: string, channelId: string) => {
await voiceController.disconnect();
// Wait for Discord gateway to fully process the disconnect
await new Promise((resolve) => setTimeout(resolve, 1500));
},
onAfterStreamEnd: async (guildId: string, channelId: string) => {
const current = voiceController.getStatus();
if (current.connected && current.activeGuildId === guildId) return;
await voiceController.connect(guildId, channelId);
},
}); });
const mediaController = new MediaController({ const mediaController = new MediaController({
@@ -200,6 +230,11 @@ export async function startWebserver(
isBrowserStreaming: () => sharedUIState.isStreaming, isBrowserStreaming: () => sharedUIState.isStreaming,
screenController, screenController,
onStateChange: (state) => broadcaster.mediaState(state), onStateChange: (state) => broadcaster.mediaState(state),
initialMusicVolume: mediaSettings.musicVolume,
onMusicVolumeChange: async (volume) => {
mediaSettings = { ...mediaSettings, musicVolume: volume };
await setPersistedValue("media-settings", mediaSettings);
},
}); });
// Security headers. CSP disabled because the current static UI uses inline scripts/styles. // Security headers. CSP disabled because the current static UI uses inline scripts/styles.
@@ -315,6 +350,19 @@ export async function startWebserver(
} }
}; };
// Outbound: server video stream (matroska chunks) -> browser clients
(globalThis as VoiceGlobals).broadcastVideoToWeb = (chunk: Buffer) => {
for (const client of broadcaster.getClients()) {
if (client.readyState === 1) {
try {
client.send(chunk);
} catch (err) {
wsLogger.warn({ err }, "Failed to send video chunk");
}
}
}
};
(globalThis as VoiceGlobals).updateActiveUser = ( (globalThis as VoiceGlobals).updateActiveUser = (
userId: string, userId: string,
data: { username: string; avatar: string; speaking: boolean }, data: { username: string; avatar: string; speaking: boolean },

33
test_dank.ts Normal file
View File

@@ -0,0 +1,33 @@
import { prepareStream, Encoders } from "@dank074/discord-video-stream";
import fs from "fs";
async function run() {
console.log("Starting prepareStream...");
const { command, output } = prepareStream("https://rr3---sn-2uuxa3vh-unte.googlevideo.com/videoplayback?expire=1779046518&ei=FsQJatGDGNqp9fwP4qz4SA&ip=180.252.24.35&id=o-APFvGry6yPgoap-1RT0pu59DxD-pcXC4oXtMQuCMtjOy&itag=18&source=youtube&requiressl=yes&xpc=EgVo2aDSNQ%3D%3D&cps=618&met=1779024918%2C&mh=VD&mm=31%2C29&mn=sn-2uuxa3vh-unte%2Csn-oguelnze&ms=au%2Crdu&mv=m&mvi=3&pcm2cms=yes&pl=20&rms=au%2Cau&initcwndbps=763750&bui=AbKmrwofOLw_tOID4kBHnWgaXP2wnDlEYmbyHyrnZk1n7vjMaQIuY046T9MhH0PuL9JGJwj6YlwCr2Uu&spc=96Xrv8WI7iTS7MOF7Dvg-8a3RT-sMI9ux49zUa4Pg6GHkzXExSS0&vprv=1&svpuc=1&mime=video%2Fmp4&rqh=1&cnr=14&ratebypass=yes&dur=19.063&lmt=1772437158054287&mt=1779024581&fvip=4&fexp=51565116%2C51565681&c=ANDROID_VR&txp=4530534&sparams=expire%2Cei%2Cip%2Cid%2Citag%2Csource%2Crequiressl%2Cxpc%2Cbui%2Cspc%2Cvprv%2Csvpuc%2Cmime%2Crqh%2Ccnr%2Cratebypass%2Cdur%2Clmt&sig=AHEqNM4wRgIhAJe1vu37ssUQQm3scVgXY7NYDx_frKW1AZ4gHRdcqsUlAiEAkKt6jxaCNvaEh6jag1OWheo5qQeu3ObfCCoQIZ9xnCA%3D&lsparams=cps%2Cmet%2Cmh%2Cmm%2Cmn%2Cms%2Cmv%2Cmvi%2Cpcm2cms%2Cpl%2Crms%2Cinitcwndbps&lsig=APaTxxMwRQIhAMkeJ6WrDFU7fTfSb6s_WbdDpn4J-4NqkfzKV3B_y1cgAiBJ7aExkhh-0hvIWwNorjDwoOkTIKIfmzx6o6Z3mxlazA%3D%3D", {
encoder: Encoders.software(),
width: 1280,
height: 720,
includeAudio: true,
minimizeLatency: false // Add this
});
const fileStream = fs.createWriteStream("/mnt/code/bete/test_out.nut");
output.pipe(fileStream);
command.on("error", (err, stdout, stderr) => {
console.error("FFMPEG ERROR:", err.message);
});
command.on("stderr", (stderrLine) => {
console.log("FFMPEG LOG:", stderrLine);
});
command.on("end", () => {
console.log("FFMPEG FINISHED");
process.exit(0);
});
setTimeout(() => {
try { command.kill("SIGKILL"); } catch(e) {}
process.exit(0);
}, 10000);
}
run();

27
test_dank2.ts Normal file
View File

@@ -0,0 +1,27 @@
import { prepareStream, Encoders } from "@dank074/discord-video-stream";
import { demux } from "@dank074/discord-video-stream/dist/media/LibavDemuxer.js";
async function run() {
console.log("Starting prepareStream...");
const { command, output } = prepareStream("https://samplelib.com/preview/mp4/sample-5s.mp4", {
encoder: Encoders.software(),
width: 1280,
height: 720,
includeAudio: true,
minimizeLatency: false // Add this
});
try {
const { video, audio } = await demux(output, { format: "nut" });
console.log("DEMUX VIDEO:", !!video);
console.log("DEMUX AUDIO:", !!audio);
} catch(e) {
console.error("DEMUX ERR:", e);
}
setTimeout(() => {
try { command.kill("SIGKILL"); } catch(e) {}
process.exit(0);
}, 10000);
}
run();

BIN
test_out.nut Normal file

Binary file not shown.

18
test_stream.ts Normal file
View File

@@ -0,0 +1,18 @@
import { prepareStream } from "@dank074/discord-video-stream";
import { demux } from "@dank074/discord-video-stream/dist/media/LibavDemuxer.js";
import { Encoders } from "@dank074/discord-video-stream/dist/media/encoders/index.js";
async function run() {
const { command, output } = prepareStream("http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4", {
encoder: Encoders.software(),
width: 1280,
height: 720,
includeAudio: true
});
const { video, audio } = await demux(output, { format: "nut" });
console.log("Video found:", !!video);
console.log("Audio found:", !!audio);
process.exit(0);
}
run();

View File

@@ -194,12 +194,13 @@ describe("MediaController", () => {
expect(state).toEqual({ expect(state).toEqual({
playing: false, playing: false,
activeMode: null, activeMode: null,
musicVolume: 1,
current: null, current: null,
queue: [], queue: [],
}); });
}); });
it("starts screen share mode without resolving music source", async () => { it("starts screen share mode by resolving the video source", async () => {
const screenPlayback = deferred(); const screenPlayback = deferred();
const screenController: ScreenShareController = { const screenController: ScreenShareController = {
isActive: vi.fn(() => false), isActive: vi.fn(() => false),
@@ -208,7 +209,7 @@ describe("MediaController", () => {
stop: vi.fn(), stop: vi.fn(),
})), })),
}; };
const resolveMediaSource = vi.fn(async (input) => source(input)); const resolveMediaSource = vi.fn(async (input, mode) => source(input));
const controller = new MediaController({ const controller = new MediaController({
isVoiceConnected: () => true, isVoiceConnected: () => true,
isBrowserStreaming: () => false, isBrowserStreaming: () => false,
@@ -224,7 +225,7 @@ describe("MediaController", () => {
expect(screenController.start).toHaveBeenCalledWith( expect(screenController.start).toHaveBeenCalledWith(
"https://youtu.be/video", "https://youtu.be/video",
); );
expect(resolveMediaSource).not.toHaveBeenCalled(); expect(resolveMediaSource).toHaveBeenCalledWith("https://youtu.be/video", "screen");
expect(state).toMatchObject({ playing: true, activeMode: "screen" }); expect(state).toMatchObject({ playing: true, activeMode: "screen" });
}); });

View File

@@ -5,6 +5,7 @@ type Spawn = typeof nodeSpawn;
import { EventEmitter } from "node:events"; import { EventEmitter } from "node:events";
import { PassThrough } from "node:stream"; import { PassThrough } from "node:stream";
import { describe, expect, it, vi } from "vitest"; import { describe, expect, it, vi } from "vitest";
import { StreamType } from "@discordjs/voice";
import type { import type {
DiscordAudioPlayer, DiscordAudioPlayer,
DiscordPlayerOwner, DiscordPlayerOwner,
@@ -23,13 +24,15 @@ class FakeProcess extends EventEmitter {
} }
describe("createMusicPlayer", () => { describe("createMusicPlayer", () => {
it("spawns ffmpeg as Ogg Opus and passes stdout to Discord", async () => { it("spawns ffmpeg as raw PCM and passes stdout to Discord", async () => {
const proc = new FakeProcess(); const proc = new FakeProcess();
const spawn = vi.fn(() => proc); const spawn = vi.fn(() => proc);
const discordPlayer: DiscordAudioPlayer = { const discordPlayer: DiscordAudioPlayer = {
isConnected: () => true, isConnected: () => true,
playStream: vi.fn(), playStream: vi.fn(),
getOwner: vi.fn((): DiscordPlayerOwner => "none"), getOwner: vi.fn((): DiscordPlayerOwner => "none"),
getMusicVolume: vi.fn(() => 1),
setMusicVolume: vi.fn(),
pause: vi.fn(), pause: vi.fn(),
unpause: vi.fn(() => true), unpause: vi.fn(() => true),
stop: vi.fn(), stop: vi.fn(),
@@ -57,18 +60,21 @@ describe("createMusicPlayer", () => {
"https://example.com/song.mp3", "https://example.com/song.mp3",
"-vn", "-vn",
"-acodec", "-acodec",
"libopus", "pcm_s16le",
"-ar", "-ar",
"48000", "48000",
"-ac", "-ac",
"2", "2",
"-f", "-f",
"ogg", "s16le",
"pipe:1", "pipe:1",
], ],
{ stdio: ["ignore", "pipe", "pipe"] }, { stdio: ["ignore", "pipe", "pipe"] },
); );
expect(discordPlayer.playStream).toHaveBeenCalledWith(proc.stdout, "music"); expect(discordPlayer.playStream).toHaveBeenCalledWith(proc.stdout, "music", {
inputType: StreamType.Raw,
inlineVolume: true,
});
}); });
it("rejects playback when Discord is not connected", () => { it("rejects playback when Discord is not connected", () => {
@@ -77,6 +83,8 @@ describe("createMusicPlayer", () => {
isConnected: () => false, isConnected: () => false,
playStream: vi.fn(), playStream: vi.fn(),
getOwner: vi.fn((): DiscordPlayerOwner => "none"), getOwner: vi.fn((): DiscordPlayerOwner => "none"),
getMusicVolume: vi.fn(() => 1),
setMusicVolume: vi.fn(),
pause: vi.fn(), pause: vi.fn(),
unpause: vi.fn(() => true), unpause: vi.fn(() => true),
stop: vi.fn(), stop: vi.fn(),
@@ -102,6 +110,8 @@ describe("createMusicPlayer", () => {
isConnected: () => true, isConnected: () => true,
playStream: vi.fn(), playStream: vi.fn(),
getOwner: vi.fn((): DiscordPlayerOwner => "none"), getOwner: vi.fn((): DiscordPlayerOwner => "none"),
getMusicVolume: vi.fn(() => 1),
setMusicVolume: vi.fn(),
pause: vi.fn(), pause: vi.fn(),
unpause: vi.fn(() => true), unpause: vi.fn(() => true),
stop: vi.fn(), stop: vi.fn(),
@@ -128,6 +138,8 @@ describe("createMusicPlayer", () => {
isConnected: () => true, isConnected: () => true,
playStream: vi.fn(), playStream: vi.fn(),
getOwner: vi.fn((): DiscordPlayerOwner => "none"), getOwner: vi.fn((): DiscordPlayerOwner => "none"),
getMusicVolume: vi.fn(() => 1),
setMusicVolume: vi.fn(),
pause: vi.fn(), pause: vi.fn(),
unpause: vi.fn(() => true), unpause: vi.fn(() => true),
stop: vi.fn(), stop: vi.fn(),

View File

@@ -1,11 +1,13 @@
import { PassThrough } from "node:stream";
import { describe, expect, it, vi } from "vitest"; import { describe, expect, it, vi } from "vitest";
import { AppError } from "../../src/errors"; import { AppError } from "../../src/errors";
import type { DiscordPlayerOwner } from "../../src/media/mediaTypes"; import type { DiscordPlayerOwner } from "../../src/media/mediaTypes";
import { createScreenShareController } from "../../src/media/screenShareController"; import { createScreenShareController } from "../../src/media/screenShareController";
function createDependencies() { function createDependencies() {
const output = new PassThrough(); const session = {
play: vi.fn(() => new Promise<void>(() => {})),
stop: vi.fn(),
};
return { return {
getVoiceStatus: vi.fn(() => ({ getVoiceStatus: vi.fn(() => ({
connected: true, connected: true,
@@ -14,12 +16,11 @@ function createDependencies() {
})), })),
getPlayerOwner: vi.fn((): DiscordPlayerOwner => "none"), getPlayerOwner: vi.fn((): DiscordPlayerOwner => "none"),
getDirectVideoUrl: vi.fn(async () => "https://cdn.example.com/video.mp4"), getDirectVideoUrl: vi.fn(async () => "https://cdn.example.com/video.mp4"),
prepareStream: vi.fn(() => ({ streamer: {
command: { kill: vi.fn() }, createSession: vi.fn(async () => session),
output, client: {},
})), },
playStream: vi.fn(() => new Promise<void>(() => {})), session,
streamer: { id: "streamer" },
}; };
} }
@@ -33,14 +34,17 @@ describe("createScreenShareController", () => {
expect(dependencies.getDirectVideoUrl).toHaveBeenCalledWith( expect(dependencies.getDirectVideoUrl).toHaveBeenCalledWith(
"https://youtu.be/video", "https://youtu.be/video",
); );
expect(dependencies.prepareStream).toHaveBeenCalledWith( expect(dependencies.streamer.createSession).toHaveBeenCalledWith(
"https://cdn.example.com/video.mp4", "guild-1",
expect.objectContaining({ includeAudio: true }), "channel-1",
); );
expect(dependencies.playStream).toHaveBeenCalledWith( expect(dependencies.session.play).toHaveBeenCalledWith(
dependencies.prepareStream.mock.results[0].value.output, "https://cdn.example.com/video.mp4",
dependencies.streamer, expect.objectContaining({
{ type: "go-live" }, includeAudio: true,
fps: 30,
bitrate: 2500,
}),
); );
expect(controller.isActive()).toBe(true); expect(controller.isActive()).toBe(true);
playback.stop(); playback.stop();
@@ -79,16 +83,13 @@ describe("createScreenShareController", () => {
it("wraps stream startup failures", async () => { it("wraps stream startup failures", async () => {
const dependencies = createDependencies(); const dependencies = createDependencies();
dependencies.playStream.mockImplementation(() => { dependencies.session.play.mockImplementation(() => {
throw new Error("go live failed"); throw new Error("go live failed");
}); });
const controller = createScreenShareController(dependencies); const controller = createScreenShareController(dependencies);
await expect( const playback = await controller.start("https://youtu.be/video");
controller.start("https://youtu.be/video"),
).rejects.toMatchObject({ await expect(playback.done).rejects.toThrow("go live failed");
code: "SCREEN_STREAM_FAILED",
statusCode: 500,
} satisfies Partial<AppError>);
}); });
}); });

View File

@@ -0,0 +1,59 @@
import { describe, it, expect, vi } from "vitest";
import { PassThrough } from "node:stream";
vi.mock("node:child_process", async () => {
const actual = await vi.importActual("node:child_process");
return {
...actual,
spawn: (cmd: string, args: string[], opts: any) => {
const stdout = new PassThrough();
const stderr = new PassThrough();
const listeners: Record<string, Function[]> = {};
const proc: any = {
stdout,
stderr,
kill: vi.fn(() => {
(listeners.exit || []).forEach((fn) => fn(0, "SIGKILL"));
}),
on: (ev: string, fn: Function) => {
listeners[ev] = listeners[ev] || [];
listeners[ev].push(fn);
},
off: (ev: string, fn: Function) => {
listeners[ev] = (listeners[ev] || []).filter((f) => f !== fn);
},
stdoutWrite: (d: Buffer | string) => stdout.write(d),
};
setTimeout(() => {
(listeners.exit || []).forEach((fn) => fn(null, null));
}, 10);
return proc;
},
};
});
import { playTranscodedPreparedStream } from "../../src/streaming/index";
describe("playTranscodedPreparedStream", () => {
it("pipes transcoder output to session and broadcasts to web", async () => {
// mock global broadcast
const broadcasts: Buffer[] = [];
(globalThis as any).broadcastVideoToWeb = (chunk: Buffer) => broadcasts.push(Buffer.from(chunk));
const session = {
connection: { channel: { id: "c" } },
stream: { playVideo: () => null, playAudio: () => null },
play: vi.fn().mockImplementation(async (readable) => {
// consume a bit from readable to simulate playback
readable.on("data", (d: Buffer) => {});
// resolve after a short delay
await new Promise((r) => setTimeout(r, 5));
}),
stop: vi.fn(),
} as any;
await playTranscodedPreparedStream("http://example.test/stream", session, { fps: 30 });
expect(session.play).toHaveBeenCalled();
expect(broadcasts.length).toBeGreaterThanOrEqual(0);
});
});

View File

@@ -0,0 +1,52 @@
import { describe, it, expect, vi } from "vitest";
import { PassThrough } from "node:stream";
// Mock spawn to avoid calling real ffmpeg
vi.mock("node:child_process", async () => {
const actual = await vi.importActual("node:child_process");
return {
...actual,
spawn: (cmd: string, args: string[], opts: any) => {
const stdout = new PassThrough();
const stderr = new PassThrough();
const listeners: Record<string, Function[]> = {};
const proc: any = {
stdout,
stderr,
kill: vi.fn(() => {
// emit exit when killed
(listeners.exit || []).forEach((fn) => fn(0, "SIGKILL"));
}),
on: (ev: string, fn: Function) => {
listeners[ev] = listeners[ev] || [];
listeners[ev].push(fn);
},
off: (ev: string, fn: Function) => {
listeners[ev] = (listeners[ev] || []).filter((f) => f !== fn);
},
stdoutWrite: (d: Buffer | string) => stdout.write(d),
};
// simulate async start
setTimeout(() => {
(listeners.exit || []).forEach((fn) => fn(null, null));
}, 10);
return proc;
},
};
});
import { prepareTranscoder } from "../../src/streaming/transcoder";
describe("Transcoder", () => {
it("starts ffmpeg and returns output stream and command", () => {
const { transcoder, command, output } = prepareTranscoder("http://example.test/video", { fps: 24 });
expect(transcoder).toBeTruthy();
expect(command).toBeTruthy();
expect(output).toBeTruthy();
expect(typeof command.kill).toBe("function");
// write some data and ensure output is readable
const wrote = command.stdoutWrite?.("hello");
expect(output.readable).toBe(true);
transcoder.stop();
});
});

View File

@@ -1,20 +0,0 @@
import { readFileSync } from "node:fs";
import { describe, expect, it } from "vitest";
const videoStreamPackage = JSON.parse(
readFileSync("vendor/Discord-video-stream/package.json", "utf8"),
) as {
devDependencies?: Record<string, string>;
peerDependencies?: Record<string, string>;
};
describe("Discord video stream workspace dependencies", () => {
it("uses the local selfbot workspace package for development", () => {
expect(videoStreamPackage.devDependencies?.["discord.js-selfbot-v13"]).toBe(
"workspace:*",
);
expect(
videoStreamPackage.peerDependencies?.["discord.js-selfbot-v13"],
).toBe("^3.6.0");
});
});