From 578f7d0d42ffca7b0018f63fa1b17b1c3072044c Mon Sep 17 00:00:00 2001 From: Marshall Quander Date: Wed, 1 Nov 2017 19:11:20 -0700 Subject: [PATCH] Clean up, forward FIRs --- src/lib.rs | 117 ++++++++++++++++++++---------------------------- src/sessions.rs | 12 ++++- 2 files changed, 59 insertions(+), 70 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index cfdf507..9cbd6c5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,6 +33,7 @@ use std::os::raw::{c_char, c_int}; use std::ptr; use std::slice; use std::sync::{mpsc, Arc, RwLock, Weak}; +use std::sync::atomic::Ordering; use std::thread; use switchboard::Switchboard; @@ -140,36 +141,6 @@ fn send_notification(myself: &SessionState, json: JsonValue) -> Result<(), Box(from: *mut PluginSession, send: T) -> Result<(), Box> where T: Fn(&Session) { - let sess = Session::from_ptr(from)?; - if let Some(state) = sess.get() { - janus::log(LogLevel::Dbg, &format!("Data packet received in room {:?} from {:?}.", state.room_id, state.user_id)); - for other in STATE.sessions.read()?.iter() { - if let Some(other_state) = other.get() { - if other_state.room_id == state.room_id && other_state.user_id != state.user_id { - send(&other) - } - } - } - } - Ok(()) -} - -fn get_subscribers(from: *mut PluginSession, kind: Option) -> Result>, Box> { - let sess = Session::from_ptr(from)?; - let switchboard = STATE.switchboard.read()?; - Ok(switchboard.subscribers_to(&sess, kind)) -} - -fn feedback(from: *mut PluginSession, kind: Option, send: T) -> Result<(), Box> where T: Fn(&Session) { - let sess = Session::from_ptr(from)?; - let switchboard = STATE.switchboard.read()?; - for subscription in switchboard.subscribers_to(&sess, kind) { - send(&subscription); - } - Ok(()) -} - extern "C" fn init(callbacks: *mut PluginCallbacks, _config_path: *const c_char) -> c_int { match unsafe { callbacks.as_ref() } { Some(c) => { @@ -256,54 +227,67 @@ extern "C" fn setup_media(_handle: *mut PluginSession) { } extern "C" fn incoming_rtp(handle: *mut PluginSession, video: c_int, buf: *mut c_char, len: c_int) { + let sess = Session::from_ptr(handle).expect("Session can't be null!"); + let switchboard = STATE.switchboard.read().expect("Switchboard lock poisoned; can't continue."); let content_kind = if video == 1 { ContentKind::VIDEO } else { ContentKind::AUDIO }; - let subscribers = get_subscribers(handle, Some(content_kind)); - match subscribers { - Err(e) => janus::log(LogLevel::Huge, &format!("Discarding RTP packet: {}", e)), - Ok(others) => { - janus::log(LogLevel::Huge, &format!("Broadcasting RTP packet to subscribers: {:?}", others)); - let relay_rtp = gateway_callbacks().relay_rtp; - for other in others { - relay_rtp(other.as_ptr(), video, buf, len); - } - } + let subscribers = switchboard.subscribers_to(&sess, Some(content_kind)); + let relay_rtp = gateway_callbacks().relay_rtp; + janus::log(LogLevel::Huge, &format!("RTP packet received over {:?}.", sess)); + for other in subscribers { + relay_rtp(other.as_ptr(), video, buf, len); } } extern "C" fn incoming_rtcp(handle: *mut PluginSession, video: c_int, buf: *mut c_char, len: c_int) { let sess = Session::from_ptr(handle).expect("Session can't be null!"); + let switchboard = STATE.switchboard.read().expect("Subscriptions lock poisoned; can't continue."); let content_kind = if video == 1 { ContentKind::VIDEO } else { ContentKind::AUDIO }; let relay_rtcp = gateway_callbacks().relay_rtcp; - if content_kind == ContentKind::AUDIO { - let subscriptions = STATE.switchboard.read().expect("Subscriptions lock poisoned; can't continue."); - let subscribers = subscriptions.subscribers_to(&sess, Some(content_kind)); - for subscriber in subscribers { - relay_rtcp(subscriber.as_ptr(), video, buf, len); - } - } else if content_kind == ContentKind::VIDEO { - let subscriptions = STATE.switchboard.read().expect("Subscriptions lock poisoned; can't continue."); - let publishers = subscriptions.publishers_to(&sess, Some(content_kind)); - let packet = unsafe { slice::from_raw_parts(buf, len as usize) }; - if janus::rtcp::has_pli(packet) { - let mut pli = janus::rtcp::gen_pli(); - for publisher in publishers { - janus::log(LogLevel::Info, &format!("Relaying. ")); - relay_rtcp(publisher.as_ptr(), video, pli.as_mut_ptr(), pli.len() as i32); + if let Some(state) = sess.get() { + janus::log(LogLevel::Dbg, &format!("RTCP packet received in {:?} from {:?} over {:?}.", state.room_id, state.user_id, sess)); + if content_kind == ContentKind::AUDIO { + let subscribers = switchboard.subscribers_to(&sess, Some(content_kind)); + for subscriber in subscribers { + relay_rtcp(subscriber.as_ptr(), video, buf, len); } - if let Err(e) = feedback(handle, Some(content_kind), |other| { - // relay_rtcp(other.handle, video, pli.as_ptr() as *mut _, pli.len() as i32) - }) { - janus::log(LogLevel::Huge, &format!("Discarding RTCP packet: {}", e)) - + } else if content_kind == ContentKind::VIDEO { + let publishers = switchboard.publishers_to(&sess, Some(content_kind)); + let packet = unsafe { slice::from_raw_parts(buf, len as usize) }; + if janus::rtcp::has_pli(packet) { + let mut pli = janus::rtcp::gen_pli(); + for publisher in publishers { + janus::log(LogLevel::Info, &format!("Relaying PLI.")); + relay_rtcp(publisher.as_ptr(), video, pli.as_mut_ptr(), pli.len() as i32); + } + } else if janus::rtcp::has_fir(packet) { + let mut seq = state.fir_seq.fetch_add(1, Ordering::Relaxed) as i32; + let mut fir = janus::rtcp::gen_fir(&mut seq); + for publisher in publishers { + janus::log(LogLevel::Info, &format!("Relaying FIR.")); + relay_rtcp(publisher.as_ptr(), video, fir.as_mut_ptr(), fir.len() as i32); + } } } + } else { + janus::log(LogLevel::Huge, &format!("Discarding RTCP packet from not-yet-joined peer.")); } } extern "C" fn incoming_data(handle: *mut PluginSession, buf: *mut c_char, len: c_int) { - let relay_data = gateway_callbacks().relay_data; - if let Err(e) = send_data(handle, |other| { relay_data(other.as_ptr(), buf, len); }) { - janus::log(LogLevel::Huge, &format!("Discarding data packet: {}", e)) + let sess = Session::from_ptr(handle).expect("Session can't be null!"); + let sessions = STATE.sessions.read().expect("Sessions lock poisoned; can't continue."); + if let Some(state) = sess.get() { + janus::log(LogLevel::Dbg, &format!("Data packet received in {:?} from {:?} over {:?}.", state.room_id, state.user_id, sess)); + let relay_data = gateway_callbacks().relay_data; + for other in sessions.iter() { + if let Some(other_state) = other.get() { + if other_state.room_id == state.room_id && other_state.user_id != state.user_id { + relay_data(other.as_ptr(), buf, len); + } + } + } + } else { + janus::log(LogLevel::Huge, &format!("Discarding data packet from not-yet-joined peer.")); } } @@ -316,12 +300,7 @@ extern "C" fn hangup_media(_handle: *mut PluginSession) { } fn process_join(from: &Arc, room_id: RoomId, user_id: UserId, notify: Option, subscription_specs: Option>) -> MessageProcessingResult { - let state = Arc::new(SessionState { - user_id, - room_id, - notify: notify.unwrap_or(false), - }); - + let state = Arc::new(SessionState::new(user_id, room_id, notify.unwrap_or(false))); if from.set_if_none(state).is_some() { return Err(From::from("Users may only join once!")) } diff --git a/src/sessions.rs b/src/sessions.rs index 6b50963..ef0b101 100644 --- a/src/sessions.rs +++ b/src/sessions.rs @@ -1,11 +1,12 @@ /// Types for representing Janus session state. use atom::AtomSetOnce; +use std::sync::atomic::AtomicIsize; use messages::{RoomId, UserId}; use janus::session::SessionWrapper; use std::sync::Arc; /// The state associated with a single session. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug)] pub struct SessionState { /// An opaque ID uniquely identifying this user. pub user_id: UserId, @@ -15,6 +16,15 @@ pub struct SessionState { /// Whether or not this session should receive notifications. pub notify: bool, + + /// The current FIR sequence number for this session's video. + pub fir_seq: AtomicIsize, +} + +impl SessionState { + pub fn new(user_id: UserId, room_id: RoomId, notify: bool) -> Self { + Self { user_id, room_id, notify, fir_seq: AtomicIsize::new(0) } + } } /// Rust representation of a single Janus session, i.e. a single RTCPeerConnection.