Bug 1414623 - P2: Connect callback send/receive with Mutex. r=kinetik

state_callback() and data_callback() can be called from multiple
threads.  To protect the send/receive pair of calls, a Mutex is added
to Connection to prevent one thread from starving when two threads try
to wait on the socket in recvmsg.

MozReview-Commit-ID: LUXcqnw2Hm1

--HG--
extra : rebase_source : e98f50a109510e35bab6516febc9e76539c3228f
This commit is contained in:
Dan Glastonbury 2017-11-14 09:17:38 +10:00
Родитель db190864d1
Коммит 702bf586df
1 изменённых файлов: 19 добавлений и 11 удалений

Просмотреть файл

@ -25,12 +25,12 @@ use cubeb_core::ffi;
use mio::{Ready, Token};
use mio_uds::{UnixListener, UnixStream};
use std::{slice, thread};
use std::collections::VecDeque;
use std::collections::HashSet;
use std::collections::{HashSet, VecDeque};
use std::convert::From;
use std::io::Cursor;
use std::os::raw::c_void;
use std::os::unix::prelude::*;
use std::sync::{Mutex, MutexGuard};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
@ -65,11 +65,18 @@ struct Callback {
input_frame_size: u16,
/// Size of output frame in bytes
output_frame_size: u16,
connection: audioipc::Connection,
connection: Mutex<audioipc::Connection>,
input_shm: SharedMemWriter,
output_shm: SharedMemReader
}
impl Callback {
#[doc(hidden)]
fn connection(&self) -> MutexGuard<audioipc::Connection> {
self.connection.lock().unwrap()
}
}
impl cubeb::StreamCallback for Callback {
type Frame = u8;
@ -89,7 +96,8 @@ impl cubeb::StreamCallback for Callback {
self.input_shm.write(real_input).unwrap();
let r = self.connection.send(ClientMessage::StreamDataCallback(
let mut conn = self.connection();
let r = conn.send(ClientMessage::StreamDataCallback(
output.len() as isize,
self.output_frame_size as usize
));
@ -98,7 +106,7 @@ impl cubeb::StreamCallback for Callback {
return -1;
}
let r = self.connection.receive();
let r = conn.receive();
match r {
Ok(ServerMessage::StreamDataCallback(cb_result)) => {
if cb_result >= 0 {
@ -125,9 +133,8 @@ impl cubeb::StreamCallback for Callback {
cubeb::State::Drained => ffi::CUBEB_STATE_DRAINED,
cubeb::State::Error => ffi::CUBEB_STATE_ERROR,
};
let r = self.connection.send(
ClientMessage::StreamStateCallback(state)
);
let mut conn = self.connection();
let r = conn.send(ClientMessage::StreamStateCallback(state));
if r.is_err() {
debug!("state_callback: Failed to send to client - got={:?}", r);
}
@ -136,7 +143,7 @@ impl cubeb::StreamCallback for Callback {
// side to make state_callback synchronous. If not, then there
// exists a race on cubeb_stream_stop()/cubeb_stream_destroy()
// in Gecko that results in a UAF.
let r = self.connection.receive();
let r = conn.receive();
match r {
Ok(ServerMessage::StreamStateCallback) => {},
_ => {
@ -148,7 +155,8 @@ impl cubeb::StreamCallback for Callback {
impl Drop for Callback {
fn drop(&mut self) {
let r = self.connection.send(ClientMessage::StreamDestroyed);
let mut conn = self.connection();
let r = conn.send(ClientMessage::StreamDestroyed);
if r.is_err() {
debug!("Callback::drop failed to send StreamDestroyed = {:?}", r);
}
@ -447,7 +455,7 @@ impl ServerConn {
Callback {
input_frame_size: input_frame_size,
output_frame_size: output_frame_size,
connection: conn2,
connection: Mutex::new(conn2),
input_shm: input_shm,
output_shm: output_shm
}