Make user IDs mandatory, fix bugs

This commit is contained in:
Marshall Quander 2017-10-23 15:18:40 -07:00
Родитель ed7c2b44af
Коммит 3e0431edba
9 изменённых файлов: 229 добавлений и 353 удалений

7
Cargo.lock сгенерированный
Просмотреть файл

@ -2,6 +2,7 @@
name = "janus-plugin-sfu"
version = "0.1.0"
dependencies = [
"atom 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"bitflags 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"cstr-macro 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"janus-plugin 0.1.0 (git+https://github.com/mquander/janus-plugin-rs)",
@ -11,6 +12,11 @@ dependencies = [
"serde_json 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "atom"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "bitflags"
version = "0.9.1"
@ -238,6 +244,7 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[metadata]
"checksum atom 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4cd7b80cba09d9c6679f5ac66af2e5eb9c17fa1b914f142d690b069ba51eacaf"
"checksum bitflags 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4efd02e230a02e18f92fc2735f44597385ed02ad8f831e7c1c1156ee5e1ab3a5"
"checksum bitflags 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f5cde24d1b2e2216a726368b2363a273739c91f4e3eb4e0dd12d672d396ad989"
"checksum chrono 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7c20ebe0b2b08b0aeddba49c609fe7957ba2e33449882cb186a180bc60682fa9"

Просмотреть файл

@ -9,6 +9,7 @@ publish = false
crate_type = ["cdylib"]
[dependencies]
atom = "0.3.4"
bitflags = "1.0.0"
cstr-macro = "0.1.0"
janus-plugin = { git = "https://github.com/mquander/janus-plugin-rs" }

Просмотреть файл

@ -1,4 +1,4 @@
var USER_ID = null; // set on initial connection
var USER_ID = Math.floor(Math.random() * (1000000001));
var ROOM_ID = 42;
const PEER_CONNECTION_CONFIG = {
@ -88,6 +88,7 @@ function attachPublisher(session) {
return handle.attach("janus.plugin.sfu").then(() => {
var iceReady = negotiateIce(conn, handle);
var channel = conn.createDataChannel("reliable", { ordered: true });
channel.addEventListener("message", ev => console.info("Message received on channel: ", ev));
var mediaReady = navigator.mediaDevices.getUserMedia({ audio: true });
var offerReady = mediaReady
.then(media => {
@ -100,10 +101,9 @@ function attachPublisher(session) {
.then(answer => conn.setRemoteDescription(answer.jsep));
var connectionReady = Promise.all([iceReady, localReady, remoteReady]);
return connectionReady
.then(() => handle.sendMessage({ kind: "join", room_id: ROOM_ID, notify: true }))
.then(() => handle.sendMessage({ kind: "join", room_id: ROOM_ID, user_id: USER_ID, notify: true }))
.then(reply => {
var response = reply.plugindata.data.response;
USER_ID = response.user_id;
response.user_ids.forEach(otherId => {
if (USER_ID !== otherId) {
addUser(session, otherId);

Просмотреть файл

@ -1,6 +1,6 @@
# Signalling API
**This documentation is in-progress and currently incomplete.**
**This API is very WIP. So is this documentation.**
The plugin exposes a signalling API for establishing connections and managing connection state.
@ -14,10 +14,13 @@ expect consumers of this plugin to use WebSockets, but you can probably use what
2. Create an RTC connection and perform session negotation.
3. Join a room. If you have a user ID, send your user ID; else obtain a user ID. Establish an initial set of subscriptions;
subscriptions tell the server which data from other clients to send down your connection.
3. Determine your user ID. This should be a unique ID that nobody else is likely to share. In the future, we will actually
have authentication; as it stands just pick a big random ID and pray for no collisions. I'm serious.
4. When done, close your connection, which will implicitly leave the room.
4. Join a room. Establish an initial set of subscriptions; subscriptions tell the server which data from other clients
to send down your connection.
5. When done, close your connection, which will implicitly leave the room.
## Application protocol
@ -38,16 +41,12 @@ join a room. You can only join one room with any connection.
{
"kind": "join",
"room_id": unsigned integer ID
"user_id": [none|unsigned integer ID],
"user_id": unsigned integer ID,
"notify": [none|boolean],
"subscription_specs": [none|array of spec objects]
}
```
The first time you join a room, you should allow Janus to assign you a user ID; if you don't, you might overlap with
someone else's. For future connections, you should provide your user ID again. User IDs are used to identify the target
for subscriptions, so changing your user ID will make it impossible for people to subscribe to your audio.
If `notify: true` is passed, you will receive notifications from Janus for this handle when things relevant to your
interest occur in the room; for example, if someone joins or leaves. If you create multiple connections, you probably
don't want those notifications on every connection.

Просмотреть файл

@ -1,114 +0,0 @@
/// Types for representing unique entity IDs.
use std::error::Error;
use std::sync::atomic::{AtomicUsize, Ordering};
/// A room ID representing a Janus multicast room.
///
/// Room IDs are represented as usizes >= 1; 0 indicates an un-set room ID.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct RoomId(usize);
impl RoomId {
/// Attempts to construct a room ID from a usize. Any usize >= 1 is valid.
pub fn try_from(val: usize) -> Result<Self, Box<Error + Send + Sync>> {
match val {
0 => Err(From::from("Room IDs must be positive integers.")),
_ => Ok(RoomId(val)),
}
}
}
// An atomic container representing an optional room ID.
#[derive(Debug)]
pub struct AtomicRoomId {
v: AtomicUsize,
}
impl AtomicRoomId {
pub fn empty() -> Self {
Self {
v: AtomicUsize::new(0),
}
}
pub fn load(&self, order: Ordering) -> Option<RoomId> {
match self.v.load(order) {
0 => None,
n => Some(RoomId(n)),
}
}
pub fn store(&self, val: RoomId, order: Ordering) {
self.v.store(val.0, order);
}
}
/// A user ID representing a single Janus client. Used to correlate multiple Janus connections back to the same
/// conceptual user for managing subscriptions.
///
/// User IDs are represented as usizes >= 1; 0 indicates an un-set user ID.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct UserId(usize);
impl UserId {
/// Attempts to construct a user ID from a usize. Any usize >= 1 is valid.
pub fn try_from(val: usize) -> Result<Self, Box<Error + Send + Sync>> {
match val {
0 => Err(From::from("User IDs must be positive integers.")),
_ => Ok(UserId(val)),
}
}
}
/// An atomic container representing an optional user ID. A sequence of successive user IDs can be generated via
/// AtomicUserId::first() followed by repeated invocations of AtomicUserId::next().
#[derive(Debug)]
pub struct AtomicUserId {
v: AtomicUsize,
}
impl AtomicUserId {
pub fn empty() -> Self {
Self {
v: AtomicUsize::new(0),
}
}
pub fn first() -> Self {
Self {
v: AtomicUsize::new(1),
}
}
pub fn next(&self, order: Ordering) -> Option<UserId> {
match self.v.fetch_add(1, order) {
0 => None,
n => Some(UserId(n)),
}
}
pub fn load(&self, order: Ordering) -> Option<UserId> {
match self.v.load(order) {
0 => None,
n => Some(UserId(n)),
}
}
pub fn store(&self, val: UserId, order: Ordering) {
self.v.store(val.0, order);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic_userid_functionality() {
let a = AtomicUserId::empty();
let b = AtomicUserId::first();
assert_eq!(None, a.next(Ordering::SeqCst));
assert_eq!(a.load(Ordering::SeqCst), b.load(Ordering::SeqCst));
}
}

Просмотреть файл

@ -1,3 +1,4 @@
extern crate atom;
#[macro_use]
extern crate bitflags;
#[macro_use]
@ -12,37 +13,28 @@ extern crate serde_derive;
#[macro_use]
extern crate serde_json;
mod entityids;
mod messages;
mod sessions;
mod subscriptions;
use entityids::{AtomicUserId, RoomId, UserId};
use atom::AtomSetOnce;
use messages::{RoomId, UserId};
use janus::{JanssonDecodingFlags, JanssonEncodingFlags, JanssonValue, LogLevel, Plugin, PluginCallbacks, PluginMetadata, PluginResultInfo,
PluginResultType, PluginSession, RawJanssonValue};
use janus::sdp::{MediaType, Sdp};
use messages::{JsepKind, MessageKind, RawMessage, SubscriptionSpec};
use janus::sdp::Sdp;
use messages::{JsepKind, MessageKind, OptionalField, RawMessage, SubscriptionSpec};
use serde_json::Result as JsonResult;
use serde_json::Value as JsonValue;
use sessions::Session;
use sessions::{Session, SessionState};
use std::collections::HashSet;
use std::error::Error;
use std::ffi::CString;
use std::os::raw::{c_char, c_int};
use std::ptr;
use std::sync::{mpsc, Arc, Mutex, RwLock};
use std::sync::atomic::Ordering;
use std::thread;
use subscriptions::{ContentKind, SubscriptionMap};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(untagged)]
enum OptionalPayload<T> {
Some(T),
None {}
}
/// Inefficiently converts a serde JSON value to a Jansson JSON value.
fn from_serde_json(input: JsonValue) -> JanssonValue {
JanssonValue::from_str(&input.to_string(), JanssonDecodingFlags::empty()).unwrap()
@ -80,7 +72,6 @@ struct State {
pub sessions: RwLock<Vec<Box<Arc<Session>>>>,
pub subscriptions: RwLock<SubscriptionMap>,
pub message_channel: Mutex<Option<mpsc::SyncSender<RawMessage>>>,
pub next_user_id: AtomicUserId,
}
lazy_static! {
@ -88,51 +79,64 @@ lazy_static! {
sessions: RwLock::new(Vec::new()),
subscriptions: RwLock::new(SubscriptionMap::new()),
message_channel: Mutex::new(None),
next_user_id: AtomicUserId::first()
};
}
fn get_room_ids(sessions: &Vec<Box<Arc<Session>>>) -> HashSet<RoomId> {
sessions.iter().filter_map(|s| s.room_id.load(Ordering::Relaxed)).collect()
sessions.iter().filter_map(|s| s.get()).map(|s| s.room_id).collect()
}
fn get_user_ids(sessions: &Vec<Box<Arc<Session>>>, room_id: RoomId) -> HashSet<UserId> {
sessions.iter()
.filter(|s| s.room_id.load(Ordering::Relaxed) == Some(room_id))
.filter_map(|s| s.user_id.load(Ordering::Relaxed))
.collect()
sessions.iter().filter_map(|s| s.get()).filter(|s| s.room_id == room_id).map(|s| s.user_id).collect()
}
fn send_notification(myself: UserId, json: JsonValue) -> Result<(), Box<Error>> {
fn send_notification(myself: &SessionState, json: JsonValue) -> Result<(), Box<Error>> {
janus::log(LogLevel::Info, &format!("{:?} sending notification: {}.", myself, json));
let msg = from_serde_json(json);
let push_event = gateway_callbacks().push_event;
for other in STATE.sessions.read()?.iter() {
if other.notify.load(Ordering::Relaxed) && other.user_id.load(Ordering::Relaxed) != Some(myself) {
janus::get_result(push_event(other.handle, &mut PLUGIN, ptr::null(), msg.as_mut_ref(), ptr::null_mut()))?
if let Some(other_state) = other.get() {
if other_state.user_id != myself.user_id && other_state.notify {
janus::get_result(push_event(other.handle, &mut PLUGIN, ptr::null(), msg.as_mut_ref(), ptr::null_mut()))?
}
}
}
Ok(())
}
fn relay<T>(from: *mut PluginSession, kind: Option<ContentKind>, send: T) -> Result<(), Box<Error>> where T: Fn(&Session) {
fn send_data<T>(from: *mut PluginSession, send: T) -> Result<(), Box<Error>> where T: Fn(&Session) {
let sess = Session::from_ptr(from)?;
if let Some(user_id) = sess.user_id.load(Ordering::Relaxed) {
if let Some(room_id) = sess.room_id.load(Ordering::Relaxed) {
janus::log(LogLevel::Dbg, &format!("Packet of kind {:?} received in room {:?} from {:?}.", kind, room_id, user_id));
let subscriptions = STATE.subscriptions.read()?;
subscriptions::for_each_subscriber(&subscriptions, user_id, kind, |s| {
if Some(user_id) != s.user_id.load(Ordering::Relaxed) {
// if there's a cross-room subscription, relay it -- presume the client knows what it's doing.
send(s)
if let Some(state) = sess.get() {
janus::log(LogLevel::Dbg, &format!("Data packet received in room {:?} from {:?}.", state.room_id, state.user_id));
for other in STATE.sessions.read()?.iter() {
if let Some(other_state) = other.get() {
if other_state.room_id == state.room_id && other_state.user_id != state.user_id {
send(&other)
}
});
Ok(())
} else {
Err(From::from("No room ID associated with connection; can't relay."))
}
}
}
Ok(())
}
fn publish<T>(from: *mut PluginSession, kind: Option<ContentKind>, send: T) -> Result<(), Box<Error>> where T: Fn(&Session) {
let sess = Session::from_ptr(from)?;
if let Some(state) = sess.get() {
janus::log(LogLevel::Dbg, &format!("Packet of kind {:?} received in room {:?} from {:?}.", kind, state.room_id, state.user_id));
let subscriptions = STATE.subscriptions.read()?;
for subscription in subscriptions::subscribers_to(&subscriptions, state.user_id, kind) {
if let Some(subscriber) = subscription.sess.upgrade() {
if let Some(subscriber_state) = subscriber.get() {
if state.user_id != subscriber_state.user_id {
// if there's a cross-room subscription, relay it -- presume the client knows what it's doing.
send(&subscriber);
}
}
}
}
Ok(())
} else {
Err(From::from("No user ID associated with connection; can't relay."))
Err(From::from("Session not yet configured; can't relay."))
}
}
@ -168,7 +172,7 @@ extern "C" fn destroy() {
}
extern "C" fn create_session(handle: *mut PluginSession, error: *mut c_int) {
match Session::associate(handle, Default::default()) {
match Session::associate(handle, AtomSetOnce::empty()) {
Ok(sess) => {
janus::log(LogLevel::Info, &format!("Initializing SFU session {:?}...", sess));
STATE.sessions.write().unwrap().push(sess);
@ -184,17 +188,20 @@ extern "C" fn destroy_session(handle: *mut PluginSession, error: *mut c_int) {
match Session::from_ptr(handle) {
Ok(sess) => {
janus::log(LogLevel::Info, &format!("Destroying SFU session {:?}...", sess));
let user_id = sess.user_id.load(Ordering::Relaxed);
let room_id = sess.room_id.load(Ordering::Relaxed);
STATE.sessions.write().unwrap().retain(|ref s| s.handle != handle);
if let Some(user_id) = user_id {
let user_exists = STATE.sessions.read().unwrap().iter().any(|ref s| Some(user_id) == s.user_id.load(Ordering::Relaxed));
if let Some(state) = sess.get() {
let user_exists = STATE.sessions.read().unwrap().iter().any(|ref s| {
match s.get() {
None => false,
Some(other_state) => state.user_id == other_state.user_id
}
});
if !user_exists {
let mut subscriptions = STATE.subscriptions.write().unwrap();
subscriptions::unpublish(&mut subscriptions, user_id);
let response = json!({ "event": "leave", "user_id": user_id, "room_id": room_id });
send_notification(user_id, response).unwrap_or_else(|e| {
subscriptions::unpublish(&mut subscriptions, state.user_id);
let response = json!({ "event": "leave", "user_id": state.user_id, "room_id": state.room_id });
send_notification(state, response).unwrap_or_else(|e| {
janus::log(LogLevel::Err, &format!("Error notifying publishers on leave: {}", e));
});
}
@ -218,7 +225,7 @@ extern "C" fn setup_media(_handle: *mut PluginSession) {
extern "C" fn incoming_rtp(handle: *mut PluginSession, video: c_int, buf: *mut c_char, len: c_int) {
let content_kind = if video == 1 { ContentKind::VIDEO } else { ContentKind::AUDIO };
let relay_rtp = gateway_callbacks().relay_rtp;
if let Err(e) = relay(handle, Some(content_kind), |other| { relay_rtp(other.handle, video, buf, len); }) {
if let Err(e) = publish(handle, Some(content_kind), |other| { relay_rtp(other.handle, video, buf, len); }) {
janus::log(LogLevel::Huge, &format!("Discarding RTP packet: {}", e))
}
}
@ -226,17 +233,14 @@ extern "C" fn incoming_rtp(handle: *mut PluginSession, video: c_int, buf: *mut c
extern "C" fn incoming_rtcp(handle: *mut PluginSession, video: c_int, buf: *mut c_char, len: c_int) {
let content_kind = if video == 1 { ContentKind::VIDEO } else { ContentKind::AUDIO };
let relay_rtcp = gateway_callbacks().relay_rtcp;
if let Err(e) = relay(handle, Some(content_kind), |other| { relay_rtcp(other.handle, video, buf, len); }) {
if let Err(e) = publish(handle, Some(content_kind), |other| { relay_rtcp(other.handle, video, buf, len); }) {
janus::log(LogLevel::Huge, &format!("Discarding RTCP packet: {}", e))
}
}
extern "C" fn incoming_data(handle: *mut PluginSession, buf: *mut c_char, len: c_int) {
let relay_data = gateway_callbacks().relay_data;
let result = relay(handle, None, |other| {
relay_data(other.handle, buf, len);
});
if let Err(e) = result {
if let Err(e) = send_data(handle, |other| { relay_data(other.handle, buf, len); }) {
janus::log(LogLevel::Huge, &format!("Discarding data packet: {}", e))
}
}
@ -249,35 +253,29 @@ extern "C" fn hangup_media(_handle: *mut PluginSession) {
janus::log(LogLevel::Verb, "Hanging up WebRTC media.");
}
fn process_join(from: &Arc<Session>, room_id: RoomId, user_id: Option<UserId>, notify: Option<bool>, subscription_specs: Option<Vec<SubscriptionSpec>>) -> MessageProcessingResult {
from.room_id.store(room_id, Ordering::Relaxed);
from.notify.store(notify.unwrap_or(false), Ordering::Relaxed);
fn process_join(from: &Arc<Session>, room_id: RoomId, user_id: UserId, notify: Option<bool>, subscription_specs: Option<Vec<SubscriptionSpec>>) -> MessageProcessingResult {
let state = Box::new(SessionState {
user_id,
room_id,
notify: notify.unwrap_or(false)
});
if from.set_if_none(state).is_some() {
return Err(From::from("Users may only join once!"))
}
let subscription_specs = subscription_specs.unwrap_or_else(Vec::new);
let mut subscriptions = STATE.subscriptions.write()?;
let new_user_id = match user_id {
Some(n) => {
from.user_id.store(n, Ordering::Relaxed);
subscriptions::subscribe_all(&mut subscriptions, from, &subscription_specs)?;
n
},
None => {
let new_user_id = STATE.next_user_id.next(Ordering::Relaxed)
.expect("next_user_id is always a non-empty user ID.");
from.user_id.store(new_user_id, Ordering::Relaxed);
subscriptions::subscribe_all(&mut subscriptions, from, &subscription_specs)?;
let notification = json!({ "event": "join", "user_id": new_user_id, "room_id": room_id });
if let Err(e) = send_notification(new_user_id, notification) {
janus::log(LogLevel::Err, &format!("Error sending notification for user join: {:?}", e))
}
new_user_id
subscriptions::subscribe_all(&mut subscriptions, from, &subscription_specs)?;
if notify == Some(true) {
let notification = json!({ "event": "join", "user_id": user_id, "room_id": room_id });
if let Err(e) = send_notification(from.get().unwrap(), notification) {
janus::log(LogLevel::Err, &format!("Error sending notification for user join: {:?}", e))
}
};
}
let sessions = STATE.sessions.read()?;
Ok(json!({
"user_id": new_user_id,
"user_ids": get_user_ids(&sessions, room_id)
}))
Ok(json!({ "user_ids": get_user_ids(&sessions, room_id) }))
}
fn process_list_users(room_id: RoomId) -> MessageProcessingResult {
@ -301,9 +299,9 @@ fn process_unsubscribe(from: &Arc<Session>, specs: Vec<SubscriptionSpec>) -> Mes
}
fn process_message(from: &Arc<Session>, msg: JanssonValue) -> MessageProcessingResult {
match to_serde_json::<OptionalPayload<MessageKind>>(msg) {
Ok(OptionalPayload::None {}) => Ok(json!({})),
Ok(OptionalPayload::Some(kind)) => {
match to_serde_json::<OptionalField<MessageKind>>(msg) {
Ok(OptionalField::None {}) => Ok(json!({})),
Ok(OptionalField::Some(kind)) => {
janus::log(LogLevel::Info, &format!("Processing {:?} on connection {:?}.", kind, from));
match kind {
MessageKind::ListRooms => process_list_rooms(),
@ -318,22 +316,20 @@ fn process_message(from: &Arc<Session>, msg: JanssonValue) -> MessageProcessingR
}
}
fn process_offer(from: &Arc<Session>, sdp: String) -> JsepResult {
fn process_offer(sdp: String) -> JsepResult {
let offer = Sdp::parse(CString::new(sdp)?)?;
let answer = answer_sdp!(offer, janus::sdp::OfferAnswerParameters::Video, 0);
let answer_str = Sdp::to_string(&answer);
from.has_data.store(answer.get_mlines().contains_key(&MediaType::JANUS_SDP_APPLICATION), Ordering::Relaxed);
janus::log(LogLevel::Info, &format!("{:?}.", answer.get_mlines()));
Ok(serde_json::to_value(JsepKind::Answer { sdp: answer_str.to_str()?.to_owned() })?)
}
fn process_jsep(from: &Arc<Session>, jsep: JanssonValue) -> JsepResult {
match to_serde_json::<OptionalPayload<JsepKind>>(jsep) {
Ok(OptionalPayload::None {}) => Ok(json!({})),
Ok(OptionalPayload::Some(kind)) => {
match to_serde_json::<OptionalField<JsepKind>>(jsep) {
Ok(OptionalField::None {}) => Ok(json!({})),
Ok(OptionalField::Some(kind)) => {
janus::log(LogLevel::Info, &format!("Processing {:?} from {:?}.", kind, from));
match kind {
JsepKind::Offer { sdp } => process_offer(from, sdp),
JsepKind::Offer { sdp } => process_offer(sdp),
JsepKind::Answer { .. } => Err(From::from("JSEP answers not yet supported.")),
}
}
@ -409,102 +405,3 @@ const PLUGIN: Plugin = build_plugin!(
);
export_plugin!(&PLUGIN);
#[cfg(test)]
mod tests {
use super::*;
mod jsep_parsing {
use super::*;
#[test]
fn parse_offer() {
let jsep = r#"{"type": "offer", "sdp": "..."}"#;
let result: JsepKind = serde_json::from_str(jsep).unwrap();
assert_eq!(result, JsepKind::Offer { sdp: "...".to_owned() });
}
#[test]
fn parse_answer() {
let jsep = r#"{"type": "answer", "sdp": "..."}"#;
let result: JsepKind = serde_json::from_str(jsep).unwrap();
assert_eq!(result, JsepKind::Answer { sdp: "...".to_owned() });
}
}
mod message_parsing {
use super::*;
#[test]
fn parse_empty() {
let json = r#"{}"#;
let result: OptionalPayload<MessageKind> = serde_json::from_str(json).unwrap();
assert_eq!(result, OptionalPayload::None {});
}
#[test]
fn parse_list_rooms() {
let json = r#"{"kind": "listrooms"}"#;
let result: MessageKind = serde_json::from_str(json).unwrap();
assert_eq!(result, MessageKind::ListRooms);
}
#[test]
fn parse_list_users() {
let json = r#"{"kind": "listusers", "room_id": 5}"#;
let result: MessageKind = serde_json::from_str(json).unwrap();
assert_eq!(result, MessageKind::ListUsers { room_id: RoomId::try_from(5).unwrap() });
}
#[test]
fn parse_join_no_user_id() {
let json = r#"{"kind": "join", "room_id": 5, "notify": true, "subscription_specs": []}"#;
let result: MessageKind = serde_json::from_str(json).unwrap();
assert_eq!(result, MessageKind::Join {
user_id: None,
room_id: RoomId::try_from(5).unwrap(),
notify: Some(true),
subscription_specs: Some(vec!()),
});
}
#[test]
fn parse_join_user_id() {
let json = r#"{"kind": "join", "user_id": 10, "room_id": 5}"#;
let result: MessageKind = serde_json::from_str(json).unwrap();
assert_eq!(result, MessageKind::Join {
user_id: Some(UserId::try_from(10).unwrap()),
room_id: RoomId::try_from(5).unwrap(),
notify: None,
subscription_specs: None,
});
}
#[test]
fn parse_subscribe() {
let json = r#"{"kind": "subscribe", "specs": [{"publisher_id": 2, "content_kind": 1}]}"#;
let result: MessageKind = serde_json::from_str(json).unwrap();
assert_eq!(result, MessageKind::Subscribe {
specs: vec!(SubscriptionSpec {
publisher_id: UserId::try_from(2).unwrap(),
content_kind: 1
})
});
}
#[test]
fn parse_unsubscribe() {
let json = r#"{"kind": "unsubscribe", "specs": [{"publisher_id": 5, "content_kind": 2}]}"#;
let result: MessageKind = serde_json::from_str(json).unwrap();
assert_eq!(result, MessageKind::Unsubscribe {
specs: vec!(SubscriptionSpec {
publisher_id: UserId::try_from(5).unwrap(),
content_kind: 2
})
});
}
}
}

Просмотреть файл

@ -1,10 +1,26 @@
/// Types and code related to handling signalling messages.
use super::JanssonValue;
use entityids::{RoomId, UserId};
use sessions::Session;
use std::os::raw::c_char;
use std::sync::Weak;
/// Useful to represent a JSON message field which may or may not be present.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum OptionalField<T> {
Some(T),
None {}
}
/// A room ID representing a Janus multicast room.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct RoomId(u64);
/// A user ID representing a single Janus client. Used to correlate multiple Janus connections back to the same
/// conceptual user for managing subscriptions.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct UserId(u64);
/// A signalling message carrying a JSEP SDP offer or answer.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase", tag = "type")]
@ -23,9 +39,6 @@ pub enum MessageKind {
/// Indicates that a client wishes to "join" a room on the server. Prior to this, no audio, video, or data
/// received from the client will be forwarded to anyone.
///
/// The first session associated with a client should pass no user ID; the server will generate
/// an ID and return it. Subsequent sessions associated with the same client should pass the same ID.
///
/// The "notify" option controls whether room notifications (e.g. join, leave) should be sent to this session.
///
/// If subscriptions are specified, some initial subscriptions for this session will be configured. This is
@ -33,7 +46,7 @@ pub enum MessageKind {
/// get a join event for this user.
Join {
room_id: RoomId,
user_id: Option<UserId>,
user_id: UserId,
notify: Option<bool>,
subscription_specs: Option<Vec<SubscriptionSpec>>
},
@ -82,3 +95,104 @@ pub struct RawMessage {
// covers the txn pointer -- careful that the other fields are really threadsafe!
unsafe impl Send for RawMessage {}
#[cfg(test)]
mod tests {
use super::*;
mod jsep_parsing {
use super::*;
use ::serde_json;
#[test]
fn parse_offer() {
let jsep = r#"{"type": "offer", "sdp": "..."}"#;
let result: JsepKind = serde_json::from_str(jsep).unwrap();
assert_eq!(result, JsepKind::Offer { sdp: "...".to_owned() });
}
#[test]
fn parse_answer() {
let jsep = r#"{"type": "answer", "sdp": "..."}"#;
let result: JsepKind = serde_json::from_str(jsep).unwrap();
assert_eq!(result, JsepKind::Answer { sdp: "...".to_owned() });
}
}
mod message_parsing {
use super::*;
use ::serde_json;
#[test]
fn parse_empty() {
let json = r#"{}"#;
let result: OptionalField<MessageKind> = serde_json::from_str(json).unwrap();
assert_eq!(result, OptionalField::None {});
}
#[test]
fn parse_list_rooms() {
let json = r#"{"kind": "listrooms"}"#;
let result: MessageKind = serde_json::from_str(json).unwrap();
assert_eq!(result, MessageKind::ListRooms);
}
#[test]
fn parse_list_users() {
let json = r#"{"kind": "listusers", "room_id": 5}"#;
let result: MessageKind = serde_json::from_str(json).unwrap();
assert_eq!(result, MessageKind::ListUsers { room_id: RoomId(5) });
}
#[test]
fn parse_join_subscriptions() {
let json = r#"{"kind": "join", "user_id": 10, "room_id": 5, "notify": true, "subscription_specs": []}"#;
let result: MessageKind = serde_json::from_str(json).unwrap();
assert_eq!(result, MessageKind::Join {
user_id: UserId(10),
room_id: RoomId(5),
notify: Some(true),
subscription_specs: Some(vec!()),
});
}
#[test]
fn parse_join_user_id() {
let json = r#"{"kind": "join", "user_id": 10, "room_id": 5}"#;
let result: MessageKind = serde_json::from_str(json).unwrap();
assert_eq!(result, MessageKind::Join {
user_id: UserId(10),
room_id: RoomId(5),
notify: None,
subscription_specs: None,
});
}
#[test]
fn parse_subscribe() {
let json = r#"{"kind": "subscribe", "specs": [{"publisher_id": 100, "content_kind": 1}]}"#;
let result: MessageKind = serde_json::from_str(json).unwrap();
assert_eq!(result, MessageKind::Subscribe {
specs: vec!(SubscriptionSpec {
publisher_id: UserId(100),
content_kind: 1
})
});
}
#[test]
fn parse_unsubscribe() {
let json = r#"{"kind": "unsubscribe", "specs": [{"publisher_id": 100, "content_kind": 2}]}"#;
let result: MessageKind = serde_json::from_str(json).unwrap();
assert_eq!(result, MessageKind::Unsubscribe {
specs: vec!(SubscriptionSpec {
publisher_id: UserId(100),
content_kind: 2
})
});
}
}
}

Просмотреть файл

@ -1,40 +1,20 @@
/// Types for representing Janus session state.
use entityids::{AtomicRoomId, AtomicUserId};
use atom::AtomSetOnce;
use messages::{RoomId, UserId};
use janus::session::SessionWrapper;
use std::sync::atomic::AtomicBool;
/// The state associated with a single session.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SessionState {
/// The user ID associated with this session. Used to correlate multiple sessions that represent
/// the same client, so that other code can refer to a client's packets consistently without
/// regard to which session those packets are being transported on.
///
/// By convention, this starts out empty during every session and is immutable once set.
pub user_id: AtomicUserId,
/// An opaque ID uniquely identifying this user.
pub user_id: UserId,
/// The room ID that this session is in. Only users in the same room can subscribe to each other.
///
/// By convention, this starts out empty during every session and is immutable once set.
pub room_id: AtomicRoomId,
/// Whether or not this session has negotiated a data connection.
pub has_data: AtomicBool,
pub room_id: RoomId,
/// Whether or not this session should receive notifications.
pub notify: AtomicBool,
}
impl Default for SessionState {
fn default() -> Self {
Self {
room_id: AtomicRoomId::empty(),
user_id: AtomicUserId::empty(),
has_data: AtomicBool::new(false),
notify: AtomicBool::new(false),
}
}
pub notify: bool,
}
/// Rust representation of a single Janus session, i.e. a single RTCPeerConnection.
pub type Session = SessionWrapper<SessionState>;
pub type Session = SessionWrapper<AtomSetOnce<Box<SessionState>>>;

Просмотреть файл

@ -1,5 +1,5 @@
/// Types and code related to managing session subscriptions to incoming data.
use entityids::UserId;
use messages::UserId;
use sessions::Session;
use messages::SubscriptionSpec;
use std::collections::HashMap;
@ -85,11 +85,3 @@ pub fn subscribers_to(subscriptions: &SubscriptionMap, publisher: UserId, kind:
Some(k) => all_subscriptions.filter(|s| { s.kind.contains(k) }).collect()
}
}
pub fn for_each_subscriber<T>(subscriptions: &SubscriptionMap, publisher: UserId, kind: Option<ContentKind>, send: T) where T: Fn(&Session) {
for subscription in subscribers_to(subscriptions, publisher, kind) {
if let Some(subscriber_sess) = subscription.sess.upgrade() {
send(subscriber_sess.as_ref());
}
}
}