зеркало из https://github.com/mozilla/gecko-dev.git
169 строки
4.4 KiB
Rust
169 строки
4.4 KiB
Rust
#![feature(test)]
|
|
|
|
extern crate futures;
|
|
extern crate test;
|
|
|
|
use futures::{Async, Poll, AsyncSink};
|
|
use futures::executor;
|
|
use futures::executor::{Notify, NotifyHandle};
|
|
|
|
use futures::sink::Sink;
|
|
use futures::stream::Stream;
|
|
|
|
use futures::sync::mpsc::unbounded;
|
|
use futures::sync::mpsc::channel;
|
|
use futures::sync::mpsc::Sender;
|
|
use futures::sync::mpsc::UnboundedSender;
|
|
|
|
|
|
use test::Bencher;
|
|
|
|
fn notify_noop() -> NotifyHandle {
|
|
struct Noop;
|
|
|
|
impl Notify for Noop {
|
|
fn notify(&self, _id: usize) {}
|
|
}
|
|
|
|
const NOOP : &'static Noop = &Noop;
|
|
|
|
NotifyHandle::from(NOOP)
|
|
}
|
|
|
|
/// Single producer, single consumer
|
|
#[bench]
|
|
fn unbounded_1_tx(b: &mut Bencher) {
|
|
b.iter(|| {
|
|
let (tx, rx) = unbounded();
|
|
|
|
let mut rx = executor::spawn(rx);
|
|
|
|
// 1000 iterations to avoid measuring overhead of initialization
|
|
// Result should be divided by 1000
|
|
for i in 0..1000 {
|
|
|
|
// Poll, not ready, park
|
|
assert_eq!(Ok(Async::NotReady), rx.poll_stream_notify(¬ify_noop(), 1));
|
|
|
|
UnboundedSender::unbounded_send(&tx, i).unwrap();
|
|
|
|
// Now poll ready
|
|
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll_stream_notify(¬ify_noop(), 1));
|
|
}
|
|
})
|
|
}
|
|
|
|
/// 100 producers, single consumer
|
|
#[bench]
|
|
fn unbounded_100_tx(b: &mut Bencher) {
|
|
b.iter(|| {
|
|
let (tx, rx) = unbounded();
|
|
|
|
let mut rx = executor::spawn(rx);
|
|
|
|
let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect();
|
|
|
|
// 1000 send/recv operations total, result should be divided by 1000
|
|
for _ in 0..10 {
|
|
for i in 0..tx.len() {
|
|
assert_eq!(Ok(Async::NotReady), rx.poll_stream_notify(¬ify_noop(), 1));
|
|
|
|
UnboundedSender::unbounded_send(&tx[i], i).unwrap();
|
|
|
|
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll_stream_notify(¬ify_noop(), 1));
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
#[bench]
|
|
fn unbounded_uncontended(b: &mut Bencher) {
|
|
b.iter(|| {
|
|
let (tx, mut rx) = unbounded();
|
|
|
|
for i in 0..1000 {
|
|
UnboundedSender::unbounded_send(&tx, i).expect("send");
|
|
// No need to create a task, because poll is not going to park.
|
|
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll());
|
|
}
|
|
})
|
|
}
|
|
|
|
|
|
/// A Stream that continuously sends incrementing number of the queue
|
|
struct TestSender {
|
|
tx: Sender<u32>,
|
|
last: u32, // Last number sent
|
|
}
|
|
|
|
// Could be a Future, it doesn't matter
|
|
impl Stream for TestSender {
|
|
type Item = u32;
|
|
type Error = ();
|
|
|
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
|
match self.tx.start_send(self.last + 1) {
|
|
Err(_) => panic!(),
|
|
Ok(AsyncSink::Ready) => {
|
|
self.last += 1;
|
|
assert_eq!(Ok(Async::Ready(())), self.tx.poll_complete());
|
|
Ok(Async::Ready(Some(self.last)))
|
|
}
|
|
Ok(AsyncSink::NotReady(_)) => {
|
|
Ok(Async::NotReady)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/// Single producers, single consumer
|
|
#[bench]
|
|
fn bounded_1_tx(b: &mut Bencher) {
|
|
b.iter(|| {
|
|
let (tx, rx) = channel(0);
|
|
|
|
let mut tx = executor::spawn(TestSender {
|
|
tx: tx,
|
|
last: 0,
|
|
});
|
|
|
|
let mut rx = executor::spawn(rx);
|
|
|
|
for i in 0..1000 {
|
|
assert_eq!(Ok(Async::Ready(Some(i + 1))), tx.poll_stream_notify(¬ify_noop(), 1));
|
|
assert_eq!(Ok(Async::NotReady), tx.poll_stream_notify(¬ify_noop(), 1));
|
|
assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll_stream_notify(¬ify_noop(), 1));
|
|
}
|
|
})
|
|
}
|
|
|
|
/// 100 producers, single consumer
|
|
#[bench]
|
|
fn bounded_100_tx(b: &mut Bencher) {
|
|
b.iter(|| {
|
|
// Each sender can send one item after specified capacity
|
|
let (tx, rx) = channel(0);
|
|
|
|
let mut tx: Vec<_> = (0..100).map(|_| {
|
|
executor::spawn(TestSender {
|
|
tx: tx.clone(),
|
|
last: 0
|
|
})
|
|
}).collect();
|
|
|
|
let mut rx = executor::spawn(rx);
|
|
|
|
for i in 0..10 {
|
|
for j in 0..tx.len() {
|
|
// Send an item
|
|
assert_eq!(Ok(Async::Ready(Some(i + 1))), tx[j].poll_stream_notify(¬ify_noop(), 1));
|
|
// Then block
|
|
assert_eq!(Ok(Async::NotReady), tx[j].poll_stream_notify(¬ify_noop(), 1));
|
|
// Recv the item
|
|
assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll_stream_notify(¬ify_noop(), 1));
|
|
}
|
|
}
|
|
})
|
|
}
|