Updating the foreign executor API

Added parameters/return values to signal failure.  In particular, we
need a way to signal that the foreign executor has been cancelled/closed
and therefore any futures that depend on it should be halted.  This is
needed to fix #1669.

Updated the Rust code to check for executor failures.  We need to
release any references passed to the waker callback since that callback
will not run in that case.

Updated the foreign code to handle cancelled futures.

Reworked and updated unit tests to test the new system.

Bumped `UNIFFI_CONTRACT_VERSION` since this requires changing the
scaffolding API.  I think this is correct, although you could make the
argument that it's not needed since the async code is still fairly
experimental.

Updated the `uniffi-versioning.md` doc now that
`UNIFFI_CONTRACT_VERSION` lives in `uniffi_meta`.
This commit is contained in:
Ben Dean-Kawamura 2023-08-07 17:05:00 -04:00
Родитель 689f099a2b
Коммит e7a4130d2d
15 изменённых файлов: 434 добавлений и 185 удалений

Просмотреть файл

@ -23,6 +23,7 @@
### What's Fixed
- Updated the async functionality to correctly handle cancellation (#1669)
- Kotlin: Fixed low-level issue with exported async APIs
## v0.24.3 (backend crates: v0.24.3) - (_2023-08-01_)

Просмотреть файл

@ -37,4 +37,4 @@ To expand on the previous point, here are the scenarios where `uniffi` should ge
* Increment the minor version of `uniffi`
* Once we get to `1.0` then this will change to be a major version bump.
* Update the `uniffi_bindgen::UNIFFI_CONTRACT_VERSION` value
* Update the `uniffi_meta::UNIFFI_CONTRACT_VERSION` value

Просмотреть файл

@ -207,25 +207,31 @@ runBlocking {
// Test a future that uses a lock and that is cancelled.
runBlocking {
val job = launch {
useSharedResource(SharedResourceOptions(releaseAfterMs=100U, timeoutMs=1000U))
val time = measureTimeMillis {
val job = launch {
useSharedResource(SharedResourceOptions(releaseAfterMs=100U, timeoutMs=1000U))
}
// Wait some time to ensure the task has locked the shared resource
delay(50)
// Cancel the job before the shared resource has been released.
job.cancel()
// Try accessing the shared resource again. The initial task should release the shared resource
// before the timeout expires.
useSharedResource(SharedResourceOptions(releaseAfterMs=0U, timeoutMs=1000U))
}
// Wait some time to ensure the task has locked the shared resource
delay(50)
// Cancel the job before the shared resource has been released.
job.cancel()
// Try accessing the shared resource again. The initial task should release the shared resource
// before the timeout expires.
useSharedResource(SharedResourceOptions(releaseAfterMs=0U, timeoutMs=1000U))
println("useSharedResource: ${time}ms")
}
// Test a future that uses a lock and that is not cancelled.
runBlocking {
useSharedResource(SharedResourceOptions(releaseAfterMs=100U, timeoutMs=1000U))
val time = measureTimeMillis {
useSharedResource(SharedResourceOptions(releaseAfterMs=100U, timeoutMs=1000U))
useSharedResource(SharedResourceOptions(releaseAfterMs=0U, timeoutMs=1000U))
useSharedResource(SharedResourceOptions(releaseAfterMs=0U, timeoutMs=1000U))
}
println("useSharedResource (not canceled): ${time}ms")
}
// Test that we properly cleaned up future callback references

Просмотреть файл

@ -242,9 +242,9 @@ Task {
// Wait some time to ensure the task has locked the shared resource
try await Task.sleep(nanoseconds: 50_000_000)
// Cancel the job task the shared resource has been released.
//
// FIXME: this test currently passes because `test.cancel()` doesn't actually cancel the
// operation. We need to rework the Swift async handling to handle this properly.
//
// FIXME: this test currently passes because `test.cancel()` doesn't actually cancel the
// operation. We need to rework the Swift async handling to handle this properly.
task.cancel()
// Try accessing the shared resource again. The initial task should release the shared resource

Просмотреть файл

@ -6,7 +6,7 @@
{{ self.add_import("kotlin.coroutines.resumeWithException") }}
{# We use these in the generated functions, which don't have access to add_import() -- might as well add it here #}
{{ self.add_import("kotlin.coroutines.suspendCoroutine") }}
{{ self.add_import("kotlinx.coroutines.suspendCancellableCoroutine") }}
{{ self.add_import("kotlinx.coroutines.coroutineScope") }}
// Stores all active future callbacks to ensure they're not GC'ed while waiting for the Rust code to

Просмотреть файл

@ -1,24 +1,42 @@
{{ self.add_import("kotlinx.coroutines.CoroutineScope") }}
{{ self.add_import("kotlinx.coroutines.delay") }}
{{ self.add_import("kotlinx.coroutines.isActive") }}
{{ self.add_import("kotlinx.coroutines.launch") }}
internal const val UNIFFI_RUST_TASK_CALLBACK_SUCCESS = 0.toByte()
internal const val UNIFFI_RUST_TASK_CALLBACK_CANCELLED = 1.toByte()
internal const val UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS = 0.toByte()
internal const val UNIFFI_FOREIGN_EXECUTOR_CALLBACK_CANCELLED = 1.toByte()
internal const val UNIFFI_FOREIGN_EXECUTOR_CALLBACK_ERROR = 2.toByte()
// Callback function to execute a Rust task. The Kotlin code schedules these in a coroutine then
// invokes them.
internal interface UniFfiRustTaskCallback : com.sun.jna.Callback {
fun callback(rustTaskData: Pointer?)
fun callback(rustTaskData: Pointer?, statusCode: Byte)
}
internal object UniFfiForeignExecutorCallback : com.sun.jna.Callback {
fun callback(handle: USize, delayMs: Int, rustTask: UniFfiRustTaskCallback?, rustTaskData: Pointer?) {
fun callback(handle: USize, delayMs: Int, rustTask: UniFfiRustTaskCallback?, rustTaskData: Pointer?) : Byte {
if (rustTask == null) {
FfiConverterForeignExecutor.drop(handle)
return UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS
} else {
val coroutineScope = FfiConverterForeignExecutor.lift(handle)
coroutineScope.launch {
if (delayMs > 0) {
delay(delayMs.toLong())
if (coroutineScope.isActive) {
val job = coroutineScope.launch {
if (delayMs > 0) {
delay(delayMs.toLong())
}
rustTask.callback(rustTaskData, UNIFFI_RUST_TASK_CALLBACK_SUCCESS)
}
rustTask.callback(rustTaskData)
job.invokeOnCompletion { cause ->
if (cause != null) {
rustTask.callback(rustTaskData, UNIFFI_RUST_TASK_CALLBACK_CANCELLED)
}
}
return UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS
} else {
return UNIFFI_FOREIGN_EXECUTOR_CALLBACK_CANCELLED
}
}
}

Просмотреть файл

@ -61,10 +61,11 @@ class {{ type_name }}(
// scaffolding function, passing it one of the callback handlers from `AsyncTypes.kt`.
return coroutineScope {
val scope = this
return@coroutineScope suspendCoroutine { continuation ->
return@coroutineScope suspendCancellableCoroutine { continuation ->
try {
val callback = {{ meth.result_type().borrow()|future_callback_handler }}(continuation)
uniffiActiveFutureCallbacks.add(callback)
continuation.invokeOnCancellation { uniffiActiveFutureCallbacks.remove(callback) }
callWithPointer { thisPtr ->
rustCall { status ->
_UniFFILib.INSTANCE.{{ meth.ffi_func().name() }}(

Просмотреть файл

@ -11,10 +11,11 @@ suspend fun {{ func.name()|fn_name }}({%- call kt::arg_list_decl(func) -%}){% ma
// scaffolding function, passing it one of the callback handlers from `AsyncTypes.kt`.
return coroutineScope {
val scope = this
return@coroutineScope suspendCoroutine { continuation ->
return@coroutineScope suspendCancellableCoroutine { continuation ->
try {
val callback = {{ func.result_type().borrow()|future_callback_handler }}(continuation)
uniffiActiveFutureCallbacks.add(callback)
continuation.invokeOnCancellation { uniffiActiveFutureCallbacks.remove(callback) }
rustCall { status ->
_UniFFILib.INSTANCE.{{ func.ffi_func().name() }}(
{% call kt::arg_list_lowered(func) %}

Просмотреть файл

@ -2,6 +2,12 @@
{{ self.add_import("asyncio") }}
_UNIFFI_RUST_TASK_CALLBACK_SUCCESS = 0
_UNIFFI_RUST_TASK_CALLBACK_CANCELLED = 1
_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS = 0
_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_CANCELED = 1
_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_ERROR = 2
class {{ ffi_converter_name }}:
_pointer_manager = _UniffiPointerManager()
@ -27,16 +33,26 @@ class {{ ffi_converter_name }}:
def _uniffi_executor_callback(eventloop_address, delay, task_ptr, task_data):
if task_ptr is None:
{{ ffi_converter_name }}._pointer_manager.release_pointer(eventloop_address)
return _UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS
else:
eventloop = {{ ffi_converter_name }}._pointer_manager.lookup(eventloop_address)
if eventloop.is_closed():
return _UNIFFI_FOREIGN_EXECUTOR_CALLBACK_CANCELED
callback = _UNIFFI_RUST_TASK(task_ptr)
# FIXME: there's no easy way to get a callback when an eventloop is closed. This means that
# if eventloop is called before the `call_soon_threadsafe()` calls are invoked, the call
# will never happen and we will probably leak a resource.
if delay == 0:
# This can be called from any thread, so make sure to use `call_soon_threadsafe'
eventloop.call_soon_threadsafe(callback, task_data)
eventloop.call_soon_threadsafe(callback, task_data,
_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS)
else:
# For delayed tasks, we use `call_soon_threadsafe()` + `call_later()` to make the
# operation threadsafe
eventloop.call_soon_threadsafe(eventloop.call_later, delay / 1000.0, callback, task_data)
eventloop.call_soon_threadsafe(eventloop.call_later, delay / 1000.0, callback,
task_data, _UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS)
return _UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS
# Register the callback with the scaffolding
_UniffiLib.uniffi_foreign_executor_callback_set(_uniffi_executor_callback)

Просмотреть файл

@ -14,12 +14,12 @@ Normally we should call task(task_data) after the detail.
However, when task is NULL this indicates that Rust has dropped the ForeignExecutor and we should
decrease the EventLoop refcount.
"""
_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_T = ctypes.CFUNCTYPE(None, ctypes.c_size_t, ctypes.c_uint32, ctypes.c_void_p, ctypes.c_void_p)
_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_T = ctypes.CFUNCTYPE(ctypes.c_int8, ctypes.c_size_t, ctypes.c_uint32, ctypes.c_void_p, ctypes.c_void_p)
"""
Function pointer for a Rust task, which a callback function that takes a opaque pointer
"""
_UNIFFI_RUST_TASK = ctypes.CFUNCTYPE(None, ctypes.c_void_p)
_UNIFFI_RUST_TASK = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_int8)
def _uniffi_future_callback_t(return_type):
"""

Просмотреть файл

@ -32,7 +32,7 @@ typedef struct RustBuffer
typedef int32_t (*ForeignCallback)(uint64_t, int32_t, const uint8_t *_Nonnull, int32_t, RustBuffer *_Nonnull);
// Task defined in Rust that Swift executes
typedef void (*UniFfiRustTaskCallback)(const void * _Nullable);
typedef void (*UniFfiRustTaskCallback)(const void * _Nullable, int8_t);
// Callback to execute Rust tasks using a Swift Task
//
@ -41,7 +41,7 @@ typedef void (*UniFfiRustTaskCallback)(const void * _Nullable);
// delay: Delay in MS
// task: UniFfiRustTaskCallback to call
// task_data: data to pass the task callback
typedef void (*UniFfiForeignExecutorCallback)(size_t, uint32_t, UniFfiRustTaskCallback _Nullable, const void * _Nullable);
typedef int8_t (*UniFfiForeignExecutorCallback)(size_t, uint32_t, UniFfiRustTaskCallback _Nullable, const void * _Nullable);
typedef struct ForeignBytes
{

Просмотреть файл

@ -1,3 +1,9 @@
private let UNIFFI_RUST_TASK_CALLBACK_SUCCESS: Int8 = 0
private let UNIFFI_RUST_TASK_CALLBACK_CANCELLED: Int8 = 1
private let UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS: Int8 = 0
private let UNIFFI_FOREIGN_EXECUTOR_CALLBACK_CANCELED: Int8 = 1
private let UNIFFI_FOREIGN_EXECUTOR_CALLBACK_ERROR: Int8 = 2
// Encapsulates an executor that can run Rust tasks
//
// On Swift, `Task.detached` can handle this we just need to know what priority to send it.
@ -35,7 +41,7 @@ fileprivate struct FfiConverterForeignExecutor: FfiConverter {
}
fileprivate func uniffiForeignExecutorCallback(executorHandle: Int, delayMs: UInt32, rustTask: UniFfiRustTaskCallback?, taskData: UnsafeRawPointer?) {
fileprivate func uniffiForeignExecutorCallback(executorHandle: Int, delayMs: UInt32, rustTask: UniFfiRustTaskCallback?, taskData: UnsafeRawPointer?) -> Int8 {
if let rustTask = rustTask {
let executor = try! FfiConverterForeignExecutor.lift(executorHandle)
Task.detached(priority: executor.priority) {
@ -43,12 +49,14 @@ fileprivate func uniffiForeignExecutorCallback(executorHandle: Int, delayMs: UIn
let nanoseconds: UInt64 = numericCast(delayMs * 1000000)
try! await Task.sleep(nanoseconds: nanoseconds)
}
rustTask(taskData)
rustTask(taskData, UNIFFI_RUST_TASK_CALLBACK_SUCCESS)
}
return UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS
} else {
// When rustTask is null, we should drop the foreign executor.
// However, since its just a value type, we don't need to do anything here.
return UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS
}
// No else branch: when rustTask is null, we should drop the foreign executor. However, since
// its just a value type, we don't need to do anything here.
}
fileprivate func uniffiInitForeignExecutor() {

Просмотреть файл

@ -39,19 +39,77 @@ unsafe impl Sync for ForeignExecutorHandle {}
/// bindings should release the reference to the executor that was reserved for Rust.
///
/// This callback can be invoked from any thread, including threads created by Rust.
///
/// The callback should return one of the `ForeignExecutorCallbackResult` values.
pub type ForeignExecutorCallback = extern "C" fn(
executor: ForeignExecutorHandle,
delay: u32,
task: Option<RustTaskCallback>,
task_data: *const (),
);
) -> i8;
/// Result code returned by `ForeignExecutorCallback`
#[repr(i8)]
#[derive(Debug, PartialEq, Eq)]
pub enum ForeignExecutorCallbackResult {
/// Callback was scheduled successfully
Success = 0,
/// Callback couldn't be scheduled because the foreign executor is canceled/closed.
Cancelled = 1,
/// Callback couldn't be scheduled because of some other error
Error = 2,
}
impl ForeignExecutorCallbackResult {
/// Check the result code for the foreign executor callback
///
/// If the result was `ForeignExecutorCallbackResult.Success`, this method returns `true`.
///
/// If not, this method returns `false`, logging errors for any unexpected return values
pub fn check_result_code(result: i8) -> bool {
match result {
n if n == ForeignExecutorCallbackResult::Success as i8 => true,
n if n == ForeignExecutorCallbackResult::Cancelled as i8 => false,
n if n == ForeignExecutorCallbackResult::Error as i8 => {
log::error!(
"ForeignExecutorCallbackResult::Error returned by foreign executor callback"
);
false
}
n => {
log::error!("Unknown code ({n}) returned by foreign executor callback");
false
}
}
}
}
// Option<RustTaskCallback> should use the null pointer optimization and be represented in C as a
// regular pointer. Let's check that.
static_assertions::assert_eq_size!(usize, Option<RustTaskCallback>);
/// Callback for a Rust task, this is what the foreign executor invokes
pub type RustTaskCallback = extern "C" fn(*const ());
///
/// The task will be passed the `task_data` passed to `ForeignExecutorCallback` in addition to one
/// of the `RustTaskCallbackCode` values.
pub type RustTaskCallback = extern "C" fn(*const (), RustTaskCallbackCode);
/// Passed to a `RustTaskCallback` function when the executor invokes them.
///
/// Every `RustTaskCallback` will be invoked eventually, this code is used to distinguish the times
/// when it's invoked successfully vs times when the callback is being called because the foreign
/// executor has been cancelled / shutdown
#[repr(i8)]
#[derive(Debug, PartialEq, Eq)]
pub enum RustTaskCallbackCode {
/// Successful task callback invocation
Success = 0,
/// The `ForeignExecutor` has been cancelled.
///
/// This signals that any progress using the executor should be halted. In particular, Futures
/// should not continue to progress.
Cancelled = 1,
}
static FOREIGN_EXECUTOR_CALLBACK: AtomicUsize = AtomicUsize::new(0);
@ -116,18 +174,21 @@ impl ForeignExecutor {
///
/// When using this function, take care to ensure that the `ForeignExecutor` that holds the
/// `ForeignExecutorHandle` has not been dropped.
///
/// Returns true if the callback was successfully scheduled
pub(crate) fn schedule_raw(
handle: ForeignExecutorHandle,
delay: u32,
callback: RustTaskCallback,
data: *const (),
) {
(get_foreign_executor_callback())(handle, delay, Some(callback), data)
) -> bool {
let result_code = (get_foreign_executor_callback())(handle, delay, Some(callback), data);
ForeignExecutorCallbackResult::check_result_code(result_code)
}
impl Drop for ForeignExecutor {
fn drop(&mut self) {
(get_foreign_executor_callback())(self.handle, 0, None, std::ptr::null())
(get_foreign_executor_callback())(self.handle, 0, None, std::ptr::null());
}
}
/// Struct that handles the ForeignExecutor::schedule() method
@ -144,12 +205,24 @@ where
}
fn schedule_callback(self, handle: ForeignExecutorHandle, delay: u32) {
let leaked_ptr: *const Self = Box::leak(Box::new(self));
schedule_raw(handle, delay, Self::callback, leaked_ptr as *const ());
let leaked_ptr: *mut Self = Box::leak(Box::new(self));
if !schedule_raw(handle, delay, Self::callback, leaked_ptr as *const ()) {
// If schedule_raw() failed, drop the leaked box since `Self::callback()` has not been
// scheduled to run.
unsafe {
// Note: specifying the Box generic is a good safety measure. Things would go very
// bad if Rust inferred the wrong type.
drop(Box::<Self>::from_raw(leaked_ptr));
};
}
}
extern "C" fn callback(data: *const ()) {
run_task(unsafe { Box::from_raw(data as *mut Self).task });
extern "C" fn callback(data: *const (), status_code: RustTaskCallbackCode) {
// No matter what, we need to call Box::from_raw() to balance the Box::leak() call.
let scheduled_task = unsafe { Box::from_raw(data as *mut Self) };
if status_code == RustTaskCallbackCode::Success {
run_task(scheduled_task.task);
}
}
}
@ -189,13 +262,25 @@ where
fn schedule_callback(&self, handle: ForeignExecutorHandle, delay: u32) {
let raw_ptr = Arc::into_raw(Arc::clone(&self.inner));
schedule_raw(handle, delay, Self::callback, raw_ptr as *const ());
if !schedule_raw(handle, delay, Self::callback, raw_ptr as *const ()) {
// If `schedule_raw()` failed, make sure to decrement the ref count since
// `Self::callback()` has not been scheduled to run.
unsafe {
// Note: specifying the Arc generic is a good safety measure. Things would go very
// bad if Rust inferred the wrong type.
Arc::<RunFutureInner<T, F>>::decrement_strong_count(raw_ptr);
};
}
}
extern "C" fn callback(data: *const ()) {
unsafe {
let inner = Arc::from_raw(data as *const RunFutureInner<T, F>);
let task = (*inner.task.get()).take().unwrap();
extern "C" fn callback(data: *const (), status_code: RustTaskCallbackCode) {
// No matter what, call `Arc::from_raw()` to balance the `Arc::into_raw()` call in
// `schedule_callback()`.
let inner = unsafe { Arc::from_raw(data as *const RunFutureInner<T, F>) };
// Only drive the future forward on `RustTaskCallbackCode::Success`.
if status_code == RustTaskCallbackCode::Success {
let task = unsafe { (*inner.task.get()).take().unwrap() };
if let Some(result) = run_task(task) {
let mut inner2 = inner.mutex.lock().unwrap();
inner2.result = Some(result);
@ -243,7 +328,7 @@ fn run_task<F: FnOnce() -> T + panic::UnwindSafe, T>(task: F) -> Option<T> {
}
#[cfg(test)]
pub use test::MockExecutor;
pub use test::MockEventLoop;
#[cfg(test)]
mod test {
@ -254,134 +339,147 @@ mod test {
};
use std::task::Wake;
static MOCK_EXECUTOR_INIT: Once = Once::new();
// Executor for testing that stores scheduled calls in a Vec
pub struct MockExecutor {
pub calls: &'static Mutex<Vec<(u32, Option<RustTaskCallback>, *const ())>>,
pub executor: Option<ForeignExecutor>,
/// Simulate an event loop / task queue / coroutine scope on the foreign side
///
/// This simply collects scheduled calls into a Vec for testing purposes.
///
/// Most of the MockEventLoop methods are `pub` since it's also used by the `rustfuture` tests.
pub struct MockEventLoop {
// Wrap everything in a mutex since we typically share access to MockEventLoop via an Arc
inner: Mutex<MockEventLoopInner>,
}
impl MockExecutor {
pub fn new() -> Self {
// Create a boxed call list and immediately leak it, this will be our mock executor
let calls = Box::leak(Box::new(Mutex::new(Vec::new())));
let executor = ForeignExecutor {
handle: unsafe { std::mem::transmute(calls as *const _) },
};
// Setup a callback to handle our handles
MOCK_EXECUTOR_INIT
pub struct MockEventLoopInner {
// calls that have been scheduled
calls: Vec<(u32, Option<RustTaskCallback>, *const ())>,
// has the event loop been shutdown?
is_shutdown: bool,
}
static FOREIGN_EXECUTOR_CALLBACK_INIT: Once = Once::new();
impl MockEventLoop {
pub fn new() -> Arc<Self> {
// Make sure we install a foreign executor callback that can deal with mock event loops
FOREIGN_EXECUTOR_CALLBACK_INIT
.call_once(|| uniffi_foreign_executor_callback_set(mock_executor_callback));
Self {
calls,
executor: Some(executor),
Arc::new(Self {
inner: Mutex::new(MockEventLoopInner {
calls: vec![],
is_shutdown: false,
}),
})
}
/// Create a new ForeignExecutorHandle
pub fn new_handle(self: &Arc<Self>) -> ForeignExecutorHandle {
// To keep the memory management simple, we simply leak an arc reference for this. We
// only create a handful of these in the tests so there's no need for proper cleanup.
ForeignExecutorHandle(Arc::into_raw(Arc::clone(self)) as *const ())
}
pub fn new_executor(self: &Arc<Self>) -> ForeignExecutor {
ForeignExecutor {
handle: self.new_handle(),
}
}
pub fn handle(&self) -> Option<ForeignExecutorHandle> {
self.executor.as_ref().map(|e| e.handle)
}
/// Get the current number of scheduled calls
pub fn call_count(&self) -> usize {
self.calls.lock().unwrap().len()
self.inner.lock().unwrap().calls.len()
}
/// Get the last scheduled call
pub fn last_call(&self) -> (u32, Option<RustTaskCallback>, *const ()) {
self.inner
.lock()
.unwrap()
.calls
.last()
.cloned()
.expect("no calls scheduled")
}
/// Run all currently scheduled calls
pub fn run_all_calls(&self) {
let mut calls = self.calls.lock().unwrap();
for (_delay, callback, data) in calls.drain(..) {
callback.unwrap()(data);
let mut inner = self.inner.lock().unwrap();
let is_shutdown = inner.is_shutdown;
for (_delay, callback, data) in inner.calls.drain(..) {
if !is_shutdown {
callback.unwrap()(data, RustTaskCallbackCode::Success);
} else {
callback.unwrap()(data, RustTaskCallbackCode::Cancelled);
}
}
}
pub fn schedule_raw(&self, delay: u32, callback: RustTaskCallback, data: *const ()) {
let handle = self.executor.as_ref().unwrap().handle;
schedule_raw(handle, delay, callback, data)
}
pub fn schedule<F: FnOnce() + Send + panic::UnwindSafe + 'static>(
&self,
delay: u32,
closure: F,
) {
self.executor.as_ref().unwrap().schedule(delay, closure)
}
pub fn run<F: FnOnce() -> T + Send + panic::UnwindSafe + 'static, T>(
&self,
delay: u32,
closure: F,
) -> impl Future<Output = T> {
self.executor.as_ref().unwrap().run(delay, closure)
}
pub fn drop_executor(&mut self) {
self.executor = None;
/// Shutdown the eventloop, causing scheduled calls and future calls to be cancelled
pub fn shutdown(&self) {
self.inner.lock().unwrap().is_shutdown = true;
}
}
impl Default for MockExecutor {
fn default() -> Self {
Self::new()
}
}
// Mock executor callback pushes calls to a ScheduledCalls
// `ForeignExecutorCallback` that we install for testing
extern "C" fn mock_executor_callback(
executor: ForeignExecutorHandle,
handle: ForeignExecutorHandle,
delay: u32,
task: Option<RustTaskCallback>,
task_data: *const (),
) {
unsafe {
let calls: *mut Mutex<Vec<(u32, Option<RustTaskCallback>, *const ())>> =
std::mem::transmute(executor);
calls
.as_ref()
.unwrap()
.lock()
.unwrap()
.push((delay, task, task_data));
) -> i8 {
let eventloop = handle.0 as *const MockEventLoop;
let mut inner = unsafe { (*eventloop).inner.lock().unwrap() };
if inner.is_shutdown {
ForeignExecutorCallbackResult::Cancelled as i8
} else {
inner.calls.push((delay, task, task_data));
ForeignExecutorCallbackResult::Success as i8
}
}
#[test]
fn test_schedule_raw() {
extern "C" fn callback(data: *const ()) {
extern "C" fn callback(data: *const (), _status_code: RustTaskCallbackCode) {
unsafe {
*(data as *mut u32) += 1;
}
}
let executor = MockExecutor::new();
let eventloop = MockEventLoop::new();
let value: u32 = 0;
assert_eq!(executor.call_count(), 0);
assert_eq!(eventloop.call_count(), 0);
executor.schedule_raw(0, callback, &value as *const u32 as *const ());
assert_eq!(executor.call_count(), 1);
schedule_raw(
eventloop.new_handle(),
0,
callback,
&value as *const u32 as *const (),
);
assert_eq!(eventloop.call_count(), 1);
assert_eq!(value, 0);
executor.run_all_calls();
assert_eq!(executor.call_count(), 0);
eventloop.run_all_calls();
assert_eq!(eventloop.call_count(), 0);
assert_eq!(value, 1);
}
#[test]
fn test_schedule() {
let executor = MockExecutor::new();
let eventloop = MockEventLoop::new();
let executor = eventloop.new_executor();
let value = Arc::new(AtomicU32::new(0));
assert_eq!(executor.call_count(), 0);
assert_eq!(eventloop.call_count(), 0);
let value2 = value.clone();
executor.schedule(0, move || {
value2.fetch_add(1, Ordering::Relaxed);
});
assert_eq!(executor.call_count(), 1);
assert_eq!(eventloop.call_count(), 1);
assert_eq!(value.load(Ordering::Relaxed), 0);
executor.run_all_calls();
assert_eq!(executor.call_count(), 0);
eventloop.run_all_calls();
assert_eq!(eventloop.call_count(), 0);
assert_eq!(value.load(Ordering::Relaxed), 1);
}
@ -398,19 +496,20 @@ mod test {
#[test]
fn test_run() {
let executor = MockExecutor::new();
let eventloop = MockEventLoop::new();
let executor = eventloop.new_executor();
let mock_waker = Arc::new(MockWaker::default());
let waker = Waker::from(mock_waker.clone());
let mut context = Context::from_waker(&waker);
assert_eq!(executor.call_count(), 0);
assert_eq!(eventloop.call_count(), 0);
let mut future = executor.run(0, move || "test-return-value");
assert_eq!(executor.call_count(), 1);
assert_eq!(eventloop.call_count(), 1);
assert_eq!(Pin::new(&mut future).poll(&mut context), Poll::Pending);
assert_eq!(mock_waker.wake_count.load(Ordering::Relaxed), 0);
executor.run_all_calls();
assert_eq!(executor.call_count(), 0);
eventloop.run_all_calls();
assert_eq!(eventloop.call_count(), 0);
assert_eq!(mock_waker.wake_count.load(Ordering::Relaxed), 1);
assert_eq!(
Pin::new(&mut future).poll(&mut context),
@ -420,15 +519,70 @@ mod test {
#[test]
fn test_drop() {
let mut executor = MockExecutor::new();
let eventloop = MockEventLoop::new();
let executor = eventloop.new_executor();
executor.schedule(0, || {});
assert_eq!(executor.call_count(), 1);
drop(executor);
// Calling drop should schedule a call with null task data.
assert_eq!(eventloop.call_count(), 1);
assert_eq!(eventloop.last_call().1, None);
}
executor.drop_executor();
assert_eq!(executor.call_count(), 2);
let calls = executor.calls.lock().unwrap();
let drop_call = calls.last().unwrap();
assert_eq!(drop_call.1, None);
// Test that cancelled calls never run
#[test]
fn test_cancelled_call() {
let eventloop = MockEventLoop::new();
let executor = eventloop.new_executor();
// Create a shared counter
let counter = Arc::new(AtomicU32::new(0));
// schedule increments using both `schedule()` and run()`
let counter_clone = Arc::clone(&counter);
executor.schedule(0, move || {
counter_clone.fetch_add(1, Ordering::Relaxed);
});
let counter_clone = Arc::clone(&counter);
let future = executor.run(0, move || {
counter_clone.fetch_add(1, Ordering::Relaxed);
});
// shutdown the eventloop before the scheduled call gets a chance to run.
eventloop.shutdown();
// `run_all_calls()` will cause the scheduled task callbacks to run, but will pass
// `RustTaskCallbackCode::Cancelled` to it. This drop the scheduled closure without executing
// it.
eventloop.run_all_calls();
assert_eq!(counter.load(Ordering::Relaxed), 0);
drop(future);
}
// Test that when scheduled calls are cancelled, the closures are dropped properly
#[test]
fn test_cancellation_drops_closures() {
let eventloop = MockEventLoop::new();
let executor = eventloop.new_executor();
// Create an Arc<> that we will move into the closures to test if they are dropped or not
let arc = Arc::new(0);
let arc_clone = Arc::clone(&arc);
executor.schedule(0, move || assert_eq!(*arc_clone, 0));
let arc_clone = Arc::clone(&arc);
let future = executor.run(0, move || assert_eq!(*arc_clone, 0));
// shutdown the eventloop and run the (cancelled) scheduled calls.
eventloop.shutdown();
eventloop.run_all_calls();
// try to schedule some more calls now that the loop has been shutdown
let arc_clone = Arc::clone(&arc);
executor.schedule(0, move || assert_eq!(*arc_clone, 0));
let arc_clone = Arc::clone(&arc);
let future2 = executor.run(0, move || assert_eq!(*arc_clone, 0));
// Drop the futures so they don't hold on to any references
drop(future);
drop(future2);
// All of these closures should have been dropped by now, there only remaining arc
// reference should be the original
assert_eq!(Arc::strong_count(&arc), 1);
}
}

Просмотреть файл

@ -116,8 +116,8 @@
//! [`RawWaker`]: https://doc.rust-lang.org/std/task/struct.RawWaker.html
use crate::{
rust_call_with_out_status, schedule_raw, FfiConverter, FfiDefault, ForeignExecutor,
ForeignExecutorHandle, RustCallStatus,
ffi::foreignexecutor::RustTaskCallbackCode, rust_call_with_out_status, schedule_raw,
FfiConverter, FfiDefault, ForeignExecutor, ForeignExecutorHandle, RustCallStatus,
};
use std::{
cell::UnsafeCell,
@ -216,18 +216,33 @@ where
fn schedule_do_wake(self: Pin<Arc<Self>>) {
unsafe {
let handle = self.executor.handle;
let raw_ptr = Arc::into_raw(Pin::into_inner_unchecked(self)) as *const ();
let raw_ptr = Arc::into_raw(Pin::into_inner_unchecked(self));
// SAFETY: The `into_raw()` / `from_raw()` contract guarantees that our executor cannot
// be dropped before we call `from_raw()` on the raw pointer. This means we can safely
// use its handle to schedule a callback.
schedule_raw(handle, 0, Self::wake_callback, raw_ptr);
if !schedule_raw(handle, 0, Self::wake_callback, raw_ptr as *const ()) {
// There was an error scheduling the callback, drop the arc reference since
// `wake_callback()` will never be called
//
// Note: specifying the `<Self>` generic is a good safety measure. Things would go
// very bad if Rust inferred the wrong type.
//
// However, the `Pin<>` part doesn't matter since its `repr(transparent)`.
Arc::<Self>::decrement_strong_count(raw_ptr);
}
}
}
extern "C" fn wake_callback(self_ptr: *const ()) {
unsafe {
Pin::new_unchecked(Arc::from_raw(self_ptr as *const Self)).do_wake();
};
extern "C" fn wake_callback(self_ptr: *const (), status_code: RustTaskCallbackCode) {
// No matter what, call `Arc::from_raw()` to balance the `Arc::into_raw()` call in
// `schedule_do_wake()`.
let task = unsafe { Pin::new_unchecked(Arc::from_raw(self_ptr as *const Self)) };
if status_code == RustTaskCallbackCode::Success {
// Only drive the future forward on `RustTaskCallbackCode::Success`.
// `RUST_TASK_CALLBACK_CANCELED` indicates the foreign executor has been cancelled /
// shutdown and we should not continue.
task.do_wake();
}
}
// Does the work for wake, we take care to ensure this always runs in a serialized fashion.
@ -359,7 +374,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::{try_lift_from_rust_buffer, MockExecutor};
use crate::{try_lift_from_rust_buffer, MockEventLoop};
use std::sync::Weak;
// Mock future that we can manually control using an Option<>
@ -396,36 +411,26 @@ mod tests {
// Bundles everything together so that we can run some tests
struct TestFutureEnvironment {
rust_future: Pin<Arc<TestRustFuture>>,
executor: MockExecutor,
foreign_result: Pin<Box<Option<MockForeignResult>>>,
}
impl TestFutureEnvironment {
fn new() -> Self {
fn new(eventloop: &Arc<MockEventLoop>) -> Self {
let foreign_result = Box::pin(None);
let foreign_result_ptr = &*foreign_result as *const Option<_> as *const ();
let executor = MockExecutor::new();
let rust_future = TestRustFuture::new(
MockFuture(None),
executor.handle().unwrap(),
eventloop.new_handle(),
mock_foreign_callback,
foreign_result_ptr,
);
Self {
executor,
rust_future,
foreign_result,
}
}
fn scheduled_call_count(&self) -> usize {
self.executor.call_count()
}
fn run_scheduled_calls(&self) {
self.executor.run_all_calls();
}
fn wake(&self) {
self.rust_future.clone().wake();
}
@ -445,47 +450,49 @@ mod tests {
#[test]
fn test_wake() {
let mut test_env = TestFutureEnvironment::new();
let eventloop = MockEventLoop::new();
let mut test_env = TestFutureEnvironment::new(&eventloop);
// Initially, we shouldn't have a result and nothing should be scheduled
assert!(test_env.foreign_result.is_none());
assert_eq!(test_env.scheduled_call_count(), 0);
assert_eq!(eventloop.call_count(), 0);
// wake() should schedule a call
test_env.wake();
assert_eq!(test_env.scheduled_call_count(), 1);
assert_eq!(eventloop.call_count(), 1);
// When that call runs, we should still not have a result yet
test_env.run_scheduled_calls();
eventloop.run_all_calls();
assert!(test_env.foreign_result.is_none());
assert_eq!(test_env.scheduled_call_count(), 0);
assert_eq!(eventloop.call_count(), 0);
// Multiple wakes should only result in 1 scheduled call
test_env.wake();
test_env.wake();
assert_eq!(test_env.scheduled_call_count(), 1);
assert_eq!(eventloop.call_count(), 1);
// Make the future ready, which should call mock_foreign_callback and set the result
test_env.complete_future(Ok(true));
test_env.run_scheduled_calls();
eventloop.run_all_calls();
let result = test_env
.foreign_result
.take()
.expect("Expected result to be set");
assert_eq!(result.value, 1);
assert_eq!(result.status.code, 0);
assert_eq!(test_env.scheduled_call_count(), 0);
assert_eq!(eventloop.call_count(), 0);
// Future wakes shouldn't schedule any calls
test_env.wake();
assert_eq!(test_env.scheduled_call_count(), 0);
assert_eq!(eventloop.call_count(), 0);
}
#[test]
fn test_error() {
let mut test_env = TestFutureEnvironment::new();
let eventloop = MockEventLoop::new();
let mut test_env = TestFutureEnvironment::new(&eventloop);
test_env.complete_future(Err("Something went wrong".into()));
test_env.wake();
test_env.run_scheduled_calls();
eventloop.run_all_calls();
let result = test_env
.foreign_result
.take()
@ -500,12 +507,12 @@ mod tests {
String::from("Something went wrong"),
)
}
assert_eq!(test_env.scheduled_call_count(), 0);
assert_eq!(eventloop.call_count(), 0);
}
#[test]
fn test_raw_clone_and_drop() {
let test_env = TestFutureEnvironment::new();
let test_env = TestFutureEnvironment::new(&MockEventLoop::new());
let waker = test_env.rust_future.make_waker();
let weak_ref = test_env.rust_future_weak();
assert_eq!(weak_ref.strong_count(), 2);
@ -522,7 +529,8 @@ mod tests {
#[test]
fn test_raw_wake() {
let test_env = TestFutureEnvironment::new();
let eventloop = MockEventLoop::new();
let test_env = TestFutureEnvironment::new(&eventloop);
let waker = test_env.rust_future.make_waker();
let weak_ref = test_env.rust_future_weak();
// `test_env` and `waker` both hold a strong reference to the `RustFuture`
@ -530,18 +538,54 @@ mod tests {
// wake_by_ref() should schedule a wake
waker.wake_by_ref();
assert_eq!(test_env.scheduled_call_count(), 1);
assert_eq!(eventloop.call_count(), 1);
// Once the wake runs, the strong could should not have changed
test_env.run_scheduled_calls();
eventloop.run_all_calls();
assert_eq!(weak_ref.strong_count(), 2);
// wake() should schedule a wake
waker.wake();
assert_eq!(test_env.scheduled_call_count(), 1);
assert_eq!(eventloop.call_count(), 1);
// Once the wake runs, the strong have decremented, since wake() consumes the waker
test_env.run_scheduled_calls();
eventloop.run_all_calls();
assert_eq!(weak_ref.strong_count(), 1);
}
// Test trying to create a RustFuture before the executor is shutdown.
//
// The main thing we're testing is that we correctly drop the Future in this case
#[test]
fn test_executor_shutdown() {
let eventloop = MockEventLoop::new();
eventloop.shutdown();
let test_env = TestFutureEnvironment::new(&eventloop);
let weak_ref = test_env.rust_future_weak();
// When we wake the future, it should try to schedule a callback and fail. This should
// cause the future to be dropped
test_env.wake();
drop(test_env);
assert!(weak_ref.upgrade().is_none());
}
// Similar run a similar test to the last, but simulate an executor shutdown after the future was
// scheduled, but before the callback is called.
#[test]
fn test_executor_shutdown_after_schedule() {
let eventloop = MockEventLoop::new();
let test_env = TestFutureEnvironment::new(&eventloop);
let weak_ref = test_env.rust_future_weak();
test_env.complete_future(Ok(true));
test_env.wake();
eventloop.shutdown();
eventloop.run_all_calls();
// Test that the foreign async side wasn't completed. Even though we could have
// driven the future to completion, we shouldn't have since the executor was shutdown
assert!(test_env.foreign_result.is_none());
// Also test that we've dropped all references to the future
drop(test_env);
assert!(weak_ref.upgrade().is_none());
}
}

Просмотреть файл

@ -25,7 +25,7 @@ mod metadata;
// `docs/uniffi-versioning.md` for details.
//
// Once we get to 1.0, then we'll need to update the scheme to something like 100 + major_version
pub const UNIFFI_CONTRACT_VERSION: u32 = 22;
pub const UNIFFI_CONTRACT_VERSION: u32 = 23;
/// Similar to std::hash::Hash.
///