The Stream trait is similar to Future but can yield multiple values before completing, similar to the Iterator trait from the standard library:
Stream trait 类似于 Future,但在完成之前可以产生多个值,类似于标准库中的迭代器特征:
trait Stream {
/// The type of the value yielded by the stream.
type Item;
/// Attempt to resolve the next item in the stream.
/// Retuns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value
/// is ready, and `Poll::Ready(None)` if the stream has completed.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>;
}
One common example of a Stream is the Receiver for the channel type from the futures crate. It will yield Some(val) every time a value is sent from the Sender end, and will yield None once the Sender has been dropped and all pending messages have been received:
Stream 的一个常见示例是futures crate中channel 类型的接收器。每次从发送方端发送值时,它都会产生Some(val),并且在发送方被删除并且接收到所有挂起的消息后,它将不产生任何值:
async fn send_recv() {
const BUFFER_SIZE: usize = 10;
let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);
tx.send(1).await.unwrap();
tx.send(2).await.unwrap();
drop(tx);
// `StreamExt::next` is similar to `Iterator::next`, but returns a
// type that implements `Future<Output = Option<T>>`.
assert_eq!(Some(1), rx.next().await);
assert_eq!(Some(2), rx.next().await);
assert_eq!(None, rx.next().await);
}
Iteration and Concurrency
Similar to synchronous Iterators, there are many different ways to iterate over and process the values in a Stream. There are combinator-style methods such as map, filter, and fold, and their early-exit-on-error cousins try_map, try_filter, and try_fold.
与同步迭代器类似,有许多不同的方法来迭代和处理Stream中的值。有一些组合样式的方法,如map、filter和fold,它们的早期退出错误类似于try_map、try_filter和try_fold。
Unfortunately, for loops are not usable with Streams, but for imperative-style code, while let and the next/try_next functions can be used:
不幸的是,for循环不能用于流,而是用于命令式代码,而let和next/try_next函数可以用于:
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
use futures::stream::StreamExt; // for `next`
let mut sum = 0;
while let Some(item) = stream.next().await {
sum += item;
}
sum
}
async fn sum_with_try_next(
mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
use futures::stream::TryStreamExt; // for `try_next`
let mut sum = 0;
while let Some(item) = stream.try_next().await? {
sum += item;
}
Ok(sum)
}
However, if we're just processing one element at a time, we're potentially leaving behind opportunity for concurrency, which is, after all, why we're writing async code in the first place. To process multiple items from a stream concurrently, use the for_each_concurrent and try_for_each_concurrent methods:
然而,如果我们一次只处理一个元素,那么我们就有可能留下并发的机会,这毕竟是我们首先编写异步代码的原因。要同时处理流中的多个项,请使用for-each-concurrent和try-for-each-concurrent方法:
async fn jump_around(
mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
const MAX_CONCURRENT_JUMPERS: usize = 100;
stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
jump_n_times(num).await?;
report_n_jumps(num).await?;
Ok(())
}).await?;
Ok(())
}
网友评论