From 7c77c2cd52437739dfc48c5e3f1fe9be40a6af99 Mon Sep 17 00:00:00 2001 From: Aron Heinecke Date: Thu, 5 Aug 2021 16:49:54 +0200 Subject: [PATCH] Fix decoder issues with RTP extensions Signed-off-by: Aron Heinecke --- src/discord.rs | 47 +++++++++++++++++++++++++++++-------- src/discord_audiohandler.rs | 6 ++--- src/main.rs | 9 +++---- 3 files changed, 45 insertions(+), 17 deletions(-) diff --git a/src/discord.rs b/src/discord.rs index ea167c5..1d2450c 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -1,7 +1,11 @@ //! Discord handler +use std::sync::Arc; + +use audiopus::{Channels, SampleRate}; +use audiopus::coder::Decoder; use serde::Deserialize; -use serenity::prelude::Mentionable; +use serenity::prelude::{Mentionable, Mutex}; use slog::error; // This trait adds the `register_songbird` and `register_songbird_with` methods @@ -24,6 +28,8 @@ use serenity::{ model::{channel::Message, gateway::Ready}, Result as SerenityResult, }; +use songbird::packet::PacketSize; +use songbird::packet::rtp::RtpExtensionPacket; use songbird::{ model::payload::{ClientConnect, ClientDisconnect, Speaking}, CoreEvent, @@ -330,6 +336,7 @@ fn check_msg(result: SerenityResult) { struct Receiver{ sink: crate::AudioBufferDiscord, + decoder: Arc>, } impl Receiver { @@ -338,6 +345,7 @@ impl Receiver { // you can later store them in intervals. Self { sink: voice_receiver, + decoder: Arc::new(Mutex::new(Decoder::new(SampleRate::Hz48000, Channels::Stereo).unwrap())) } } } @@ -383,19 +391,38 @@ impl VoiceEventHandler for Receiver { // get raw opus package, we don't decode here and leave that to the AudioHandler let last_bytes = packet.payload.len() - payload_end_pad; - let opus_slice = &packet.payload[*payload_offset..last_bytes]; + let data = &packet.payload[*payload_offset..last_bytes]; + let start = if packet.extension != 0 { + match RtpExtensionPacket::new(data) { + Some(v) => v.packet_size(), + None => { + eprintln!("Extension packet indicated, but insufficient space."); + return None; + } + } + } else { + 0 + }; + let opus_slice = &data[start..]; let dur; { - let time = std::time::Instant::now(); - let mut lock = self.sink.lock().await; - dur = time.elapsed(); - if let Err(e) = lock.handle_packet(packet.ssrc, packet.sequence.0.0, opus_slice.to_vec()) { - eprintln!("Failed to handle Discord voice packet: {}",e); + let mut lock_decoder = self.decoder.lock().await; + let mut decoded: [i16; 48000 *2 ] = [0; 48000 * 2]; + if let Err(e) = lock_decoder.decode(Some(opus_slice), &mut decoded[..], false) { + eprintln!("Failed to handle Discord voice packet: {:?}",e); + } else { + let time = std::time::Instant::now(); + let mut lock = self.sink.lock().await; + dur = time.elapsed(); + if let Err(e) = lock.handle_packet(packet.ssrc, packet.sequence.0.0, opus_slice.to_vec()) { + eprintln!("Failed to handle Discord voice packet: {}",e); + } + if dur.as_millis() > 1 { + eprintln!("Acquiring lock took {}ms",dur.as_millis()); + } } } - if dur.as_millis() > 1 { - eprintln!("Acquiring lock took {}ms",dur.as_millis()); - } + }, Ctx::RtcpPacket {packet, payload_offset, payload_end_pad} => { // An event which fires for every received rtcp packet, diff --git a/src/discord_audiohandler.rs b/src/discord_audiohandler.rs index f1fcbd1..56f08bc 100644 --- a/src/discord_audiohandler.rs +++ b/src/discord_audiohandler.rs @@ -17,7 +17,7 @@ use std::hash::Hash; use audiopus::coder::Decoder; use audiopus::{packet, Channels, SampleRate}; -use slog::{debug, o, trace, warn, Logger}; +use slog::{Logger, debug, info, o, trace, warn}; use tsclientlib::audio::Error; use tsproto_packets::packets::{AudioData, CodecType, InAudioBuf}; @@ -244,7 +244,7 @@ impl AudioQueue { } fn decode_packet(&mut self, packet: Option<&QueuePacket>, fec: bool) -> Result<()> { - trace!(self.logger, "Decoding packet"; "has_packet" => packet.is_some(), "fec" => fec); + debug!(self.logger, "Decoding packet"; "has_packet" => packet.is_some(), "fec" => fec); let packet_data; let len; if let Some(p) = packet { @@ -347,7 +347,7 @@ impl AudioQueue { cur_id ); // Packet loss - debug!(self.logger, "Audio packet loss"; "need" => cur_id, "have" => packet.id); + info!(self.logger, "Audio packet loss"; "need" => cur_id, "have" => packet.id); if packet.id == self.next_id { // Can use forward-error-correction self.decode_packet(Some(&packet), true)?; diff --git a/src/main.rs b/src/main.rs index 9ea2fab..92fe8fc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -92,7 +92,7 @@ impl TypeMapKey for ListenerHolder { /// teamspeak audio fragment timer /// We want to run every 20ms, but we only get ~1ms correctness -const TICK_TIME: u64 = 18; +const TICK_TIME: u64 = 20; const FRAME_SIZE_MS: usize = 20; const SAMPLE_RATE: usize = 48000; const STEREO_20MS: usize = SAMPLE_RATE * 2 * FRAME_SIZE_MS / 1000; @@ -104,6 +104,7 @@ const I16_CONVERSION_DIVIDER: f32 = 0x8000 as f32; const MAX_OPUS_FRAME_SIZE: usize = 1275; #[tokio::main] async fn main() -> Result<()> { + dbg!(STEREO_20MS); tracing_subscriber::fmt::init(); // init logging stuff used by tsclientlib let config: Config = toml::from_str(&std::fs::read_to_string(".credentials.toml").unwrap()).unwrap(); @@ -127,7 +128,7 @@ async fn main() -> Result<()> { let songbird = Songbird::serenity(); songbird.set_config( DriverConfig::default() - .decode_mode(DecodeMode::Decode) + .decode_mode(DecodeMode::Decrypt) ); // init discord client @@ -255,12 +256,12 @@ async fn process_discord_audio(voice_buffer: &AudioBufferDiscord, encoder: &Arc< // buffer_map = std::mem::replace(&mut *lock, HashMap::new()); // } - let mut data = [0.0; STEREO_20MS]; + let mut data = [0.0; 1920]; { let mut lock = voice_buffer.lock().await; lock.fill_buffer(&mut data); } - let mut encoded = [0; 1024]; + let mut encoded = [0; 1920]; let encoder_c = encoder.clone(); // don't block the async runtime let res = task::spawn_blocking(move || {