Document a bunch of stuff on the main

Signed-off-by: Aron Heinecke <aron.heinecke@t-online.de>
This commit is contained in:
Aron Heinecke 2021-05-22 19:25:58 +02:00
parent 18a8be0e10
commit 4bb9f7d848
2 changed files with 30 additions and 34 deletions

View file

@ -1,3 +1,5 @@
//! Discord handler
use serde::Deserialize; use serde::Deserialize;
use serenity::prelude::Mentionable; use serenity::prelude::Mentionable;
@ -377,11 +379,6 @@ impl VoiceEventHandler for Receiver {
Ctx::VoicePacket {audio, packet, payload_offset, payload_end_pad} => { Ctx::VoicePacket {audio, packet, payload_offset, payload_end_pad} => {
// An event which fires for every received audio packet, // An event which fires for every received audio packet,
// containing the decoded data. // containing the decoded data.
// let data: &[u8] = &packet.payload.as_slice()[*payload_offset..(packet.payload.len()-payload_end_pad)];
// let packet = OutAudio::new(&AudioData::C2S { id: 0, codec: CodecType::OpusVoice, data });
// if let Err(e) = self.sink.send_timeout(packet, Duration::from_millis(10)).await {
// eprint!("Can't send voice to sender: {}",e);
// }
if let Some(audio) = audio { if let Some(audio) = audio {
{ {
let time = std::time::Instant::now(); let time = std::time::Instant::now();
@ -397,25 +394,6 @@ impl VoiceEventHandler for Receiver {
let _ = lock.insert(packet.ssrc, audio.clone()); let _ = lock.insert(packet.ssrc, audio.clone());
} }
} }
// println!("Audio packet's first 5 samples: {:?}", audio.get(..5.min(audio.len())));
// // println!(
// // "Audio packet sequence {:05} has {:04} bytes (decompressed from {}), SSRC {}",
// // packet.sequence.0,
// // audio.len() * std::mem::size_of::<i16>(),
// // packet.payload.len(),
// // packet.ssrc,
// // );
// let mut values_converted = Vec::with_capacity(2*audio.len());
// for value in audio {
// // maybe "be" ?
// // TODO: we could optimize this, data isn't directly used
// values_converted.extend(&value.to_le_bytes());
// }
// let packet =
// OutAudio::new(&AudioData::C2S { id: 0, codec: CodecType::OpusMusic, data: &values_converted });
// if let Err(e) = self.sink.send_timeout(packet, Duration::from_millis(10)).await {
// eprint!("Can't send voice to sender: {}",e);
// }
} else { } else {
println!("RTP packet, but no audio. Driver may not be configured to decode."); println!("RTP packet, but no audio. Driver may not be configured to decode.");
} }

View file

@ -89,7 +89,9 @@ impl TypeMapKey for ListenerHolder {
type Value = (TsToDiscordPipeline,AudioBufferDiscord); type Value = (TsToDiscordPipeline,AudioBufferDiscord);
} }
const TICK_TIME: u64 = 15; /// teamspeak audio fragment timer
/// We want to run every 20ms, but we only get ~1ms correctness
const TICK_TIME: u64 = 18;
const FRAME_SIZE_MS: usize = 20; const FRAME_SIZE_MS: usize = 20;
const STEREO_20MS: usize = 48000 * 2 * FRAME_SIZE_MS / 1000; const STEREO_20MS: usize = 48000 * 2 * FRAME_SIZE_MS / 1000;
/// The maximum size of an opus frame is 1275 as from RFC6716. /// The maximum size of an opus frame is 1275 as from RFC6716.
@ -97,7 +99,7 @@ const MAX_OPUS_FRAME_SIZE: usize = 1275;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
// init logging stuff used by tsclientlib
let config: Config = toml::from_str(&std::fs::read_to_string(".credentials.toml").unwrap()).unwrap(); let config: Config = toml::from_str(&std::fs::read_to_string(".credentials.toml").unwrap()).unwrap();
let logger = { let logger = {
let decorator = slog_term::TermDecorator::new().build(); let decorator = slog_term::TermDecorator::new().build();
@ -107,7 +109,7 @@ async fn main() -> Result<()> {
Logger::root(drain, o!()) Logger::root(drain, o!())
}; };
// init discord framework
let framework = StandardFramework::new() let framework = StandardFramework::new()
.configure(|c| c .configure(|c| c
.prefix("~")) .prefix("~"))
@ -122,7 +124,7 @@ async fn main() -> Result<()> {
.decode_mode(DecodeMode::Decode) .decode_mode(DecodeMode::Decode)
); );
// init discord client
let mut client = Client::builder(&config.discord_token) let mut client = Client::builder(&config.discord_token)
.event_handler(discord::Handler) .event_handler(discord::Handler)
.framework(framework) .framework(framework)
@ -130,11 +132,14 @@ async fn main() -> Result<()> {
.await .await
.expect("Err creating client"); .expect("Err creating client");
// init teamspeak -> discord pipeline
let ts_voice_logger = logger.new(o!("pipeline" => "voice-ts")); let ts_voice_logger = logger.new(o!("pipeline" => "voice-ts"));
let teamspeak_voice_handler = TsToDiscordPipeline::new(ts_voice_logger); let teamspeak_voice_handler = TsToDiscordPipeline::new(ts_voice_logger);
// init discord -> teamspeak pipeline
let map = HashMap::new(); let map = HashMap::new();
let discord_voice_buffer: AudioBufferDiscord = Arc::new(Mutex::new(map)); let discord_voice_buffer: AudioBufferDiscord = Arc::new(Mutex::new(map));
// stuff discord -> teamspeak pipeline into discord context for retrieval inside the client
{ {
// Open the data lock in write mode, so keys can be inserted to it. // Open the data lock in write mode, so keys can be inserted to it.
let mut data = client.data.write().await; let mut data = client.data.write().await;
@ -145,24 +150,27 @@ async fn main() -> Result<()> {
data.insert::<ListenerHolder>((teamspeak_voice_handler.clone(),discord_voice_buffer.clone())); data.insert::<ListenerHolder>((teamspeak_voice_handler.clone(),discord_voice_buffer.clone()));
} }
// spawn client runner
tokio::spawn(async move { tokio::spawn(async move {
let _ = client.start().await.map_err(|why| println!("Client ended: {:?}", why)); let _ = client.start().await.map_err(|why| println!("Client ended: {:?}", why));
}); });
let con_id = ConnectionId(0); let con_id = ConnectionId(0);
// configure teamspeak client
let con_config = Connection::build(config.teamspeak_server) let con_config = Connection::build(config.teamspeak_server)
.log_commands(config.verbose >= 1) .log_commands(config.verbose >= 1)
.log_packets(config.verbose >= 2) .log_packets(config.verbose >= 2)
.log_udp_packets(config.verbose >= 3); .log_udp_packets(config.verbose >= 3);
// Optionally set the key of this client, otherwise a new key is generated. // teamspeak: Optionally set the key of this client, otherwise a new key is generated.
let id = Identity::new_from_str(&config.teamspeak_identity).expect("Can't load identity!"); let id = Identity::new_from_str(&config.teamspeak_identity).expect("Can't load identity!");
let con_config = con_config.identity(id); let con_config = con_config.identity(id);
// Connect // Connect teamspeak client
let mut con = con_config.connect()?; let mut con = con_config.connect()?;
// todo: something something discord connection events?
let r = con let r = con
.events() .events()
.try_filter(|e| future::ready(matches!(e, StreamItem::BookEvents(_)))) .try_filter(|e| future::ready(matches!(e, StreamItem::BookEvents(_))))
@ -171,16 +179,23 @@ async fn main() -> Result<()> {
if let Some(r) = r { if let Some(r) = r {
r?; r?;
} }
// init discord -> teamspeak opus encoder
let encoder = audiopus::coder::Encoder::new( let encoder = audiopus::coder::Encoder::new(
audiopus::SampleRate::Hz48000, audiopus::SampleRate::Hz48000,
audiopus::Channels::Stereo, audiopus::Channels::Stereo,
audiopus::Application::Voip) audiopus::Application::Voip)
.expect("Can't construct encoder!"); .expect("Can't construct encoder!");
// we have to stuff this inside an arc-mutex to avoid lifetime shenanigans
let encoder = Arc::new(Mutex::new(encoder)); let encoder = Arc::new(Mutex::new(encoder));
// teamspeak playback timer
let mut interval = tokio::time::interval(Duration::from_millis(TICK_TIME)); let mut interval = tokio::time::interval(Duration::from_millis(TICK_TIME));
loop { loop {
// handle teamspeak events
let events = con.events().try_for_each(|e| async { let events = con.events().try_for_each(|e| async {
// handle teamspeak audio packets
if let StreamItem::Audio(packet) = e { if let StreamItem::Audio(packet) = e {
let from = ClientId(match packet.data().data() { let from = ClientId(match packet.data().data() {
AudioData::S2C { from, .. } => *from, AudioData::S2C { from, .. } => *from,
@ -189,17 +204,19 @@ async fn main() -> Result<()> {
}); });
let mut ts_voice: std::sync::MutexGuard<TsAudioHandler> = teamspeak_voice_handler.data.lock().expect("Can't lock ts audio buffer!"); let mut ts_voice: std::sync::MutexGuard<TsAudioHandler> = teamspeak_voice_handler.data.lock().expect("Can't lock ts audio buffer!");
// feed mixer+jitter buffer, consumed by discord
if let Err(e) = ts_voice.handle_packet((con_id, from), packet) { if let Err(e) = ts_voice.handle_packet((con_id, from), packet) {
debug!(logger, "Failed to play TS_Voice packet"; "error" => %e); debug!(logger, "Failed to play TS_Voice packet"; "error" => %e);
} }
} }
Ok(()) Ok(())
}); });
// Wait for ctrl + c // Wait for ctrl + c and run everything else, end on who ever stops first
tokio::select! { tokio::select! {
_send = interval.tick() => { _send = interval.tick() => {
let start = std::time::Instant::now(); let start = std::time::Instant::now();
if let Some(processed) = process_audio(&discord_voice_buffer,&encoder).await { // send audio frame to teamspeak
if let Some(processed) = process_discord_audio(&discord_voice_buffer,&encoder).await {
con.send_audio(processed)?; con.send_audio(processed)?;
let dur = start.elapsed(); let dur = start.elapsed();
if dur >= Duration::from_millis(1) { if dur >= Duration::from_millis(1) {
@ -223,8 +240,9 @@ async fn main() -> Result<()> {
} }
/// Create an audio frame for consumption by teamspeak.
async fn process_audio(voice_buffer: &AudioBufferDiscord, encoder: &Arc<Mutex<Encoder>>) -> Option<OutPacket> { /// Merges all streams and converts them to opus
async fn process_discord_audio(voice_buffer: &AudioBufferDiscord, encoder: &Arc<Mutex<Encoder>>) -> Option<OutPacket> {
let mut buffer_map; let mut buffer_map;
{ {
let mut lock = voice_buffer.lock().await; let mut lock = voice_buffer.lock().await;