feat(Receiver): Add videoData event to allow users to process video data

This commit is contained in:
Elysia
2024-09-25 19:29:29 +07:00
parent 736b3ff2fb
commit 43d2ea1bc4
4 changed files with 87 additions and 142 deletions

View File

@@ -25,7 +25,6 @@ class PacketHandler extends EventEmitter {
super(); super();
this.receiver = receiver; this.receiver = receiver;
this.streams = new Map(); this.streams = new Map();
this.videoStreams = new Map(); // Placeholder
this.speakingTimeouts = new Map(); this.speakingTimeouts = new Map();
} }
@@ -55,7 +54,7 @@ class PacketHandler extends EventEmitter {
return stream; return stream;
} }
parseBuffer(buffer) { parseBuffer(buffer, shouldReturnTuple = false) {
const { secret_key, mode } = this.receiver.connection.authentication; const { secret_key, mode } = this.receiver.connection.authentication;
// Open packet // Open packet
if (!secret_key) return new Error('secret_key cannot be null or undefined'); if (!secret_key) return new Error('secret_key cannot be null or undefined');
@@ -100,10 +99,14 @@ class PacketHandler extends EventEmitter {
break; break;
} }
default: { default: {
throw new RangeError(`Unsupported decryption method: ${mode}`); return new RangeError(`Unsupported decryption method: ${mode}`);
} }
} }
if (shouldReturnTuple) {
return [header, packet];
}
// Strip decrypted RTP Header Extension if present // Strip decrypted RTP Header Extension if present
if (buffer.slice(12, 14).compare(HEADER_EXTENSION_BYTE) === 0) { if (buffer.slice(12, 14).compare(HEADER_EXTENSION_BYTE) === 0) {
const headerExtensionLength = buffer.slice(14).readUInt16BE(); const headerExtensionLength = buffer.slice(14).readUInt16BE();
@@ -177,38 +180,19 @@ class PacketHandler extends EventEmitter {
const userStat = this.connection.ssrcMap.get(ssrc - 1); // Video_ssrc const userStat = this.connection.ssrcMap.get(ssrc - 1); // Video_ssrc
if (!userStat) return; if (!userStat) return;
this.parseBuffer(buffer);
let opusPacket;
const videoStreamInfo = this.videoStreams.get(userStat.userId);
// If the user is in video, we need to check if the packet is just silence // If the user is in video, we need to check if the packet is just silence
if (userStat.hasVideo) { if (userStat.hasVideo) {
opusPacket = this.parseBuffer(buffer); const packet = this.parseBuffer(buffer, true);
if (opusPacket instanceof Error) { if (packet instanceof Error) {
// Only emit an error if we were actively receiving packets from this user
if (videoStreamInfo) {
this.emit('error', opusPacket);
}
return; return;
} }
if (SILENCE_FRAME.equals(opusPacket)) { let [header, videoPacket] = packet;
if (SILENCE_FRAME.equals(videoPacket)) {
// If this is a silence frame, pretend we never received it // If this is a silence frame, pretend we never received it
return; return;
} }
} this.receiver.emit('videoData', ssrc, userStat, header, videoPacket);
if (videoStreamInfo) {
const stream = videoStreamInfo;
if (!opusPacket) {
opusPacket = this.parseBuffer(buffer);
if (opusPacket instanceof Error) {
this.emit('error', opusPacket);
return;
}
}
stream.push(opusPacket);
} }
} }

View File

@@ -53,6 +53,58 @@ class VoiceReceiver extends EventEmitter {
} }
return stream; return stream;
} }
/**
* Emitted whenever there is a video data (Raw)
* @event VoiceReceiver#videoData
* @param {number} ssrc SSRC
* @param {{ userId: Snowflake, hasVideo: boolean }} ssrcData SSRC Data
* @param {Buffer} header The unencrypted RTP header contains 12 bytes, Buffer<0xbe, 0xde> and the extension size
* @param {Buffer} packetDecrypt Decrypted contains the extension, if any, the video packet
* @example
* // Send packet to VLC
* const dgram = require('dgram');
* // Replace these with your actual values
* const PORT = 5004; // The port VLC is listening on
* const HOST = '127.0.0.1'; // Your localhost or the IP address of the machine running VLC
* // Create a UDP socket
* const socket = dgram.createSocket('udp4');
* function sendRTPPacket(payload) {
* const message = Buffer.from(payload);
* socket.send(message, 0, message.length, PORT, HOST, err => {
* if (err) {
* console.error('Error sending packet:', err);
* } else {
* console.log(message);
* }
* });
* }
* const connection = await client.voice.joinChannel(channel, {
* selfMute: true,
* selfDeaf: true,
* selfVideo: false,
* });
* connection.receiver.on('videoData', (ssrc, ssrcData, header, packetDecrypt) => {
* if (ssrcData.hasVideo) {
* header[0] &= 0xef; // Remove the marker bit
* // Strip decrypted RTP Header Extension if present
* if (header.slice(12, 14).compare(Buffer.from([0xbe, 0xde])) === 0) {
* const headerExtensionLength = header.slice(14).readUInt16BE();
* packetDecrypt = packetDecrypt.subarray(4 * headerExtensionLength);
* }
* sendRTPPacket(Buffer.concat([header.slice(0, 12), packetDecrypt]));
* }
* });
* // VLC SDP file (You can have it with FFmpeg)
* // ! Very buggy
* // o=- 0 0 IN IP4 <HOST>
* // s=No Name
* // c=IN IP4 <HOST>
* // t=0 0
* // a=tool:libavformat 61.1.100
* // m=video <PORT> RTP/AVP <RTP Dynamic Payload Type>
* // a=rtpmap:<RTP Dynamic Payload Type> <VP8|VP9|H264|H265>/90000
*/
} }
module.exports = VoiceReceiver; module.exports = VoiceReceiver;

View File

@@ -1,106 +0,0 @@
'use strict';
const { Buffer } = require('buffer');
const { setTimeout } = require('timers');
class Readable extends require('stream').Readable {
_read() {} // eslint-disable-line no-empty-function
}
/**
* Receives video packets from a voice connection.
*/
class IvfJoinner {
constructor(codec = 'VP8') {
this.codec = codec;
this.ivfHeader = this.getHeaderIvf();
this.count = 0;
/**
* Readable stream
* @type {Readable}
*/
this.stream = new Readable();
this._tempBuffer = null;
this._fps = 0;
this.timeConvert = null;
this.lastConvert = null;
this.firstFrame = Buffer.from([0x90, 0x80]);
this._timeoutFps = null;
}
getHeaderIvf() {
const ivfHeader = Buffer.alloc(32);
ivfHeader.write('DKIF'); // Signature
ivfHeader.writeUInt16LE(0, 4); // Version
ivfHeader.writeUInt16LE(32, 6); // Header length
ivfHeader.write(`${this.codec}0`, 8); // Codec FourCC
ivfHeader.writeUInt16LE(0, 12); // Width
ivfHeader.writeUInt16LE(0, 14); // Height
ivfHeader.writeUInt32LE(this._fps, 16); // Frame rate
ivfHeader.writeUInt32LE(1, 20); // Framerate denominator
ivfHeader.writeUInt32LE(this.count + 1, 24); // Frame count
return ivfHeader;
}
getFramedata() {
const frameHeader = Buffer.alloc(12);
frameHeader.writeUInt32LE(this._tempBuffer.length, 0); // Frame size
frameHeader.writeUInt32LE(this.count, 4); // Timestamp
return frameHeader;
}
push(bufferRaw) {
if (!this._timeoutFps) {
this._timeoutFps = setTimeout(() => {
if (this.stream.destroyed) return;
this._fps = Math.round((this.lastConvert - this.timeConvert) / this.count);
// ! Todo: need improved
this._timeoutFps = null;
}, 500).unref();
}
if (!this.timeConvert) {
this.timeConvert = performance.now();
}
// Ex VP8
// <Buffer 90 80 80 00 30 b7 01 9d 01 2a 80 07 38 04 0b c7 08 85 85 88 99 84 88 3f 82 00 06 16 04 f7 06 81 64 9f 6b db 9b 27 38 7b 27 38 7b 27 38 7b 27 38 7b 27 ... 1154 more bytes>
// 90 80: payloadDescriptorBuf (90 80 if first frame | 80 80 else)
// 80 00: pictureIdBuf
// n bytes: chunk raw (Ivf splitter)
const payloadDescriptorBuf = bufferRaw.slice(0, 2);
const data = bufferRaw.slice(4);
const isFirstFrame = Buffer.compare(payloadDescriptorBuf, this.firstFrame) === 0;
if (isFirstFrame && this._tempBuffer) {
this.count++;
this.lastConvert = performance.now();
this.stream.push(Buffer.concat([this.getFramedata(), this._tempBuffer]));
this._tempBuffer = null;
}
if (!this._tempBuffer) {
this._tempBuffer = data;
} else {
this._tempBuffer = Buffer.concat([this._tempBuffer, data]);
}
}
/**
* Force stop stream
* @returns {void}
*/
stop() {
this.stream.push(null);
this.stream.emit('end'); // Force close stream;
this.stream.destroy();
}
/**
* Convert partial file to full file
* @param {Readable} readable File created by stream (Raw)
* @param {Writable} writeable Output (Ivf)
* @returns {void}
*/
createFinalFile(readable, writeable) {
if (this.stream.destroyed) {
writeable.write(this.getHeaderIvf());
readable.pipe(writeable);
}
}
}
module.exports = {
IvfJoinner,
};

33
typings/index.d.ts vendored
View File

@@ -1083,21 +1083,36 @@ export class VoiceReceiver extends EventEmitter {
public createStream(user: UserResolvable, options?: { mode?: 'opus' | 'pcm'; end?: 'silence' | 'manual' }): Readable; public createStream(user: UserResolvable, options?: { mode?: 'opus' | 'pcm'; end?: 'silence' | 'manual' }): Readable;
public on(event: 'debug', listener: (error: Error | string) => void): this; public on(event: 'debug', listener: (error: Error | string) => void): this;
public on(
event: 'videoData',
listener: (
ssrc: number,
ssrcData: {
userId: Snowflake;
hasVideo: boolean;
},
headerRaw: Buffer,
packetDecrypt: Buffer,
) => void,
): this;
public on(event: string, listener: (...args: any[]) => void): this; public on(event: string, listener: (...args: any[]) => void): this;
public once(event: 'debug', listener: (error: Error | string) => void): this; public once(event: 'debug', listener: (error: Error | string) => void): this;
public once(
event: 'videoData',
listener: (
ssrc: number,
ssrcData: {
userId: Snowflake;
hasVideo: boolean;
},
headerRaw: Buffer,
packetDecrypt: Buffer,
) => void,
): this;
public once(event: string, listener: (...args: any[]) => void): this; public once(event: string, listener: (...args: any[]) => void): this;
} }
/*
export class IvfJoinner {
constructor(codec: 'VP8');
public stream: Readable;
public stop(): void;
public createFinalFile(read: Readable, write: Writable): void;
}
*/
export { Collection } from '@discordjs/collection'; export { Collection } from '@discordjs/collection';
export interface CollectorEventTypes<K, V, F extends unknown[] = []> { export interface CollectorEventTypes<K, V, F extends unknown[] = []> {