Merge pull request #21 from mozilla/fixes

Miscellaneous fixes
This commit is contained in:
Marshall Quander 2018-05-22 15:03:46 -07:00 коммит произвёл GitHub
Родитель 75a300cc68 b622ce8121
Коммит 59f97d89c4
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 151 добавлений и 118 удалений

Просмотреть файл

@ -15,6 +15,7 @@ mod messages;
mod sessions;
mod switchboard;
mod config;
mod txid;
use atom::AtomSetOnce;
use messages::{RoomId, UserId};
@ -25,6 +26,7 @@ use janus::sdp::{AudioCodec, MediaDirection, OfferAnswerParameters, Sdp, VideoCo
use messages::{JsepKind, MessageKind, OptionalField, Subscription};
use serde_json::Value as JsonValue;
use sessions::{JoinState, Session, SessionState};
use txid::TransactionId;
use std::error::Error;
use std::ffi::CStr;
use std::os::raw::{c_char, c_int};
@ -46,12 +48,6 @@ macro_rules! c_str {
}
}
/// A Janus transaction ID. Used to correlate signalling requests and responses.
#[derive(Debug)]
struct TransactionId(pub *mut c_char);
unsafe impl Send for TransactionId {}
/// A single signalling message that came in off the wire, associated with one session.
///
/// These will be queued up asynchronously and processed in order later.
@ -72,10 +68,22 @@ struct RawMessage {
}
/// Inefficiently converts a serde JSON value to a Jansson JSON value.
fn from_serde_json(input: &JsonValue) -> JanssonValue {
fn serde_to_jansson(input: &JsonValue) -> JanssonValue {
JanssonValue::from_str(&input.to_string(), JanssonDecodingFlags::empty()).unwrap()
}
fn jansson_to_str(json: &JanssonValue) -> Result<String, Box<Error>> {
Ok(json.to_libcstring(JanssonEncodingFlags::empty()).to_str()?.to_owned())
}
fn transpose<T, E>(val: Result<Option<T>, E>) -> Option<Result<T, E>> {
match val {
Ok(None) => None,
Ok(Some(y)) => Some(Ok(y)),
Err(e) => Some(Err(e))
}
}
/// A response to a signalling message. May carry either a response body, a JSEP, or both.
struct MessageResponse {
pub body: Option<JsonValue>,
@ -155,24 +163,26 @@ fn notify_except<T: IntoIterator<Item=U>, U: AsRef<Session>>(json: &JsonValue, m
}
fn send_notification<T: IntoIterator<Item=U>, U: AsRef<Session>>(body: &JsonValue, sessions: T) -> JanusResult {
let mut msg = from_serde_json(body);
let mut msg = serde_to_jansson(body);
let push_event = gateway_callbacks().push_event;
for session in sessions {
janus_info!("Notification going to {:?}: {:?}.", session.as_ref(), msg);
let handle = session.as_ref().handle;
janus_verb!("Notification going to {:p}: {}.", handle, body);
// probably a hack -- we shouldn't stop notifying if we fail one
JanusError::from(push_event(session.as_ref().as_ptr(), &mut PLUGIN, ptr::null(), msg.as_mut_ref(), ptr::null_mut()))?
JanusError::from(push_event(handle, &mut PLUGIN, ptr::null(), msg.as_mut_ref(), ptr::null_mut()))?
}
Ok(())
}
fn send_offer<T: IntoIterator<Item=U>, U: AsRef<Session>>(offer: &JsonValue, sessions: T) -> JanusResult {
let mut msg = from_serde_json(&json!({}));
let mut jsep = from_serde_json(offer);
let mut msg = serde_to_jansson(&json!({}));
let mut jsep = serde_to_jansson(offer);
let push_event = gateway_callbacks().push_event;
for session in sessions {
janus_info!("Offer going to {:?}: {:?}.", session.as_ref(), jsep);
let handle = session.as_ref().handle;
janus_verb!("Offer going to {:p}: {}.", handle, offer);
// probably a hack -- we shouldn't stop notifying if we fail one
JanusError::from(push_event(session.as_ref().as_ptr(), &mut PLUGIN, ptr::null(), msg.as_mut_ref(), jsep.as_mut_ref()))?
JanusError::from(push_event(handle, &mut PLUGIN, ptr::null(), msg.as_mut_ref(), jsep.as_mut_ref()))?
}
Ok(())
}
@ -221,7 +231,6 @@ extern "C" fn init(callbacks: *mut PluginCallbacks, config_path: *const c_char)
thread::spawn(move || {
janus_verb!("Message processing thread is alive.");
for msg in messages_rx.iter() {
janus_verb!("Processing message: {:?}", msg);
handle_message_async(msg).err().map(|e| {
janus_err!("Error processing message: {}", e);
});
@ -253,7 +262,7 @@ extern "C" fn create_session(handle: *mut PluginSession, error: *mut c_int) {
match unsafe { Session::associate(handle, initial_state) } {
Ok(sess) => {
janus_info!("Initializing SFU session {:?}...", sess);
janus_info!("Initializing SFU session {:p}...", sess.handle);
STATE.switchboard.write().expect("Switchboard is poisoned :(").connect(sess);
}
Err(e) => {
@ -266,7 +275,7 @@ extern "C" fn create_session(handle: *mut PluginSession, error: *mut c_int) {
extern "C" fn destroy_session(handle: *mut PluginSession, error: *mut c_int) {
match unsafe { Session::from_ptr(handle) } {
Ok(sess) => {
janus_info!("Destroying SFU session {:?}...", sess);
janus_info!("Destroying SFU session {:p}...", sess.handle);
let mut destroyed = sess.destroyed.lock().expect("Session destruction mutex is poisoned :(");
let mut switchboard = STATE.switchboard.write().expect("Switchboard is poisoned :(");
switchboard.remove_session(&sess);
@ -293,14 +302,14 @@ extern "C" fn destroy_session(handle: *mut PluginSession, error: *mut c_int) {
extern "C" fn query_session(_handle: *mut PluginSession) -> *mut RawJanssonValue {
let output = json!({});
from_serde_json(&output).into_raw()
serde_to_jansson(&output).into_raw()
}
extern "C" fn setup_media(handle: *mut PluginSession) {
let sess = unsafe { Session::from_ptr(handle).expect("Session can't be null!") };
let switchboard = STATE.switchboard.read().expect("Switchboard is poisoned :(");
send_fir(switchboard.media_senders_to(&sess));
janus_verb!("WebRTC media is now available on {:?}.", sess);
janus_verb!("WebRTC media is now available on {:p}.", sess.handle);
}
extern "C" fn incoming_rtp(handle: *mut PluginSession, video: c_int, buf: *mut c_char, len: c_int) {
@ -341,12 +350,14 @@ extern "C" fn incoming_data(handle: *mut PluginSession, buf: *mut c_char, len: c
}
}
extern "C" fn slow_link(_handle: *mut PluginSession, _uplink: c_int, _video: c_int) {
janus_verb!("Slow link message received!");
extern "C" fn slow_link(handle: *mut PluginSession, _uplink: c_int, _video: c_int) {
let sess = unsafe { Session::from_ptr(handle).expect("Session can't be null!") };
janus_verb!("Slow link message received on {:p}.", sess.handle);
}
extern "C" fn hangup_media(_handle: *mut PluginSession) {
janus_verb!("Hanging up WebRTC media.");
extern "C" fn hangup_media(handle: *mut PluginSession) {
let sess = unsafe { Session::from_ptr(handle).expect("Session can't be null!") };
janus_verb!("Hanging up WebRTC media on {:p}.", sess.handle);
}
fn process_join(from: &Arc<Session>, room_id: RoomId, user_id: UserId, subscribe: Option<Subscription>) -> MessageResult {
@ -384,7 +395,7 @@ fn process_join(from: &Arc<Session>, room_id: RoomId, user_id: UserId, subscribe
let notification = json!({ "event": "join", "user_id": user_id, "room_id": room_id });
switchboard.join_room(Arc::clone(from), room_id.clone());
if let Err(e) = notify_except(&notification, &user_id, switchboard.occupants_of(&room_id)) {
janus_err!("Error sending notification for user join: {:?}", e)
janus_err!("Error sending notification for user join: {}", e)
}
}
if let Some(ref publisher_id) = subscription.media {
@ -454,20 +465,12 @@ fn process_subscribe(from: &Arc<Session>, what: Subscription) -> MessageResult {
Ok(MessageResponse::msg(json!({})))
}
fn process_message(from: &Arc<Session>, msg: &JanssonValue) -> MessageResult {
let msg_str = msg.to_libcstring(JanssonEncodingFlags::empty());
let msg_contents: OptionalField<MessageKind> = serde_json::from_str(msg_str.to_str()?)?;
match msg_contents {
OptionalField::None {} => Ok(MessageResponse::msg(json!({}))),
OptionalField::Some(kind) => {
janus_info!("Processing {:?} on connection {:?}.", kind, from);
match kind {
MessageKind::Subscribe { what } => process_subscribe(from, what),
MessageKind::Block { whom } => process_block(from, whom),
MessageKind::Unblock { whom } => process_unblock(from, whom),
MessageKind::Join { room_id, user_id, subscribe } => process_join(from, room_id, user_id, subscribe),
}
}
fn process_message(from: &Arc<Session>, msg: MessageKind) -> MessageResult {
match msg {
MessageKind::Subscribe { what } => process_subscribe(from, what),
MessageKind::Block { whom } => process_block(from, whom),
MessageKind::Unblock { whom } => process_unblock(from, whom),
MessageKind::Join { room_id, user_id, subscribe } => process_join(from, room_id, user_id, subscribe),
}
}
@ -480,7 +483,7 @@ fn process_offer(from: &Session, offer: &Sdp) -> JsepResult {
OfferAnswerParameters::VideoCodec, VIDEO_CODEC.to_cstr().as_ptr(),
OfferAnswerParameters::VideoDirection, MediaDirection::JANUS_SDP_RECVONLY,
);
janus_huge!("Providing answer to {:?}: {}", from, answer.to_string().to_str().unwrap());
janus_huge!("Providing answer to {:p}: {}", from.handle, answer.to_string().to_str()?);
// it's fishy, but we provide audio and video streams to subscribers regardless of whether the client is sending
// audio and video right now or not -- this is basically working around pains in renegotiation to do with
@ -502,7 +505,7 @@ fn process_offer(from: &Session, offer: &Sdp) -> JsepResult {
OfferAnswerParameters::VideoPayloadType, video_payload_type.unwrap_or(100),
OfferAnswerParameters::VideoDirection, MediaDirection::JANUS_SDP_SENDONLY,
);
janus_huge!("Storing subscriber offer for {:?}: {}", from, subscriber_offer.to_string().to_str().unwrap());
janus_huge!("Storing subscriber offer for {:p}: {}", from.handle, subscriber_offer.to_string().to_str()?);
let switchboard = STATE.switchboard.read().expect("Switchboard lock poisoned; can't continue.");
let jsep = json!({ "type": "offer", "sdp": subscriber_offer });
@ -519,36 +522,33 @@ fn process_answer(_from: &Session, _answer: &Sdp) -> JsepResult {
Ok(json!({})) // todo: check that this guy should actually be sending us an answer?
}
fn process_jsep(from: &Session, jsep: &JanssonValue) -> JsepResult {
let jsep_str = jsep.to_libcstring(JanssonEncodingFlags::empty());
let jsep_contents: OptionalField<JsepKind> = serde_json::from_str(jsep_str.to_str()?)?;
match jsep_contents {
OptionalField::None {} => Ok(json!({})),
OptionalField::Some(kind) => {
janus_info!("Processing {:?} from {:?}.", kind, from);
match kind {
JsepKind::Offer { sdp } => process_offer(from, &sdp),
JsepKind::Answer { sdp } => process_answer(from, &sdp),
}
}
fn process_jsep(from: &Session, jsep: JsepKind) -> JsepResult {
match jsep {
JsepKind::Offer { sdp } => process_offer(from, &sdp),
JsepKind::Answer { sdp } => process_answer(from, &sdp),
}
}
fn push_response(from: &Session, txn: TransactionId, body: &JsonValue, jsep: Option<JsonValue>) -> JanusResult {
let push_event = gateway_callbacks().push_event;
let jsep = jsep.unwrap_or_else(|| json!({}));
janus_info!("{:?} sending response to {:?}: body = {}.", from.as_ptr(), txn, body);
JanusError::from(push_event(from.as_ptr(), &mut PLUGIN, txn.0, from_serde_json(body).as_mut_ref(), from_serde_json(&jsep).as_mut_ref()))
janus_info!("Responding to {:p} for txid {}: body={}, jsep={}", from.handle, txn, body, jsep);
JanusError::from(push_event(from.as_ptr(), &mut PLUGIN, txn.0, serde_to_jansson(body).as_mut_ref(), serde_to_jansson(&jsep).as_mut_ref()))
}
fn handle_message_async(RawMessage { jsep, msg, txn, from }: RawMessage) -> JanusResult {
if let Some(ref from) = from.upgrade() {
let destroyed = from.destroyed.lock().expect("Session destruction mutex is poisoned :(");
if !*destroyed {
// handle the message first, because handling a JSEP can cause us to want to send an RTCP
let parsed_msg = msg.and_then(|x| transpose(jansson_to_str(&x).and_then(OptionalField::try_parse)));
let parsed_jsep = jsep.and_then(|x| transpose(jansson_to_str(&x).and_then(OptionalField::try_parse)));
janus_info!("Processing txid {} from {:p}: msg={:?}, jsep={:?}",
txn, from.handle, parsed_msg, parsed_jsep);
// process the message first, because processing a JSEP can cause us to want to send an RTCP
// FIR to our subscribers, which may have been established in the message
let msg_result = msg.map(|x| process_message(from, &x));
let jsep_result = jsep.map(|x| process_jsep(from, &x));
let msg_result = parsed_msg.map(|x| x.and_then(|msg| process_message(from, msg)));
let jsep_result = parsed_jsep.map(|x| x.and_then(|jsep| process_jsep(from, jsep)));
return match (msg_result, jsep_result) {
(Some(Err(msg_err)), _) => {
let resp = json!({ "success": false, "error": { "msg": format!("{}", msg_err) }});
@ -587,7 +587,6 @@ fn handle_message_async(RawMessage { jsep, msg, txn, from }: RawMessage) -> Janu
extern "C" fn handle_message(handle: *mut PluginSession, transaction: *mut c_char,
message: *mut RawJanssonValue, jsep: *mut RawJanssonValue) -> *mut RawPluginResult {
janus_verb!("Queueing signalling message.");
let result = match unsafe { Session::from_ptr(handle) } {
Ok(sess) => {
let msg = RawMessage {
@ -596,6 +595,7 @@ extern "C" fn handle_message(handle: *mut PluginSession, transaction: *mut c_cha
msg: unsafe { JanssonValue::from_raw(message) },
jsep: unsafe { JanssonValue::from_raw(jsep) }
};
janus_verb!("Queueing signalling message on {:p}.", sess.handle);
STATE.message_channel.get().unwrap().send(msg).ok();
PluginResult::ok_wait(Some(c_str!("Processing.")))
},

Просмотреть файл

@ -1,5 +1,9 @@
/// Types and code related to handling signalling messages.
use super::Sdp;
use super::serde_json;
use serde::de::DeserializeOwned;
use std::error::Error;
use std::borrow::Borrow;
/// A room ID representing a Janus multicast room.
pub type RoomId = String;
@ -17,6 +21,21 @@ pub enum OptionalField<T> {
None {}
}
impl<T> Into<Option<T>> for OptionalField<T> {
fn into(self) -> Option<T> {
match self {
OptionalField::None {} => None,
OptionalField::Some(x) => Some(x)
}
}
}
impl<T> OptionalField<T> where T: DeserializeOwned {
pub fn try_parse(val: impl Borrow<str>) -> Result<Option<T>, Box<Error>> {
Ok(serde_json::from_str::<OptionalField<T>>(val.borrow()).map(|x| x.into())?)
}
}
/// A signalling message carrying a JSEP SDP offer or answer.
#[derive(Debug, Deserialize)]
#[serde(rename_all = "lowercase", tag = "type")]

Просмотреть файл

@ -176,73 +176,65 @@ impl Switchboard {
self.occupants.get(room).map(Vec::as_slice).unwrap_or(&[])
}
pub fn media_recipients_for(&self, sender: &Session) -> Vec<&Arc<Session>> {
let mut subscribers: Vec<_> = self.subscribers_to(sender).iter().collect();
if let Some(joined) = sender.join_state.get() {
let forward_blocks = self.blockers_to_miscreants.get_keys(&joined.user_id);
let reverse_blocks = self.blockers_to_miscreants.get_values(&joined.user_id);
let blocks_exist = !forward_blocks.is_empty() || !reverse_blocks.is_empty();
if blocks_exist {
subscribers.retain(|recipient| {
match recipient.join_state.get() {
None => true,
Some(other) => {
let blocks = forward_blocks.contains(&other.user_id);
let is_blocked = reverse_blocks.contains(&other.user_id);
return !blocks && !is_blocked;
}
}
});
pub fn media_recipients_for(&self, sender: &Session) -> impl Iterator<Item=&Arc<Session>> {
let (forward_blocks, reverse_blocks) = match sender.join_state.get() {
None => (&[] as &[_], &[] as &[_]),
Some(joined) => (
self.blockers_to_miscreants.get_keys(&joined.user_id),
self.blockers_to_miscreants.get_values(&joined.user_id)
)
};
self.subscribers_to(sender).iter().filter(move |subscriber| {
match subscriber.join_state.get() {
None => true,
Some(other) => {
let blocks = forward_blocks.contains(&other.user_id);
let is_blocked = reverse_blocks.contains(&other.user_id);
!blocks && !is_blocked
}
}
}
subscribers
})
}
pub fn media_senders_to(&self, recipient: &Session) -> Vec<&Arc<Session>> {
let mut publishers: Vec<_> = self.publishers_to(recipient).iter().collect();
if let Some(joined) = recipient.join_state.get() {
let forward_blocks = self.blockers_to_miscreants.get_values(&joined.user_id);
let reverse_blocks = self.blockers_to_miscreants.get_keys(&joined.user_id);
let blocks_exist = !forward_blocks.is_empty() || !reverse_blocks.is_empty();
if blocks_exist {
publishers.retain(|sender| {
match sender.join_state.get() {
None => true,
Some(other) => {
let blocks = forward_blocks.contains(&other.user_id);
let is_blocked = reverse_blocks.contains(&other.user_id);
return !blocks && !is_blocked;
}
}
});
pub fn media_senders_to(&self, recipient: &Session) -> impl Iterator<Item=&Arc<Session>> {
let (forward_blocks, reverse_blocks) = match recipient.join_state.get() {
None => (&[] as &[_], &[] as &[_]),
Some(joined) => (
self.blockers_to_miscreants.get_values(&joined.user_id),
self.blockers_to_miscreants.get_keys(&joined.user_id)
)
};
self.publishers_to(recipient).iter().filter(move |publisher| {
match publisher.join_state.get() {
None => true,
Some(other) => {
let blocks = forward_blocks.contains(&other.user_id);
let is_blocked = reverse_blocks.contains(&other.user_id);
!blocks && !is_blocked
}
}
}
publishers
})
}
pub fn data_recipients_for(&self, session: &Session) -> Vec<&Arc<Session>> {
if let Some(joined) = session.join_state.get() {
let mut cohabitators: Vec<_> = self.occupants_of(&joined.room_id).iter().collect();
let forward_blocks = self.blockers_to_miscreants.get_keys(&joined.user_id);
let reverse_blocks = self.blockers_to_miscreants.get_values(&joined.user_id);
let blocks_exist = !forward_blocks.is_empty() || !reverse_blocks.is_empty();
cohabitators.retain(|cohabitator| cohabitator.handle != session.handle);
if blocks_exist {
cohabitators.retain(|cohabitator| {
match cohabitator.join_state.get() {
None => true,
Some(other) => {
let blocks = forward_blocks.contains(&other.user_id);
let is_blocked = reverse_blocks.contains(&other.user_id);
return !blocks && !is_blocked;
}
}
});
pub fn data_recipients_for<'s>(&'s self, session: &'s Session) -> impl Iterator<Item=&'s Arc<Session>> {
let (forward_blocks, reverse_blocks, cohabitators) = match session.join_state.get() {
None => (&[] as &[_], &[] as &[_], &[] as &[_]),
Some(joined) => (
self.blockers_to_miscreants.get_keys(&joined.user_id),
self.blockers_to_miscreants.get_values(&joined.user_id),
self.occupants_of(&joined.room_id)
)
};
cohabitators.iter().filter(move |cohabitator| {
cohabitator.handle != session.handle && match cohabitator.join_state.get() {
None => true,
Some(other) => {
let blocks = forward_blocks.contains(&other.user_id);
let is_blocked = reverse_blocks.contains(&other.user_id);
!blocks && !is_blocked
}
}
cohabitators
} else {
Vec::new()
}
})
}
pub fn get_users(&self, room: &RoomId) -> HashSet<&UserId> {

22
src/txid.rs Normal file
Просмотреть файл

@ -0,0 +1,22 @@
use std::ffi::CStr;
use std::fmt;
use std::os::raw::c_char;
/// A Janus transaction ID. Used to correlate signalling requests and responses.
#[derive(Debug)]
pub struct TransactionId(pub *mut c_char);
unsafe impl Send for TransactionId {}
impl fmt::Display for TransactionId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
unsafe {
if self.0.is_null() {
f.write_str("<null>")
} else {
let contents = CStr::from_ptr(self.0);
f.write_str(&contents.to_string_lossy())
}
}
}
}