美文网首页
Rust修行之Future篇-part3

Rust修行之Future篇-part3

作者: 黑天鹅学院 | 来源:发表于2020-02-28 10:19 被阅读0次

    本文翻译自Rust futures: an uneducated, short and hopefully not boring tutorial - Part 3 - The reactor

    介绍

    在这一篇文章中,我们将深入reactor内部的实现原理。在前面的文章中,我们多次使用reactor来执行Future,但是我们并不关心内部的实现。

    Reactor与Loop

    简单的说,reactor就是一个Loop,为了解释这个概念,可以参考一个老套的例子。

    你向心仪的女孩发了封邮件,邀请她陪你一起看电影,发完邮件后,你肯定会十分忐忑,会不停的查邮箱,一遍又一遍的查,直到得到回复。

    rust的reactor的运行原理类似于这个过程。将Future提交给reactor之后,reactor将不断的检查该Future,直到这个Future运行结束,或者是出现错误。reactor执行Future是通过调用poll函数来完成的,每一个Future都需要实现该函数,具体来说,就是返回一个Poll<T,E>结构体。事实上,reactor并不会无止境的调用poll函数,我们用一个例子来进行分析。

    从零开始实现Future

    为了认识reactor,我们从零实现一个Future,也就是手动实现Futuretrait。我们实现的Future功能很简单,在超时之后返回。

    我们的定义WaitForIt如下:

    
    #[derive(Debug)]
    
    struct WaitForIt {
    
        message: String,
    
        until: DateTime<Utc>,
    
        polls: u64,
    
    }
    
    

    这个结构体保存有超时时间,一个用户自定义的字符串,以及已经被调用的次数。我们实现的new函数如下:

    
    impl WaitForIt {
    
        pub fn new(message: String, delay: Duration) -> WaitForIt {
    
            WaitForIt {
    
                polls: 0,
    
                message: message,
    
                until: Utc::now() + delay,
    
            }
    
        }
    
    }
    
    

    这个new函数将初始化一个WaitForIt实例。

    现在,我们开始实现Futuretrait,要做的事情也就是实现poll函数。

    
    impl Future for WaitForIt {
    
        type Item = String;
    
        type Error = Box<Error>;
    
        fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
    
            let now = Utc::now();
    
            if self.until < now {
    
                Ok(Async::Ready(
    
                    format!("{} after {} polls!", self.message, self.polls),
    
                ))
    
            } else {
    
                self.polls += 1;
    
                println!("not ready yet --> {:?}", self);
    
                Ok(Async::NotReady)
    
            }
    
        }
    
    }
    
    

    来看这几行:

    
    type Item = String;
    
    type Error = Box<Error>;
    
    

    在rust里面,这样的类型被叫做关联类型, 意思就是,Future在将来完成时返回的值(或者错误)。在我们的例子中,WaitForIt最终返回一个String,或者是Box<Error>

    看poll的函数的定义:

    
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
    
    

    在这个定义中,Self::ItemSelf::Error是两个关联类型的占位符,函数的定义与下面等价:

    
    fn poll(&mut self) -> Poll<String, Box<Error>>
    
    

    逻辑部分如下:

    
    let now = Utc::now();
    
    if self.until < now {
    
      // Tell reactor we are ready!
    
    } else {
    
      // Tell reactor we are not ready! Come back later!
    
    }
    
    

    在poll函数中,我们怎么告诉reactor当前Future的执行状态呢,换句话说,reactor怎么直到这个Future已经完成了呢?方法很简单,我们通过Ok枚举携带Async::NotReady表征Future未完成,通过Ok枚举携带Async::Ready表征Future已完成。

    poll函数改造如下:

    
    impl Future for WaitForIt {
    
        type Item = String;
    
        type Error = Box<Error>;
    
        fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
    
            let now = Utc::now();
    
            if self.until < now {
    
                Ok(Async::Ready(
    
                    format!("{} after {} polls!", self.message, self.polls),
    
                ))
    
            } else {
    
                self.polls += 1;
    
                println!("not ready yet --> {:?}", self);
    
                Ok(Async::NotReady)
    
            }
    
        }
    
    }
    
    

    我们在main函数中创建reactor来执行Future。

    
    fn main() {
    
        let mut reactor = Core::new().unwrap();
    
        let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));
    
        println!("wfi_1 == {:?}", wfi_1);
    
        let ret = reactor.run(wfi_1).unwrap();
    
        println!("ret == {:?}", ret);
    
    }
    
    

    我们预期Future在1秒钟之后完成,运行结果如下:

    
    Running `target/debug/tst_fut_create`
    
    wfi_1 == WaitForIt { message: "I\'m done:", until: 2017-11-07T16:07:06.382232234Z, polls: 0 }
    
    not ready yet --> WaitForIt { message: "I\'m done:", until: 2017-11-07T16:07:06.382232234Z, polls: 1 }
    
    

    运行结果貌似不符合预期,仅运行了一次就停住了,但是并没有产生额外的CPU消耗,这是为什么呢?

    fcpu

    在rust中,一个Future poll函数提交给reactor后,视为这个Future停放在了这个reactor中,而reactor并不会再次调用这个poll函数,除非显式的告知需要被再次调用。在我们的例子中,reactor会立即调用WaitForIt中的poll函数,但是返回值是Async::NotReady,所以这个poll函数将会被停放在这个reactor中。如果没有相应的机制告诉reactor解除停放,那么poll函数将永远不会被再次调用。在这个过程中,reactor处于空闲状态,并不会额外消耗CPU。由于没有去直接查询运行是否完成的状态,所以这种方式的效率很高。在上面的邮件例子中,我们可以让邮箱在收到回复之后通知我们,这样就没有必要不断的去查邮箱了。

    另一个更有意义的例子是网络收包过程,在不确定报文什么时候到达的情况下,我们可以阻塞线程等待报文到来,也可以在等待的过程中做其他的事情。

    解除停放

    有些时候,我们需要解除poll函数的停放,应该怎样修改WaitForIt的实现呢?有很多外部事件可以用来解除停放,比如键盘事件或者网络报文到达,在我们的例子中,我们需要手动触发。

    
    futures::task::current().notify();
    
    

    Future修改如下:

    
    impl Future for WaitForIt {
    
        type Item = String;
    
        type Error = Box<Error>;
    
        fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
    
            let now = Utc::now();
    
            if self.until < now {
    
                Ok(Async::Ready(
    
                    format!("{} after {} polls!", self.message, self.polls),
    
                ))
    
            } else {
    
                self.polls += 1;
    
                println!("not ready yet --> {:?}", self);
    
                futures::task::current().notify();
    
                Ok(Async::NotReady)
    
            }
    
        }
    
    }
    
    

    现在可以可以看到,Future不会停下来。

    frun

    代码运行结束后,poll函数在1秒内被调用了超过50k次。这是严重的资源浪费,所以,应该仅在事件明确发生的时候,才应该解除停放。

    fcpu1

    到目前为止,我们的loop是单线程的,如果有需要,可以使用多线程来运行Future。

    Joining

    reactor有一个有用的特性是可以并行运行多个Future,通过这种方式,我们可以更加高效的利用单线程loop,如果一个Future被停放了,另一个Future将获得执行机会。

    我们复用WaitForIt,定义两个Future,然后并发的执行这两个Future:

    
    let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));
    
    println!("wfi_1 == {:?}", wfi_1);
    
    let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(1));
    
    println!("wfi_2 == {:?}", wfi_2);
    
    

    我们通过futures::future::join_all来并发执行Future,join_all的输入是一个Future迭代器,我们先创建一个vector:

    
    let v = vec![wfi_1, wfi_2];
    
    

    然后创建联合:

    
    let sel = join_all(v);
    
    

    完整的代码如下:

    
    fn main() {
    
        let mut reactor = Core::new().unwrap();
    
        let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));
    
        println!("wfi_1 == {:?}", wfi_1);
    
        let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(1));
    
        println!("wfi_2 == {:?}", wfi_2);
    
        let v = vec![wfi_1, wfi_2];
    
        let sel = join_all(v);
    
        let ret = reactor.run(sel).unwrap();
    
        println!("ret == {:?}", ret);
    
    }
    
    

    运行结果如下:

    frun1

    关键点是两个请求是交错的,第一个Future被调用后,第二个Future被调用,然后是第一个,接着是第二个,依此类推,直到两个Future最终完成。

    Select

    futuretrait有很多辅助性的函数,除了join_all之外,还有一个是selectselect函数运行两个Future,返回第一个完成的Future。这种方法在实现超时时很有用,我们举一个例子:

    
    fn main() {
    
        let mut reactor = Core::new().unwrap();
    
        let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));
    
        println!("wfi_1 == {:?}", wfi_1);
    
        let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(2));
    
        println!("wfi_2 == {:?}", wfi_2);
    
        let v = vec![wfi_1, wfi_2];
    
        let sel = select_all(v);
    
        let ret = reactor.run(sel).unwrap();
    
        println!("ret == {:?}", ret);
    
    }
    
    

    尾声

    在下一篇文章中,我们将介绍更加有实际意义的future,不消耗额外的CPU资源,同时更加符合reactor的要求。

    相关文章

      网友评论

          本文标题:Rust修行之Future篇-part3

          本文链接:https://www.haomeiwen.com/subject/tlekhhtx.html