gecko-dev/third_party/rust/tokio-current-thread/tests/current_thread.rs

838 строки
21 KiB
Rust

extern crate futures;
extern crate tokio_current_thread;
extern crate tokio_executor;
use tokio_current_thread::{block_on_all, CurrentThread};
use std::any::Any;
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::thread;
use std::time::Duration;
use futures::future::{self, lazy};
use futures::task;
// This is not actually unused --- we need this trait to be in scope for
// the tests that sue TaskExecutor::current().execute(). The compiler
// doesn't realise that.
#[allow(unused_imports)]
use futures::future::Executor as _futures_Executor;
use futures::prelude::*;
use futures::sync::oneshot;
mod from_block_on_all {
use super::*;
fn test<F: Fn(Box<Future<Item = (), Error = ()>>) + 'static>(spawn: F) {
let cnt = Rc::new(Cell::new(0));
let c = cnt.clone();
let msg = tokio_current_thread::block_on_all(lazy(move || {
c.set(1 + c.get());
// Spawn!
spawn(Box::new(lazy(move || {
c.set(1 + c.get());
Ok::<(), ()>(())
})));
Ok::<_, ()>("hello")
}))
.unwrap();
assert_eq!(2, cnt.get());
assert_eq!(msg, "hello");
}
#[test]
fn spawn() {
test(tokio_current_thread::spawn)
}
#[test]
fn execute() {
test(|f| {
tokio_current_thread::TaskExecutor::current()
.execute(f)
.unwrap();
});
}
}
#[test]
fn block_waits() {
let (tx, rx) = oneshot::channel();
thread::spawn(|| {
thread::sleep(Duration::from_millis(1000));
tx.send(()).unwrap();
});
let cnt = Rc::new(Cell::new(0));
let cnt2 = cnt.clone();
block_on_all(rx.then(move |_| {
cnt.set(1 + cnt.get());
Ok::<_, ()>(())
}))
.unwrap();
assert_eq!(1, cnt2.get());
}
#[test]
fn spawn_many() {
const ITER: usize = 200;
let cnt = Rc::new(Cell::new(0));
let mut tokio_current_thread = CurrentThread::new();
for _ in 0..ITER {
let cnt = cnt.clone();
tokio_current_thread.spawn(lazy(move || {
cnt.set(1 + cnt.get());
Ok::<(), ()>(())
}));
}
tokio_current_thread.run().unwrap();
assert_eq!(cnt.get(), ITER);
}
mod does_not_set_global_executor_by_default {
use super::*;
fn test<F: Fn(Box<Future<Item = (), Error = ()> + Send>) -> Result<(), E> + 'static, E>(
spawn: F,
) {
block_on_all(lazy(|| {
spawn(Box::new(lazy(|| ok()))).unwrap_err();
ok()
}))
.unwrap()
}
#[test]
fn spawn() {
use tokio_executor::Executor;
test(|f| tokio_executor::DefaultExecutor::current().spawn(f))
}
#[test]
fn execute() {
test(|f| tokio_executor::DefaultExecutor::current().execute(f))
}
}
mod from_block_on_future {
use super::*;
fn test<F: Fn(Box<Future<Item = (), Error = ()>>)>(spawn: F) {
let cnt = Rc::new(Cell::new(0));
let mut tokio_current_thread = CurrentThread::new();
tokio_current_thread
.block_on(lazy(|| {
let cnt = cnt.clone();
spawn(Box::new(lazy(move || {
cnt.set(1 + cnt.get());
Ok(())
})));
Ok::<_, ()>(())
}))
.unwrap();
tokio_current_thread.run().unwrap();
assert_eq!(1, cnt.get());
}
#[test]
fn spawn() {
test(tokio_current_thread::spawn);
}
#[test]
fn execute() {
test(|f| {
tokio_current_thread::TaskExecutor::current()
.execute(f)
.unwrap();
});
}
}
struct Never(Rc<()>);
impl Future for Never {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
Ok(Async::NotReady)
}
}
mod outstanding_tasks_are_dropped_when_executor_is_dropped {
use super::*;
fn test<F, G>(spawn: F, dotspawn: G)
where
F: Fn(Box<Future<Item = (), Error = ()>>) + 'static,
G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>),
{
let mut rc = Rc::new(());
let mut tokio_current_thread = CurrentThread::new();
dotspawn(&mut tokio_current_thread, Box::new(Never(rc.clone())));
drop(tokio_current_thread);
// Ensure the daemon is dropped
assert!(Rc::get_mut(&mut rc).is_some());
// Using the global spawn fn
let mut rc = Rc::new(());
let mut tokio_current_thread = CurrentThread::new();
tokio_current_thread
.block_on(lazy(|| {
spawn(Box::new(Never(rc.clone())));
Ok::<_, ()>(())
}))
.unwrap();
drop(tokio_current_thread);
// Ensure the daemon is dropped
assert!(Rc::get_mut(&mut rc).is_some());
}
#[test]
fn spawn() {
test(tokio_current_thread::spawn, |rt, f| {
rt.spawn(f);
})
}
#[test]
fn execute() {
test(
|f| {
tokio_current_thread::TaskExecutor::current()
.execute(f)
.unwrap();
},
// Note: `CurrentThread` doesn't currently implement
// `futures::Executor`, so we'll call `.spawn(...)` rather than
// `.execute(...)` for now. If `CurrentThread` is changed to
// implement Executor, change this to `.execute(...).unwrap()`.
|rt, f| {
rt.spawn(f);
},
);
}
}
#[test]
#[should_panic]
fn nesting_run() {
block_on_all(lazy(|| {
block_on_all(lazy(|| ok())).unwrap();
ok()
}))
.unwrap();
}
mod run_in_future {
use super::*;
#[test]
#[should_panic]
fn spawn() {
block_on_all(lazy(|| {
tokio_current_thread::spawn(lazy(|| {
block_on_all(lazy(|| ok())).unwrap();
ok()
}));
ok()
}))
.unwrap();
}
#[test]
#[should_panic]
fn execute() {
block_on_all(lazy(|| {
tokio_current_thread::TaskExecutor::current()
.execute(lazy(|| {
block_on_all(lazy(|| ok())).unwrap();
ok()
}))
.unwrap();
ok()
}))
.unwrap();
}
}
#[test]
fn tick_on_infini_future() {
let num = Rc::new(Cell::new(0));
struct Infini {
num: Rc<Cell<usize>>,
}
impl Future for Infini {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
self.num.set(1 + self.num.get());
task::current().notify();
Ok(Async::NotReady)
}
}
CurrentThread::new()
.spawn(Infini { num: num.clone() })
.turn(None)
.unwrap();
assert_eq!(1, num.get());
}
mod tasks_are_scheduled_fairly {
use super::*;
struct Spin {
state: Rc<RefCell<[i32; 2]>>,
idx: usize,
}
impl Future for Spin {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
let mut state = self.state.borrow_mut();
if self.idx == 0 {
let diff = state[0] - state[1];
assert!(diff.abs() <= 1);
if state[0] >= 50 {
return Ok(().into());
}
}
state[self.idx] += 1;
if state[self.idx] >= 100 {
return Ok(().into());
}
task::current().notify();
Ok(Async::NotReady)
}
}
fn test<F: Fn(Spin)>(spawn: F) {
let state = Rc::new(RefCell::new([0, 0]));
block_on_all(lazy(|| {
spawn(Spin {
state: state.clone(),
idx: 0,
});
spawn(Spin {
state: state,
idx: 1,
});
ok()
}))
.unwrap();
}
#[test]
fn spawn() {
test(tokio_current_thread::spawn)
}
#[test]
fn execute() {
test(|f| {
tokio_current_thread::TaskExecutor::current()
.execute(f)
.unwrap();
})
}
}
mod and_turn {
use super::*;
fn test<F, G>(spawn: F, dotspawn: G)
where
F: Fn(Box<Future<Item = (), Error = ()>>) + 'static,
G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>),
{
let cnt = Rc::new(Cell::new(0));
let c = cnt.clone();
let mut tokio_current_thread = CurrentThread::new();
// Spawn a basic task to get the executor to turn
dotspawn(&mut tokio_current_thread, Box::new(lazy(move || Ok(()))));
// Turn once...
tokio_current_thread.turn(None).unwrap();
dotspawn(
&mut tokio_current_thread,
Box::new(lazy(move || {
c.set(1 + c.get());
// Spawn!
spawn(Box::new(lazy(move || {
c.set(1 + c.get());
Ok::<(), ()>(())
})));
Ok(())
})),
);
// This does not run the newly spawned thread
tokio_current_thread.turn(None).unwrap();
assert_eq!(1, cnt.get());
// This runs the newly spawned thread
tokio_current_thread.turn(None).unwrap();
assert_eq!(2, cnt.get());
}
#[test]
fn spawn() {
test(tokio_current_thread::spawn, |rt, f| {
rt.spawn(f);
})
}
#[test]
fn execute() {
test(
|f| {
tokio_current_thread::TaskExecutor::current()
.execute(f)
.unwrap();
},
// Note: `CurrentThread` doesn't currently implement
// `futures::Executor`, so we'll call `.spawn(...)` rather than
// `.execute(...)` for now. If `CurrentThread` is changed to
// implement Executor, change this to `.execute(...).unwrap()`.
|rt, f| {
rt.spawn(f);
},
);
}
}
mod in_drop {
use super::*;
struct OnDrop<F: FnOnce()>(Option<F>);
impl<F: FnOnce()> Drop for OnDrop<F> {
fn drop(&mut self) {
(self.0.take().unwrap())();
}
}
struct MyFuture {
_data: Box<Any>,
}
impl Future for MyFuture {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
Ok(().into())
}
}
fn test<F, G>(spawn: F, dotspawn: G)
where
F: Fn(Box<Future<Item = (), Error = ()>>) + 'static,
G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>),
{
let mut tokio_current_thread = CurrentThread::new();
let (tx, rx) = oneshot::channel();
dotspawn(
&mut tokio_current_thread,
Box::new(MyFuture {
_data: Box::new(OnDrop(Some(move || {
spawn(Box::new(lazy(move || {
tx.send(()).unwrap();
Ok(())
})));
}))),
}),
);
tokio_current_thread.block_on(rx).unwrap();
tokio_current_thread.run().unwrap();
}
#[test]
fn spawn() {
test(tokio_current_thread::spawn, |rt, f| {
rt.spawn(f);
})
}
#[test]
fn execute() {
test(
|f| {
tokio_current_thread::TaskExecutor::current()
.execute(f)
.unwrap();
},
// Note: `CurrentThread` doesn't currently implement
// `futures::Executor`, so we'll call `.spawn(...)` rather than
// `.execute(...)` for now. If `CurrentThread` is changed to
// implement Executor, change this to `.execute(...).unwrap()`.
|rt, f| {
rt.spawn(f);
},
);
}
}
#[test]
fn hammer_turn() {
use futures::sync::mpsc;
const ITER: usize = 100;
const N: usize = 100;
const THREADS: usize = 4;
for _ in 0..ITER {
let mut ths = vec![];
// Add some jitter
for _ in 0..THREADS {
let th = thread::spawn(|| {
let mut tokio_current_thread = CurrentThread::new();
let (tx, rx) = mpsc::unbounded();
tokio_current_thread.spawn({
let cnt = Rc::new(Cell::new(0));
let c = cnt.clone();
rx.for_each(move |_| {
c.set(1 + c.get());
Ok(())
})
.map_err(|e| panic!("err={:?}", e))
.map(move |v| {
assert_eq!(N, cnt.get());
v
})
});
thread::spawn(move || {
for _ in 0..N {
tx.unbounded_send(()).unwrap();
thread::yield_now();
}
});
while !tokio_current_thread.is_idle() {
tokio_current_thread.turn(None).unwrap();
}
});
ths.push(th);
}
for th in ths {
th.join().unwrap();
}
}
}
#[test]
fn turn_has_polled() {
let mut tokio_current_thread = CurrentThread::new();
// Spawn oneshot receiver
let (sender, receiver) = oneshot::channel::<()>();
tokio_current_thread.spawn(receiver.then(|_| Ok(())));
// Turn once...
let res = tokio_current_thread
.turn(Some(Duration::from_millis(0)))
.unwrap();
// Should've polled the receiver once, but considered it not ready
assert!(res.has_polled());
// Turn another time
let res = tokio_current_thread
.turn(Some(Duration::from_millis(0)))
.unwrap();
// Should've polled nothing, the receiver is not ready yet
assert!(!res.has_polled());
// Make the receiver ready
sender.send(()).unwrap();
// Turn another time
let res = tokio_current_thread
.turn(Some(Duration::from_millis(0)))
.unwrap();
// Should've polled the receiver, it's ready now
assert!(res.has_polled());
// Now the executor should be empty
assert!(tokio_current_thread.is_idle());
let res = tokio_current_thread
.turn(Some(Duration::from_millis(0)))
.unwrap();
// So should've polled nothing
assert!(!res.has_polled());
}
// Our own mock Park that is never really waiting and the only
// thing it does is to send, on request, something (once) to a oneshot
// channel
struct MyPark {
sender: Option<oneshot::Sender<()>>,
send_now: Rc<Cell<bool>>,
}
struct MyUnpark;
impl tokio_executor::park::Park for MyPark {
type Unpark = MyUnpark;
type Error = ();
fn unpark(&self) -> Self::Unpark {
MyUnpark
}
fn park(&mut self) -> Result<(), Self::Error> {
// If called twice with send_now, this will intentionally panic
if self.send_now.get() {
self.sender.take().unwrap().send(()).unwrap();
}
Ok(())
}
fn park_timeout(&mut self, _duration: Duration) -> Result<(), Self::Error> {
self.park()
}
}
impl tokio_executor::park::Unpark for MyUnpark {
fn unpark(&self) {}
}
#[test]
fn turn_fair() {
let send_now = Rc::new(Cell::new(false));
let (sender, receiver) = oneshot::channel::<()>();
let (sender_2, receiver_2) = oneshot::channel::<()>();
let (sender_3, receiver_3) = oneshot::channel::<()>();
let my_park = MyPark {
sender: Some(sender_3),
send_now: send_now.clone(),
};
let mut tokio_current_thread = CurrentThread::new_with_park(my_park);
let receiver_1_done = Rc::new(Cell::new(false));
let receiver_1_done_clone = receiver_1_done.clone();
// Once an item is received on the oneshot channel, it will immediately
// immediately make the second oneshot channel ready
tokio_current_thread.spawn(receiver.map_err(|_| unreachable!()).and_then(move |_| {
sender_2.send(()).unwrap();
receiver_1_done_clone.set(true);
Ok(())
}));
let receiver_2_done = Rc::new(Cell::new(false));
let receiver_2_done_clone = receiver_2_done.clone();
tokio_current_thread.spawn(receiver_2.map_err(|_| unreachable!()).and_then(move |_| {
receiver_2_done_clone.set(true);
Ok(())
}));
// The third receiver is only woken up from our Park implementation, it simulates
// e.g. a socket that first has to be polled to know if it is ready now
let receiver_3_done = Rc::new(Cell::new(false));
let receiver_3_done_clone = receiver_3_done.clone();
tokio_current_thread.spawn(receiver_3.map_err(|_| unreachable!()).and_then(move |_| {
receiver_3_done_clone.set(true);
Ok(())
}));
// First turn should've polled both and considered them not ready
let res = tokio_current_thread
.turn(Some(Duration::from_millis(0)))
.unwrap();
assert!(res.has_polled());
// Next turn should've polled nothing
let res = tokio_current_thread
.turn(Some(Duration::from_millis(0)))
.unwrap();
assert!(!res.has_polled());
assert!(!receiver_1_done.get());
assert!(!receiver_2_done.get());
assert!(!receiver_3_done.get());
// After this the receiver future will wake up the second receiver future,
// so there are pending futures again
sender.send(()).unwrap();
// Now the first receiver should be done, the second receiver should be ready
// to be polled again and the socket not yet
let res = tokio_current_thread.turn(None).unwrap();
assert!(res.has_polled());
assert!(receiver_1_done.get());
assert!(!receiver_2_done.get());
assert!(!receiver_3_done.get());
// Now let our park implementation know that it should send something to sender 3
send_now.set(true);
// This should resolve the second receiver directly, but also poll the socket
// and read the packet from it. If it didn't do both here, we would handle
// futures that are woken up from the reactor and directly unfairly and would
// favour the ones that are woken up directly.
let res = tokio_current_thread.turn(None).unwrap();
assert!(res.has_polled());
assert!(receiver_1_done.get());
assert!(receiver_2_done.get());
assert!(receiver_3_done.get());
// Don't send again
send_now.set(false);
// Now we should be idle and turning should not poll anything
assert!(tokio_current_thread.is_idle());
let res = tokio_current_thread.turn(None).unwrap();
assert!(!res.has_polled());
}
#[test]
fn spawn_from_other_thread() {
let mut current_thread = CurrentThread::new();
let handle = current_thread.handle();
let (sender, receiver) = oneshot::channel::<()>();
thread::spawn(move || {
handle
.spawn(lazy(move || {
sender.send(()).unwrap();
Ok(())
}))
.unwrap();
});
let _ = current_thread.block_on(receiver).unwrap();
}
#[test]
fn spawn_from_other_thread_unpark() {
use std::sync::mpsc::channel as mpsc_channel;
let mut current_thread = CurrentThread::new();
let handle = current_thread.handle();
let (sender_1, receiver_1) = oneshot::channel::<()>();
let (sender_2, receiver_2) = mpsc_channel::<()>();
thread::spawn(move || {
let _ = receiver_2.recv().unwrap();
handle
.spawn(lazy(move || {
sender_1.send(()).unwrap();
Ok(())
}))
.unwrap();
});
// Ensure that unparking the executor works correctly. It will first
// check if there are new futures (there are none), then execute the
// lazy future below which will cause the future to be spawned from
// the other thread. Then the executor will park but should be woken
// up because *now* we have a new future to schedule
let _ = current_thread
.block_on(
lazy(move || {
sender_2.send(()).unwrap();
Ok(())
})
.and_then(|_| receiver_1),
)
.unwrap();
}
#[test]
fn spawn_from_executor_with_handle() {
let mut current_thread = CurrentThread::new();
let handle = current_thread.handle();
let (tx, rx) = oneshot::channel();
current_thread.spawn(lazy(move || {
handle
.spawn(lazy(move || {
tx.send(()).unwrap();
Ok(())
}))
.unwrap();
Ok::<_, ()>(())
}));
current_thread.run();
rx.wait().unwrap();
}
fn ok() -> future::FutureResult<(), ()> {
future::ok(())
}