This commit is contained in:
Marshall Polaris 2019-12-09 17:14:50 -08:00
Родитель 57e68c9eb0
Коммит 7df3af8a41
4 изменённых файлов: 35 добавлений и 60 удалений

7
Cargo.lock сгенерированный
Просмотреть файл

@ -1,10 +1,5 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "atom"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "atty"
version = "0.2.13"
@ -100,7 +95,6 @@ dependencies = [
name = "janus-plugin-sfu"
version = "0.1.0"
dependencies = [
"atom 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"janus-plugin 0.11.1 (git+https://github.com/mozilla/janus-plugin-rs)",
"jsonwebtoken 6.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"multimap 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -305,7 +299,6 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[metadata]
"checksum atom 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3c86699c3f02778ec07158376991c8f783dd1f2f95c579ffaf0738dc984b2fe2"
"checksum atty 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "1803c647a3ec87095e7ae7acfca019e98de5ec9a7d01343f611cf3152ed71a90"
"checksum autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "1d49d90015b3c36167a20fe2810c5cd875ad504b39cff3d4eae7977e6b7c1cb2"
"checksum base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0b25d992356d2eb0ed82172f5248873db5560c4721f564b13cb5193bda5e668e"

Просмотреть файл

@ -16,7 +16,6 @@ crate_type = ["cdylib"]
lto = true
[dependencies]
atom = "0.3"
janus-plugin = { git = "https://github.com/mozilla/janus-plugin-rs" }
multimap = "0.8"
once_cell = "1.2"

Просмотреть файл

@ -5,7 +5,6 @@ mod switchboard;
mod config;
mod txid;
use atom::AtomSetOnce;
use auth::ValidatedToken;
use messages::{RoomId, UserId};
use config::Config;
@ -15,7 +14,7 @@ use janus_plugin::{answer_sdp, offer_sdp, build_plugin, export_plugin, janus_err
use janus_plugin::rtcp::{gen_fir, gen_pli, has_fir, has_pli};
use janus_plugin::sdp::{AudioCodec, MediaDirection, OfferAnswerParameters, Sdp, VideoCodec};
use janus_plugin::utils::LibcString;
use once_cell::sync::Lazy;
use once_cell::sync::{Lazy, OnceCell};
use messages::{JsepKind, MessageKind, OptionalField, Subscription};
use serde::de::DeserializeOwned;
use serde_json::json;
@ -106,20 +105,9 @@ fn gateway_callbacks() -> &'static PluginCallbacks {
unsafe { CALLBACKS.expect("Callbacks not initialized -- did plugin init() succeed?") }
}
#[derive(Debug)]
struct State {
pub switchboard: RwLock<Switchboard>,
pub message_channel: AtomSetOnce<Box<mpsc::SyncSender<RawMessage>>>,
pub config: AtomSetOnce<Box<Config>>,
}
static STATE: Lazy<State> = Lazy::new(|| {
State {
switchboard: RwLock::new(Switchboard::new()),
message_channel: AtomSetOnce::empty(),
config: AtomSetOnce::empty(),
}
});
static SWITCHBOARD: Lazy<RwLock<Switchboard>> = Lazy::new(|| { RwLock::new(Switchboard::new()) });
static MESSAGE_CHANNEL: OnceCell<mpsc::SyncSender<RawMessage>> = OnceCell::new();
static CONFIG: OnceCell<Config> = OnceCell::new();
// todo: clean up duplication here
@ -250,12 +238,12 @@ extern "C" fn init(callbacks: *mut PluginCallbacks, config_path: *const c_char)
Config::default()
}
};
STATE.config.set_if_none(Box::new(config));
CONFIG.set(config).expect("Big problem: config already initialized!");
match unsafe { callbacks.as_ref() } {
Some(c) => {
unsafe { CALLBACKS = Some(c) };
let (messages_tx, messages_rx) = mpsc::sync_channel(0);
STATE.message_channel.set_if_none(Box::new(messages_tx));
MESSAGE_CHANNEL.set(messages_tx).expect("Big problem: message channel already initialized!");
thread::spawn(move || {
janus_verb!("Message processing thread is alive.");
@ -283,16 +271,16 @@ extern "C" fn destroy() {
extern "C" fn create_session(handle: *mut PluginSession, error: *mut c_int) {
let initial_state = SessionState {
destroyed: AtomicBool::new(false),
join_state: AtomSetOnce::empty(),
join_state: OnceCell::new(),
subscriber_offer: Arc::new(Mutex::new(None)),
subscription: AtomSetOnce::empty(),
subscription: OnceCell::new(),
fir_seq: AtomicIsize::new(0),
};
match unsafe { Session::associate(handle, initial_state) } {
Ok(sess) => {
janus_info!("Initializing SFU session {:p}...", sess.handle);
STATE.switchboard.write().expect("Switchboard is poisoned :(").connect(sess);
SWITCHBOARD.write().expect("Switchboard is poisoned :(").connect(sess);
}
Err(e) => {
janus_err!("{}", e);
@ -305,7 +293,7 @@ extern "C" fn destroy_session(handle: *mut PluginSession, error: *mut c_int) {
match unsafe { Session::from_ptr(handle) } {
Ok(sess) => {
janus_info!("Destroying SFU session {:p}...", sess.handle);
let mut switchboard = STATE.switchboard.write().expect("Switchboard is poisoned :(");
let mut switchboard = SWITCHBOARD.write().expect("Switchboard is poisoned :(");
switchboard.remove_session(&sess);
if let Some(joined) = sess.join_state.get() {
// if they are entirely disconnected, notify their roommates
@ -331,14 +319,14 @@ extern "C" fn query_session(_handle: *mut PluginSession) -> *mut RawJanssonValue
extern "C" fn setup_media(handle: *mut PluginSession) {
let sess = unsafe { Session::from_ptr(handle).expect("Session can't be null!") };
let switchboard = STATE.switchboard.read().expect("Switchboard is poisoned :(");
let switchboard = SWITCHBOARD.read().expect("Switchboard is poisoned :(");
send_fir(switchboard.media_senders_to(&sess));
janus_info!("WebRTC media is now available on {:p}.", sess.handle);
}
extern "C" fn incoming_rtp(handle: *mut PluginSession, video: c_int, buf: *mut c_char, len: c_int) {
let sess = unsafe { Session::from_ptr(handle).expect("Session can't be null!") };
let switchboard = STATE.switchboard.read().expect("Switchboard lock poisoned; can't continue.");
let switchboard = SWITCHBOARD.read().expect("Switchboard lock poisoned; can't continue.");
let relay_rtp = gateway_callbacks().relay_rtp;
for other in switchboard.media_recipients_for(&sess) {
relay_rtp(other.as_ptr(), video, buf, len);
@ -347,7 +335,7 @@ extern "C" fn incoming_rtp(handle: *mut PluginSession, video: c_int, buf: *mut c
extern "C" fn incoming_rtcp(handle: *mut PluginSession, video: c_int, buf: *mut c_char, len: c_int) {
let sess = unsafe { Session::from_ptr(handle).expect("Session can't be null!") };
let switchboard = STATE.switchboard.read().expect("Switchboard lock poisoned; can't continue.");
let switchboard = SWITCHBOARD.read().expect("Switchboard lock poisoned; can't continue.");
let packet = unsafe { slice::from_raw_parts(buf, len as usize) };
match video {
1 if has_pli(packet) => {
@ -367,7 +355,7 @@ extern "C" fn incoming_rtcp(handle: *mut PluginSession, video: c_int, buf: *mut
extern "C" fn incoming_data(handle: *mut PluginSession, label: *mut c_char, buf: *mut c_char, len: c_int, ) {
let sess = unsafe { Session::from_ptr(handle).expect("Session can't be null!") };
let switchboard = STATE.switchboard.read().expect("Switchboard lock poisoned; can't continue.");
let switchboard = SWITCHBOARD.read().expect("Switchboard lock poisoned; can't continue.");
let relay_data = gateway_callbacks().relay_data;
for other in switchboard.data_recipients_for(&sess) {
// we presume that clients have matching labels on their channels -- in our case we have one
@ -388,7 +376,7 @@ extern "C" fn hangup_media(handle: *mut PluginSession) {
fn process_join(from: &Arc<Session>, room_id: RoomId, user_id: UserId, subscribe: Option<Subscription>, token: Option<String>) -> MessageResult {
// todo: holy shit clean this function up somehow
let config = STATE.config.get().unwrap();
let config = CONFIG.get().unwrap();
match (&config.auth_key, token) {
(None, _) => {
janus_verb!("No auth_key configured. Allowing join from {:p} to room {} as user {}.", from.handle, room_id, user_id);
@ -414,18 +402,9 @@ fn process_join(from: &Arc<Session>, room_id: RoomId, user_id: UserId, subscribe
}
}
let mut switchboard = STATE.switchboard.write()?;
let mut switchboard = SWITCHBOARD.write()?;
let body = json!({ "users": { room_id.as_str(): switchboard.get_users(&room_id) }});
let already_joined = !from.join_state.is_none();
let already_subscribed = !from.subscription.is_none();
if already_joined {
return Err(From::from("Handles may only join once!"))
}
if already_subscribed && subscribe.is_some() {
return Err(From::from("Handles may only subscribe once!"))
}
let mut is_master_handle = false;
if let Some(subscription) = subscribe.as_ref() {
let room_is_full = switchboard.occupants_of(&room_id).len() > config.max_room_size;
@ -439,10 +418,15 @@ fn process_join(from: &Arc<Session>, room_id: RoomId, user_id: UserId, subscribe
}
}
from.join_state.set_if_none(Box::new(JoinState::new(room_id.clone(), user_id.clone())));
if let Err(_existing) = from.join_state.set(JoinState::new(room_id.clone(), user_id.clone())) {
return Err(From::from("Handles may only join once!"));
}
if let Some(subscription) = subscribe {
janus_info!("Processing join-time subscription from {:p}: {:?}.", from.handle, subscription);
from.subscription.set_if_none(Box::new(subscription.clone()));
if let Err(_existing) = from.subscription.set(subscription.clone()) {
return Err(From::from("Handles may only subscribe once!"));
};
if is_master_handle {
let notification = json!({ "event": "join", "user_id": user_id, "room_id": room_id });
switchboard.join_room(Arc::clone(from), room_id.clone());
@ -462,14 +446,14 @@ fn process_join(from: &Arc<Session>, room_id: RoomId, user_id: UserId, subscribe
}
fn process_kick(from: &Arc<Session>, room_id: RoomId, user_id: UserId, token: String) -> MessageResult {
let config = STATE.config.get().unwrap();
let config = CONFIG.get().unwrap();
if let Some(ref key) = config.auth_key {
match ValidatedToken::from_str(&token, key) {
Ok(tok) => {
if tok.kick_users {
janus_info!("Processing kick from {:p} targeting user ID {} in room ID {}.", from.handle, user_id, room_id);
let end_session = gateway_callbacks().end_session;
let switchboard = STATE.switchboard.read()?;
let switchboard = SWITCHBOARD.read()?;
let sessions = switchboard.get_sessions(&room_id, &user_id);
for sess in sessions {
janus_info!("Kicking session {:p}.", from.handle);
@ -492,7 +476,7 @@ fn process_kick(from: &Arc<Session>, room_id: RoomId, user_id: UserId, token: St
fn process_block(from: &Arc<Session>, whom: UserId) -> MessageResult {
janus_info!("Processing block from {:p} to {}", from.handle, whom);
if let Some(joined) = from.join_state.get() {
let mut switchboard = STATE.switchboard.write()?;
let mut switchboard = SWITCHBOARD.write()?;
let event = json!({ "event": "blocked", "by": &joined.user_id });
notify_user(&event, &whom, switchboard.occupants_of(&joined.room_id));
switchboard.establish_block(joined.user_id.clone(), whom);
@ -505,7 +489,7 @@ fn process_block(from: &Arc<Session>, whom: UserId) -> MessageResult {
fn process_unblock(from: &Arc<Session>, whom: UserId) -> MessageResult {
janus_info!("Processing unblock from {:p} to {}", from.handle, whom);
if let Some(joined) = from.join_state.get() {
let mut switchboard = STATE.switchboard.write()?;
let mut switchboard = SWITCHBOARD.write()?;
switchboard.lift_block(&joined.user_id, &whom);
if let Some(publisher) = switchboard.get_publisher(&whom) {
send_fir(&[publisher]);
@ -520,12 +504,11 @@ fn process_unblock(from: &Arc<Session>, whom: UserId) -> MessageResult {
fn process_subscribe(from: &Arc<Session>, what: &Subscription) -> MessageResult {
janus_info!("Processing subscription from {:p}: {:?}", from.handle, what);
let subscription_state = Box::new(what.clone());
if from.subscription.set_if_none(subscription_state).is_some() {
if let Err(_existing) = from.subscription.set(what.clone()) {
return Err(From::from("Users may only subscribe once!"))
}
let mut switchboard = STATE.switchboard.write()?;
let mut switchboard = SWITCHBOARD.write()?;
if let Some(ref publisher_id) = what.media {
let publisher = switchboard.get_publisher(publisher_id).ok_or("Can't subscribe to a nonexistent publisher.")?.clone();
let jsep = json!({
@ -541,7 +524,7 @@ fn process_subscribe(from: &Arc<Session>, what: &Subscription) -> MessageResult
fn process_data(from: &Arc<Session>, whom: Option<UserId>, body: &str) -> MessageResult {
janus_huge!("Processing data message from {:p}: {:?}", from.handle, body);
let payload = json!({ "event": "data", "body": body });
let switchboard = STATE.switchboard.write()?;
let switchboard = SWITCHBOARD.write()?;
if let Some(joined) = from.join_state.get() {
let occupants = switchboard.occupants_of(&joined.room_id);
if let Some(user_id) = whom {
@ -611,7 +594,7 @@ fn process_offer(from: &Session, offer: &Sdp) -> JsepResult {
}
janus_verb!("Storing subscriber offer for {:p}: {:?}", from.handle, subscriber_offer);
let switchboard = STATE.switchboard.read().expect("Switchboard lock poisoned; can't continue.");
let switchboard = SWITCHBOARD.read().expect("Switchboard lock poisoned; can't continue.");
let jsep = json!({ "type": "offer", "sdp": subscriber_offer });
send_offer(&jsep, switchboard.subscribers_to(from));
*from.subscriber_offer.lock().unwrap() = Some(subscriber_offer);
@ -699,7 +682,7 @@ extern "C" fn handle_message(handle: *mut PluginSession, transaction: *mut c_cha
jsep: unsafe { JanssonValue::from_raw(jsep) }
};
janus_info!("Queueing signalling message on {:p}.", sess.handle);
STATE.message_channel.get().unwrap().send(msg).ok();
MESSAGE_CHANNEL.get().unwrap().send(msg).ok();
PluginResult::ok_wait(Some(c_str!("Processing.")))
},
Err(_) => PluginResult::error(c_str!("No handle associated with message!"))

Просмотреть файл

@ -1,9 +1,9 @@
/// Types for representing Janus session state.
use atom::AtomSetOnce;
use std::sync::atomic::{AtomicIsize, AtomicBool};
use std::sync::{Arc, Mutex};
use janus_plugin::sdp::Sdp;
use janus_plugin::session::SessionWrapper;
use once_cell::sync::OnceCell;
use crate::messages::{RoomId, UserId, Subscription};
/// State pertaining to this session's join of a particular room as a particular user ID.
@ -29,10 +29,10 @@ pub struct SessionState {
pub destroyed: AtomicBool,
/// Information pertaining to this session's user and room, if joined.
pub join_state: AtomSetOnce<Box<JoinState>>,
pub join_state: OnceCell<JoinState>,
/// The subscription this user has established, if any.
pub subscription: AtomSetOnce<Box<Subscription>>,
pub subscription: OnceCell<Subscription>,
/// If this is a publisher, the offer for subscribing to it.
pub subscriber_offer: Arc<Mutex<Option<Sdp>>>,