Skip to content

Commit

Permalink
Add AbortSignal to prepareStream and playStream (#153)
Browse files Browse the repository at this point in the history
  • Loading branch information
longnguyen2004 authored Feb 20, 2025
1 parent 4dc6cdc commit 6434c25
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 16 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
"@libav.js/variant-webcodecs": "^6.4.7",
"debug-level": "^3.2.1",
"fluent-ffmpeg": "^2.1.3",
"p-cancelable": "^4.0.1",
"p-debounce": "^4.0.0",
"sodium-plus": "^0.9.0",
"uid": "^2.0.2",
Expand Down
9 changes: 0 additions & 9 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 41 additions & 6 deletions src/media/newApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ export type EncoderOptions = {

export function prepareStream(
input: string | Readable,
options: Partial<EncoderOptions> = {}
options: Partial<EncoderOptions> = {},
cancelSignal?: AbortSignal
) {
cancelSignal?.throwIfAborted();
const defaultOptions = {
noTranscoding: false,
// negative values = resize by aspect ratio, see https://trac.ffmpeg.org/wiki/Scaling
Expand Down Expand Up @@ -281,8 +283,26 @@ export function prepareStream(
.audioCodec("libopus")
.audioBitrate(`${bitrateAudio}k`);

// exit handling
const promise = new Promise<void>((resolve, reject) => {
command.on("error", (err) => {
if (cancelSignal?.aborted)
/**
* fluent-ffmpeg might throw an error when SIGTERM is sent to
* the process, so we check if the abort signal is triggered
* and throw that instead
*/
reject(cancelSignal.reason);
else
reject(err);
});
command.on("end", () => resolve());
})
promise.catch(() => {});
cancelSignal?.addEventListener("abort", () => command.kill("SIGTERM"), { once: true });
command.run();
return { command, output }

return { command, output, promise }
}

export type PlayStreamOptions = {
Expand Down Expand Up @@ -321,12 +341,18 @@ export type PlayStreamOptions = {
}

export async function playStream(
input: Readable, streamer: Streamer, options: Partial<PlayStreamOptions> = {})
input: Readable, streamer: Streamer,
options: Partial<PlayStreamOptions> = {},
cancelSignal?: AbortSignal
)
{
cancelSignal?.throwIfAborted();
if (!streamer.voiceConnection)
throw new Error("Bot is not connected to a voice channel");

const { video, audio } = await demux(input);
cancelSignal?.throwIfAborted();

if (!video)
throw new Error("No video stream in media");

Expand Down Expand Up @@ -421,12 +447,21 @@ export async function playStream(
vStream.on("pts", stopBurst);
}
}
return new Promise<void>((resolve) => {
vStream.once("finish", () => {
return new Promise<void>((resolve, reject) => {
const cleanup = () => {
stopStream();
udp.mediaConnection.setSpeaking(false);
udp.mediaConnection.setVideoAttributes(false);
}
cancelSignal?.addEventListener("abort", () => {
cleanup();
reject(cancelSignal.reason);
}, { once: true })
vStream.once("finish", () => {
if (cancelSignal?.aborted)
return;
cleanup();
resolve();
});
});
}).catch(() => {});
}

0 comments on commit 6434c25

Please sign in to comment.