Bug 1639018 - Change `TaskRunnable::dispatch` to take owned runnables. r=froydnj

This matches how the `Dispatch(already_AddRefed<nsIRunnable>)`
overloads work in C++: `Dispatch` takes ownership of the runnable, and
leaks it if dispatch fails—because the thread manager is shutting down,
for instance. This avoids a race where a runnable can be released on
either the owning or target thread.

Rust doesn't allow arbitrary `Self` types yet (see
rust-lang/rust#44874), so we need to change `dispatch` and
`dispatch_with_options` to be associated methods.

Differential Revision: https://phabricator.services.mozilla.com/D75858
This commit is contained in:
Lina Cambridge 2020-05-20 20:54:49 +00:00
Родитель 28cc6477f4
Коммит faf2fd15e4
11 изменённых файлов: 66 добавлений и 41 удалений

Просмотреть файл

@ -1164,7 +1164,7 @@ impl CertStorage {
));
let thread = try_ns!(self.thread.lock());
let runnable = try_ns!(TaskRunnable::new("HasPriorData", task));
try_ns!(runnable.dispatch(&*thread));
try_ns!(TaskRunnable::dispatch(runnable, &*thread));
NS_OK
}
@ -1226,7 +1226,7 @@ impl CertStorage {
));
let thread = try_ns!(self.thread.lock());
let runnable = try_ns!(TaskRunnable::new("SetRevocations", task));
try_ns!(runnable.dispatch(&*thread));
try_ns!(TaskRunnable::dispatch(runnable, &*thread));
NS_OK
}
@ -1305,7 +1305,7 @@ impl CertStorage {
));
let thread = try_ns!(self.thread.lock());
let runnable = try_ns!(TaskRunnable::new("SetCRLiteState", task));
try_ns!(runnable.dispatch(&*thread));
try_ns!(TaskRunnable::dispatch(runnable, &*thread));
NS_OK
}
@ -1351,7 +1351,7 @@ impl CertStorage {
));
let thread = try_ns!(self.thread.lock());
let runnable = try_ns!(TaskRunnable::new("SetFullCRLiteFilter", task));
try_ns!(runnable.dispatch(&*thread));
try_ns!(TaskRunnable::dispatch(runnable, &*thread));
NS_OK
}
@ -1417,7 +1417,7 @@ impl CertStorage {
));
let thread = try_ns!(self.thread.lock());
let runnable = try_ns!(TaskRunnable::new("AddCerts", task));
try_ns!(runnable.dispatch(&*thread));
try_ns!(TaskRunnable::dispatch(runnable, &*thread));
NS_OK
}
@ -1445,7 +1445,7 @@ impl CertStorage {
));
let thread = try_ns!(self.thread.lock());
let runnable = try_ns!(TaskRunnable::new("RemoveCertsByHashes", task));
try_ns!(runnable.dispatch(&*thread));
try_ns!(TaskRunnable::dispatch(runnable, &*thread));
NS_OK
}

Просмотреть файл

@ -372,7 +372,11 @@ impl PuntTask {
let runnable = TaskRunnable::new(self.name, Box::new(self))?;
// `may_block` schedules the task on the I/O thread pool, since we
// expect most operations to wait on I/O.
runnable.dispatch_with_options(target, DispatchOptions::default().may_block(true))?;
TaskRunnable::dispatch_with_options(
runnable,
target,
DispatchOptions::default().may_block(true),
)?;
Ok(())
}

Просмотреть файл

@ -118,7 +118,7 @@ impl Log for LogSink {
};
let _ =
TaskRunnable::new("extension_storage_sync::Logger::log", Box::new(task))
.and_then(|r| r.dispatch(logger.owning_thread()));
.and_then(|r| TaskRunnable::dispatch(r, logger.owning_thread()));
}
Err(_) => {}
}

Просмотреть файл

@ -196,7 +196,11 @@ where
let runnable = TaskRunnable::new(self.ferry.name(), Box::new(self))?;
// `may_block` schedules the task on the I/O thread pool, since we
// expect most operations to wait on I/O.
runnable.dispatch_with_options(target, DispatchOptions::default().may_block(true))?;
TaskRunnable::dispatch_with_options(
runnable,
target,
DispatchOptions::default().may_block(true),
)?;
Ok(())
}
}
@ -339,7 +343,11 @@ where
/// Dispatches the task to the given thread `target`.
pub fn dispatch(self, target: &nsIEventTarget) -> Result<()> {
let runnable = TaskRunnable::new(Self::name(), Box::new(self))?;
runnable.dispatch_with_options(target, DispatchOptions::default().may_block(true))?;
TaskRunnable::dispatch_with_options(
runnable,
target,
DispatchOptions::default().may_block(true),
)?;
Ok(())
}
}

Просмотреть файл

@ -130,7 +130,7 @@ impl BitsService {
let runnable = TaskRunnable::new(task_runnable_name, task).map_err(|rv| {
BitsTaskError::from_nsresult(FailedToConstructTaskRunnable, action, Pretask, rv)
})?;
runnable.dispatch(&command_thread).map_err(|rv| {
TaskRunnable::dispatch(runnable, &command_thread).map_err(|rv| {
BitsTaskError::from_nsresult(FailedToDispatchRunnable, action, Pretask, rv)
})
}

Просмотреть файл

@ -77,8 +77,11 @@ impl StorageSyncArea {
let task = PuntTask::new(Arc::downgrade(&*self.store()?), punt, callback)?;
let runnable = TaskRunnable::new(name, Box::new(task))?;
// `may_block` schedules the runnable on a dedicated I/O pool.
runnable
.dispatch_with_options(self.queue.coerce(), DispatchOptions::new().may_block(true))?;
TaskRunnable::dispatch_with_options(
runnable,
self.queue.coerce(),
DispatchOptions::new().may_block(true),
)?;
Ok(())
}
@ -240,15 +243,8 @@ impl StorageSyncArea {
Some(store) => {
// Interrupt any currently-running statements.
store.interrupt();
// If dispatching the runnable fails, we'll drop the store and
// close its database connection on the main thread. This is a
// last resort, and can also happen if the last `RefPtr` to this
// storage area is released without calling `teardown`. In that
// case, the destructor for `self.store` will run, which
// automatically closes its database connection. mozStorage's
// `Connection::Release` also falls back to closing the
// connection on the main thread if it can't dispatch to the
// background thread.
// If dispatching the runnable fails, we'll leak the store
// without closing its database connection.
teardown(&self.queue, store, callback)?;
}
None => return Err(Error::AlreadyTornDown),
@ -264,7 +260,11 @@ fn teardown(
) -> Result<()> {
let task = TeardownTask::new(store, callback)?;
let runnable = TaskRunnable::new(TeardownTask::name(), Box::new(task))?;
runnable.dispatch_with_options(queue.coerce(), DispatchOptions::new().may_block(true))?;
TaskRunnable::dispatch_with_options(
runnable,
queue.coerce(),
DispatchOptions::new().may_block(true),
)?;
Ok(())
}

Просмотреть файл

@ -134,7 +134,7 @@ impl KeyValueService {
nsCString::from(name),
));
TaskRunnable::new("KVService::GetOrCreate", task)?.dispatch(thread)
TaskRunnable::dispatch(TaskRunnable::new("KVService::GetOrCreate", task)?, thread)
}
}
@ -182,7 +182,7 @@ impl KeyValueDatabase {
let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
TaskRunnable::new("KVDatabase::Put", task)?.dispatch(thread)
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Put", task)?, thread)
}
xpcom_method!(
@ -220,7 +220,7 @@ impl KeyValueDatabase {
let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
TaskRunnable::new("KVDatabase::WriteMany", task)?.dispatch(thread)
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::WriteMany", task)?, thread)
}
xpcom_method!(
@ -247,7 +247,7 @@ impl KeyValueDatabase {
let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
TaskRunnable::new("KVDatabase::Get", task)?.dispatch(thread)
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Get", task)?, thread)
}
xpcom_method!(
@ -264,7 +264,7 @@ impl KeyValueDatabase {
let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
TaskRunnable::new("KVDatabase::Has", task)?.dispatch(thread)
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Has", task)?, thread)
}
xpcom_method!(
@ -281,7 +281,7 @@ impl KeyValueDatabase {
let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
TaskRunnable::new("KVDatabase::Delete", task)?.dispatch(thread)
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Delete", task)?, thread)
}
xpcom_method!(
@ -297,7 +297,7 @@ impl KeyValueDatabase {
let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
TaskRunnable::new("KVDatabase::Clear", task)?.dispatch(thread)
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Clear", task)?, thread)
}
xpcom_method!(
@ -324,7 +324,7 @@ impl KeyValueDatabase {
let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
TaskRunnable::new("KVDatabase::Enumerate", task)?.dispatch(thread)
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Enumerate", task)?, thread)
}
}

Просмотреть файл

@ -100,7 +100,7 @@ impl dogear::Driver for Driver {
"bookmark_sync::Driver::record_telemetry_event",
Box::new(task),
)
.and_then(|r| r.dispatch(progress.owning_thread()));
.and_then(|r| TaskRunnable::dispatch(r, progress.owning_thread()));
}
}
}
@ -140,7 +140,7 @@ impl Log for Logger {
message,
};
let _ = TaskRunnable::new("bookmark_sync::Logger::log", Box::new(task))
.and_then(|r| r.dispatch(logger.owning_thread()));
.and_then(|r| TaskRunnable::dispatch(r, logger.owning_thread()));
}
Err(_) => {}
}

Просмотреть файл

@ -108,7 +108,7 @@ impl SyncedBookmarksMerger {
"bookmark_sync::SyncedBookmarksMerger::merge",
Box::new(task),
)?;
runnable.dispatch(&async_thread)?;
TaskRunnable::dispatch(runnable, &async_thread)?;
let op = MergeOp::new(controller);
Ok(RefPtr::new(op.coerce()))
}

Просмотреть файл

@ -176,7 +176,7 @@ pub(crate) fn persist(key: String, value: Option<String>) -> XULStoreResult<()>
.ok_or(XULStoreError::Unavailable)?
.get_ref()
.ok_or(XULStoreError::Unavailable)?;
TaskRunnable::new("XULStore::Persist", task)?.dispatch(thread)?;
TaskRunnable::dispatch(TaskRunnable::new("XULStore::Persist", task)?, thread)?;
}
// Now insert the key/value pair into the map. The unwrap() call here

Просмотреть файл

@ -183,17 +183,30 @@ impl TaskRunnable {
}))
}
/// Dispatches this task runnable to an event target with the default
/// options.
#[inline]
pub fn dispatch(&self, target_thread: &nsIEventTarget) -> Result<(), nsresult> {
self.dispatch_with_options(target_thread, DispatchOptions::default())
pub fn dispatch(this: RefPtr<Self>, target: &nsIEventTarget) -> Result<(), nsresult> {
Self::dispatch_with_options(this, target, DispatchOptions::default())
}
/// Dispatches this task runnable to an event target, like a thread or a
/// task queue, with the given options.
///
/// Note that this is an associated function, not a method, because it takes
/// an owned reference to the runnable, and must be called like
/// `TaskRunnable::dispatch_with_options(runnable, options)` and *not*
/// `runnable.dispatch_with_options(options)`.
///
/// ### Safety
///
/// This function leaks the runnable if dispatch fails.
pub fn dispatch_with_options(
&self,
target_thread: &nsIEventTarget,
this: RefPtr<Self>,
target: &nsIEventTarget,
options: DispatchOptions,
) -> Result<(), nsresult> {
unsafe { target_thread.DispatchFromScript(self.coerce(), options.flags()) }.to_result()
unsafe { target.DispatchFromScript(this.coerce(), options.flags()) }.to_result()
}
xpcom_method!(run => Run());
@ -205,7 +218,7 @@ impl TaskRunnable {
Ok(_) => {
assert!(!is_current_thread(&self.original_thread));
self.task.run();
self.dispatch(&self.original_thread)
Self::dispatch(RefPtr::new(self), &self.original_thread)
}
Err(_) => {
assert!(is_current_thread(&self.original_thread));