美文网首页RUST编程
006 Rust 异步编程,Stream 介绍

006 Rust 异步编程,Stream 介绍

作者: 令狐壹冲 | 来源:发表于2020-07-06 22:10 被阅读0次

    Stream 介绍

    StreamFuture类似,但是Future对应的是一个item的状态的变化,而Stream则是类似于iterator,在结束之前能够得到多个值。或者我们可以简单的理解为,Stream是由一系列的Future组成,我们可以从Stream读取各个Future的结果,直到Stream结束。

    Stream trait定义

    定义如下:

    trait Stream {
        type Item;
    
        fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker)
            -> Poll<Option<Self::Item>>;
    }
    
    

    poll_next函数有三种可能的返回值,分别如下:

    • Poll::Pending 说明下一个值还没有就绪,仍然需要等待。
    • Poll::Ready(Some(val)) 已经就绪,成功返回一个值,程序可以通过调用poll_next再获取下一个值。
    • Poll::Ready(None) 表示Stream已经结束,不应该在调用poll_next

    迭代

    和同步的Iterator类似,Stream可以迭代处理其中的值,如使用map, filter, fold, try_map, try_filter, and try_fold等。但是Stream不支持使用for,而while letnext/try_next则是允许的。 例子如下:

    async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
        use futures::stream::StreamExt; // for `next`
        let mut sum = 0;
        while let Some(item) = stream.next().await {
            sum += item;
        }
        sum
    }
    
    async fn sum_with_try_next(
        mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
    ) -> Result<i32, io::Error> {
        use futures::stream::TryStreamExt; // for `try_next`
        let mut sum = 0;
        while let Some(item) = stream.try_next().await? {
            sum += item;
        }
        Ok(sum)
    }
    
    

    并发

    上面的使用的迭代处理,如果我们要并发的处理流,则应该使用for_each_concurrenttry_for_each_concurrent,示例如下:

    async fn jump_around(
        mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
    ) -> Result<(), io::Error> {
        use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
        const MAX_CONCURRENT_JUMPERS: usize = 100;
    
        stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
            jump_n_times(num).await?;
            report_n_jumps(num).await?;
            Ok(())
        }).await?;
    
        Ok(())
    }
    
    

    参考资料

    Rust异步编程

    相关文章

      网友评论

        本文标题:006 Rust 异步编程,Stream 介绍

        本文链接:https://www.haomeiwen.com/subject/djhnqktx.html