Bug 1773399 - Update tokio-stream to 0.1.9. r=emilio

Differential Revision: https://phabricator.services.mozilla.com/D148728
This commit is contained in:
Mike Hommey 2022-06-09 20:33:48 +00:00
Родитель d7460d2746
Коммит 21f74c6178
18 изменённых файлов: 330 добавлений и 28 удалений

4
Cargo.lock сгенерированный
Просмотреть файл

@ -5381,9 +5381,9 @@ dependencies = [
[[package]]
name = "tokio-stream"
version = "0.1.8"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3"
checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9"
dependencies = [
"futures-core",
"pin-project-lite",

Просмотреть файл

@ -1478,7 +1478,7 @@ version = "1.8.0"
criteria = "safe-to-run"
[[unaudited.tokio-stream]]
version = "0.1.8"
version = "0.1.9"
criteria = "safe-to-run"
[[unaudited.tokio-util]]

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

20
third_party/rust/tokio-stream/CHANGELOG.md поставляемый
Просмотреть файл

@ -1,3 +1,23 @@
# 0.1.9 (June 4, 2022)
- deps: upgrade `tokio-util` dependency to `0.7.x` ([#3762])
- stream: add `StreamExt::map_while` ([#4351])
- stream: add `StreamExt::then` ([#4355])
- stream: add cancel-safety docs to `StreamExt::next` and `try_next` ([#4715])
- stream: expose `Elapsed` error ([#4502])
- stream: expose `Timeout` ([#4601])
- stream: implement `Extend` for `StreamMap` ([#4272])
- sync: add `Clone` to `RecvError` types ([#4560])
[#3762]: https://github.com/tokio-rs/tokio/pull/3762
[#4272]: https://github.com/tokio-rs/tokio/pull/4272
[#4351]: https://github.com/tokio-rs/tokio/pull/4351
[#4355]: https://github.com/tokio-rs/tokio/pull/4355
[#4502]: https://github.com/tokio-rs/tokio/pull/4502
[#4560]: https://github.com/tokio-rs/tokio/pull/4560
[#4601]: https://github.com/tokio-rs/tokio/pull/4601
[#4715]: https://github.com/tokio-rs/tokio/pull/4715
# 0.1.8 (October 29, 2021)
- stream: add `From<Receiver<T>>` impl for receiver streams ([#4080])

33
third_party/rust/tokio-stream/Cargo.toml поставляемый
Просмотреть файл

@ -11,19 +11,29 @@
[package]
edition = "2018"
rust-version = "1.49"
name = "tokio-stream"
version = "0.1.8"
version = "0.1.9"
authors = ["Tokio Contributors <team@tokio.rs>"]
description = "Utilities to work with `Stream` and `tokio`.\n"
description = """
Utilities to work with `Stream` and `tokio`.
"""
homepage = "https://tokio.rs"
documentation = "https://docs.rs/tokio-stream/0.1.8/tokio_stream"
categories = ["asynchronous"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
[package.metadata.docs.rs]
all-features = true
rustc-args = ["--cfg", "docsrs"]
rustdoc-args = ["--cfg", "docsrs"]
rustdoc-args = [
"--cfg",
"docsrs",
]
rustc-args = [
"--cfg",
"docsrs",
]
[dependencies.futures-core]
version = "0.3.0"
@ -35,8 +45,9 @@ version = "1.8.0"
features = ["sync"]
[dependencies.tokio-util]
version = "0.6.3"
version = "0.7.0"
optional = true
[dev-dependencies.async-stream]
version = "0.3"
@ -49,7 +60,10 @@ version = "1"
[dev-dependencies.tokio]
version = "1.2.0"
features = ["full", "test-util"]
features = [
"full",
"test-util",
]
[features]
default = ["time"]
@ -57,5 +71,8 @@ fs = ["tokio/fs"]
io-util = ["tokio/io-util"]
net = ["tokio/net"]
signal = ["tokio/signal"]
sync = ["tokio/sync", "tokio-util"]
sync = [
"tokio/sync",
"tokio-util",
]
time = ["tokio/time"]

2
third_party/rust/tokio-stream/LICENSE поставляемый
Просмотреть файл

@ -1,4 +1,4 @@
Copyright (c) 2021 Tokio Contributors
Copyright (c) 2022 Tokio Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated

4
third_party/rust/tokio-stream/src/lib.rs поставляемый
Просмотреть файл

@ -10,7 +10,6 @@
unreachable_pub
)]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![cfg_attr(docsrs, deny(rustdoc::broken_intra_doc_links))]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
@ -78,6 +77,9 @@ pub mod wrappers;
mod stream_ext;
pub use stream_ext::{collect::FromStream, StreamExt};
cfg_time! {
pub use stream_ext::timeout::{Elapsed, Timeout};
}
mod empty;
pub use empty::{empty, Empty};

Просмотреть файл

@ -1,3 +1,4 @@
use core::future::Future;
use futures_core::Stream;
mod all;
@ -27,6 +28,9 @@ use fuse::Fuse;
mod map;
use map::Map;
mod map_while;
use map_while::MapWhile;
mod merge;
use merge::Merge;
@ -39,17 +43,20 @@ use skip::Skip;
mod skip_while;
use skip_while::SkipWhile;
mod try_next;
use try_next::TryNext;
mod take;
use take::Take;
mod take_while;
use take_while::TakeWhile;
mod then;
use then::Then;
mod try_next;
use try_next::TryNext;
cfg_time! {
mod timeout;
pub(crate) mod timeout;
use timeout::Timeout;
use tokio::time::Duration;
mod throttle;
@ -106,6 +113,12 @@ pub trait StreamExt: Stream {
/// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
/// crate.
///
/// # Cancel safety
///
/// This method is cancel safe. The returned future only
/// holds onto a reference to the underlying stream,
/// so dropping it will never lose a value.
///
/// # Examples
///
/// ```
@ -142,6 +155,12 @@ pub trait StreamExt: Stream {
/// an [`Option<Result<T, E>>`](Option), making for easy use
/// with the [`?`](std::ops::Try) operator.
///
/// # Cancel safety
///
/// This method is cancel safe. The returned future only
/// holds onto a reference to the underlying stream,
/// so dropping it will never lose a value.
///
/// # Examples
///
/// ```
@ -197,6 +216,93 @@ pub trait StreamExt: Stream {
Map::new(self, f)
}
/// Map this stream's items to a different type for as long as determined by
/// the provided closure. A stream of the target type will be returned,
/// which will yield elements until the closure returns `None`.
///
/// The provided closure is executed over all elements of this stream as
/// they are made available, until it returns `None`. It is executed inline
/// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
/// the underlying stream will not be polled again.
///
/// Note that this function consumes the stream passed into it and returns a
/// wrapped version of it, similar to the [`Iterator::map_while`] method in the
/// standard library.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
///
/// let stream = stream::iter(1..=10);
/// let mut stream = stream.map_while(|x| {
/// if x < 4 {
/// Some(x + 3)
/// } else {
/// None
/// }
/// });
/// assert_eq!(stream.next().await, Some(4));
/// assert_eq!(stream.next().await, Some(5));
/// assert_eq!(stream.next().await, Some(6));
/// assert_eq!(stream.next().await, None);
/// # }
/// ```
fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
where
F: FnMut(Self::Item) -> Option<T>,
Self: Sized,
{
MapWhile::new(self, f)
}
/// Maps this stream's items asynchronously to a different type, returning a
/// new stream of the resulting type.
///
/// The provided closure is executed over all elements of this stream as
/// they are made available, and the returned future is executed. Only one
/// future is executed at the time.
///
/// Note that this function consumes the stream passed into it and returns a
/// wrapped version of it, similar to the existing `then` methods in the
/// standard library.
///
/// Be aware that if the future is not `Unpin`, then neither is the `Stream`
/// returned by this method. To handle this, you can use `tokio::pin!` as in
/// the example below or put the stream in a `Box` with `Box::pin(stream)`.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
///
/// async fn do_async_work(value: i32) -> i32 {
/// value + 3
/// }
///
/// let stream = stream::iter(1..=3);
/// let stream = stream.then(do_async_work);
///
/// tokio::pin!(stream);
///
/// assert_eq!(stream.next().await, Some(4));
/// assert_eq!(stream.next().await, Some(5));
/// assert_eq!(stream.next().await, Some(6));
/// # }
/// ```
fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
where
F: FnMut(Self::Item) -> Fut,
Fut: Future,
Self: Sized,
{
Then::new(self, f)
}
/// Combine two streams into one by interleaving the output of both as it
/// is produced.
///

Просмотреть файл

@ -66,17 +66,17 @@ where
use Poll::Ready;
loop {
let mut me = self.as_mut().project();
let me = self.as_mut().project();
let item = match ready!(me.stream.poll_next(cx)) {
Some(item) => item,
None => {
return Ready(U::finalize(sealed::Internal, &mut me.collection));
return Ready(U::finalize(sealed::Internal, me.collection));
}
};
if !U::extend(sealed::Internal, &mut me.collection, item) {
return Ready(U::finalize(sealed::Internal, &mut me.collection));
if !U::extend(sealed::Internal, me.collection, item) {
return Ready(U::finalize(sealed::Internal, me.collection));
}
}
}

52
third_party/rust/tokio-stream/src/stream_ext/map_while.rs поставляемый Normal file
Просмотреть файл

@ -0,0 +1,52 @@
use crate::Stream;
use core::fmt;
use core::pin::Pin;
use core::task::{Context, Poll};
use pin_project_lite::pin_project;
pin_project! {
/// Stream for the [`map_while`](super::StreamExt::map_while) method.
#[must_use = "streams do nothing unless polled"]
pub struct MapWhile<St, F> {
#[pin]
stream: St,
f: F,
}
}
impl<St, F> fmt::Debug for MapWhile<St, F>
where
St: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MapWhile")
.field("stream", &self.stream)
.finish()
}
}
impl<St, F> MapWhile<St, F> {
pub(super) fn new(stream: St, f: F) -> Self {
MapWhile { stream, f }
}
}
impl<St, F, T> Stream for MapWhile<St, F>
where
St: Stream,
F: FnMut(St::Item) -> Option<T>,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
let me = self.project();
let f = me.f;
me.stream.poll_next(cx).map(|opt| opt.and_then(f))
}
fn size_hint(&self) -> (usize, Option<usize>) {
let (_, upper) = self.stream.size_hint();
(0, upper)
}
}

Просмотреть файл

@ -8,6 +8,13 @@ use pin_project_lite::pin_project;
pin_project! {
/// Future for the [`next`](super::StreamExt::next) method.
///
/// # Cancel safety
///
/// This method is cancel safe. It only
/// holds onto a reference to the underlying stream,
/// so dropping it will never lose a value.
///
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Next<'a, St: ?Sized> {

83
third_party/rust/tokio-stream/src/stream_ext/then.rs поставляемый Normal file
Просмотреть файл

@ -0,0 +1,83 @@
use crate::Stream;
use core::fmt;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use pin_project_lite::pin_project;
pin_project! {
/// Stream for the [`then`](super::StreamExt::then) method.
#[must_use = "streams do nothing unless polled"]
pub struct Then<St, Fut, F> {
#[pin]
stream: St,
#[pin]
future: Option<Fut>,
f: F,
}
}
impl<St, Fut, F> fmt::Debug for Then<St, Fut, F>
where
St: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Then")
.field("stream", &self.stream)
.finish()
}
}
impl<St, Fut, F> Then<St, Fut, F> {
pub(super) fn new(stream: St, f: F) -> Self {
Then {
stream,
future: None,
f,
}
}
}
impl<St, F, Fut> Stream for Then<St, Fut, F>
where
St: Stream,
Fut: Future,
F: FnMut(St::Item) -> Fut,
{
type Item = Fut::Output;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Fut::Output>> {
let mut me = self.project();
loop {
if let Some(future) = me.future.as_mut().as_pin_mut() {
match future.poll(cx) {
Poll::Ready(item) => {
me.future.set(None);
return Poll::Ready(Some(item));
}
Poll::Pending => return Poll::Pending,
}
}
match me.stream.as_mut().poll_next(cx) {
Poll::Ready(Some(item)) => {
me.future.set(Some((me.f)(item)));
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let future_len = if self.future.is_some() { 1 } else { 0 };
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(future_len);
let upper = upper.and_then(|upper| upper.checked_add(future_len));
(lower, upper)
}
}

Просмотреть файл

@ -9,6 +9,12 @@ use pin_project_lite::pin_project;
pin_project! {
/// Future for the [`try_next`](super::StreamExt::try_next) method.
///
/// # Cancel safety
///
/// This method is cancel safe. It only
/// holds onto a reference to the underlying stream,
/// so dropping it will never lose a value.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryNext<'a, St: ?Sized> {

Просмотреть файл

@ -585,6 +585,15 @@ where
}
}
impl<K, V> Extend<(K, V)> for StreamMap<K, V> {
fn extend<T>(&mut self, iter: T)
where
T: IntoIterator<Item = (K, V)>,
{
self.entries.extend(iter);
}
}
mod rand {
use std::cell::Cell;

Просмотреть файл

@ -14,11 +14,11 @@ use std::task::{Context, Poll};
/// [`Stream`]: trait@crate::Stream
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
pub struct BroadcastStream<T> {
inner: ReusableBoxFuture<(Result<T, RecvError>, Receiver<T>)>,
inner: ReusableBoxFuture<'static, (Result<T, RecvError>, Receiver<T>)>,
}
/// An error returned from the inner stream of a [`BroadcastStream`].
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub enum BroadcastStreamRecvError {
/// The receiver lagged too far behind. Attempting to receive again will
/// return the oldest message still retained by the channel.

Просмотреть файл

@ -49,7 +49,7 @@ use tokio::sync::watch::error::RecvError;
/// [`Stream`]: trait@crate::Stream
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
pub struct WatchStream<T> {
inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver<T>)>,
inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<T>)>,
}
async fn make_future<T: Clone + Send + Sync>(

Просмотреть файл

@ -1,10 +1,10 @@
#![cfg(feature = "full")]
#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
use tokio::time::{self, sleep, Duration};
use tokio_stream::{self, StreamExt};
use tokio_test::*;
use futures::StreamExt as _;
use futures::stream;
async fn maybe_sleep(idx: i32) -> i32 {
if idx % 2 == 0 {

Просмотреть файл

@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
use tokio::time;
use tokio_stream::StreamExt;