зеркало из https://github.com/mozilla/gecko-dev.git
servo: Merge #2500 - Workqueue cleanups (from brendanzab:workqueue); r=pcwalton
cc. @pcwalton Source-Repo: https://github.com/servo/servo Source-Revision: 14ae870d75187225e2072f1062850a310c3ed8d2
This commit is contained in:
Родитель
066e8db6b1
Коммит
aa42b9e344
|
@ -18,66 +18,65 @@ use std::task::TaskOpts;
|
|||
|
||||
/// A unit of work.
|
||||
///
|
||||
/// The type parameter `QUD` stands for "queue user data" and represents global custom data for the
|
||||
/// entire work queue, and the type parameter `WUD` stands for "work user data" and represents
|
||||
/// custom data specific to each unit of work.
|
||||
pub struct WorkUnit<QUD,WUD> {
|
||||
/// # Type parameters
|
||||
///
|
||||
/// - `QueueData`: global custom data for the entire work queue.
|
||||
/// - `WorkData`: custom data specific to each unit of work.
|
||||
pub struct WorkUnit<QueueData, WorkData> {
|
||||
/// The function to execute.
|
||||
pub fun: extern "Rust" fn(WUD, &mut WorkerProxy<QUD,WUD>),
|
||||
pub fun: extern "Rust" fn(WorkData, &mut WorkerProxy<QueueData, WorkData>),
|
||||
/// Arbitrary data.
|
||||
pub data: WUD,
|
||||
pub data: WorkData,
|
||||
}
|
||||
|
||||
/// Messages from the supervisor to the worker.
|
||||
enum WorkerMsg<QUD,WUD> {
|
||||
enum WorkerMsg<QueueData, WorkData> {
|
||||
/// Tells the worker to start work.
|
||||
StartMsg(Worker<WorkUnit<QUD,WUD>>, *mut AtomicUint, *QUD),
|
||||
|
||||
StartMsg(Worker<WorkUnit<QueueData, WorkData>>, *mut AtomicUint, *QueueData),
|
||||
/// Tells the worker to stop. It can be restarted again with a `StartMsg`.
|
||||
StopMsg,
|
||||
|
||||
/// Tells the worker thread to terminate.
|
||||
ExitMsg,
|
||||
}
|
||||
|
||||
/// Messages to the supervisor.
|
||||
enum SupervisorMsg<QUD,WUD> {
|
||||
enum SupervisorMsg<QueueData, WorkData> {
|
||||
FinishedMsg,
|
||||
ReturnDequeMsg(uint, Worker<WorkUnit<QUD,WUD>>),
|
||||
ReturnDequeMsg(uint, Worker<WorkUnit<QueueData, WorkData>>),
|
||||
}
|
||||
|
||||
/// Information that the supervisor thread keeps about the worker threads.
|
||||
struct WorkerInfo<QUD,WUD> {
|
||||
struct WorkerInfo<QueueData, WorkData> {
|
||||
/// The communication channel to the workers.
|
||||
chan: Sender<WorkerMsg<QUD,WUD>>,
|
||||
chan: Sender<WorkerMsg<QueueData, WorkData>>,
|
||||
/// The buffer pool for this deque.
|
||||
pool: BufferPool<WorkUnit<QUD,WUD>>,
|
||||
pool: BufferPool<WorkUnit<QueueData, WorkData>>,
|
||||
/// The worker end of the deque, if we have it.
|
||||
deque: Option<Worker<WorkUnit<QUD,WUD>>>,
|
||||
deque: Option<Worker<WorkUnit<QueueData, WorkData>>>,
|
||||
/// The thief end of the work-stealing deque.
|
||||
thief: Stealer<WorkUnit<QUD,WUD>>,
|
||||
thief: Stealer<WorkUnit<QueueData, WorkData>>,
|
||||
}
|
||||
|
||||
/// Information specific to each worker thread that the thread keeps.
|
||||
struct WorkerThread<QUD,WUD> {
|
||||
struct WorkerThread<QueueData, WorkData> {
|
||||
/// The index of this worker.
|
||||
index: uint,
|
||||
/// The communication port from the supervisor.
|
||||
port: Receiver<WorkerMsg<QUD,WUD>>,
|
||||
port: Receiver<WorkerMsg<QueueData, WorkData>>,
|
||||
/// The communication channel on which messages are sent to the supervisor.
|
||||
chan: Sender<SupervisorMsg<QUD,WUD>>,
|
||||
chan: Sender<SupervisorMsg<QueueData, WorkData>>,
|
||||
/// The thief end of the work-stealing deque for all other workers.
|
||||
other_deques: Vec<Stealer<WorkUnit<QUD,WUD>>>,
|
||||
other_deques: Vec<Stealer<WorkUnit<QueueData, WorkData>>>,
|
||||
/// The random number generator for this worker.
|
||||
rng: XorShiftRng,
|
||||
}
|
||||
|
||||
static SPIN_COUNT: uint = 1000;
|
||||
|
||||
impl<QUD:Send,WUD:Send> WorkerThread<QUD,WUD> {
|
||||
impl<QueueData: Send, WorkData: Send> WorkerThread<QueueData, WorkData> {
|
||||
/// The main logic. This function starts up the worker and listens for
|
||||
/// messages.
|
||||
pub fn start(&mut self) {
|
||||
fn start(&mut self) {
|
||||
loop {
|
||||
// Wait for a start message.
|
||||
let (mut deque, ref_count, queue_data) = match self.port.recv() {
|
||||
|
@ -160,16 +159,16 @@ impl<QUD:Send,WUD:Send> WorkerThread<QUD,WUD> {
|
|||
}
|
||||
|
||||
/// A handle to the work queue that individual work units have.
|
||||
pub struct WorkerProxy<'a,QUD,WUD> {
|
||||
pub worker: &'a mut Worker<WorkUnit<QUD,WUD>>,
|
||||
pub ref_count: *mut AtomicUint,
|
||||
pub queue_data: *QUD,
|
||||
pub struct WorkerProxy<'a, QueueData, WorkData> {
|
||||
worker: &'a mut Worker<WorkUnit<QueueData, WorkData>>,
|
||||
ref_count: *mut AtomicUint,
|
||||
queue_data: *QueueData,
|
||||
}
|
||||
|
||||
impl<'a,QUD,WUD:Send> WorkerProxy<'a,QUD,WUD> {
|
||||
impl<'a, QueueData, WorkData: Send> WorkerProxy<'a, QueueData, WorkData> {
|
||||
/// Enqueues a block into the work queue.
|
||||
#[inline]
|
||||
pub fn push(&mut self, work_unit: WorkUnit<QUD,WUD>) {
|
||||
pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) {
|
||||
unsafe {
|
||||
drop((*self.ref_count).fetch_add(1, SeqCst));
|
||||
}
|
||||
|
@ -178,7 +177,7 @@ impl<'a,QUD,WUD:Send> WorkerProxy<'a,QUD,WUD> {
|
|||
|
||||
/// Retrieves the queue user data.
|
||||
#[inline]
|
||||
pub fn user_data<'a>(&'a self) -> &'a QUD {
|
||||
pub fn user_data<'a>(&'a self) -> &'a QueueData {
|
||||
unsafe {
|
||||
cast::transmute(self.queue_data)
|
||||
}
|
||||
|
@ -186,21 +185,21 @@ impl<'a,QUD,WUD:Send> WorkerProxy<'a,QUD,WUD> {
|
|||
}
|
||||
|
||||
/// A work queue on which units of work can be submitted.
|
||||
pub struct WorkQueue<QUD,WUD> {
|
||||
pub struct WorkQueue<QueueData, WorkData> {
|
||||
/// Information about each of the workers.
|
||||
workers: Vec<WorkerInfo<QUD,WUD>>,
|
||||
workers: Vec<WorkerInfo<QueueData, WorkData>>,
|
||||
/// A port on which deques can be received from the workers.
|
||||
port: Receiver<SupervisorMsg<QUD,WUD>>,
|
||||
port: Receiver<SupervisorMsg<QueueData, WorkData>>,
|
||||
/// The amount of work that has been enqueued.
|
||||
pub work_count: uint,
|
||||
work_count: uint,
|
||||
/// Arbitrary user data.
|
||||
pub data: QUD,
|
||||
pub data: QueueData,
|
||||
}
|
||||
|
||||
impl<QUD:Send,WUD:Send> WorkQueue<QUD,WUD> {
|
||||
impl<QueueData: Send, WorkData: Send> WorkQueue<QueueData, WorkData> {
|
||||
/// Creates a new work queue and spawns all the threads associated with
|
||||
/// it.
|
||||
pub fn new(task_name: &'static str, thread_count: uint, user_data: QUD) -> WorkQueue<QUD,WUD> {
|
||||
pub fn new(task_name: &'static str, thread_count: uint, user_data: QueueData) -> WorkQueue<QueueData, WorkData> {
|
||||
// Set up data structures.
|
||||
let (supervisor_chan, supervisor_port) = channel();
|
||||
let (mut infos, mut threads) = (vec!(), vec!());
|
||||
|
@ -253,7 +252,7 @@ impl<QUD:Send,WUD:Send> WorkQueue<QUD,WUD> {
|
|||
|
||||
/// Enqueues a block into the work queue.
|
||||
#[inline]
|
||||
pub fn push(&mut self, work_unit: WorkUnit<QUD,WUD>) {
|
||||
pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) {
|
||||
match self.workers.get_mut(0).deque {
|
||||
None => {
|
||||
fail!("tried to push a block but we don't have the deque?!")
|
||||
|
|
Загрузка…
Ссылка в новой задаче