feat: new events (streamUpdate)

This commit is contained in:
Elysia
2024-10-29 15:49:10 +07:00
parent d410202709
commit f13e0e5a5a
6 changed files with 165 additions and 5 deletions

File diff suppressed because one or more lines are too long

View File

@@ -686,6 +686,10 @@ class VoiceConnection extends EventEmitter {
this.streamConnection.disconnect(); this.streamConnection.disconnect();
break; break;
} }
case 'STREAM_UPDATE': {
this.streamConnection.update(data);
break;
}
} }
} }
if (this.streamWatchConnection.has(StreamKey.userId) && this.channel.id == StreamKey.channelId) { if (this.streamWatchConnection.has(StreamKey.userId) && this.channel.id == StreamKey.channelId) {
@@ -703,6 +707,11 @@ class VoiceConnection extends EventEmitter {
} }
case 'STREAM_DELETE': { case 'STREAM_DELETE': {
streamConnection.disconnect(); streamConnection.disconnect();
streamConnection.receiver.packets.destroyAllStream();
break;
}
case 'STREAM_UPDATE': {
streamConnection.update(data);
break; break;
} }
} }
@@ -791,6 +800,10 @@ class VoiceConnection extends EventEmitter {
this.streamConnection.disconnect(); this.streamConnection.disconnect();
break; break;
} }
case 'STREAM_UPDATE': {
this.streamConnection.update(data);
break;
}
} }
} }
if (this.streamWatchConnection.has(StreamKey.userId) && this.channel.id == StreamKey.channelId) { if (this.streamWatchConnection.has(StreamKey.userId) && this.channel.id == StreamKey.channelId) {
@@ -808,6 +821,11 @@ class VoiceConnection extends EventEmitter {
} }
case 'STREAM_DELETE': { case 'STREAM_DELETE': {
streamConnection.disconnect(); streamConnection.disconnect();
streamConnection.receiver.packets.destroyAllStream();
break;
}
case 'STREAM_UPDATE': {
streamConnection.update(data);
break; break;
} }
} }
@@ -844,6 +862,20 @@ class VoiceConnection extends EventEmitter {
} }
}); });
} }
/**
* @event VoiceConnection#streamUpdate
* @description Emitted when the StreamConnection or StreamConnectionReadonly
* state changes, providing the previous and current stream state.
*
* @param {StreamState} oldData - The previous state of the stream.
* @param {StreamState} newData - The current state of the stream.
*
* @typedef {Object} StreamState
* @property {boolean} isPaused - Indicates whether the stream is currently paused.
* @property {string|null} region - The region where the stream is hosted, or null if not specified.
* @property {Snowflake[]} viewerIds - An array of Snowflake IDs representing the viewers connected to the stream.
*/
} }
/** /**
@@ -890,6 +922,18 @@ class StreamConnection extends VoiceConnection {
* @type {boolean} * @type {boolean}
*/ */
this.isPaused = false; this.isPaused = false;
/**
* Viewer IDs
* @type {Snowflake[]}
*/
this.viewerIds = [];
/**
* Voice region name
* @type {string | null}
*/
this.region = null;
} }
createStreamConnection() { createStreamConnection() {
@@ -951,6 +995,19 @@ class StreamConnection extends VoiceConnection {
*/ */
sendScreenshareState(isPaused = false) { sendScreenshareState(isPaused = false) {
if (isPaused == this.isPaused) return; if (isPaused == this.isPaused) return;
this.emit(
'streamUpdate',
{
isPaused: this.isPaused,
region: this.region,
viewerIds: this.viewerIds,
},
{
isPaused,
region: this.region,
viewerIds: this.viewerIds,
},
);
this.isPaused = isPaused; this.isPaused = isPaused;
this.channel.client.ws.broadcast({ this.channel.client.ws.broadcast({
op: Opcodes.STREAM_SET_PAUSED, op: Opcodes.STREAM_SET_PAUSED,
@@ -976,6 +1033,24 @@ class StreamConnection extends VoiceConnection {
}); });
} }
update(data) {
this.emit(
'streamUpdate',
{
isPaused: this.isPaused,
region: this.region,
viewerIds: this.viewerIds.slice(),
},
{
isPaused: data.paused,
region: data.region,
viewerIds: data.viewer_ids,
},
);
this.viewerIds = data.viewer_ids;
this.region = data.region;
}
/** /**
* Current stream key * Current stream key
* @type {string} * @type {string}
@@ -1032,6 +1107,24 @@ class StreamConnectionReadonly extends VoiceConnection {
* @type {string | null} * @type {string | null}
*/ */
this.serverId = null; this.serverId = null;
/**
* Stream state
* @type {boolean}
*/
this.isPaused = false;
/**
* Viewer IDs
* @type {Snowflake[]}
*/
this.viewerIds = [];
/**
* Voice region name
* @type {string | null}
*/
this.region = null;
} }
createStreamConnection() { createStreamConnection() {
@@ -1097,6 +1190,25 @@ class StreamConnectionReadonly extends VoiceConnection {
}); });
} }
update(data) {
this.emit(
'streamUpdate',
{
isPaused: this.isPaused,
region: this.region,
viewerIds: this.viewerIds.slice(),
},
{
isPaused: data.paused,
region: data.region,
viewerIds: data.viewer_ids,
},
);
this.isPaused = data.paused;
this.viewerIds = data.viewer_ids;
this.region = data.region;
}
/** /**
* Current stream key * Current stream key
* @type {string} * @type {string}

View File

@@ -15,9 +15,17 @@ const { StreamOutput } = require('../util/Socket');
* @extends {EventEmitter} * @extends {EventEmitter}
*/ */
class FFmpegHandler extends EventEmitter { class FFmpegHandler extends EventEmitter {
constructor(codec, portUdp, output, isEnableAudio) { constructor(receiver, userId, codec, portUdp, output, isEnableAudio) {
super(); super();
Object.defineProperty(this, 'receiver', { value: receiver });
/**
* The user ID
* @type {Snowflake}
*/
this.userId = userId;
/** /**
* If the audio is enabled * If the audio is enabled
* @type {boolean} * @type {boolean}
@@ -130,9 +138,21 @@ class FFmpegHandler extends EventEmitter {
let process = list.find(o => o.pid === ffmpegPid || o.ppid === ffmpegPid || o.cmd.includes(args)); let process = list.find(o => o.pid === ffmpegPid || o.ppid === ffmpegPid || o.cmd.includes(args));
if (process) { if (process) {
kill(process.pid); kill(process.pid);
this.receiver.videoStreams.delete(this.userId);
this.emit('closed');
} }
}); });
} }
/**
* Emitted when the FFmpegHandler becomes ready to start working.
* @event FFmpegHandler#ready
*/
/**
* Emitted when the FFmpegHandler is closed.
* @event FFmpegHandler#closed
*/
} }
module.exports = FFmpegHandler; module.exports = FFmpegHandler;

View File

@@ -58,7 +58,7 @@ class PacketHandler extends EventEmitter {
makeVideoStream(user, portUdp, codec, output, isEnableAudio = false) { makeVideoStream(user, portUdp, codec, output, isEnableAudio = false) {
if (this.videoStreams.has(user)) return this.videoStreams.get(user); if (this.videoStreams.has(user)) return this.videoStreams.get(user);
const stream = new FFmpegHandler(codec, portUdp, output, isEnableAudio); const stream = new FFmpegHandler(this, user, codec, portUdp, output, isEnableAudio);
stream.on('ready', () => { stream.on('ready', () => {
this.videoStreams.set(user, stream); this.videoStreams.set(user, stream);
}); });
@@ -231,6 +231,18 @@ class PacketHandler extends EventEmitter {
this.videoReceiver(buffer); this.videoReceiver(buffer);
this.audioReceiverForStream(buffer); this.audioReceiverForStream(buffer);
} }
// When udp connection is closed (STREAM_DELETE), destroy all streams (Memory leak)
destroyAllStream() {
for (const stream of this.streams.values()) {
stream.stream.destroy();
}
this.streams.clear();
for (const stream of this.videoStreams.values()) {
stream.destroy();
}
this.videoStreams.clear();
}
} }
module.exports = PacketHandler; module.exports = PacketHandler;

View File

@@ -42,7 +42,7 @@ const payloadTypes = [
payload_type: 103, payload_type: 103,
rtx_payload_type: 104, rtx_payload_type: 104,
encode: false, encode: false,
decode: false, decode: false, // Working but very glitchy
}, },
{ {
name: 'H264', name: 'H264',

18
typings/index.d.ts vendored
View File

@@ -1059,6 +1059,7 @@ export class VoiceConnection extends EventEmitter {
public on(event: 'error' | 'failed' | 'disconnect', listener: (error: Error) => void): this; public on(event: 'error' | 'failed' | 'disconnect', listener: (error: Error) => void): this;
public on(event: 'speaking', listener: (user: User, speaking: Readonly<Speaking>) => void): this; public on(event: 'speaking', listener: (user: User, speaking: Readonly<Speaking>) => void): this;
public on(event: 'warn', listener: (warning: string | Error) => void): this; public on(event: 'warn', listener: (warning: string | Error) => void): this;
public on(event: 'streamUpdate', listener: (oldState: StreamState, newState: StreamState) => void): this;
public on(event: string, listener: (...args: any[]) => void): this; public on(event: string, listener: (...args: any[]) => void): this;
public once(event: 'authenticated' | 'closing' | 'newSession' | 'ready' | 'reconnecting', listener: () => void): this; public once(event: 'authenticated' | 'closing' | 'newSession' | 'ready' | 'reconnecting', listener: () => void): this;
@@ -1066,18 +1067,27 @@ export class VoiceConnection extends EventEmitter {
public once(event: 'error' | 'failed' | 'disconnect', listener: (error: Error) => void): this; public once(event: 'error' | 'failed' | 'disconnect', listener: (error: Error) => void): this;
public once(event: 'speaking', listener: (user: User, speaking: Readonly<Speaking>) => void): this; public once(event: 'speaking', listener: (user: User, speaking: Readonly<Speaking>) => void): this;
public once(event: 'warn', listener: (warning: string | Error) => void): this; public once(event: 'warn', listener: (warning: string | Error) => void): this;
public once(event: 'streamUpdate', listener: (oldState: StreamState, newState: StreamState) => void): this;
public once(event: string, listener: (...args: any[]) => void): this; public once(event: string, listener: (...args: any[]) => void): this;
public createStreamConnection(): Promise<StreamConnection>; public createStreamConnection(): Promise<StreamConnection>;
public joinStreamConnection(user: UserResolvable): Promise<StreamConnectionReadonly>; public joinStreamConnection(user: UserResolvable): Promise<StreamConnectionReadonly>;
} }
export interface StreamState {
isPaused: boolean;
region: string | null;
viewerIds: Snowflake[];
}
export class StreamConnection extends VoiceConnection { export class StreamConnection extends VoiceConnection {
public createStreamConnection(): Promise<this>; public createStreamConnection(): Promise<this>;
public readonly voiceConnection: VoiceConnection; public readonly voiceConnection: VoiceConnection;
public serverId: Snowflake; public serverId: Snowflake;
public isPaused: boolean; public isPaused: boolean;
public region: string | null;
public streamConnection: this; public streamConnection: this;
public viewerIds: Snowflake[];
public sendSignalScreenshare(): void; public sendSignalScreenshare(): void;
public sendScreenshareState(isPause: boolean): void; public sendScreenshareState(isPause: boolean): void;
private sendStopScreenshare(): void; private sendStopScreenshare(): void;
@@ -1088,8 +1098,11 @@ export class StreamConnectionReadonly extends VoiceConnection {
public joinStreamConnection(): Promise<this>; public joinStreamConnection(): Promise<this>;
public readonly voiceConnection: VoiceConnection; public readonly voiceConnection: VoiceConnection;
public serverId: Snowflake; public serverId: Snowflake;
public isPaused: boolean;
public region: string | null;
public userId: Snowflake; public userId: Snowflake;
public streamConnection: null; public streamConnection: null;
public viewerIds: Snowflake[];
public sendSignalScreenshare(): void; public sendSignalScreenshare(): void;
private sendStopScreenshare(): void; private sendStopScreenshare(): void;
public readonly streamKey: string; public readonly streamKey: string;
@@ -1100,7 +1113,7 @@ export class StreamConnectionReadonly extends VoiceConnection {
} }
export class FFmpegHandler extends EventEmitter { export class FFmpegHandler extends EventEmitter {
public codec: VideoCodec | 'H265' | 'VP9' | 'AV1'; public codec: 'H264';
public portUdp: number; public portUdp: number;
public ready: boolean; public ready: boolean;
public stream: ChildProcessWithoutNullStreams; public stream: ChildProcessWithoutNullStreams;
@@ -1108,9 +1121,12 @@ export class FFmpegHandler extends EventEmitter {
public socketAudio: Socket; public socketAudio: Socket;
public output: Writable | string; public output: Writable | string;
public isEnableAudio: boolean; public isEnableAudio: boolean;
public userId: Snowflake;
public sendPayloadToFFmpeg(payload: Buffer, isAudio?: boolean): void; public sendPayloadToFFmpeg(payload: Buffer, isAudio?: boolean): void;
public on(event: 'ready', listener: () => void): this; public on(event: 'ready', listener: () => void): this;
public once(event: 'ready', listener: () => void): this; public once(event: 'ready', listener: () => void): this;
public on(event: 'closed', listener: () => void): this;
public once(event: 'closed', listener: () => void): this;
public destroy(): void; public destroy(): void;
} }