美文网首页
[Rust-async-book]--2--Under the

[Rust-async-book]--2--Under the

作者: buddyCoder | 来源:发表于2020-03-06 09:01 被阅读0次

    In this section, we'll cover the underlying structure of how Futures and asynchronous tasks are scheduled. If you're only interested in learning how to write higher-level code that uses existing Future types and aren't interested in the details of how Future types work, you can skip ahead to the async/await chapter. However, several of the topics discussed in this chapter are useful for understanding how async/await code works, understanding the runtime and performance properties of async/await code, and building new asynchronous primitives. If you decide to skip this section now, you may want to bookmark it to revisit in the future.
    在本节中,我们将介绍如何调度Futures 任务和异步任务的底层结构。如果您只对学习如何编写使用现有Future types 的高级代码感兴趣,而对Future types如何工作的细节不感兴趣,那么您可以跳到async/await一章。然而,本章讨论的几个主题对于理解async/await 代码如何工作、理解async/await 代码的运行时和性能属性以及构建新的异步原语都很有用。如果您现在决定跳过此部分,您可能希望将其添加到书签,以便将来重新访问。

    Now, with that out of the way, let's talk about the Future trait.
    现在,有了这些,我们来谈谈Future trait。

    The Future Trait

    The Future trait is at the center of asynchronous programming in Rust. A Future is an asynchronous computation that can produce a value (although that value may be empty, e.g. ()). A simplified version of the future trait might look something like this:
    在Rust中,Future trait是异步编程的核心。Future是一个异步计算,它可以产生一个值(尽管这个值可能是空的,例如())。future trait 的简化版本可能是这样的:

    trait SimpleFuture {
        type Output;
        fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
    }
    
    enum Poll<T> {
        Ready(T),
        Pending,
    }
    
    

    Futures can be advanced by calling the poll function, which will drive the future as far towards completion as possible. If the future completes, it returns Poll::Ready(result). If the future is not able to complete yet, it returns Poll::Pending and arranges for the wake() function to be called when the Future is ready to make more progress. When wake() is called, the executor driving the Future will call poll again so that the Future can make more progress.

    Futures可以通过调用poll function来提前,这将使future尽可能地接近完成。如果future完成,它将返回Poll::Ready(result)。如果future还不能完成,它将返回Poll::Pending,并 Future 准备取得更多 progress 调用wake()函数。当调用wake()时,驱动Future的executor 将再次调用poll,以便Future可以取得更多的progress。

    Without wake(), the executor would have no way of knowing when a particular future could make progress, and would have to be constantly polling every future. With wake(), the executor knows exactly which futures are ready to be polled.

    如果没有wake(),executor 将无法知道某个特定的 future 何时可以取得 progress,并且必须不断轮询每个future 。通过使用wake(),executor 可以准确地知道哪些 future 即将轮询。

    For example, consider the case where we want to read from a socket that may or may not have data available already. If there is data, we can read it in and return Poll::Ready(data), but if no data is ready, our future is blocked and can no longer make progress. When no data is available, we must register wake to be called when data becomes ready on the socket, which will tell the executor that our future is ready to make progress. A simple SocketRead future might look something like this:

    例如,考虑这样一种情况:我们想要从一个可能已经有数据也可能没有数据的 socket 中读取数据。如果有数据,我们可以读入它并返回Poll::Ready(data),但是如果没有数据,我们的future 就会受阻,无法再取得progress。当没有数据可用时,我们必须在socket 上注册当数据准备好时调用的wake(),这将告诉executor 我们的 future 已经准备好取得 progress。一个简单的 SocketRead future 可能是这样的:

    pub struct SocketRead<'a> {
        socket: &'a Socket,
    }
    
    impl SimpleFuture for SocketRead<'_> {
        type Output = Vec<u8>;
    
        fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
            if self.socket.has_data_to_read() {
                // The socket has data-- read it into a buffer and return it.
                Poll::Ready(self.socket.read_buf())
            } else {
                // The socket does not yet have data.
                //
                // Arrange for `wake` to be called once data is available.
                // When data becomes available, `wake` will be called, and the
                // user of this `Future` will know to call `poll` again and
                // receive data.
                self.socket.set_readable_callback(wake);
              //等待 
               Poll::Pending  
            }
        }
    }
    

    This model of Futures allows for composing together multiple asynchronous operations without needing intermediate allocations. Running multiple futures at once or chaining futures together can be implemented via allocation-free state machines, like this:
    这种 Futures 模型允许组合多个异步操作,而不需要中间分配。一次运行多个Futures 或将Futures 链接在一起可以通过无分配状态机实现,如下所示:

    /// A SimpleFuture that runs two other futures to completion concurrently.
    ///一个简单的Future同时运行另外两个Future。
    /// Concurrency is achieved via the fact that calls to `poll` each future
    /// may be interleaved, allowing each future to advance itself at its own pace.
    ///并发性是通过调用每个future的“轮询”来实现的
    ///可能交错,让每个future以自己的步伐前进。
    
    pub struct Join<FutureA, FutureB> {
        // Each field may contain a future that should be run to completion.
        // If the future has already completed, the field is set to `None`.
        // This prevents us from polling a future after it has completed, which
        // would violate the contract of the `Future` trait.
      //每个field 可能包含一个应该运行到完成的future 。
      //如果future 已经完成,则field 设置为“None”。
      //这使我们无法在future  完成后再轮询它
      //会违反“Future”特质的合约。
        a: Option<FutureA>,
        b: Option<FutureB>,
    }
    
    impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
    where
        FutureA: SimpleFuture<Output = ()>,
        FutureB: SimpleFuture<Output = ()>,
    {
        type Output = ();
        fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
            // Attempt to complete future `a`.
            if let Some(a) = &mut self.a {
                if let Poll::Ready(()) = a.poll(wake) {
                    self.a.take();
                }
            }
    
            // Attempt to complete future `b`.
            if let Some(b) = &mut self.b {
                if let Poll::Ready(()) = b.poll(wake) {
                    self.b.take();
                }
            }
    
            if self.a.is_none() && self.b.is_none() {
                // Both futures have completed-- we can return successfully
                Poll::Ready(())
            } else {
                // One or both futures returned `Poll::Pending` and still have
                // work to do. They will call `wake()` when progress can be made.
                Poll::Pending
            }
        }
    }
    
    

    This shows how multiple futures can be run simultaneously without needing separate allocations, allowing for more efficient asynchronous programs. Similarly, multiple sequential futures can be run one after another, like this:
    这说明了如何在不需要单独分配的情况下同时运行多个futures ,从而实现更高效的异步程序。类似地,多个连续futures 可以一个接一个地运行,就像这样:

    /// A SimpleFuture that runs two futures to completion, one after another.
    //
    // Note: for the purposes of this simple example, `AndThenFut` assumes both
    // the first and second futures are available at creation-time. The real
    // `AndThen` combinator allows creating the second future based on the output
    // of the first future, like `get_breakfast.and_then(|food| eat(food))`.
    pub struct AndThenFut<FutureA, FutureB> {
        first: Option<FutureA>,
        second: FutureB,
    }
    
    impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
    where
        FutureA: SimpleFuture<Output = ()>,
        FutureB: SimpleFuture<Output = ()>,
    {
        type Output = ();
        fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
            if let Some(first) = &mut self.first {
                match first.poll(wake) {
                    // We've completed the first future-- remove it and start on
                    // the second!
                    Poll::Ready(()) => self.first.take(),
                    // We couldn't yet complete the first future.
                    Poll::Pending => return Poll::Pending,
                };
            }
            // Now that the first future is done, attempt to complete the second.
            self.second.poll(wake)
        }
    }
    

    These examples show how the Future trait can be used to express asynchronous control flow without requiring multiple allocated objects and deeply nested callbacks. With the basic control-flow out of the way, let's talk about the real Future trait and how it is different.

    这些示例展示了如何使用Future trait来表示异步控制流,而不需要多个已分配的对象和深度嵌套的回调。有了基本的控制流,让我们来谈谈真正的Future trait 和它是如何不同的。

    trait Future {
        type Output;
        fn poll(
            // Note the change from `&mut self` to `Pin<&mut Self>`:
            self: Pin<&mut Self>,
            // and the change from `wake: fn()` to `cx: &mut Context<'_>`:
            cx: &mut Context<'_>,
        ) -> Poll<Self::Output>;
    }
    

    The first change you'll notice is that our self type is no longer &mut self, but has changed to Pin<&mut Self>. We'll talk more about pinning in a later section, but for now know that it allows us to create futures that are immovable. Immovable objects can store pointers between their fields, e.g. struct MyFut { a: i32, ptr_to_a: *const i32 }. Pinning is necessary to enable async/await.
    你会注意到的第一个变化是我们的self类型不再是&mut self,而是变成了Pin<&mut self >。我们将在后面的章节中讨论更多关于 pinning 的内容,但是现在我们知道它允许我们创建不可移动的futures 。不可移动的对象可以在其字段之间存储指针,例如 struct MyFut {a: i32, ptr_to_a: *const i32} 。要启用async/await,必须进行Pinning 。

    Secondly, wake: fn() has changed to &mut Context<'_>. In SimpleFuture, we used a call to a function pointer (fn()) to tell the future executor that the future in question should be polled. However, since fn() is just a function pointer, it can't store any data about which Future called wake.

    其次,"wake: fn()" 已经改为" &mut Context<'_>"。在SimpleFuture中,我们使用对函数指针(fn())的调用来告诉future executor 应该轮询有question 的future 。但是,由于fn()只是一个函数指针,所以它不能存储关于哪个Future调用wake的任何数据。

    In a real-world scenario, a complex application like a web server may have thousands of different connections whose wakeups should all be managed separately. The Context type solves this by providing access to a value of type Waker, which can be used to wake up a specific task.
    在真实的场景中,像web服务器这样的复杂应用程序可能有数千个不同的连接,这些连接的唤醒应该分别进行管理。Context 类型通过提供对类型为Waker的值的访问来解决这个问题,该值可用于唤醒特定的任务。

    Task Wakeups with Waker

    It's common that futures aren't able to complete the first time they are polled. When this happens, the future needs to ensure that it is polled again once it is ready to make more progress. This is done with the Waker type.
    很常见的是,futures 无法完成第一次polled。当这种情况发生时,future 需要确保在它准备好取得更多progress时再次轮询。这是使用Waker类型完成的。

    Each time a future is polled, it is polled as part of a "task". Tasks are the top-level futures that have been submitted to an executor.
    每次对future 进行polled时,都将其作为“task”的一部分进行polled 。Tasks 是提交给executor 的顶级 futures 。

    Waker provides a wake() method that can be used to tell the executor that the associated task should be awoken. When wake() is called, the executor knows that the task associated with the Waker is ready to make progress, and its future should be polled again.
    Waker提供了一个wake()方法,该方法可用于告诉 executor 应该唤醒相关的任务。调用wake()时,executor 知道与Waker关联的任务已经准备好取得 progress,应该再次 polled 它的 future 。

    Waker also implements clone() so that it can be copied around and stored.
    Waker还实现了clone(),这样就可以对其进行复制和存储。

    Let's try implementing a simple timer future using Waker.
    让我们尝试使用Waker实现一个简单的timer future。

    Applied: Build a Timer

    For the sake of the example, we'll just spin up a new thread when the timer is created, sleep for the required time, and then signal the timer future when the time window has elapsed.
    对于本例,我们将在创建计时器时启动一个新线程,休眠所需的时间,然后在时间窗口结束时通知计时器的future 。
    Here are the imports we'll need to get started:
    这里是我们需要开始的导入:

    
    use {
        std::{
            future::Future,
            pin::Pin,
            sync::{Arc, Mutex},
            task::{Context, Poll, Waker},
            thread,
            time::Duration,
        },
    };
    

    Let's start by defining the future type itself. Our future needs a way for the thread to communicate that the timer has elapsed and the future should complete. We'll use a shared Arc<Mutex<..>> value to communicate between the thread and the future.
    让我们从定义 future 类型本身开始。我们的future 需要一种方式,使线程能够告知计时器 future 已经完成,我们将使用共享的Arc<Mutex<..>>值在线程和future 之间进行通信。

    pub struct TimerFuture {
        shared_state: Arc<Mutex<SharedState>>,
    }
    
    /// Shared state between the future and the waiting thread
    struct SharedState {
        /// Whether or not the sleep time has elapsed
        completed: bool,
    
        /// The waker for the task that `TimerFuture` is running on.
        /// The thread can use this after setting `completed = true` to tell
        /// `TimerFuture`'s task to wake up, see that `completed = true`, and
        /// move forward.
        waker: Option<Waker>,
    }
    

    Now, let's actually write the Future implementation!
    现在,让我们写下Future 的实现!

    impl Future for TimerFuture {
        type Output = ();
        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            // Look at the shared state to see if the timer has already completed.
    
            let mut shared_state = self.shared_state.lock().unwrap();
            if shared_state.completed {
                Poll::Ready(())
            } else {
    
                // Set waker so that the thread can wake up the current task
                // when the timer has completed, ensuring that the future is polled
                // again and sees that `completed = true`.
                //
                // It's tempting to do this once rather than repeatedly cloning
                // the waker each time. However, the `TimerFuture` can move between
                // tasks on the executor, which could cause a stale waker pointing
                // to the wrong task, preventing `TimerFuture` from waking up
                // correctly.
                //
                // N.B. it's possible to check for this using the `Waker::will_wake`
                // function, but we omit that here to keep things simple.
              //设置waker ,以便线程可以在计时器完成时唤醒当前任务,确保再次轮询未来并看    
             //到“completed=true”。
             // 这样做一次很有诱惑力,而不是每次都重复克隆唤醒器。但是,“TimerFuture”可以在执行器上的任务之
             // 间移动,这可能会导致过时的唤醒器指向错误的任务,从而阻止“TimerFuture”正确唤醒。
             //N、 B.可以使用“Waker::will_wake”函数检查这个,但是我们在这里省略了这个以保持简单
                shared_state.waker = Some(cx.waker().clone());
                Poll::Pending
            }
        }
    }
    
    

    Pretty simple, right? If the thread has set shared_state.completed = true, we're done! Otherwise, we clone the Waker for the current task and pass it to shared_state.waker so that the thread can wake the task back up.
    很简单,对吧?如果线程已将shared_state.completed设置为true,则完成!否则,我们将克隆当前任务的唤醒器并将其传递给shared_state.Waker,以便线程可以唤醒任务。

    Importantly, we have to update the Waker every time the future is polled because the future may have moved to a different task with a different Waker. This will happen when futures are passed around between tasks after being polled.

    重要的是,每次对future 进行polled 时,我们都必须更新Waker程序,因为future 可能已转移到具有不同唤醒程序的不同任务。这将发生在polled后在任务之间传递future 时。

    Finally, we need the API to actually construct the timer and start the thread:
    最后,我们需要API来实际构造定时器和启动线程:

    impl TimerFuture {
       /// Create a new `TimerFuture` which will complete after the provided
       /// timeout.
      ///创建一个新的“TimerFuture”,它将在提供的超时后完成。
       pub fn new(duration: Duration) -> Self {
           let shared_state = Arc::new(Mutex::new(SharedState {
               completed: false,
               waker: None,
           }));
    
           // Spawn the new thread 生成新的线程
           let thread_shared_state = shared_state.clone();
           thread::spawn(move || {
               thread::sleep(duration);
               let mut shared_state = thread_shared_state.lock().unwrap();
               // Signal that the timer has completed and wake up the last
               // task on which the future was polled, if one exists.
              //表示计时器已完成并唤醒对未来进行轮询的最后一个任务(如果存在)。
               shared_state.completed = true;
               if let Some(waker) = shared_state.waker.take() {
                   waker.wake()
               }
           });
    
           TimerFuture { shared_state }
       }
    }
    

    Woot! That's all we need to build a simple timer future. Now, if only we had an executor to run the future on...
    呜!这就是我们需要建立一个简单的计时器 future。现在,如果我们有一个 executor 来管理 the future 。。。

    Applied: Build an Executor

    Rust's Futures are lazy: they won't do anything unless actively driven to completion. One way to drive a future to completion is to .await it inside an async function, but that just pushes the problem one level up: who will run the futures returned from the top-level async functions? The answer is that we need a Future executor.
    Rust的Futures 是 lazy的:除非积极推动完成,否则他们不会做任何事情。驱动Futures 完成的一种方法是在async function中.await它,但这只是将问题推高了一层:谁将运行从顶级异步函数返回的futures ?答案是我们需要一个Future executor。

    Future executors take a set of top-level Futures and run them to completion by calling poll whenever the Future can make progress. Typically, an executor will poll a future once to start off. When Futures indicate that they are ready to make progress by calling wake(), they are placed back onto a queue and poll is called again, repeating until the Future has completed.
    Future executors 将获取一组顶级Futures ,并在Future 可以取得progress 时通过调用poll来运行它们直到完成。通常情况下, 一个 executor 会对 future进行一次poll以开始。当Futures通过调用wake()指示它们准备好要取得progress 时,它们将被放回队列中,并再次调用poll,重复直到Future完成。

    In this section, we'll write our own simple executor capable of running a large number of top-level futures to completion concurrently.
    在本节中,我们将编写自己的简单 executor,它能够同时运行大量顶级futures。

    For this example, we depend on the futures crate for the ArcWake trait, which provides an easy way to construct a Waker.
    在这个例子中,我们依赖于ArcWake特性的futures crate ,它提供了一种构造Waker的简单方法。

    [package]
    name = "xyz"
    version = "0.1.0"
    authors = ["XYZ Author"]
    edition = "2018"
    
    [dependencies]
    futures = "0.3"
    

    Next, we need the following imports at the top of src/main.rs:
    接下来,我们需要src/main.rs顶部的以下导入:

    use {
        futures::{
            future::{FutureExt, BoxFuture},
            task::{ArcWake, waker_ref},
        },
        std::{
            future::Future,
            sync::{Arc, Mutex},
            sync::mpsc::{sync_channel, SyncSender, Receiver},
            task::{Context, Poll},
            time::Duration,
        },
        // The timer we wrote in the previous section:
        timer_future::TimerFuture,
    };
    

    Our executor will work by sending tasks to run over a channel. The executor will pull events off of the channel and run them. When a task is ready to do more work (is awoken), it can schedule itself to be polled again by putting itself back onto the channel.
    我们的executor 将通过发送任务来运行一个通道channel。executor 将从通道中提取事件并运行它们。当一个任务准备好做更多的工作(被唤醒)时,它可以通过将自己放回 channel来安排自己再次被 polled 。

    In this design, the executor itself just needs the receiving end of the task channel. The user will get a sending end so that they can spawn new futures. Tasks themselves are just futures that can reschedule themselves, so we'll store them as a future paired with a sender that the task can use to requeue itself.
    在这种设计中,executor 本身只需要任务通道的接收端。用户将得到一个发送端,这样他们就可以产生新的futures。任务本身只是可以重新调度自己的futures ,因此我们将它们存储为与发送者配对的futures ,任务可以使用发送者来重新排队。

    // Task executor that receives tasks off of a channel and runs them.
    //从通道接收任务并运行它们的任务执行器。
    struct Executor {
        ready_queue: Receiver<Arc<Task>>,
    }
    
    /// `Spawner` spawns new futures onto the task channel.
    /// `Spawner` 生成者 在 任务通道上生成新的 futures 。
    #[derive(Clone)]
    struct Spawner {
        task_sender: SyncSender<Arc<Task>>,
    }
    
    /// A future that can reschedule itself to be polled by an `Executor`.
    /// 一个可以重新安排自己接受“Executor”进行polled 的future ,其实就是一个任务。
    struct Task {
        /// In-progress future that should be pushed to completion.
        ///
        /// The `Mutex` is not necessary for correctness, since we only have
        /// one thread executing tasks at once. However, Rust isn't smart
        /// enough to know that `future` is only mutated from one thread,
        /// so we need use the `Mutex` to prove thread-safety. A production
        /// executor would not need this, and could use `UnsafeCell` instead.
       ///在进行中的future ,应该将其推送到完成位置。
      ///正确性不必要使用“Mutex”互斥锁,因为我们一次只能有一个线程执行任务。
      ///然而,Rust还不够聪明,无法知道“future”只是从一个线程变异而来,
      ///因此我们需要使用“Mutex”来证明线程的安全性。
     /// 生产executor 不需要这些,而是可以使用“UnsafeCell” 代替。
    
        future: Mutex<Option<BoxFuture<'static, ()>>>,
    
        /// Handle to place the task itself back onto the task queue.
       ///  将任务本身放回任务队列的句柄。
        task_sender: SyncSender<Arc<Task>>,
    }
    
    fn new_executor_and_spawner() -> (Executor, Spawner) {
        // Maximum number of tasks to allow queueing in the channel at once.
        // This is just to make `sync_channel` happy, and wouldn't be present in
        // a real executor.
       //允许同时在通道中排队的最大任务数。
       //这只是为了让“sync_channel”开心,不会出现在真正的 executor 身上。
        const MAX_QUEUED_TASKS: usize = 10_000;
        let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
        (Executor { ready_queue }, Spawner { task_sender })
    }
    

    Let's also add a method to spawner to make it easy to spawn new futures. This method will take a future type, box it and put it in a FutureObj, and create a new Arc<Task> with it inside which can be enqueued onto the executor.
    我们还可以为spawner添加一个方法,以便于生成新的futures。此方法将获取一个futures类型,将其装箱并放入一个FutureObj中,并使用它创建一个新的Arc<Task>,该Arc<Task>可以在executor上排队。

    impl Spawner {
        fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
            let future = future.boxed();
            let task = Arc::new(Task {
                future: Mutex::new(Some(future)),
                task_sender: self.task_sender.clone(),
            });
            self.task_sender.send(task).expect("too many tasks queued");
        }
    }
    

    To poll futures, we'll need to create a Waker. As discussed in the task wakeups section, Wakers are responsible for scheduling a task to be polled again once wake is called. Remember that Wakers tell the executor exactly which task has become ready, allowing them to poll just the futures that are ready to make progress. The easiest way to create a new Waker is by implementing the ArcWake trait and then using the waker_ref or .into_waker() functions to turn an Arc<impl ArcWake> into a Waker. Let's implement ArcWake for our tasks to allow them to be turned into Wakers and awoken:

    要 poll futures,我们需要创建一个Waker。正如在任务唤醒部分所讨论的,Wakers 负责在调用唤醒后调度要再次轮询的任务。记住,Wakers 准确地告诉 executor 哪些任务已经准备好了,允许他们只轮询准备好要取得progress 的 futures。创建新Wakers 的最简单方法是实现ArcWake特性,然后使用Waker_ref or.into_Waker()函数将Arc<impl ArcWake>转换为Waker。让我们为我们的任务实现ArcWake,让它们变成Waker并被唤醒:

    impl ArcWake for Task {
        fn wake_by_ref(arc_self: &Arc<Self>) {
            // Implement `wake` by sending this task back onto the task channel
            // so that it will be polled again by the executor.
            let cloned = arc_self.clone();
            arc_self.task_sender.send(cloned).expect("too many tasks queued");
        }
    }
    

    When a Waker is created from an Arc<Task>, calling wake() on it will cause a copy of the Arc to be sent onto the task channel. Our executor then needs to pick up the task and poll it. Let's implement that:
    从Arc<Task>创建Waker 时,对其调用wake()将导致将Arc的副本发送到任务通道。然后,我们的executor 需要pick up 任务并进行poll。让我们实现这一点:

    impl Executor {
        fn run(&self) {
            while let Ok(task) = self.ready_queue.recv() {
                // Take the future, and if it has not yet completed (is still Some),
                // poll it in an attempt to complete it.
                //获取future,如果它还没有完成(仍然是Some),那么就对它进行poll,试图完成它。
                let mut future_slot = task.future.lock().unwrap();
                if let Some(mut future) = future_slot.take() {
                    // Create a `LocalWaker` from the task itself
                   //从任务本身创建一个“LocalWaker”
                    let waker = waker_ref(&task);
                    let context = &mut Context::from_waker(&*waker);
                    // `BoxFuture<T>` is a type alias for
                    // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
                    // We can get a `Pin<&mut dyn Future + Send + 'static>`
                    // from it by calling the `Pin::as_mut` method.
                    // `BoxFuture<T>` 是 `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.的类型别名
                    // 我们可以得到一个 `Pin<&mut dyn Future + Send + 'static>` 通过调用 `Pin::as_mut` 方法.
                    if let Poll::Pending = future.as_mut().poll(context) {
                        // We're not done processing the future, so put it
                        // back in its task to be run again in the future.
                        //我们还没有处理好future,所以把它回到它的任务中,以便将来再次运行。
                        *future_slot = Some(future);
                    }
                }
            }
        }
    }
    

    Congratulations! We now have a working futures executor. We can even use it to run async/.await code and custom futures, such as the TimerFuture we wrote earlier:
    祝贺 !我们现在有一个工作的futures executor。我们甚至可以使用它来运行async/.await代码和自定义futures,例如我们之前编写的TimerFuture:

    fn main() {
        let (executor, spawner) = new_executor_and_spawner();
    
        // Spawn a task to print before and after waiting on a timer.
        spawner.spawn(async {
            println!("howdy!");
            // Wait for our timer future to complete after two seconds.
            TimerFuture::new(Duration::new(2, 0)).await;
            println!("done!");
        });
    
        // Drop the spawner so that our executor knows it is finished and won't
        // receive more incoming tasks to run.
        drop(spawner);
    
        // Run the executor until the task queue is empty.
        // This will print "howdy!", pause, and then print "done!".
        executor.run();
    }
    
    

    Executors and System IO

    In the previous section on The Future Trait, we discussed this example of a future that performed an asynchronous read on a socket:
    在关于 [The Future Trait]的上一节中,我们讨论了在 socket 上执行异步读取的future 示例:

    pub struct SocketRead<'a> {
        socket: &'a Socket,
    }
    
    impl SimpleFuture for SocketRead<'_> {
        type Output = Vec<u8>;
    
        fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
            if self.socket.has_data_to_read() {
                // The socket has data-- read it into a buffer and return it.
                Poll::Ready(self.socket.read_buf())
            } else {
                // The socket does not yet have data.
                //
                // Arrange for `wake` to be called once data is available.
                // When data becomes available, `wake` will be called, and the
                // user of this `Future` will know to call `poll` again and
                // receive data.
                self.socket.set_readable_callback(wake);
                Poll::Pending
            }
        }
    }
    

    This future will read available data on a socket, and if no data is available, it will yield to the executor, requesting that its task be awoken when the socket becomes readable again. However, it's not clear from this example how the Socket type is implemented, and in particular it isn't obvious how the set_readable_callback function works. How can we arrange for wake() to be called once the socket becomes readable? One option would be to have a thread that continually checks whether socket is readable, calling wake() when appropriate. However, this would be quite inefficient, requiring a separate thread for each blocked IO future. This would greatly reduce the efficiency of our async code.
    这个future 将读取socket上的可用数据,如果没有可用数据,它将让步于executor,请求在socket 再次变为可读时唤醒其任务。但是,从这个例子中不清楚Socket类型是如何实现的,特别是set_readable_callback函数是如何工作的。如何安排在 Socket 可读之后调用wake()?一种选择是让一个线程持续检查socket是否可读,在适当的时候调用wake()。然而,这将是非常低效的,需要为每个被阻塞的IO未来单独的线程。这将大大降低异步代码的效率。

    In practice, this problem is solved through integration with an IO-aware system blocking primitive, such as epoll on Linux, kqueue on FreeBSD and Mac OS, IOCP on Windows, and ports on Fuchsia (all of which are exposed through the cross-platform Rust crate mio). These primitives all allow a thread to block on multiple asynchronous IO events, returning once one of the events completes. In practice, these APIs usually look something like this:

    实际上,这个问题是通过与一个IO感知的系统阻塞原语(如Linux上的epoll、FreeBSD和Mac OS上的kqueue、Windows上的IOCP和Fuchsia上的端口(所有端口都通过跨平台Rust crate mio公开)集成来解决的。这些 primitives 都允许线程阻塞多个异步IO事件,并在其中一个事件完成后返回。实际上,这些api通常如下所示:

    struct IoBlocker {
        ...
    }
    
    struct Event {
        // An ID uniquely identifying the event that occurred and was listened for.
        id: usize,
    
        // A set of signals to wait for, or which occurred.
        signals: Signals,
    }
    
    impl IoBlocker {
        /// Create a new collection of asynchronous IO events to block on.
        fn new() -> Self { ... }
    
        /// Express an interest in a particular IO event.
        fn add_io_event_interest(
            &self,
    
            /// The object on which the event will occur
            io_object: &IoObject,
    
            /// A set of signals that may appear on the `io_object` for
            /// which an event should be triggered, paired with
            /// an ID to give to events that result from this interest.
            event: Event,
        ) { ... }
    
        /// Block until one of the events occurs.
        fn block(&self) -> Event { ... }
    }
    
    let mut io_blocker = IoBlocker::new();
    io_blocker.add_io_event_interest(
        &socket_1,
        Event { id: 1, signals: READABLE },
    );
    io_blocker.add_io_event_interest(
        &socket_2,
        Event { id: 2, signals: READABLE | WRITABLE },
    );
    let event = io_blocker.block();
    
    // prints e.g. "Socket 1 is now READABLE" if socket one became readable.
    println!("Socket {:?} is now {:?}", event.id, event.signals);
    

    Futures executors can use these primitives to provide asynchronous IO objects such as sockets that can configure callbacks to be run when a particular IO event occurs. In the case of our SocketRead example above, the Socket::set_readable_callback function might look like the following pseudocode:

    Futures executors 可以使用这些 primitives 来提供异步IO对象,例如sockets ,这些sockets 可以配置回调,以便在特定IO事件发生时运行。在上面的SocketRead示例中,Socket::set_readable_callback 函数可能看起来像以下伪代码:

    impl Socket {
        fn set_readable_callback(&self, waker: Waker) {
            // `local_executor` is a reference to the local executor.
            // this could be provided at creation of the socket, but in practice
            // many executor implementations pass it down through thread local
            // storage for convenience.
           //`local_executor是对本地executor的引用,这可以在创建 socket 时提供,
           //但实际上许多executor实现为了方便起见,会将其传递到线程本地存储。
            let local_executor = self.local_executor;
    
            // Unique ID for this IO object.
            let id = self.id;
    
            // Store the local waker in the executor's map so that it can be called
            // once the IO event arrives.
             //将本地 waker 存储在executor 的映射中,以便在IO事件到达时调用它。
            local_executor.event_map.insert(id, waker);
            local_executor.add_io_event_interest(
                &self.socket_file_descriptor,
                Event { id, signals: READABLE },
            );
        }
    }
    

    We can now have just one executor thread which can receive and dispatch any IO event to the appropriate Waker, which will wake up the corresponding task, allowing the executor to drive more tasks to completion before returning to check for more IO events (and the cycle continues...).

    我们现在只能有一个 executor 线程,它可以接收任何IO事件并将其发送到相应的Waker,该Waker将唤醒相应的任务,允许 executor 在返回以检查更多IO事件(循环继续…)之前驱动更多任务完成。

    复制粘贴的翻译不易啊,给个小红心吧。表示路过一下也不错哦。

    相关文章

      网友评论

          本文标题:[Rust-async-book]--2--Under the

          本文链接:https://www.haomeiwen.com/subject/rzhvkhtx.html