This commit is contained in:
Marshall Quander 2017-11-01 19:11:20 -07:00
Родитель 3ef54007c8
Коммит 578f7d0d42
2 изменённых файлов: 59 добавлений и 70 удалений

Просмотреть файл

@ -33,6 +33,7 @@ use std::os::raw::{c_char, c_int};
use std::ptr;
use std::slice;
use std::sync::{mpsc, Arc, RwLock, Weak};
use std::sync::atomic::Ordering;
use std::thread;
use switchboard::Switchboard;
@ -140,36 +141,6 @@ fn send_notification(myself: &SessionState, json: JsonValue) -> Result<(), Box<E
Ok(())
}
fn send_data<T>(from: *mut PluginSession, send: T) -> Result<(), Box<Error>> where T: Fn(&Session) {
let sess = Session::from_ptr(from)?;
if let Some(state) = sess.get() {
janus::log(LogLevel::Dbg, &format!("Data packet received in room {:?} from {:?}.", state.room_id, state.user_id));
for other in STATE.sessions.read()?.iter() {
if let Some(other_state) = other.get() {
if other_state.room_id == state.room_id && other_state.user_id != state.user_id {
send(&other)
}
}
}
}
Ok(())
}
fn get_subscribers(from: *mut PluginSession, kind: Option<ContentKind>) -> Result<Vec<Arc<Session>>, Box<Error>> {
let sess = Session::from_ptr(from)?;
let switchboard = STATE.switchboard.read()?;
Ok(switchboard.subscribers_to(&sess, kind))
}
fn feedback<T>(from: *mut PluginSession, kind: Option<ContentKind>, send: T) -> Result<(), Box<Error>> where T: Fn(&Session) {
let sess = Session::from_ptr(from)?;
let switchboard = STATE.switchboard.read()?;
for subscription in switchboard.subscribers_to(&sess, kind) {
send(&subscription);
}
Ok(())
}
extern "C" fn init(callbacks: *mut PluginCallbacks, _config_path: *const c_char) -> c_int {
match unsafe { callbacks.as_ref() } {
Some(c) => {
@ -256,54 +227,67 @@ extern "C" fn setup_media(_handle: *mut PluginSession) {
}
extern "C" fn incoming_rtp(handle: *mut PluginSession, video: c_int, buf: *mut c_char, len: c_int) {
let sess = Session::from_ptr(handle).expect("Session can't be null!");
let switchboard = STATE.switchboard.read().expect("Switchboard lock poisoned; can't continue.");
let content_kind = if video == 1 { ContentKind::VIDEO } else { ContentKind::AUDIO };
let subscribers = get_subscribers(handle, Some(content_kind));
match subscribers {
Err(e) => janus::log(LogLevel::Huge, &format!("Discarding RTP packet: {}", e)),
Ok(others) => {
janus::log(LogLevel::Huge, &format!("Broadcasting RTP packet to subscribers: {:?}", others));
let relay_rtp = gateway_callbacks().relay_rtp;
for other in others {
relay_rtp(other.as_ptr(), video, buf, len);
}
}
let subscribers = switchboard.subscribers_to(&sess, Some(content_kind));
let relay_rtp = gateway_callbacks().relay_rtp;
janus::log(LogLevel::Huge, &format!("RTP packet received over {:?}.", sess));
for other in subscribers {
relay_rtp(other.as_ptr(), video, buf, len);
}
}
extern "C" fn incoming_rtcp(handle: *mut PluginSession, video: c_int, buf: *mut c_char, len: c_int) {
let sess = Session::from_ptr(handle).expect("Session can't be null!");
let switchboard = STATE.switchboard.read().expect("Subscriptions lock poisoned; can't continue.");
let content_kind = if video == 1 { ContentKind::VIDEO } else { ContentKind::AUDIO };
let relay_rtcp = gateway_callbacks().relay_rtcp;
if content_kind == ContentKind::AUDIO {
let subscriptions = STATE.switchboard.read().expect("Subscriptions lock poisoned; can't continue.");
let subscribers = subscriptions.subscribers_to(&sess, Some(content_kind));
for subscriber in subscribers {
relay_rtcp(subscriber.as_ptr(), video, buf, len);
}
} else if content_kind == ContentKind::VIDEO {
let subscriptions = STATE.switchboard.read().expect("Subscriptions lock poisoned; can't continue.");
let publishers = subscriptions.publishers_to(&sess, Some(content_kind));
let packet = unsafe { slice::from_raw_parts(buf, len as usize) };
if janus::rtcp::has_pli(packet) {
let mut pli = janus::rtcp::gen_pli();
for publisher in publishers {
janus::log(LogLevel::Info, &format!("Relaying. "));
relay_rtcp(publisher.as_ptr(), video, pli.as_mut_ptr(), pli.len() as i32);
if let Some(state) = sess.get() {
janus::log(LogLevel::Dbg, &format!("RTCP packet received in {:?} from {:?} over {:?}.", state.room_id, state.user_id, sess));
if content_kind == ContentKind::AUDIO {
let subscribers = switchboard.subscribers_to(&sess, Some(content_kind));
for subscriber in subscribers {
relay_rtcp(subscriber.as_ptr(), video, buf, len);
}
if let Err(e) = feedback(handle, Some(content_kind), |other| {
// relay_rtcp(other.handle, video, pli.as_ptr() as *mut _, pli.len() as i32)
}) {
janus::log(LogLevel::Huge, &format!("Discarding RTCP packet: {}", e))
} else if content_kind == ContentKind::VIDEO {
let publishers = switchboard.publishers_to(&sess, Some(content_kind));
let packet = unsafe { slice::from_raw_parts(buf, len as usize) };
if janus::rtcp::has_pli(packet) {
let mut pli = janus::rtcp::gen_pli();
for publisher in publishers {
janus::log(LogLevel::Info, &format!("Relaying PLI."));
relay_rtcp(publisher.as_ptr(), video, pli.as_mut_ptr(), pli.len() as i32);
}
} else if janus::rtcp::has_fir(packet) {
let mut seq = state.fir_seq.fetch_add(1, Ordering::Relaxed) as i32;
let mut fir = janus::rtcp::gen_fir(&mut seq);
for publisher in publishers {
janus::log(LogLevel::Info, &format!("Relaying FIR."));
relay_rtcp(publisher.as_ptr(), video, fir.as_mut_ptr(), fir.len() as i32);
}
}
}
} else {
janus::log(LogLevel::Huge, &format!("Discarding RTCP packet from not-yet-joined peer."));
}
}
extern "C" fn incoming_data(handle: *mut PluginSession, buf: *mut c_char, len: c_int) {
let relay_data = gateway_callbacks().relay_data;
if let Err(e) = send_data(handle, |other| { relay_data(other.as_ptr(), buf, len); }) {
janus::log(LogLevel::Huge, &format!("Discarding data packet: {}", e))
let sess = Session::from_ptr(handle).expect("Session can't be null!");
let sessions = STATE.sessions.read().expect("Sessions lock poisoned; can't continue.");
if let Some(state) = sess.get() {
janus::log(LogLevel::Dbg, &format!("Data packet received in {:?} from {:?} over {:?}.", state.room_id, state.user_id, sess));
let relay_data = gateway_callbacks().relay_data;
for other in sessions.iter() {
if let Some(other_state) = other.get() {
if other_state.room_id == state.room_id && other_state.user_id != state.user_id {
relay_data(other.as_ptr(), buf, len);
}
}
}
} else {
janus::log(LogLevel::Huge, &format!("Discarding data packet from not-yet-joined peer."));
}
}
@ -316,12 +300,7 @@ extern "C" fn hangup_media(_handle: *mut PluginSession) {
}
fn process_join(from: &Arc<Session>, room_id: RoomId, user_id: UserId, notify: Option<bool>, subscription_specs: Option<Vec<SubscriptionSpec>>) -> MessageProcessingResult {
let state = Arc::new(SessionState {
user_id,
room_id,
notify: notify.unwrap_or(false),
});
let state = Arc::new(SessionState::new(user_id, room_id, notify.unwrap_or(false)));
if from.set_if_none(state).is_some() {
return Err(From::from("Users may only join once!"))
}

Просмотреть файл

@ -1,11 +1,12 @@
/// Types for representing Janus session state.
use atom::AtomSetOnce;
use std::sync::atomic::AtomicIsize;
use messages::{RoomId, UserId};
use janus::session::SessionWrapper;
use std::sync::Arc;
/// The state associated with a single session.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug)]
pub struct SessionState {
/// An opaque ID uniquely identifying this user.
pub user_id: UserId,
@ -15,6 +16,15 @@ pub struct SessionState {
/// Whether or not this session should receive notifications.
pub notify: bool,
/// The current FIR sequence number for this session's video.
pub fir_seq: AtomicIsize,
}
impl SessionState {
pub fn new(user_id: UserId, room_id: RoomId, notify: bool) -> Self {
Self { user_id, room_id, notify, fir_seq: AtomicIsize::new(0) }
}
}
/// Rust representation of a single Janus session, i.e. a single RTCPeerConnection.