Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] RTMP output #675

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions compositor_api/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub use register_input::Mp4Input;
pub use register_input::RtpInput;
pub use register_input::WhipInput;
pub use register_output::Mp4Output;
pub use register_output::RtmpOutput;
pub use register_output::RtpOutput;
pub use register_output::WhipOutput;

Expand Down
74 changes: 74 additions & 0 deletions compositor_api/src/types/from_register_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use compositor_pipeline::pipeline::{
output::{
self,
mp4::{Mp4AudioTrack, Mp4OutputOptions, Mp4VideoTrack},
rtmp::{RtmpAudioTrack, RtmpVideoTrack},
whip::WhipAudioOptions,
},
};
Expand Down Expand Up @@ -262,6 +263,65 @@ impl TryFrom<WhipOutput> for pipeline::RegisterOutputOptions<output::OutputOptio
}
}

impl TryFrom<RtmpOutput> for pipeline::RegisterOutputOptions<output::OutputOptions> {
type Error = TypeError;

fn try_from(value: RtmpOutput) -> Result<Self, Self::Error> {
let RtmpOutput { url, video, audio } = value;
let video_track = video.as_ref().map(|v| match v.encoder {
VideoEncoderOptions::FfmpegH264 { .. } => RtmpVideoTrack {
width: v.resolution.width as u32,
height: v.resolution.height as u32,
},
});
let audio_track = audio.as_ref().map(|a| match &a.encoder {
RtmpAudioEncoderOptions::Aac {
channels,
sample_rate,
} => RtmpAudioTrack {
channels: channels.clone().into(),
sample_rate: sample_rate.unwrap_or(44100),
},
});

let (video_encoder_options, output_video_options) = maybe_video_options(video)?;
let (audio_encoder_options, output_audio_options) = match audio {
Some(OutputRtmpAudioOptions {
mixing_strategy,
send_eos_when,
encoder,
initial,
}) => {
let audio_encoder_options: AudioEncoderOptions = encoder.into();
let output_audio_options = pipeline::OutputAudioOptions {
initial: initial.try_into()?,
end_condition: send_eos_when.unwrap_or_default().try_into()?,
mixing_strategy: mixing_strategy.unwrap_or(MixingStrategy::SumClip).into(),
channels: audio_encoder_options.channels(),
};

(Some(audio_encoder_options), Some(output_audio_options))
}
None => (None, None),
};

let output_options = output::OutputOptions {
output_protocol: output::OutputProtocolOptions::Rtmp(output::rtmp::RtmpSenderOptions {
url,
video: video_track,
audio: audio_track,
}),
video: video_encoder_options,
audio: audio_encoder_options,
};
Ok(Self {
output_options,
video: output_video_options,
audio: output_audio_options,
})
}
}

fn maybe_video_options(
options: Option<OutputVideoOptions>,
) -> Result<
Expand Down Expand Up @@ -308,6 +368,20 @@ impl From<Mp4AudioEncoderOptions> for pipeline::encoder::AudioEncoderOptions {
}
}

impl From<RtmpAudioEncoderOptions> for pipeline::encoder::AudioEncoderOptions {
fn from(value: RtmpAudioEncoderOptions) -> Self {
match value {
RtmpAudioEncoderOptions::Aac {
channels,
sample_rate,
} => AudioEncoderOptions::Aac(AacEncoderOptions {
channels: channels.into(),
sample_rate: sample_rate.unwrap_or(44100),
}),
}
}
}

impl From<RtpAudioEncoderOptions> for pipeline::encoder::AudioEncoderOptions {
fn from(value: RtpAudioEncoderOptions) -> Self {
match value {
Expand Down
33 changes: 33 additions & 0 deletions compositor_api/src/types/register_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ pub struct RtpOutput {
pub audio: Option<OutputRtpAudioOptions>,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct RtmpOutput {
pub url: String,
/// Video stream configuration.
pub video: Option<OutputVideoOptions>,
/// Audio stream configuration.
pub audio: Option<OutputRtmpAudioOptions>,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct Mp4Output {
Expand Down Expand Up @@ -90,6 +100,19 @@ pub struct OutputMp4AudioOptions {
pub initial: Audio,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct OutputRtmpAudioOptions {
/// (**default="sum_clip"**) Specifies how audio should be mixed.
pub mixing_strategy: Option<MixingStrategy>,
/// Condition for termination of output stream based on the input streams states.
pub send_eos_when: Option<OutputEndCondition>,
/// Audio encoder options.
pub encoder: RtmpAudioEncoderOptions,
/// Initial audio mixer configuration for output.
pub initial: Audio,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct OutputWhipAudioOptions {
Expand Down Expand Up @@ -141,6 +164,16 @@ pub enum Mp4AudioEncoderOptions {
},
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(tag = "type", rename_all = "snake_case", deny_unknown_fields)]
pub enum RtmpAudioEncoderOptions {
Aac {
channels: AudioChannels,
/// (**default=`44100`**) Sample rate. Allowed values: [8000, 16000, 24000, 44100, 48000].
sample_rate: Option<u32>,
},
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(tag = "type", rename_all = "snake_case", deny_unknown_fields)]
pub enum WhipAudioEncoderOptions {
Expand Down
21 changes: 21 additions & 0 deletions compositor_pipeline/src/pipeline/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use compositor_render::{
};
use crossbeam_channel::{bounded, Receiver, Sender};
use mp4::{Mp4FileWriter, Mp4OutputOptions};
use rtmp::RtmpSenderOptions;
use tracing::debug;

use crate::{audio_mixer::OutputSamples, error::RegisterOutputError, queue::PipelineEvent};
Expand All @@ -19,6 +20,7 @@ use super::{
use whip::{WhipSender, WhipSenderOptions};

pub mod mp4;
pub mod rtmp;
pub mod rtp;
pub mod whip;

Expand All @@ -33,6 +35,7 @@ pub struct OutputOptions {
#[derive(Debug, Clone)]
pub enum OutputProtocolOptions {
Rtp(RtpSenderOptions),
Rtmp(RtmpSenderOptions),
Mp4(Mp4OutputOptions),
Whip(WhipSenderOptions),
}
Expand Down Expand Up @@ -70,6 +73,10 @@ pub enum Output {
sender: RtpSender,
encoder: Encoder,
},
Rtmp {
sender: rtmp::RmtpSender,
encoder: Encoder,
},
Mp4 {
writer: Mp4FileWriter,
encoder: Encoder,
Expand Down Expand Up @@ -118,6 +125,12 @@ impl OutputOptionsExt<Option<Port>> for OutputOptions {

Ok((Output::Rtp { sender, encoder }, Some(port)))
}
OutputProtocolOptions::Rtmp(rtmp_options) => {
let sender = rtmp::RmtpSender::new(output_id, rtmp_options.clone(), packets)
.map_err(|e| RegisterOutputError::OutputError(output_id.clone(), e))?;

Ok((Output::Rtmp { sender, encoder }, None))
}
OutputProtocolOptions::Mp4(mp4_opt) => {
let writer = Mp4FileWriter::new(output_id.clone(), mp4_opt.clone(), packets, ctx)
.map_err(|e| RegisterOutputError::OutputError(output_id.clone(), e))?;
Expand Down Expand Up @@ -196,6 +209,7 @@ impl Output {
pub fn frame_sender(&self) -> Option<&Sender<PipelineEvent<Frame>>> {
match &self {
Output::Rtp { encoder, .. } => encoder.frame_sender(),
Output::Rtmp { encoder, .. } => encoder.frame_sender(),
Output::Mp4 { encoder, .. } => encoder.frame_sender(),
Output::Whip { encoder, .. } => encoder.frame_sender(),
Output::EncodedData { encoder } => encoder.frame_sender(),
Expand All @@ -206,6 +220,7 @@ impl Output {
pub fn samples_batch_sender(&self) -> Option<&Sender<PipelineEvent<OutputSamples>>> {
match &self {
Output::Rtp { encoder, .. } => encoder.samples_batch_sender(),
Output::Rtmp { encoder, .. } => encoder.samples_batch_sender(),
Output::Mp4 { encoder, .. } => encoder.samples_batch_sender(),
Output::Whip { encoder, .. } => encoder.samples_batch_sender(),
Output::EncodedData { encoder } => encoder.samples_batch_sender(),
Expand All @@ -216,6 +231,7 @@ impl Output {
pub fn resolution(&self) -> Option<Resolution> {
match &self {
Output::Rtp { encoder, .. } => encoder.video.as_ref().map(|v| v.resolution()),
Output::Rtmp { encoder, .. } => encoder.video.as_ref().map(|v| v.resolution()),
Output::Mp4 { encoder, .. } => encoder.video.as_ref().map(|v| v.resolution()),
Output::Whip { encoder, .. } => encoder.video.as_ref().map(|v| v.resolution()),
Output::EncodedData { encoder } => encoder.video.as_ref().map(|v| v.resolution()),
Expand All @@ -226,6 +242,7 @@ impl Output {
pub fn request_keyframe(&self, output_id: OutputId) -> Result<(), RequestKeyframeError> {
let encoder = match &self {
Output::Rtp { encoder, .. } => encoder,
Output::Rtmp { encoder, .. } => encoder,
Output::Mp4 { encoder, .. } => encoder,
Output::Whip { encoder, .. } => encoder,
Output::EncodedData { encoder } => encoder,
Expand All @@ -252,6 +269,10 @@ impl Output {
.video
.as_ref()
.map(|_| OutputFrameFormat::PlanarYuv420Bytes),
Output::Rtmp { encoder, .. } => encoder
.video
.as_ref()
.map(|_| OutputFrameFormat::PlanarYuv420Bytes),
Output::EncodedData { encoder } => encoder
.video
.as_ref()
Expand Down
15 changes: 1 addition & 14 deletions compositor_pipeline/src/pipeline/output/mp4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,6 @@ pub struct Mp4AudioTrack {
pub sample_rate: u32,
}

pub enum Mp4OutputVideoTrack {
H264 { width: u32, height: u32 },
}

pub struct Mp4WriterOptions {
pub output_path: PathBuf,
pub video: Option<Mp4OutputVideoTrack>,
}

pub struct Mp4FileWriter;

impl Mp4FileWriter {
Expand Down Expand Up @@ -118,18 +109,14 @@ fn init_ffmpeg_output(
.map(|v| {
const VIDEO_TIME_BASE: i32 = 90000;

let codec = match v.codec {
VideoCodec::H264 => ffmpeg::codec::Id::H264,
};

let mut stream = output_ctx
.add_stream(ffmpeg::codec::Id::H264)
.map_err(OutputInitError::FfmpegMp4Error)?;

stream.set_time_base(ffmpeg::Rational::new(1, VIDEO_TIME_BASE));

let codecpar = unsafe { &mut *(*stream.as_mut_ptr()).codecpar };
codecpar.codec_id = codec.into();
codecpar.codec_id = ffmpeg::codec::Id::H264.into();
codecpar.codec_type = ffmpeg::ffi::AVMediaType::AVMEDIA_TYPE_VIDEO;
codecpar.width = v.width as i32;
codecpar.height = v.height as i32;
Expand Down
Loading
Loading