Bug 1739727 - Part 2: Improve rust async support in moz_task, r=emilio

This patch contains changes to moz_task to improve it's support for
async execution on multiple threads. Unlike the previous executor
implementation, this new implementation reduces the amount of unsafe
code substantially by depending on the async-task crate
(https://crates.io/crates/async-task) for the core task implementation.
This adds a few additional features:

 * Both local (no Send bound) and non-local (with Send bound) execution support,
 * Support for spawning on arbitrary nsIEventTargets or the background task pool,
 * Returned Task objects from runnables which may be .await-ed on or detach()-ed,
 * Support for spawning with the NS_DISPATCH_EVENT_MAY_BLOCK flag set,
 * Automatic use of NS_DISPATCH_AT_END when required,
 * Support for specifying the runnable priority for runnables.

There are also some correctness improvements, and exposed a better API
for dispatching normal runnable functions to background threads.

After these changes the TaskRunnable API should no longer be necessary.
It is re-implemented on top of the executor and kept in-place to avoid
rewriting all consumers.

Differential Revision: https://phabricator.services.mozilla.com/D130705
This commit is contained in:
Nika Layzell 2021-12-07 20:01:41 +00:00
Родитель 19317ab22a
Коммит 3e323b7d7a
15 изменённых файлов: 599 добавлений и 478 удалений

Просмотреть файл

@ -4,17 +4,10 @@
use l10nregistry_ffi::load::{load_async, load_sync};
use moz_task;
use std::{
sync::atomic::{AtomicBool, Ordering::Relaxed},
sync::Arc,
};
#[no_mangle]
pub extern "C" fn Rust_L10NLoadAsync(it_worked: *mut bool) {
let done = Arc::new(AtomicBool::new(false));
let done2 = done.clone();
moz_task::spawn_current_thread(async move {
let future = async move {
match load_async("resource://gre/localization/en-US/toolkit/about/aboutAbout.ftl").await {
Ok(res) => {
assert_eq!(res.len(), 460);
@ -25,16 +18,12 @@ pub extern "C" fn Rust_L10NLoadAsync(it_worked: *mut bool) {
*it_worked = true;
}
}
Err(err) => println!("{:?}", err),
Err(err) => panic!("{:?}", err),
}
done.store(true, Relaxed);
})
.unwrap();
};
unsafe {
moz_task::gtest_only::spin_event_loop_until(move || done2.load(Relaxed)).unwrap();
*it_worked = true;
moz_task::gtest_only::spin_event_loop_until("Rust_L10NLoadAsync", future).unwrap();
}
}
@ -50,6 +39,6 @@ pub extern "C" fn Rust_L10NLoadSync(it_worked: *mut bool) {
*it_worked = true;
}
}
Err(err) => println!("{:?}", err),
Err(err) => panic!("{:?}", err),
}
}

Просмотреть файл

@ -440,7 +440,7 @@ pub unsafe extern "C" fn l10nregistry_generate_bundles(
// Immediately spawn the task which will handle the async calls, and use an `UnboundedSender`
// to send callbacks for specific `next()` calls to it.
let (sender, mut receiver) = unbounded::<NextRequest>();
moz_task::spawn_current_thread(async move {
moz_task::spawn_local("l10nregistry_generate_bundles", async move {
use futures::StreamExt;
while let Some(req) = receiver.next().await {
let result = match iter.next().await {
@ -450,7 +450,7 @@ pub unsafe extern "C" fn l10nregistry_generate_bundles(
(req.callback)(&req.promise, result);
}
})
.expect("Failed to spawn a task");
.detach();
let iter = GeckoFluentBundleAsyncIteratorWrapper(sender);
Box::into_raw(Box::new(iter))
}

Просмотреть файл

@ -344,10 +344,10 @@ pub unsafe extern "C" fn l10nfilesource_fetch_file(
ResourceStatus::Loaded(res) => callback(promise, Some(&res)),
res @ ResourceStatus::Loading(_) => {
let strong_promise = RefPtr::new(promise);
moz_task::spawn_current_thread(async move {
moz_task::spawn_local("l10nfilesource_fetch_file", async move {
callback(&strong_promise, res.await.as_ref().map(|r| &**r));
})
.expect("Failed to spawn future");
.detach();
}
}
}

Просмотреть файл

@ -49,7 +49,7 @@ pub fn convert_args_to_owned(args: &[L10nArg]) -> Option<FluentArgs<'static>> {
for arg in args {
let val = match arg.value {
FluentArgument::Double_(d) => FluentValue::from(d),
// We need this to be owned because we pass the result into `spawn_current_thread`.
// We need this to be owned because we pass the result into `spawn_local`.
FluentArgument::String(s) => FluentValue::from(Cow::Owned(s.to_utf8().to_string())),
};
result.set(arg.id.to_string(), val);
@ -290,7 +290,7 @@ impl LocalizationRc {
let id = nsCString::from(id);
let strong_promise = RefPtr::new(promise);
moz_task::spawn_current_thread(async move {
moz_task::spawn_local("LocalizationRc::format_value", async move {
let mut errors = vec![];
let value = if let Some(value) = bundles
.format_value(&id.to_utf8(), args.as_ref(), &mut errors)
@ -309,7 +309,7 @@ impl LocalizationRc {
.collect();
callback(&strong_promise, &value, &errors);
})
.expect("Failed to spawn future");
.detach();
}
pub fn format_values(
@ -324,7 +324,7 @@ impl LocalizationRc {
let strong_promise = RefPtr::new(promise);
moz_task::spawn_current_thread(async move {
moz_task::spawn_local("LocalizationRc::format_values", async move {
let mut errors = vec![];
let ret_val = bundles
.format_values(&keys, &mut errors)
@ -350,7 +350,7 @@ impl LocalizationRc {
callback(&strong_promise, &ret_val, &errors);
})
.expect("Failed to spawn future");
.detach();
}
pub fn format_messages(
@ -369,7 +369,7 @@ impl LocalizationRc {
let strong_promise = RefPtr::new(promise);
moz_task::spawn_current_thread(async move {
moz_task::spawn_local("LocalizationRc::format_messages", async move {
let mut errors = vec![];
let ret_val = bundles
.format_messages(&keys, &mut errors)
@ -399,7 +399,7 @@ impl LocalizationRc {
callback(&strong_promise, &ret_val, &errors);
})
.expect("Failed to spawn future");
.detach();
}
}

Просмотреть файл

@ -25,10 +25,7 @@ mod task;
use atomic_refcell::AtomicRefCell;
use error::KeyValueError;
use libc::c_void;
use moz_task::{
create_background_task_queue, dispatch_background_task_with_options, DispatchOptions,
TaskRunnable,
};
use moz_task::{create_background_task_queue, DispatchOptions, TaskRunnable};
use nserror::{nsresult, NS_ERROR_FAILURE, NS_ERROR_NO_AGGREGATION, NS_OK};
use nsstring::{nsACString, nsCString};
use owned_value::{owned_to_variant, variant_to_owned};
@ -126,10 +123,8 @@ impl KeyValueService {
nsCString::from(name),
));
dispatch_background_task_with_options(
RefPtr::new(TaskRunnable::new("KVService::GetOrCreate", task)?.coerce()),
DispatchOptions::default().may_block(true),
)
TaskRunnable::new("KVService::GetOrCreate", task)?
.dispatch_background_task_with_options(DispatchOptions::default().may_block(true))
}
}

Просмотреть файл

@ -44,9 +44,9 @@ type SingleStore = rkv::SingleStore<SafeModeDatabase>;
macro_rules! task_done {
(value) => {
fn done(&self) -> Result<(), nsresult> {
// If TaskRunnable.run() calls Task.done() to return a result
// on the main thread before TaskRunnable.run() returns on the database
// thread, then the Task will get dropped on the database thread.
// If TaskRunnable calls Task.done() to return a result on the
// main thread before TaskRunnable returns on the database thread,
// then the Task will get dropped on the database thread.
//
// But the callback is an nsXPCWrappedJS that isn't safe to release
// on the database thread. So we move it out of the Task here to ensure
@ -65,9 +65,9 @@ macro_rules! task_done {
(void) => {
fn done(&self) -> Result<(), nsresult> {
// If TaskRunnable.run() calls Task.done() to return a result
// on the main thread before TaskRunnable.run() returns on the database
// thread, then the Task will get dropped on the database thread.
// If TaskRunnable calls Task.done() to return a result on the
// main thread before TaskRunnable returns on the database thread,
// then the Task will get dropped on the database thread.
//
// But the callback is an nsXPCWrappedJS that isn't safe to release
// on the database thread. So we move it out of the Task here to ensure

Просмотреть файл

@ -19,12 +19,11 @@ use crate::{
statics::get_database,
};
use crossbeam_utils::atomic::AtomicCell;
use moz_task::{dispatch_background_task_with_options, DispatchOptions, Task, TaskRunnable};
use moz_task::{DispatchOptions, Task, TaskRunnable};
use nserror::nsresult;
use once_cell::sync::Lazy;
use rkv::{StoreError as RkvStoreError, Value};
use std::{collections::HashMap, sync::Mutex, thread::sleep, time::Duration};
use xpcom::RefPtr;
/// A map of key/value pairs to persist. Values are Options so we can
/// use the same structure for both puts and deletes, with a `None` value
@ -129,10 +128,8 @@ pub(crate) fn persist(key: String, value: Option<String>) -> XULStoreResult<()>
// If *changes* was `None`, then this is the first change since
// the last time we persisted, so dispatch a new PersistTask.
let task = Box::new(PersistTask::new());
dispatch_background_task_with_options(
RefPtr::new(TaskRunnable::new("XULStore::Persist", task)?.coerce()),
DispatchOptions::default().may_block(true),
)?;
TaskRunnable::new("XULStore::Persist", task)?
.dispatch_background_task_with_options(DispatchOptions::default().may_block(true))?;
}
// Now insert the key/value pair into the map. The unwrap() call here

Просмотреть файл

@ -6,8 +6,6 @@ use moz_task;
use std::{
future::Future,
pin::Pin,
sync::atomic::{AtomicBool, Ordering::Relaxed},
sync::Arc,
task::{Context, Poll, Waker},
};
@ -15,21 +13,25 @@ use std::{
struct MyFuture {
poll_count: u32,
waker: Option<Waker>,
expect_main_thread: bool,
}
impl Default for MyFuture {
fn default() -> Self {
impl MyFuture {
fn new(expect_main_thread: bool) -> Self {
Self {
poll_count: 0,
waker: None,
expect_main_thread,
}
}
}
impl Future for MyFuture {
type Output = ();
type Output = u32;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<u32> {
assert_eq!(moz_task::is_main_thread(), self.expect_main_thread);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
self.poll_count += 1;
if let Some(waker) = &mut self.waker {
@ -43,7 +45,7 @@ impl Future for MyFuture {
println!("Poll count = {}", self.poll_count);
if self.poll_count > 5 {
Poll::Ready(())
Poll::Ready(self.poll_count)
} else {
// Just notify the task that we need to re-polled.
if let Some(waker) = &self.waker {
@ -56,17 +58,21 @@ impl Future for MyFuture {
#[no_mangle]
pub extern "C" fn Rust_Future(it_worked: *mut bool) {
let done = Arc::new(AtomicBool::new(false));
let done2 = done.clone();
moz_task::spawn_current_thread(async move {
MyFuture::default().await;
done.store(true, Relaxed);
})
.unwrap();
let future = async move {
assert_eq!(MyFuture::new(true).await, 6);
assert_eq!(
moz_task::spawn_local("Rust_Future inner spawn_local", MyFuture::new(true)).await,
6
);
assert_eq!(
moz_task::spawn("Rust_Future inner spawn", MyFuture::new(false)).await,
6
);
unsafe {
*it_worked = true;
}
};
unsafe {
moz_task::gtest_only::spin_event_loop_until(move || done2.load(Relaxed)).unwrap();
*it_worked = true;
moz_task::gtest_only::spin_event_loop_until("Rust_Future", future).unwrap();
};
}

Просмотреть файл

@ -0,0 +1,155 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use crate::{
dispatch_background_task_runnable, dispatch_runnable, get_current_thread, DispatchOptions,
};
use nserror::{nsresult, NS_OK};
use nsstring::nsACString;
use std::sync::Mutex;
use xpcom::interfaces::{nsIEventTarget, nsIRunnablePriority};
use xpcom::xpcom;
/// Basic wrapper to convert a FnOnce callback into a `nsIRunnable` to be
/// dispatched using XPCOM.
#[derive(xpcom)]
#[xpimplements(nsIRunnable, nsINamed, nsIRunnablePriority)]
#[refcnt = "atomic"]
struct InitRunnableFunction<F: FnOnce() + 'static> {
name: &'static str,
priority: u32,
function: Mutex<Option<F>>,
}
impl<F: FnOnce() + 'static> RunnableFunction<F> {
#[allow(non_snake_case)]
fn Run(&self) -> nsresult {
let function = self.function.lock().unwrap().take();
debug_assert!(function.is_some(), "runnable invoked twice?");
if let Some(function) = function {
function();
}
NS_OK
}
#[allow(non_snake_case)]
unsafe fn GetName(&self, result: *mut nsACString) -> nsresult {
(*result).assign(self.name);
NS_OK
}
#[allow(non_snake_case)]
unsafe fn GetPriority(&self, result: *mut u32) -> nsresult {
*result = self.priority;
NS_OK
}
}
pub struct RunnableBuilder<F> {
name: &'static str,
function: F,
priority: u32,
options: DispatchOptions,
}
impl<F> RunnableBuilder<F> {
pub fn new(name: &'static str, function: F) -> Self {
RunnableBuilder {
name,
function,
priority: nsIRunnablePriority::PRIORITY_NORMAL as u32,
options: DispatchOptions::default(),
}
}
pub fn priority(mut self, priority: u32) -> Self {
self.priority = priority;
self
}
pub fn options(mut self, options: DispatchOptions) -> Self {
self.options = options;
self
}
pub fn may_block(mut self, may_block: bool) -> Self {
self.options = self.options.may_block(may_block);
self
}
pub unsafe fn at_end(mut self, at_end: bool) -> Self {
self.options = self.options.at_end(at_end);
self
}
}
impl<F> RunnableBuilder<F>
where
F: FnOnce() + Send + 'static,
{
/// Dispatch this Runnable to the specified EventTarget. The runnable function must be `Send`.
pub fn dispatch(self, target: &nsIEventTarget) -> Result<(), nsresult> {
let runnable = RunnableFunction::allocate(InitRunnableFunction {
name: self.name,
priority: self.priority,
function: Mutex::new(Some(self.function)),
});
unsafe { dispatch_runnable(runnable.coerce(), target, self.options) }
}
/// Dispatch this Runnable to the specified EventTarget as a background
/// task. The runnable function must be `Send`.
pub fn dispatch_background_task(self) -> Result<(), nsresult> {
let runnable = RunnableFunction::allocate(InitRunnableFunction {
name: self.name,
priority: self.priority,
function: Mutex::new(Some(self.function)),
});
unsafe { dispatch_background_task_runnable(runnable.coerce(), self.options) }
}
}
impl<F> RunnableBuilder<F>
where
F: FnOnce() + 'static,
{
/// Dispatch this Runnable to the current thread.
///
/// Unlike `dispatch` and `dispatch_background_task`, the runnable does not
/// need to be `Send` to dispatch to the current thread.
pub fn dispatch_local(self) -> Result<(), nsresult> {
let target = get_current_thread()?;
let runnable = RunnableFunction::allocate(InitRunnableFunction {
name: self.name,
priority: self.priority,
function: Mutex::new(Some(self.function)),
});
unsafe { dispatch_runnable(runnable.coerce(), target.coerce(), self.options) }
}
}
pub fn dispatch_onto<F>(
name: &'static str,
target: &nsIEventTarget,
function: F,
) -> Result<(), nsresult>
where
F: FnOnce() + Send + 'static,
{
RunnableBuilder::new(name, function).dispatch(target)
}
pub fn dispatch_background_task<F>(name: &'static str, function: F) -> Result<(), nsresult>
where
F: FnOnce() + Send + 'static,
{
RunnableBuilder::new(name, function).dispatch_background_task()
}
pub fn dispatch_local<F>(name: &'static str, function: F) -> Result<(), nsresult>
where
F: FnOnce() + 'static,
{
RunnableBuilder::new(name, function).dispatch_local()
}

Просмотреть файл

@ -5,28 +5,27 @@
extern crate nsstring;
use cstr::cstr;
use nserror::{nsresult, NS_ERROR_SERVICE_NOT_AVAILABLE, NS_OK};
use nserror::{nsresult, NS_ERROR_SERVICE_NOT_AVAILABLE, NS_ERROR_UNEXPECTED, NS_OK};
use nsstring::*;
use std::cell::UnsafeCell;
use std::cell::RefCell;
use std::future::Future;
use xpcom::{interfaces::nsIThreadManager, xpcom, xpcom_method};
type IsDoneClosure = dyn FnMut() -> bool + 'static;
#[derive(xpcom)]
#[xpimplements(nsINestedEventLoopCondition)]
#[refcnt = "atomic"]
struct InitEventLoopCondition {
closure: UnsafeCell<Box<IsDoneClosure>>,
#[refcnt = "nonatomic"]
struct InitFutureCompleteCondition<T: 'static> {
value: RefCell<Option<T>>,
}
impl EventLoopCondition {
impl<T: 'static> FutureCompleteCondition<T> {
xpcom_method!(is_done => IsDone() -> bool);
fn is_done(&self) -> Result<bool, nsresult> {
unsafe { Ok((&mut *self.closure.get())()) }
Ok(self.value.borrow().is_some())
}
}
/// Spin the event loop on the current thread until `pred` returns true.
/// Spin the event loop on the current thread until `future` is resolved.
///
/// # Safety
///
@ -36,23 +35,34 @@ impl EventLoopCondition {
/// codebase this method would only be ill-advised and not technically "unsafe",
/// it is marked as unsafe due to the potential for triggering unsafety in
/// unrelated C++ code.
pub unsafe fn spin_event_loop_until<P>(pred: P) -> Result<(), nsresult>
pub unsafe fn spin_event_loop_until<F>(
reason: &'static str,
future: F,
) -> Result<F::Output, nsresult>
where
P: FnMut() -> bool + 'static,
F: Future + 'static,
F::Output: 'static,
{
let closure = Box::new(pred) as Box<IsDoneClosure>;
let cond = EventLoopCondition::allocate(InitEventLoopCondition {
closure: UnsafeCell::new(closure),
});
let thread_manager =
xpcom::get_service::<nsIThreadManager>(cstr!("@mozilla.org/thread-manager;1"))
.ok_or(NS_ERROR_SERVICE_NOT_AVAILABLE)?;
// XXX: Pass in aVeryGoodReason from caller
let cond = FutureCompleteCondition::<F::Output>::allocate(InitFutureCompleteCondition {
value: RefCell::new(None),
});
// Spawn our future onto the current thread event loop, and record the
// completed value as it completes.
let cond2 = cond.clone();
crate::spawn_local(reason, async move {
let rv = future.await;
*cond2.value.borrow_mut() = Some(rv);
})
.detach();
thread_manager
.SpinEventLoopUntil(
&*nsCStr::from("event_loop.rs: Rust is spinning the event loop."),
cond.coerce(),
)
.to_result()
.SpinEventLoopUntil(&*nsCStr::from(reason), cond.coerce())
.to_result()?;
let rv = cond.value.borrow_mut().take();
rv.ok_or(NS_ERROR_UNEXPECTED)
}

Просмотреть файл

@ -0,0 +1,231 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use crate::{get_current_thread, DispatchOptions, RunnableBuilder};
use async_task::Task;
use std::cell::Cell;
use std::sync::Arc;
use std::{fmt::Debug, future::Future, ptr};
use xpcom::interfaces::{nsIEventTarget, nsIRunnablePriority};
use xpcom::RefPtr;
enum SpawnTarget {
BackgroundTask,
EventTarget(RefPtr<nsIEventTarget>),
}
// SAFETY: All XPCOM interfaces are considered !Send + !Sync, however all
// well-behaved nsIEventTarget instances must be threadsafe.
unsafe impl Send for SpawnTarget {}
unsafe impl Sync for SpawnTarget {}
/// Information used by tasks as they are spawned. Stored in an Arc such that
/// their identity can be used for `POLLING_TASK`.
struct TaskSpawnConfig {
name: &'static str,
priority: u32,
options: DispatchOptions,
target: SpawnTarget,
}
thread_local! {
/// Raw pointer to the TaskSpawnConfig for the currently polling task. Used
/// to detect scheduling callbacks for a runnable while it is polled, to set
/// `DISPATCH_AT_END` on the notification.
static POLLING_TASK: Cell<*const TaskSpawnConfig> = Cell::new(ptr::null());
}
fn schedule(config: Arc<TaskSpawnConfig>, runnable: async_task::Runnable) {
// If we're dispatching this task while it is currently running on the same
// thread, set the `DISPATCH_AT_END` flag in the dispatch options to tell
// our threadpool target to not bother to spin up another thread.
let currently_polling = POLLING_TASK.with(|t| t.get() == Arc::as_ptr(&config));
// SAFETY: We use the POLLING_TASK thread local to check if we meet the
// requirements for `at_end`.
let options = unsafe { config.options.at_end(currently_polling) };
// Build the RunnableBuilder for our task to be dispatched.
let config2 = config.clone();
let builder = RunnableBuilder::new(config.name, move || {
// Record the pointer for the currently executing task in the
// POLLING_TASK thread-local so that nested dispatches can detect it.
POLLING_TASK.with(|t| {
let prev = t.get();
t.set(Arc::as_ptr(&config2));
runnable.run();
t.set(prev);
});
})
.priority(config.priority)
.options(options);
let rv = match &config.target {
SpawnTarget::BackgroundTask => builder.dispatch_background_task(),
SpawnTarget::EventTarget(target) => builder.dispatch(&*target),
};
if let Err(err) = rv {
log::warn!(
"dispatch for spawned task '{}' failed: {:?}",
config.name,
err
);
}
}
/// Helper for starting an async task which will run a future to completion.
#[derive(Debug)]
pub struct TaskBuilder<F> {
name: &'static str,
future: F,
priority: u32,
options: DispatchOptions,
}
impl<F> TaskBuilder<F> {
pub fn new(name: &'static str, future: F) -> TaskBuilder<F> {
TaskBuilder {
name,
future,
priority: nsIRunnablePriority::PRIORITY_NORMAL as u32,
options: DispatchOptions::default(),
}
}
/// Specify the priority of the task's runnables.
pub fn priority(mut self, priority: u32) -> Self {
self.priority = priority;
self
}
/// Specify options to use when dispatching the task.
pub fn options(mut self, options: DispatchOptions) -> Self {
self.options = options;
self
}
/// Set whether or not the event may block, and should be run on the IO
/// thread pool.
pub fn may_block(mut self, may_block: bool) -> Self {
self.options = self.options.may_block(may_block);
self
}
}
impl<F> TaskBuilder<F>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
/// Run the future on the background task pool.
pub fn spawn(self) -> Task<F::Output> {
let config = Arc::new(TaskSpawnConfig {
name: self.name,
priority: self.priority,
options: self.options,
target: SpawnTarget::BackgroundTask,
});
let (runnable, task) = async_task::spawn(self.future, move |runnable| {
schedule(config.clone(), runnable)
});
runnable.schedule();
task
}
/// Run the future on the specified nsIEventTarget.
pub fn spawn_onto(self, target: &nsIEventTarget) -> Task<F::Output> {
let config = Arc::new(TaskSpawnConfig {
name: self.name,
priority: self.priority,
options: self.options,
target: SpawnTarget::EventTarget(RefPtr::new(target)),
});
let (runnable, task) = async_task::spawn(self.future, move |runnable| {
schedule(config.clone(), runnable)
});
runnable.schedule();
task
}
}
impl<F> TaskBuilder<F>
where
F: Future + 'static,
F::Output: 'static,
{
/// Run the future on the current thread.
///
/// Unlike the other `spawn` methods, this method supports non-Send futures.
///
/// # Panics
///
/// This method may panic if run on a thread which cannot run local futures
/// (e.g. due to it is not being an XPCOM thread, or if we are very late
/// during shutdown).
pub fn spawn_local(self) -> Task<F::Output> {
let current_thread = get_current_thread().expect("cannot get current thread");
let config = Arc::new(TaskSpawnConfig {
name: self.name,
priority: self.priority,
options: self.options,
target: SpawnTarget::EventTarget(RefPtr::new(current_thread.coerce())),
});
let (runnable, task) = async_task::spawn_local(self.future, move |runnable| {
schedule(config.clone(), runnable)
});
runnable.schedule();
task
}
}
/// Spawn a future onto the background task pool. The future will not be run on
/// the main thread.
pub fn spawn<F>(name: &'static str, future: F) -> Task<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
TaskBuilder::new(name, future).spawn()
}
/// Spawn a potentially-blocking future onto the background task pool. The
/// future will not be run on the main thread.
pub fn spawn_blocking<F>(name: &'static str, future: F) -> Task<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
TaskBuilder::new(name, future).may_block(true).spawn()
}
/// Spawn a local future onto the current thread.
pub fn spawn_local<F>(name: &'static str, future: F) -> Task<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
TaskBuilder::new(name, future).spawn_local()
}
pub fn spawn_onto<F>(name: &'static str, target: &nsIEventTarget, future: F) -> Task<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
TaskBuilder::new(name, future).spawn_onto(target)
}
pub fn spawn_onto_blocking<F>(
name: &'static str,
target: &nsIEventTarget,
future: F,
) -> Task<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
TaskBuilder::new(name, future)
.may_block(true)
.spawn_onto(target)
}

Просмотреть файл

@ -1,238 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use crate::dispatch;
use futures_task::{waker, ArcWake};
use nserror::{nsresult, NS_OK};
use std::{
future::Future,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering::SeqCst},
Arc, Mutex,
},
task::{Context, Poll},
};
use xpcom::{interfaces::nsIEventTarget, xpcom, xpcom_method, RefPtr, ThreadBoundRefPtr};
#[derive(xpcom)]
#[xpimplements(nsIRunnable)]
#[refcnt = "atomic"]
struct InitLocalTask<F: Future<Output = ()> + 'static> {
future: Mutex<F>,
event_target: RefPtr<nsIEventTarget>,
state: TaskState,
}
impl<T> InitLocalTask<T>
where
T: Future<Output = ()> + 'static,
{
fn new(future: T, event_target: RefPtr<nsIEventTarget>) -> Self {
InitLocalTask {
future: Mutex::new(future),
event_target,
state: TaskState::default(),
}
}
}
impl<T> LocalTask<T>
where
T: Future<Output = ()> + 'static,
{
/// Runs a closure from the context of the task.
///
/// Any wake notifications resulting from the execution of the closure are
/// tracked.
fn enter<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut Context<'_>) -> R,
{
let task = ThreadBoundRefPtr::new(RefPtr::new(self));
let wake_handle = Arc::new(LocalWakeHandle { task });
let waker = waker(wake_handle);
let mut cx = Context::from_waker(&waker);
f(&mut cx)
}
xpcom_method!(run => Run());
fn run(&self) -> Result<(), nsresult> {
// # Safety
//
// Mutex ensures that future is polled serially.
self.enter(|cx| {
// The only way to have this `LocalTask` dispatched to the named
// event target is for it to be dispatched by the `Waker`, which will
// put the state into `POLL` before dispatching the runnable.
// Another waker may have put the state into `REPOLL` in the
// meantime, however we can clear that back to `POLL` now as we're
// about to begin polling.
self.state.start_poll();
loop {
// # Safety
//
// LocalTask is a heap allocation due to being an XPCOM object,
// so `fut` is effectively `Box`ed.
//
// Also the value is never moved value out of its owning `Mutex`.
let mut lock = self.future.lock().expect("Failed to lock future");
let fut = unsafe { Pin::new_unchecked(&mut *lock) };
let res = fut.poll(cx);
match res {
Poll::Pending => {}
Poll::Ready(()) => return unsafe { self.state.complete() },
}
if unsafe { !self.state.wait() } {
break;
}
}
});
Ok(())
}
fn wake_up(&self) {
if self.state.wake_up() {
unsafe { dispatch(self.coerce(), &self.event_target) }.unwrap()
}
}
}
// Task State Machine - This was heavily cribbed from futures-executor::ThreadPool
struct TaskState {
state: AtomicUsize,
}
// There are four possible task states, listed below with their possible
// transitions:
// The task is blocked, waiting on an event
const IDLE: usize = 0; // --> POLL
// The task is actively being polled by a thread; arrival of additional events
// of interest should move it to the REPOLL state
const POLL: usize = 1; // --> IDLE, REPOLL, or COMPLETE
// The task is actively being polled, but will need to be re-polled upon
// completion to ensure that all events were observed.
const REPOLL: usize = 2; // --> POLL
// The task has finished executing (either successfully or with an error/panic)
const COMPLETE: usize = 3; // No transitions out
impl Default for TaskState {
fn default() -> Self {
Self {
state: AtomicUsize::new(IDLE),
}
}
}
impl TaskState {
/// Attempt to "wake up" the task and poll the future.
///
/// A `true` result indicates that the `POLL` state has been entered, and
/// the caller can proceed to poll the future. A `false` result indicates
/// that polling is not necessary (because the task is finished or the
/// polling has been delegated).
fn wake_up(&self) -> bool {
let mut state = self.state.load(SeqCst);
loop {
match state {
// The task is idle, so try to run it immediately.
IDLE => match self.state.compare_exchange(IDLE, POLL, SeqCst, SeqCst) {
Ok(_) => {
return true;
}
Err(cur) => state = cur,
},
// The task is being polled, so we need to record that it should
// be *repolled* when complete.
POLL => match self.state.compare_exchange(POLL, REPOLL, SeqCst, SeqCst) {
Ok(_) => return false,
Err(cur) => state = cur,
},
// The task is already scheduled for polling, or is complete, so
// we've got nothing to do.
_ => return false,
}
}
}
/// Alert the Task that polling completed with `Pending`.
///
/// Returns true if a `REPOLL` is pending.
///
/// # Safety
///
/// Callable only from the `POLL`/`REPOLL` states, i.e. between
/// successful calls to `notify` and `wait`/`complete`.
unsafe fn wait(&self) -> bool {
debug_assert!(matches!(self.state.load(SeqCst), POLL | REPOLL));
match self.state.compare_exchange(POLL, IDLE, SeqCst, SeqCst) {
// no wakeups came in while we were running
Ok(_) => false,
// guaranteed to be in REPOLL state; just clobber the
// state and run again.
Err(state) => {
assert_eq!(state, REPOLL);
self.state.store(POLL, SeqCst);
true
}
}
}
/// Alert the Task that it has completed execution and should not be
/// notified again.
///
/// # Safety
///
/// Callable only from the `POLL`/`REPOLL` states, i.e. between
/// successful calls to `wake_up` and `wait`/`complete`.
unsafe fn complete(&self) {
debug_assert!(matches!(self.state.load(SeqCst), POLL | REPOLL));
self.state.store(COMPLETE, SeqCst);
}
/// We're about to begin polling, clear any accumulated re-poll requests.
///
/// Should only be called from the `POLL`/`REPOLL` states immediately before polling.
fn start_poll(&self) {
assert!(matches!(self.state.load(SeqCst), POLL | REPOLL));
self.state.store(POLL, SeqCst);
}
}
struct LocalWakeHandle<F: Future<Output = ()> + 'static> {
task: ThreadBoundRefPtr<LocalTask<F>>,
}
impl<F> ArcWake for LocalWakeHandle<F>
where
F: Future<Output = ()> + 'static,
{
fn wake_by_ref(arc_self: &Arc<Self>) {
if let Some(task) = arc_self.task.get_ref() {
task.wake_up();
} else {
panic!("Attempting to wake task from the wrong thread!");
}
}
}
/// # Safety
///
/// There is no guarantee that `current_thread` is acutally the current thread.
pub unsafe fn local_task<T>(future: T, current_thread: &nsIEventTarget)
where
T: Future<Output = ()> + 'static,
{
let task = LocalTask::allocate(InitLocalTask::new(future, RefPtr::new(current_thread)));
task.wake_up();
}

Просмотреть файл

@ -1,30 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use crate::get_current_thread;
use std::{fmt::Debug, future::Future};
use thiserror::Error;
mod future_task;
pub fn spawn_current_thread<Fut>(future: Fut) -> Result<(), Shutdown>
where
Fut: Future<Output = ()> + 'static,
{
let current_thread = get_current_thread().map_err(|_| Shutdown { _priv: () })?;
// # Safety
//
// It's safe to use `local_task` since `future` is dispatched `current_thread`.
unsafe {
future_task::local_task(future, current_thread.coerce());
}
Ok(())
}
/// An error that occurred during spawning on a shutdown event queue
#[derive(Error, Debug)]
#[error("Event target is shutdown")]
pub struct Shutdown {
_priv: (),
}

Просмотреть файл

@ -7,49 +7,44 @@
//! It also provides the Task trait and TaskRunnable struct,
//! which make it easier to dispatch tasks to threads.
extern crate cstr;
extern crate futures_task;
extern crate libc;
extern crate nserror;
extern crate nsstring;
extern crate thiserror;
extern crate xpcom;
mod dispatcher;
pub use dispatcher::{dispatch_background_task, dispatch_local, dispatch_onto, RunnableBuilder};
mod event_loop;
mod executor;
pub use executor::spawn_current_thread;
pub use executor::{
spawn, spawn_blocking, spawn_local, spawn_onto, spawn_onto_blocking, TaskBuilder,
};
// FIXME: Unfortunately directly re-exporting as `Task` conflicts with the task
// trait below. This type is useful for folks using the `spawn*` methods.
pub use async_task::Task as AsyncTask;
// Expose functions intended to be used only in gtest via this module.
// We don't use a feature gate here to stop the need to compile all crates that
// depend upon `moz_task` twice.
pub mod gtest_only {
pub use event_loop::spin_event_loop_until;
pub use crate::event_loop::spin_event_loop_until;
}
use nserror::{nsresult, NS_OK};
use nserror::nsresult;
use nsstring::{nsACString, nsCString};
use std::{
ffi::CStr,
marker::PhantomData,
mem, ptr,
sync::atomic::{AtomicBool, Ordering},
};
use std::{ffi::CStr, marker::PhantomData, mem, ptr};
use xpcom::{
getter_addrefs,
interfaces::{nsIEventTarget, nsIRunnable, nsISerialEventTarget, nsISupports, nsIThread},
xpcom, xpcom_method, AtomicRefcnt, RefCounted, RefPtr, XpCom,
AtomicRefcnt, RefCounted, RefPtr, XpCom,
};
extern "C" {
fn NS_GetCurrentThreadEventTarget(result: *mut *const nsIThread) -> nsresult;
fn NS_GetMainThreadEventTarget(result: *mut *const nsIThread) -> nsresult;
fn NS_GetCurrentThreadRust(result: *mut *const nsIThread) -> nsresult;
fn NS_GetMainThreadRust(result: *mut *const nsIThread) -> nsresult;
fn NS_IsMainThread() -> bool;
fn NS_NewNamedThreadWithDefaultStackSize(
name: *const nsACString,
result: *mut *const nsIThread,
event: *const nsIRunnable,
) -> nsresult;
fn NS_IsCurrentThread(thread: *const nsIEventTarget) -> bool;
fn NS_IsOnCurrentThread(target: *const nsIEventTarget) -> bool;
fn NS_ProxyReleaseISupports(
name: *const libc::c_char,
target: *const nsIEventTarget,
@ -64,11 +59,11 @@ extern "C" {
}
pub fn get_current_thread() -> Result<RefPtr<nsIThread>, nsresult> {
getter_addrefs(|p| unsafe { NS_GetCurrentThreadEventTarget(p) })
getter_addrefs(|p| unsafe { NS_GetCurrentThreadRust(p) })
}
pub fn get_main_thread() -> Result<RefPtr<nsIThread>, nsresult> {
getter_addrefs(|p| unsafe { NS_GetMainThreadEventTarget(p) })
getter_addrefs(|p| unsafe { NS_GetMainThreadRust(p) })
}
pub fn is_main_thread() -> bool {
@ -81,8 +76,8 @@ pub fn create_thread(name: &str) -> Result<RefPtr<nsIThread>, nsresult> {
})
}
pub fn is_current_thread(thread: &nsIThread) -> bool {
unsafe { NS_IsCurrentThread(thread.coerce()) }
pub fn is_on_current_thread(target: &nsIEventTarget) -> bool {
unsafe { NS_IsOnCurrentThread(target) }
}
/// Creates a queue that runs tasks on the background thread pool. The tasks
@ -93,18 +88,6 @@ pub fn create_background_task_queue(
getter_addrefs(|p| unsafe { NS_CreateBackgroundTaskQueue(name.as_ptr(), p) })
}
/// Dispatches a one-shot runnable to an event target with the default options.
///
/// # Safety
///
/// As there is no guarantee that the runnable is actually `Send + Sync`, we
/// can't know that it's safe to dispatch an `nsIRunnable` to any
/// `nsIEventTarget`.
#[inline]
pub unsafe fn dispatch(runnable: &nsIRunnable, target: &nsIEventTarget) -> Result<(), nsresult> {
dispatch_with_options(runnable, target, DispatchOptions::default())
}
/// Dispatches a one-shot runnable to an event target, like a thread or a
/// task queue, with the given options.
///
@ -115,7 +98,7 @@ pub unsafe fn dispatch(runnable: &nsIRunnable, target: &nsIEventTarget) -> Resul
/// As there is no guarantee that the runnable is actually `Send + Sync`, we
/// can't know that it's safe to dispatch an `nsIRunnable` to any
/// `nsIEventTarget`.
pub unsafe fn dispatch_with_options(
pub unsafe fn dispatch_runnable(
runnable: &nsIRunnable,
target: &nsIEventTarget,
options: DispatchOptions,
@ -127,35 +110,34 @@ pub unsafe fn dispatch_with_options(
.to_result()
}
/// Dispatches a one-shot task runnable to the background thread pool with the
/// default options.
#[inline]
pub fn dispatch_background_task(runnable: RefPtr<nsIRunnable>) -> Result<(), nsresult> {
dispatch_background_task_with_options(runnable, DispatchOptions::default())
}
/// Dispatches a one-shot task runnable to the background thread pool with the
/// given options. The task may run concurrently with other background tasks.
/// If you need tasks to run in a specific order, please create a background
/// task queue using `create_background_task_queue`, and dispatch tasks to it
/// instead.
///
/// ### Safety
///
/// This function leaks the runnable if dispatch fails. This avoids a race where
/// a runnable can be destroyed on either the original or target thread, which
/// is important if the runnable holds thread-unsafe members.
pub fn dispatch_background_task_with_options(
runnable: RefPtr<nsIRunnable>,
///
/// ### Safety
///
/// As there is no guarantee that the runnable is actually `Send + Sync`, we
/// can't know that it's safe to dispatch an `nsIRunnable` to any
/// `nsIEventTarget`.
pub unsafe fn dispatch_background_task_runnable(
runnable: &nsIRunnable,
options: DispatchOptions,
) -> Result<(), nsresult> {
// This eventually calls the non-`already_AddRefed<nsIRunnable>` overload of
// `nsIEventTarget::Dispatch` (see xpcom/threads/nsIEventTarget.idl#20-25),
// which adds an owning reference and leaks if dispatch fails.
unsafe { NS_DispatchBackgroundTask(runnable.coerce(), options.flags()) }.to_result()
NS_DispatchBackgroundTask(runnable, options.flags()).to_result()
}
/// Options to control how task runnables are dispatched.
///
/// NOTE: The `DISPATCH_SYNC` flag is intentionally not supported by this type.
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
pub struct DispatchOptions(u32);
@ -187,6 +169,24 @@ impl DispatchOptions {
}
}
/// Specifies that the dispatch is occurring from a running event that was
/// dispatched to the same event target, and that event is about to finish.
///
/// A thread pool can use this as an optimization hint to not spin up
/// another thread, since the current thread is about to become idle.
///
/// Setting this flag is unsafe, as it may only be used from the target
/// event target when the event is about to finish.
#[inline]
pub unsafe fn at_end(self, may_block: bool) -> DispatchOptions {
const FLAG: u32 = nsIEventTarget::DISPATCH_AT_END as u32;
if may_block {
DispatchOptions(self.flags() | FLAG)
} else {
DispatchOptions(self.flags() & !FLAG)
}
}
/// Returns the set of bitflags to pass to `DispatchFromScript`.
#[inline]
fn flags(self) -> u32 {
@ -196,85 +196,101 @@ impl DispatchOptions {
/// A task represents an operation that asynchronously executes on a target
/// thread, and returns its result to the original thread.
///
/// # Alternatives
///
/// This trait is no longer necessary for basic tasks to be dispatched to
/// another thread with a callback on the originating thread. `moz_task` now has
/// a series of more rust-like primitives which can be used instead. For
/// example, it may be preferable to use the async executor over `Task`:
///
/// ```ignore
/// // Spawn a task onto the background task pool, and capture the result of its
/// // execution.
/// let bg_task = moz_task::spawn("Example", async move {
/// do_background_work(captured_state)
/// });
///
/// // Spawn another task on the calling thread which will await on the result
/// // of the async operation, and invoke a non-Send callback. This task won't
/// // be awaited on, so needs to be `detach`-ed.
/// moz_task::spawn_local("Example", async move {
/// callback.completed(bg_task.await);
/// })
/// .detach();
/// ```
///
/// If no result is needed, the task returned from `spawn` may be also detached
/// directly.
pub trait Task {
// FIXME: These could accept `&mut`.
fn run(&self);
fn done(&self) -> Result<(), nsresult>;
}
/// The struct responsible for dispatching a Task by calling its run() method
/// on the target thread and returning its result by calling its done() method
/// on the original thread.
///
/// The struct uses its has_run field to determine whether it should call
/// run() or done(). It could instead check if task.result is Some or None,
/// but if run() failed to set task.result, then it would loop infinitely.
#[derive(xpcom)]
#[xpimplements(nsIRunnable, nsINamed)]
#[refcnt = "atomic"]
pub struct InitTaskRunnable {
pub struct TaskRunnable {
name: &'static str,
original_thread: RefPtr<nsIThread>,
task: Box<dyn Task + Send + Sync>,
has_run: AtomicBool,
}
impl TaskRunnable {
// XXX: Fixme: clean up this old API. (bug 1744312)
pub fn new(
name: &'static str,
task: Box<dyn Task + Send + Sync>,
) -> Result<RefPtr<TaskRunnable>, nsresult> {
Ok(TaskRunnable::allocate(InitTaskRunnable {
name,
original_thread: get_current_thread()?,
task,
has_run: AtomicBool::new(false),
}))
) -> Result<TaskRunnable, nsresult> {
Ok(TaskRunnable { name, task })
}
/// Dispatches this task runnable to an event target with the default
/// options.
#[inline]
pub fn dispatch(this: RefPtr<Self>, target: &nsIEventTarget) -> Result<(), nsresult> {
Self::dispatch_with_options(this, target, DispatchOptions::default())
pub fn dispatch(self, target: &nsIEventTarget) -> Result<(), nsresult> {
self.dispatch_with_options(target, DispatchOptions::default())
}
/// Dispatches this task runnable to an event target, like a thread or a
/// task queue, with the given options.
///
/// Note that this is an associated function, not a method, because it takes
/// an owned reference to the runnable, and must be called like
/// `TaskRunnable::dispatch_with_options(runnable, options)` and *not*
/// `runnable.dispatch_with_options(options)`.
///
/// This function leaks the runnable if dispatch fails.
pub fn dispatch_with_options(
this: RefPtr<Self>,
self,
target: &nsIEventTarget,
options: DispatchOptions,
) -> Result<(), nsresult> {
unsafe { target.DispatchFromScript(this.coerce(), options.flags()) }.to_result()
// Perform `task.run()` on a background thread.
let task = self.task;
let handle = TaskBuilder::new(self.name, async move {
task.run();
task
})
.options(options)
.spawn_onto(target);
// Run `task.done()` on the starting thread once the background thread
// is done with the task.
spawn_local(self.name, async move {
let task = handle.await;
let _ = task.done();
})
.detach();
Ok(())
}
xpcom_method!(run => Run());
fn run(&self) -> Result<(), nsresult> {
match self
.has_run
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => {
self.task.run();
Self::dispatch(RefPtr::new(self), &self.original_thread)
}
Err(_) => {
assert!(is_current_thread(&self.original_thread));
self.task.done()
}
}
}
pub fn dispatch_background_task_with_options(
self,
options: DispatchOptions,
) -> Result<(), nsresult> {
// Perform `task.run()` on a background thread.
let task = self.task;
let handle = TaskBuilder::new(self.name, async move {
task.run();
task
})
.options(options)
.spawn();
xpcom_method!(get_name => GetName() -> nsACString);
fn get_name(&self) -> Result<nsCString, nsresult> {
Ok(nsCString::from(self.name))
// Run `task.done()` on the starting thread once the background thread
// is done with the task.
spawn_local(self.name, async move {
let task = handle.await;
let _ = task.done();
})
.detach();
Ok(())
}
}
@ -309,7 +325,7 @@ unsafe impl<T: XpCom + 'static> RefCounted for ThreadPtrHolder<T> {
// owning thread, we can release the object directly. Otherwise,
// we need to post a proxy release event to release the object
// on the owning thread.
if is_current_thread(&self.owning_thread) {
if is_on_current_thread(&self.owning_thread) {
(*self.ptr).release()
} else {
NS_ProxyReleaseISupports(
@ -356,7 +372,7 @@ impl<T: XpCom + 'static> ThreadPtrHolder<T> {
/// Returns the wrapped object if called from the owning thread, or
/// `None` if called from any other thread.
pub fn get(&self) -> Option<&T> {
if is_current_thread(&self.owning_thread) && !self.ptr.is_null() {
if is_on_current_thread(&self.owning_thread) && !self.ptr.is_null() {
unsafe { Some(&*self.ptr) }
} else {
None

Просмотреть файл

@ -697,22 +697,12 @@ extern "C" {
// via the xpcom/rust/moz_task crate, which wraps them in safe Rust functions
// that enable Rust code to get/create threads and dispatch runnables on them.
nsresult NS_GetCurrentThreadEventTarget(nsIEventTarget** aResult) {
nsCOMPtr<nsIEventTarget> target = mozilla::GetCurrentEventTarget();
if (!target) {
return NS_ERROR_UNEXPECTED;
}
target.forget(aResult);
return NS_OK;
nsresult NS_GetCurrentThreadRust(nsIThread** aResult) {
return NS_GetCurrentThread(aResult);
}
nsresult NS_GetMainThreadEventTarget(nsIEventTarget** aResult) {
nsCOMPtr<nsIEventTarget> target = mozilla::GetMainThreadEventTarget();
if (!target) {
return NS_ERROR_UNEXPECTED;
}
target.forget(aResult);
return NS_OK;
nsresult NS_GetMainThreadRust(nsIThread** aResult) {
return NS_GetMainThread(aResult);
}
// NS_NewNamedThread's aStackSize parameter has the default argument
@ -727,8 +717,8 @@ nsresult NS_NewNamedThreadWithDefaultStackSize(const nsACString& aName,
return NS_NewNamedThread(aName, aResult, aEvent);
}
bool NS_IsCurrentThread(nsIEventTarget* aThread) {
return aThread->IsOnCurrentThread();
bool NS_IsOnCurrentThread(nsIEventTarget* aTarget) {
return aTarget->IsOnCurrentThread();
}
nsresult NS_DispatchBackgroundTask(nsIRunnable* aEvent,