From 7df3af8a412b0ec5497c427c96deeaa08652e407 Mon Sep 17 00:00:00 2001 From: Marshall Polaris Date: Mon, 9 Dec 2019 17:14:50 -0800 Subject: [PATCH] Use once_cell instead of atom --- Cargo.lock | 7 ----- Cargo.toml | 1 - src/lib.rs | 81 +++++++++++++++++++------------------------------ src/sessions.rs | 6 ++-- 4 files changed, 35 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6927925..8d728ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,10 +1,5 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -[[package]] -name = "atom" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "atty" version = "0.2.13" @@ -100,7 +95,6 @@ dependencies = [ name = "janus-plugin-sfu" version = "0.1.0" dependencies = [ - "atom 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "janus-plugin 0.11.1 (git+https://github.com/mozilla/janus-plugin-rs)", "jsonwebtoken 6.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "multimap 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -305,7 +299,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" [metadata] -"checksum atom 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3c86699c3f02778ec07158376991c8f783dd1f2f95c579ffaf0738dc984b2fe2" "checksum atty 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "1803c647a3ec87095e7ae7acfca019e98de5ec9a7d01343f611cf3152ed71a90" "checksum autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "1d49d90015b3c36167a20fe2810c5cd875ad504b39cff3d4eae7977e6b7c1cb2" "checksum base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0b25d992356d2eb0ed82172f5248873db5560c4721f564b13cb5193bda5e668e" diff --git a/Cargo.toml b/Cargo.toml index 305fcc9..db64f04 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,6 @@ crate_type = ["cdylib"] lto = true [dependencies] -atom = "0.3" janus-plugin = { git = "https://github.com/mozilla/janus-plugin-rs" } multimap = "0.8" once_cell = "1.2" diff --git a/src/lib.rs b/src/lib.rs index f6af03e..c57d1cb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,6 @@ mod switchboard; mod config; mod txid; -use atom::AtomSetOnce; use auth::ValidatedToken; use messages::{RoomId, UserId}; use config::Config; @@ -15,7 +14,7 @@ use janus_plugin::{answer_sdp, offer_sdp, build_plugin, export_plugin, janus_err use janus_plugin::rtcp::{gen_fir, gen_pli, has_fir, has_pli}; use janus_plugin::sdp::{AudioCodec, MediaDirection, OfferAnswerParameters, Sdp, VideoCodec}; use janus_plugin::utils::LibcString; -use once_cell::sync::Lazy; +use once_cell::sync::{Lazy, OnceCell}; use messages::{JsepKind, MessageKind, OptionalField, Subscription}; use serde::de::DeserializeOwned; use serde_json::json; @@ -106,20 +105,9 @@ fn gateway_callbacks() -> &'static PluginCallbacks { unsafe { CALLBACKS.expect("Callbacks not initialized -- did plugin init() succeed?") } } -#[derive(Debug)] -struct State { - pub switchboard: RwLock, - pub message_channel: AtomSetOnce>>, - pub config: AtomSetOnce>, -} - -static STATE: Lazy = Lazy::new(|| { - State { - switchboard: RwLock::new(Switchboard::new()), - message_channel: AtomSetOnce::empty(), - config: AtomSetOnce::empty(), - } -}); +static SWITCHBOARD: Lazy> = Lazy::new(|| { RwLock::new(Switchboard::new()) }); +static MESSAGE_CHANNEL: OnceCell> = OnceCell::new(); +static CONFIG: OnceCell = OnceCell::new(); // todo: clean up duplication here @@ -250,12 +238,12 @@ extern "C" fn init(callbacks: *mut PluginCallbacks, config_path: *const c_char) Config::default() } }; - STATE.config.set_if_none(Box::new(config)); + CONFIG.set(config).expect("Big problem: config already initialized!"); match unsafe { callbacks.as_ref() } { Some(c) => { unsafe { CALLBACKS = Some(c) }; let (messages_tx, messages_rx) = mpsc::sync_channel(0); - STATE.message_channel.set_if_none(Box::new(messages_tx)); + MESSAGE_CHANNEL.set(messages_tx).expect("Big problem: message channel already initialized!"); thread::spawn(move || { janus_verb!("Message processing thread is alive."); @@ -283,16 +271,16 @@ extern "C" fn destroy() { extern "C" fn create_session(handle: *mut PluginSession, error: *mut c_int) { let initial_state = SessionState { destroyed: AtomicBool::new(false), - join_state: AtomSetOnce::empty(), + join_state: OnceCell::new(), subscriber_offer: Arc::new(Mutex::new(None)), - subscription: AtomSetOnce::empty(), + subscription: OnceCell::new(), fir_seq: AtomicIsize::new(0), }; match unsafe { Session::associate(handle, initial_state) } { Ok(sess) => { janus_info!("Initializing SFU session {:p}...", sess.handle); - STATE.switchboard.write().expect("Switchboard is poisoned :(").connect(sess); + SWITCHBOARD.write().expect("Switchboard is poisoned :(").connect(sess); } Err(e) => { janus_err!("{}", e); @@ -305,7 +293,7 @@ extern "C" fn destroy_session(handle: *mut PluginSession, error: *mut c_int) { match unsafe { Session::from_ptr(handle) } { Ok(sess) => { janus_info!("Destroying SFU session {:p}...", sess.handle); - let mut switchboard = STATE.switchboard.write().expect("Switchboard is poisoned :("); + let mut switchboard = SWITCHBOARD.write().expect("Switchboard is poisoned :("); switchboard.remove_session(&sess); if let Some(joined) = sess.join_state.get() { // if they are entirely disconnected, notify their roommates @@ -331,14 +319,14 @@ extern "C" fn query_session(_handle: *mut PluginSession) -> *mut RawJanssonValue extern "C" fn setup_media(handle: *mut PluginSession) { let sess = unsafe { Session::from_ptr(handle).expect("Session can't be null!") }; - let switchboard = STATE.switchboard.read().expect("Switchboard is poisoned :("); + let switchboard = SWITCHBOARD.read().expect("Switchboard is poisoned :("); send_fir(switchboard.media_senders_to(&sess)); janus_info!("WebRTC media is now available on {:p}.", sess.handle); } extern "C" fn incoming_rtp(handle: *mut PluginSession, video: c_int, buf: *mut c_char, len: c_int) { let sess = unsafe { Session::from_ptr(handle).expect("Session can't be null!") }; - let switchboard = STATE.switchboard.read().expect("Switchboard lock poisoned; can't continue."); + let switchboard = SWITCHBOARD.read().expect("Switchboard lock poisoned; can't continue."); let relay_rtp = gateway_callbacks().relay_rtp; for other in switchboard.media_recipients_for(&sess) { relay_rtp(other.as_ptr(), video, buf, len); @@ -347,7 +335,7 @@ extern "C" fn incoming_rtp(handle: *mut PluginSession, video: c_int, buf: *mut c extern "C" fn incoming_rtcp(handle: *mut PluginSession, video: c_int, buf: *mut c_char, len: c_int) { let sess = unsafe { Session::from_ptr(handle).expect("Session can't be null!") }; - let switchboard = STATE.switchboard.read().expect("Switchboard lock poisoned; can't continue."); + let switchboard = SWITCHBOARD.read().expect("Switchboard lock poisoned; can't continue."); let packet = unsafe { slice::from_raw_parts(buf, len as usize) }; match video { 1 if has_pli(packet) => { @@ -367,7 +355,7 @@ extern "C" fn incoming_rtcp(handle: *mut PluginSession, video: c_int, buf: *mut extern "C" fn incoming_data(handle: *mut PluginSession, label: *mut c_char, buf: *mut c_char, len: c_int, ) { let sess = unsafe { Session::from_ptr(handle).expect("Session can't be null!") }; - let switchboard = STATE.switchboard.read().expect("Switchboard lock poisoned; can't continue."); + let switchboard = SWITCHBOARD.read().expect("Switchboard lock poisoned; can't continue."); let relay_data = gateway_callbacks().relay_data; for other in switchboard.data_recipients_for(&sess) { // we presume that clients have matching labels on their channels -- in our case we have one @@ -388,7 +376,7 @@ extern "C" fn hangup_media(handle: *mut PluginSession) { fn process_join(from: &Arc, room_id: RoomId, user_id: UserId, subscribe: Option, token: Option) -> MessageResult { // todo: holy shit clean this function up somehow - let config = STATE.config.get().unwrap(); + let config = CONFIG.get().unwrap(); match (&config.auth_key, token) { (None, _) => { janus_verb!("No auth_key configured. Allowing join from {:p} to room {} as user {}.", from.handle, room_id, user_id); @@ -414,18 +402,9 @@ fn process_join(from: &Arc, room_id: RoomId, user_id: UserId, subscribe } } - let mut switchboard = STATE.switchboard.write()?; + let mut switchboard = SWITCHBOARD.write()?; let body = json!({ "users": { room_id.as_str(): switchboard.get_users(&room_id) }}); - let already_joined = !from.join_state.is_none(); - let already_subscribed = !from.subscription.is_none(); - if already_joined { - return Err(From::from("Handles may only join once!")) - } - if already_subscribed && subscribe.is_some() { - return Err(From::from("Handles may only subscribe once!")) - } - let mut is_master_handle = false; if let Some(subscription) = subscribe.as_ref() { let room_is_full = switchboard.occupants_of(&room_id).len() > config.max_room_size; @@ -439,10 +418,15 @@ fn process_join(from: &Arc, room_id: RoomId, user_id: UserId, subscribe } } - from.join_state.set_if_none(Box::new(JoinState::new(room_id.clone(), user_id.clone()))); + if let Err(_existing) = from.join_state.set(JoinState::new(room_id.clone(), user_id.clone())) { + return Err(From::from("Handles may only join once!")); + } + if let Some(subscription) = subscribe { janus_info!("Processing join-time subscription from {:p}: {:?}.", from.handle, subscription); - from.subscription.set_if_none(Box::new(subscription.clone())); + if let Err(_existing) = from.subscription.set(subscription.clone()) { + return Err(From::from("Handles may only subscribe once!")); + }; if is_master_handle { let notification = json!({ "event": "join", "user_id": user_id, "room_id": room_id }); switchboard.join_room(Arc::clone(from), room_id.clone()); @@ -462,14 +446,14 @@ fn process_join(from: &Arc, room_id: RoomId, user_id: UserId, subscribe } fn process_kick(from: &Arc, room_id: RoomId, user_id: UserId, token: String) -> MessageResult { - let config = STATE.config.get().unwrap(); + let config = CONFIG.get().unwrap(); if let Some(ref key) = config.auth_key { match ValidatedToken::from_str(&token, key) { Ok(tok) => { if tok.kick_users { janus_info!("Processing kick from {:p} targeting user ID {} in room ID {}.", from.handle, user_id, room_id); let end_session = gateway_callbacks().end_session; - let switchboard = STATE.switchboard.read()?; + let switchboard = SWITCHBOARD.read()?; let sessions = switchboard.get_sessions(&room_id, &user_id); for sess in sessions { janus_info!("Kicking session {:p}.", from.handle); @@ -492,7 +476,7 @@ fn process_kick(from: &Arc, room_id: RoomId, user_id: UserId, token: St fn process_block(from: &Arc, whom: UserId) -> MessageResult { janus_info!("Processing block from {:p} to {}", from.handle, whom); if let Some(joined) = from.join_state.get() { - let mut switchboard = STATE.switchboard.write()?; + let mut switchboard = SWITCHBOARD.write()?; let event = json!({ "event": "blocked", "by": &joined.user_id }); notify_user(&event, &whom, switchboard.occupants_of(&joined.room_id)); switchboard.establish_block(joined.user_id.clone(), whom); @@ -505,7 +489,7 @@ fn process_block(from: &Arc, whom: UserId) -> MessageResult { fn process_unblock(from: &Arc, whom: UserId) -> MessageResult { janus_info!("Processing unblock from {:p} to {}", from.handle, whom); if let Some(joined) = from.join_state.get() { - let mut switchboard = STATE.switchboard.write()?; + let mut switchboard = SWITCHBOARD.write()?; switchboard.lift_block(&joined.user_id, &whom); if let Some(publisher) = switchboard.get_publisher(&whom) { send_fir(&[publisher]); @@ -520,12 +504,11 @@ fn process_unblock(from: &Arc, whom: UserId) -> MessageResult { fn process_subscribe(from: &Arc, what: &Subscription) -> MessageResult { janus_info!("Processing subscription from {:p}: {:?}", from.handle, what); - let subscription_state = Box::new(what.clone()); - if from.subscription.set_if_none(subscription_state).is_some() { + if let Err(_existing) = from.subscription.set(what.clone()) { return Err(From::from("Users may only subscribe once!")) } - let mut switchboard = STATE.switchboard.write()?; + let mut switchboard = SWITCHBOARD.write()?; if let Some(ref publisher_id) = what.media { let publisher = switchboard.get_publisher(publisher_id).ok_or("Can't subscribe to a nonexistent publisher.")?.clone(); let jsep = json!({ @@ -541,7 +524,7 @@ fn process_subscribe(from: &Arc, what: &Subscription) -> MessageResult fn process_data(from: &Arc, whom: Option, body: &str) -> MessageResult { janus_huge!("Processing data message from {:p}: {:?}", from.handle, body); let payload = json!({ "event": "data", "body": body }); - let switchboard = STATE.switchboard.write()?; + let switchboard = SWITCHBOARD.write()?; if let Some(joined) = from.join_state.get() { let occupants = switchboard.occupants_of(&joined.room_id); if let Some(user_id) = whom { @@ -611,7 +594,7 @@ fn process_offer(from: &Session, offer: &Sdp) -> JsepResult { } janus_verb!("Storing subscriber offer for {:p}: {:?}", from.handle, subscriber_offer); - let switchboard = STATE.switchboard.read().expect("Switchboard lock poisoned; can't continue."); + let switchboard = SWITCHBOARD.read().expect("Switchboard lock poisoned; can't continue."); let jsep = json!({ "type": "offer", "sdp": subscriber_offer }); send_offer(&jsep, switchboard.subscribers_to(from)); *from.subscriber_offer.lock().unwrap() = Some(subscriber_offer); @@ -699,7 +682,7 @@ extern "C" fn handle_message(handle: *mut PluginSession, transaction: *mut c_cha jsep: unsafe { JanssonValue::from_raw(jsep) } }; janus_info!("Queueing signalling message on {:p}.", sess.handle); - STATE.message_channel.get().unwrap().send(msg).ok(); + MESSAGE_CHANNEL.get().unwrap().send(msg).ok(); PluginResult::ok_wait(Some(c_str!("Processing."))) }, Err(_) => PluginResult::error(c_str!("No handle associated with message!")) diff --git a/src/sessions.rs b/src/sessions.rs index f5806fc..4f8ac23 100644 --- a/src/sessions.rs +++ b/src/sessions.rs @@ -1,9 +1,9 @@ /// Types for representing Janus session state. -use atom::AtomSetOnce; use std::sync::atomic::{AtomicIsize, AtomicBool}; use std::sync::{Arc, Mutex}; use janus_plugin::sdp::Sdp; use janus_plugin::session::SessionWrapper; +use once_cell::sync::OnceCell; use crate::messages::{RoomId, UserId, Subscription}; /// State pertaining to this session's join of a particular room as a particular user ID. @@ -29,10 +29,10 @@ pub struct SessionState { pub destroyed: AtomicBool, /// Information pertaining to this session's user and room, if joined. - pub join_state: AtomSetOnce>, + pub join_state: OnceCell, /// The subscription this user has established, if any. - pub subscription: AtomSetOnce>, + pub subscription: OnceCell, /// If this is a publisher, the offer for subscribing to it. pub subscriber_offer: Arc>>,