2014-01-25 08:51:59 +04:00
|
|
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
|
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
|
|
|
|
|
|
|
//! A work queue for scheduling units of work across threads in a fork-join fashion.
|
|
|
|
//!
|
|
|
|
//! Data associated with queues is simply a pair of unsigned integers. It is expected that a
|
|
|
|
//! higher-level API on top of this could allow safe fork-join parallelism.
|
|
|
|
|
2014-10-25 04:09:27 +04:00
|
|
|
use task_state;
|
|
|
|
|
2014-08-03 07:41:10 +04:00
|
|
|
use native::task::NativeTaskBuilder;
|
2014-04-05 02:52:50 +04:00
|
|
|
use rand::{Rng, XorShiftRng};
|
2014-03-19 20:35:17 +04:00
|
|
|
use std::mem;
|
2014-06-28 05:25:07 +04:00
|
|
|
use std::rand::weak_rng;
|
2014-01-25 08:51:59 +04:00
|
|
|
use std::sync::atomics::{AtomicUint, SeqCst};
|
|
|
|
use std::sync::deque::{Abort, BufferPool, Data, Empty, Stealer, Worker};
|
2014-08-03 07:41:10 +04:00
|
|
|
use std::task::TaskBuilder;
|
2014-01-25 08:51:59 +04:00
|
|
|
|
|
|
|
/// A unit of work.
|
|
|
|
///
|
2014-05-29 03:16:05 +04:00
|
|
|
/// # Type parameters
|
|
|
|
///
|
|
|
|
/// - `QueueData`: global custom data for the entire work queue.
|
|
|
|
/// - `WorkData`: custom data specific to each unit of work.
|
|
|
|
pub struct WorkUnit<QueueData, WorkData> {
|
2014-01-25 08:51:59 +04:00
|
|
|
/// The function to execute.
|
2014-05-29 03:16:05 +04:00
|
|
|
pub fun: extern "Rust" fn(WorkData, &mut WorkerProxy<QueueData, WorkData>),
|
2014-01-25 08:51:59 +04:00
|
|
|
/// Arbitrary data.
|
2014-05-29 03:16:05 +04:00
|
|
|
pub data: WorkData,
|
2014-01-25 08:51:59 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Messages from the supervisor to the worker.
|
2014-05-29 03:16:05 +04:00
|
|
|
enum WorkerMsg<QueueData, WorkData> {
|
2014-01-25 08:51:59 +04:00
|
|
|
/// Tells the worker to start work.
|
2014-08-03 07:41:10 +04:00
|
|
|
StartMsg(Worker<WorkUnit<QueueData, WorkData>>, *mut AtomicUint, *const QueueData),
|
2014-01-25 08:51:59 +04:00
|
|
|
/// Tells the worker to stop. It can be restarted again with a `StartMsg`.
|
|
|
|
StopMsg,
|
|
|
|
/// Tells the worker thread to terminate.
|
|
|
|
ExitMsg,
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Messages to the supervisor.
|
2014-05-29 03:16:05 +04:00
|
|
|
enum SupervisorMsg<QueueData, WorkData> {
|
2014-01-25 08:51:59 +04:00
|
|
|
FinishedMsg,
|
2014-05-29 03:16:05 +04:00
|
|
|
ReturnDequeMsg(uint, Worker<WorkUnit<QueueData, WorkData>>),
|
2014-01-25 08:51:59 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Information that the supervisor thread keeps about the worker threads.
|
2014-05-29 03:16:05 +04:00
|
|
|
struct WorkerInfo<QueueData, WorkData> {
|
2014-01-25 08:51:59 +04:00
|
|
|
/// The communication channel to the workers.
|
2014-05-29 03:16:05 +04:00
|
|
|
chan: Sender<WorkerMsg<QueueData, WorkData>>,
|
2014-01-25 08:51:59 +04:00
|
|
|
/// The worker end of the deque, if we have it.
|
2014-05-29 03:16:05 +04:00
|
|
|
deque: Option<Worker<WorkUnit<QueueData, WorkData>>>,
|
2014-01-25 08:51:59 +04:00
|
|
|
/// The thief end of the work-stealing deque.
|
2014-05-29 03:16:05 +04:00
|
|
|
thief: Stealer<WorkUnit<QueueData, WorkData>>,
|
2014-01-25 08:51:59 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Information specific to each worker thread that the thread keeps.
|
2014-05-29 03:16:05 +04:00
|
|
|
struct WorkerThread<QueueData, WorkData> {
|
2014-01-25 08:51:59 +04:00
|
|
|
/// The index of this worker.
|
|
|
|
index: uint,
|
|
|
|
/// The communication port from the supervisor.
|
2014-05-29 03:16:05 +04:00
|
|
|
port: Receiver<WorkerMsg<QueueData, WorkData>>,
|
2014-01-25 08:51:59 +04:00
|
|
|
/// The communication channel on which messages are sent to the supervisor.
|
2014-05-29 03:16:05 +04:00
|
|
|
chan: Sender<SupervisorMsg<QueueData, WorkData>>,
|
2014-01-25 08:51:59 +04:00
|
|
|
/// The thief end of the work-stealing deque for all other workers.
|
2014-05-29 03:16:05 +04:00
|
|
|
other_deques: Vec<Stealer<WorkUnit<QueueData, WorkData>>>,
|
2014-01-25 08:51:59 +04:00
|
|
|
/// The random number generator for this worker.
|
|
|
|
rng: XorShiftRng,
|
|
|
|
}
|
|
|
|
|
|
|
|
static SPIN_COUNT: uint = 1000;
|
|
|
|
|
2014-05-29 03:16:05 +04:00
|
|
|
impl<QueueData: Send, WorkData: Send> WorkerThread<QueueData, WorkData> {
|
2014-01-25 08:51:59 +04:00
|
|
|
/// The main logic. This function starts up the worker and listens for
|
|
|
|
/// messages.
|
2014-05-29 03:16:05 +04:00
|
|
|
fn start(&mut self) {
|
2014-01-25 08:51:59 +04:00
|
|
|
loop {
|
|
|
|
// Wait for a start message.
|
|
|
|
let (mut deque, ref_count, queue_data) = match self.port.recv() {
|
|
|
|
StartMsg(deque, ref_count, queue_data) => (deque, ref_count, queue_data),
|
|
|
|
StopMsg => fail!("unexpected stop message"),
|
|
|
|
ExitMsg => return,
|
|
|
|
};
|
|
|
|
|
|
|
|
// We're off!
|
|
|
|
//
|
|
|
|
// FIXME(pcwalton): Can't use labeled break or continue cross-crate due to a Rust bug.
|
|
|
|
loop {
|
|
|
|
// FIXME(pcwalton): Nasty workaround for the lack of labeled break/continue
|
|
|
|
// cross-crate.
|
|
|
|
let mut work_unit = unsafe {
|
2014-06-05 21:58:44 +04:00
|
|
|
mem::uninitialized()
|
2014-01-25 08:51:59 +04:00
|
|
|
};
|
|
|
|
match deque.pop() {
|
|
|
|
Some(work) => work_unit = work,
|
|
|
|
None => {
|
|
|
|
// Become a thief.
|
|
|
|
let mut i = 0;
|
|
|
|
let mut should_continue = true;
|
|
|
|
loop {
|
|
|
|
let victim = (self.rng.next_u32() as uint) % self.other_deques.len();
|
2014-05-07 01:19:25 +04:00
|
|
|
match self.other_deques.get_mut(victim).steal() {
|
2014-01-25 08:51:59 +04:00
|
|
|
Empty | Abort => {
|
|
|
|
// Continue.
|
|
|
|
}
|
|
|
|
Data(work) => {
|
|
|
|
work_unit = work;
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if i == SPIN_COUNT {
|
|
|
|
match self.port.try_recv() {
|
2014-05-23 04:14:24 +04:00
|
|
|
Ok(StopMsg) => {
|
2014-01-25 08:51:59 +04:00
|
|
|
should_continue = false;
|
|
|
|
break
|
|
|
|
}
|
2014-05-23 04:14:24 +04:00
|
|
|
Ok(ExitMsg) => return,
|
|
|
|
Ok(_) => fail!("unexpected message"),
|
2014-03-19 20:35:17 +04:00
|
|
|
_ => {}
|
2014-01-25 08:51:59 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
i = 0
|
|
|
|
} else {
|
|
|
|
i += 1
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if !should_continue {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// At this point, we have some work. Perform it.
|
|
|
|
let mut proxy = WorkerProxy {
|
|
|
|
worker: &mut deque,
|
|
|
|
ref_count: ref_count,
|
|
|
|
queue_data: queue_data,
|
|
|
|
};
|
|
|
|
(work_unit.fun)(work_unit.data, &mut proxy);
|
|
|
|
|
|
|
|
// The work is done. Now decrement the count of outstanding work items. If this was
|
|
|
|
// the last work unit in the queue, then send a message on the channel.
|
|
|
|
unsafe {
|
|
|
|
if (*ref_count).fetch_sub(1, SeqCst) == 1 {
|
|
|
|
self.chan.send(FinishedMsg)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Give the deque back to the supervisor.
|
|
|
|
self.chan.send(ReturnDequeMsg(self.index, deque))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// A handle to the work queue that individual work units have.
|
2014-09-21 02:35:08 +04:00
|
|
|
pub struct WorkerProxy<'a, QueueData: 'a, WorkData: 'a> {
|
2014-05-29 03:16:05 +04:00
|
|
|
worker: &'a mut Worker<WorkUnit<QueueData, WorkData>>,
|
|
|
|
ref_count: *mut AtomicUint,
|
2014-08-03 07:41:10 +04:00
|
|
|
queue_data: *const QueueData,
|
2014-01-25 08:51:59 +04:00
|
|
|
}
|
|
|
|
|
2014-09-21 02:35:08 +04:00
|
|
|
impl<'a, QueueData: 'static, WorkData: Send> WorkerProxy<'a, QueueData, WorkData> {
|
2014-01-25 08:51:59 +04:00
|
|
|
/// Enqueues a block into the work queue.
|
|
|
|
#[inline]
|
2014-05-29 03:16:05 +04:00
|
|
|
pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) {
|
2014-01-25 08:51:59 +04:00
|
|
|
unsafe {
|
|
|
|
drop((*self.ref_count).fetch_add(1, SeqCst));
|
|
|
|
}
|
|
|
|
self.worker.push(work_unit);
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Retrieves the queue user data.
|
|
|
|
#[inline]
|
2014-05-29 03:16:05 +04:00
|
|
|
pub fn user_data<'a>(&'a self) -> &'a QueueData {
|
2014-01-25 08:51:59 +04:00
|
|
|
unsafe {
|
2014-06-05 21:58:44 +04:00
|
|
|
mem::transmute(self.queue_data)
|
2014-01-25 08:51:59 +04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// A work queue on which units of work can be submitted.
|
2014-05-29 03:16:05 +04:00
|
|
|
pub struct WorkQueue<QueueData, WorkData> {
|
2014-01-25 08:51:59 +04:00
|
|
|
/// Information about each of the workers.
|
2014-05-29 03:16:05 +04:00
|
|
|
workers: Vec<WorkerInfo<QueueData, WorkData>>,
|
2014-01-25 08:51:59 +04:00
|
|
|
/// A port on which deques can be received from the workers.
|
2014-05-29 03:16:05 +04:00
|
|
|
port: Receiver<SupervisorMsg<QueueData, WorkData>>,
|
2014-01-25 08:51:59 +04:00
|
|
|
/// The amount of work that has been enqueued.
|
2014-05-29 03:16:05 +04:00
|
|
|
work_count: uint,
|
2014-01-25 08:51:59 +04:00
|
|
|
/// Arbitrary user data.
|
2014-05-29 03:16:05 +04:00
|
|
|
pub data: QueueData,
|
2014-01-25 08:51:59 +04:00
|
|
|
}
|
|
|
|
|
2014-05-29 03:16:05 +04:00
|
|
|
impl<QueueData: Send, WorkData: Send> WorkQueue<QueueData, WorkData> {
|
2014-01-25 08:51:59 +04:00
|
|
|
/// Creates a new work queue and spawns all the threads associated with
|
|
|
|
/// it.
|
2014-10-25 04:09:27 +04:00
|
|
|
pub fn new(task_name: &'static str,
|
|
|
|
state: task_state::TaskState,
|
|
|
|
thread_count: uint,
|
|
|
|
user_data: QueueData) -> WorkQueue<QueueData, WorkData> {
|
2014-01-25 08:51:59 +04:00
|
|
|
// Set up data structures.
|
2014-04-05 02:52:50 +04:00
|
|
|
let (supervisor_chan, supervisor_port) = channel();
|
2014-05-07 01:19:25 +04:00
|
|
|
let (mut infos, mut threads) = (vec!(), vec!());
|
2014-01-25 08:51:59 +04:00
|
|
|
for i in range(0, thread_count) {
|
2014-04-05 02:52:50 +04:00
|
|
|
let (worker_chan, worker_port) = channel();
|
2014-06-05 21:58:44 +04:00
|
|
|
let pool = BufferPool::new();
|
2014-01-25 08:51:59 +04:00
|
|
|
let (worker, thief) = pool.deque();
|
|
|
|
infos.push(WorkerInfo {
|
|
|
|
chan: worker_chan,
|
|
|
|
deque: Some(worker),
|
|
|
|
thief: thief,
|
|
|
|
});
|
|
|
|
threads.push(WorkerThread {
|
|
|
|
index: i,
|
|
|
|
port: worker_port,
|
|
|
|
chan: supervisor_chan.clone(),
|
2014-05-07 01:19:25 +04:00
|
|
|
other_deques: vec!(),
|
2014-06-28 05:25:07 +04:00
|
|
|
rng: weak_rng(),
|
2014-01-25 08:51:59 +04:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
// Connect workers to one another.
|
|
|
|
for i in range(0, thread_count) {
|
|
|
|
for j in range(0, thread_count) {
|
|
|
|
if i != j {
|
2014-08-09 07:00:27 +04:00
|
|
|
threads.get_mut(i).other_deques.push(infos[j].thief.clone())
|
2014-01-25 08:51:59 +04:00
|
|
|
}
|
|
|
|
}
|
2014-09-29 20:45:27 +04:00
|
|
|
assert!(threads[i].other_deques.len() == thread_count - 1)
|
2014-01-25 08:51:59 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
// Spawn threads.
|
2014-09-21 02:35:08 +04:00
|
|
|
for thread in threads.into_iter() {
|
2014-08-03 07:41:10 +04:00
|
|
|
TaskBuilder::new().named(task_name).native().spawn(proc() {
|
2014-10-25 04:09:27 +04:00
|
|
|
task_state::initialize(state | task_state::InWorker);
|
2014-01-25 08:51:59 +04:00
|
|
|
let mut thread = thread;
|
|
|
|
thread.start()
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
WorkQueue {
|
|
|
|
workers: infos,
|
|
|
|
port: supervisor_port,
|
|
|
|
work_count: 0,
|
|
|
|
data: user_data,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Enqueues a block into the work queue.
|
|
|
|
#[inline]
|
2014-05-29 03:16:05 +04:00
|
|
|
pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) {
|
2014-05-07 01:19:25 +04:00
|
|
|
match self.workers.get_mut(0).deque {
|
2014-01-25 08:51:59 +04:00
|
|
|
None => {
|
|
|
|
fail!("tried to push a block but we don't have the deque?!")
|
|
|
|
}
|
|
|
|
Some(ref mut deque) => deque.push(work_unit),
|
|
|
|
}
|
|
|
|
self.work_count += 1
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Synchronously runs all the enqueued tasks and waits for them to complete.
|
|
|
|
pub fn run(&mut self) {
|
|
|
|
// Tell the workers to start.
|
|
|
|
let mut work_count = AtomicUint::new(self.work_count);
|
2014-09-21 02:35:08 +04:00
|
|
|
for worker in self.workers.iter_mut() {
|
|
|
|
worker.chan.send(StartMsg(worker.deque.take().unwrap(), &mut work_count, &self.data))
|
2014-01-25 08:51:59 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
// Wait for the work to finish.
|
|
|
|
drop(self.port.recv());
|
|
|
|
self.work_count = 0;
|
|
|
|
|
|
|
|
// Tell everyone to stop.
|
|
|
|
for worker in self.workers.iter() {
|
|
|
|
worker.chan.send(StopMsg)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get our deques back.
|
|
|
|
for _ in range(0, self.workers.len()) {
|
|
|
|
match self.port.recv() {
|
2014-05-07 01:19:25 +04:00
|
|
|
ReturnDequeMsg(index, deque) => self.workers.get_mut(index).deque = Some(deque),
|
2014-01-25 08:51:59 +04:00
|
|
|
FinishedMsg => fail!("unexpected finished message!"),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn shutdown(&mut self) {
|
|
|
|
for worker in self.workers.iter() {
|
|
|
|
worker.chan.send(ExitMsg)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|