美文网首页
[Rust-async-book]--5--The Stream

[Rust-async-book]--5--The Stream

作者: buddyCoder | 来源:发表于2020-03-09 14:25 被阅读0次

The Stream trait is similar to Future but can yield multiple values before completing, similar to the Iterator trait from the standard library:
Stream trait 类似于 Future,但在完成之前可以产生多个值,类似于标准库中的迭代器特征:

trait Stream {
    /// The type of the value yielded by the stream.
    type Item;

    /// Attempt to resolve the next item in the stream.
    /// Retuns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value
    /// is ready, and `Poll::Ready(None)` if the stream has completed.
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Self::Item>>;
}

One common example of a Stream is the Receiver for the channel type from the futures crate. It will yield Some(val) every time a value is sent from the Sender end, and will yield None once the Sender has been dropped and all pending messages have been received:
Stream 的一个常见示例是futures crate中channel 类型的接收器。每次从发送方端发送值时,它都会产生Some(val),并且在发送方被删除并且接收到所有挂起的消息后,它将不产生任何值:

async fn send_recv() {
    const BUFFER_SIZE: usize = 10;
    let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);

    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    drop(tx);

    // `StreamExt::next` is similar to `Iterator::next`, but returns a
    // type that implements `Future<Output = Option<T>>`.
    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}

Iteration and Concurrency

Similar to synchronous Iterators, there are many different ways to iterate over and process the values in a Stream. There are combinator-style methods such as map, filter, and fold, and their early-exit-on-error cousins try_map, try_filter, and try_fold.
与同步迭代器类似,有许多不同的方法来迭代和处理Stream中的值。有一些组合样式的方法,如map、filter和fold,它们的早期退出错误类似于try_map、try_filter和try_fold。

Unfortunately, for loops are not usable with Streams, but for imperative-style code, while let and the next/try_next functions can be used:
不幸的是,for循环不能用于流,而是用于命令式代码,而let和next/try_next函数可以用于:

async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // for `next`
    let mut sum = 0;
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

async fn sum_with_try_next(
    mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // for `try_next`
    let mut sum = 0;
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}

However, if we're just processing one element at a time, we're potentially leaving behind opportunity for concurrency, which is, after all, why we're writing async code in the first place. To process multiple items from a stream concurrently, use the for_each_concurrent and try_for_each_concurrent methods:
然而,如果我们一次只处理一个元素,那么我们就有可能留下并发的机会,这毕竟是我们首先编写异步代码的原因。要同时处理流中的多个项,请使用for-each-concurrent和try-for-each-concurrent方法:

async fn jump_around(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
    const MAX_CONCURRENT_JUMPERS: usize = 100;

    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}

相关文章

  • [Rust-async-book]--5--The Stream

    The Stream trait is similar to Future but can yield multi...

  • 5--The Words

    整个城市在这个时刻静了下来,于是我听到了久违的手表指针发出的的嘀嗒声,不禁想起了以前那些挑灯奋战的夜晚,那些所谓的...

  • JAVA8新特性: Stream-集合流操作

    Stream类全路径为:java.util.stream.Stream Stream简介 Stream原理 Str...

  • Stream流

    一、创建流 Arrays.stream Stream.of Collection.stream Stream.it...

  • JDK8新特性之Stream流

    是什么是Stream流 java.util.stream.Stream Stream流和传统的IO流,它们都叫流,...

  • 2018-04-03

    Stream初探 一:stream.Readable & stream.Writable 1:模拟实现 strea...

  • 异步 Stream

    flutter 异步方式 Future Async/await Stream Stream stream 是一个事...

  • Java8新特性

    java.util.stream.Stream 注意事项 使用java.util.stream.Collector...

  • 2019-02-02——Java8 Stream

    Stream分为两种: 串行流——stream() 并行流——parallelStream() Stream的特性...

  • 中级11 - Java 8 Sream

    1. 创建 Stream Collection.stream Stream.of String.chars Int...

网友评论

      本文标题:[Rust-async-book]--5--The Stream

      本文链接:https://www.haomeiwen.com/subject/ffmudhtx.html