美文网首页rust语言
Rust中轻量级I/O库——mio

Rust中轻量级I/O库——mio

作者: superLiS | 来源:发表于2018-07-19 18:54 被阅读63次

    关于mio

    mio是rust实现的一个轻量级的I/O库。其实现基本上就是对不同操作系统底层相关API的封装,抽象出统一的接口供上层使用。Linux下为epoll,Windows下为IOCP,OS X下为kqueue

    重要特性
    • 非阻塞TCP,UDP
    • I/O事件通知epoll,kqeue,IOCP实现
    • 运行时零分配
    • 平台可扩展
    用法

    其使用方法与Linux中epoll差不多,mio底层封装了epoll,使用步骤思路:

    1. 创建Poll
    2. 注册事件
    3. 事件循环等待与处理事件

    Poll定义如下:

    pub struct Poll {
        // Platform specific IO selector
        selector: sys::Selector,    #不同操作系统不同,在Linux下是epoll
    
        // Custom readiness queue
        readiness_queue: ReadinessQueue,
    
        // Use an atomic to first check if a full lock will be required. This is a
        // fast-path check for single threaded cases avoiding the extra syscall
        lock_state: AtomicUsize,
    
        // Sequences concurrent calls to `Poll::poll`
        lock: Mutex<()>,
    
        // Wakeup the next waiter
        condvar: Condvar,
    }
    

    mio提供可跨平台的sytem selector访问,不同平台如下表,都可调用相同的API。不同平台使用的API开销不尽相同。由于mio是基于readiness(就绪状态)的API,与Linux epoll相似,可以看到很多API在Linux上都可以一对一映射。相比之下,Windows IOCP是基于完成(completion-based)而非基于就绪的API,所以两者间会有较多桥接。 同时mio提供自身版本的TcpListener、TcpStream、UdpSocket,这些API封装了底层平台相关API,并设为非阻塞且实现Evented trait。

    OS Selector
    Linux epoll
    OS X, iOS kqueue
    Windows IOCP
    FreeBSD kqueue
    Android epoll

    mio实现的是一个单线程事件循环,并没有实现线程池及多线程事件循环,如果需要线程池及多线程事件循环等需要自己实现。


    mio代码示例

    代码示例1:
    //!  mio demo 1
    
    #[macro_use]
    extern crate log;
    extern crate simple_logger;
    extern crate mio;
    
    use mio::*;
    use mio::tcp::{TcpListener, TcpStream};
    use std::io::{Read,Write};
    
    fn main() {
    
        simple_logger::init().unwrap();
    
        // Setup some tokens to allow us to identify which event is for which socket.
        const SERVER: Token = Token(0);
        const CLIENT: Token = Token(1);
    
        let addr = "127.0.0.1:12345".parse().unwrap();
    
        // Setup the server socket
        let server = TcpListener::bind(&addr).unwrap();
    
        // Create a poll instance
        let poll = Poll::new().unwrap();
    
        // Start listening for incoming connections
        poll.register(&server, SERVER, Ready::readable(), PollOpt::edge()).unwrap();
    
        // Setup the client socket
        let sock = TcpStream::connect(&addr).unwrap();
    
        // Register the socket
        poll.register(&sock, CLIENT, Ready::readable(), PollOpt::edge()).unwrap();
    
    
        // Create storage for events
        let mut events = Events::with_capacity(1024);
    
        loop {
            poll.poll(&mut events, None).unwrap();
    
            for event in events.iter() {
                match event.token() {
                    SERVER => {
                        // Accept and drop the socket immediately, this will close
                        // the socket and notify the client of the EOF.
                        let (stream,addr) = server.accept().unwrap();
                        info!("Listener accept {:?}",addr);
                    },
                    CLIENT => {
                        // The server just shuts down the socket, let's just exit
                        // from our event loop.
                        info!("client response.");
                        return;
                    },
                    _ => unreachable!(),
                }
            }
        }
    }
    
    代码示例2:
    //! mio demo 2
    #[macro_use]
    extern crate log;
    extern crate simple_logger;
    extern crate mio;
    
    
    use mio::*;
    use mio::timer::{Timeout};
    use mio::deprecated::{EventLoop, Handler, Sender, EventLoopBuilder};
    use std::thread;
    use std::time::Duration;
    
    fn main() {
        simple_logger::init().unwrap();
    
        let mut event_loop=EventLoop::new().unwrap();
        let channel_sender=event_loop.channel();
    
        thread::spawn(move ||{
            channel_sender.send(IoMessage::Notify);
            thread::sleep_ms(5*1000);
            channel_sender.send(IoMessage::End);
        });
    
        let timeout = event_loop.timeout(Token(123), Duration::from_millis(3000)).unwrap();
    
        let mut handler=MioHandler::new();
        let _ = event_loop.run(&mut handler).unwrap();
    }
    
    
    pub enum IoMessage{
        Notify,
        End,
    }
    
    pub struct MioHandler{
    }
    
    impl MioHandler{
        pub fn new()->Self{
            MioHandler{}
        }
    }
    
    impl Handler for MioHandler {
        type Timeout = Token;
        type Message = IoMessage;
    
        /// Invoked when the socket represented by `token` is ready to be operated on.
        fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: Ready) {
        }
    
        /// Invoked when a message has been received via the event loop's channel.
        fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) {
            match msg {
                IoMessage::Notify=>info!("channel notify"),
                IoMessage::End=>{
                    info!("shutdown eventloop.");
                    event_loop.shutdown();
                }
            }
        }
    
        /// Invoked when a timeout has completed.
        fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Self::Timeout) {
            match timeout{
                Token(123)=>info!("time out."),
                Token(_)=>{},
            }
        }
    
        /// Invoked when `EventLoop` has been interrupted by a signal interrupt.
        fn interrupted(&mut self, event_loop: &mut EventLoop<Self>) {
        }
    
        /// Invoked at the end of an event loop tick.
        fn tick(&mut self, event_loop: &mut EventLoop<Self>) {
        }
    }
    
    

    这个示例演示了超时及channel,围绕EventLoop编程,其实与上一个例子没有什么不同,只是EventLoop对Poll做了封装。EventLoop定义如下:

    // Single threaded IO event loop.
    pub struct EventLoop<H: Handler> {
        run: bool,
        poll: Poll,     #可以看到EventLoop内部Poll字段
        events: Events,
        timer: Timer<H::Timeout>,
        notify_tx: channel::SyncSender<H::Message>,
        notify_rx: channel::Receiver<H::Message>,
        config: Config,
    }
    

    源码简析

    对涉及操作系统部分代码,以Linux操作系统为例。在Linux操作系统中,mio封装了epoll。后面会给出相应的代码。

    结合前面的代码示例给出相应的关键代码如下:

    // Single threaded IO event loop.
    pub struct EventLoop<H: Handler> {
        run: bool,
        poll: Poll,
        events: Events,
        timer: Timer<H::Timeout>,
        notify_tx: channel::SyncSender<H::Message>,
        notify_rx: channel::Receiver<H::Message>,
        config: Config,
    }
    
    pub trait Handler: Sized {
        type Timeout;
        type Message;
    
        /// Invoked when the socket represented by `token` is ready to be operated
        /// on. `events` indicates the specific operations that are
        /// ready to be performed.
        ///
        /// For example, when a TCP socket is ready to be read from, `events` will
        /// have `readable` set. When the socket is ready to be written to,
        /// `events` will have `writable` set.
        ///
        /// This function will only be invoked a single time per socket per event
        /// loop tick.
        fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: Ready) {
        }
    
        /// Invoked when a message has been received via the event loop's channel.
        fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) {
        }
    
        /// Invoked when a timeout has completed.
        fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Self::Timeout) {
        }
    
        /// Invoked when `EventLoop` has been interrupted by a signal interrupt.
        fn interrupted(&mut self, event_loop: &mut EventLoop<Self>) {
        }
    
        /// Invoked at the end of an event loop tick.
        fn tick(&mut self, event_loop: &mut EventLoop<Self>) {
        }
    }
    
    impl<H: Handler> EventLoop<H> {
    
        /// Constructs a new `EventLoop` using the default configuration values.
        /// The `EventLoop` will not be running.
        pub fn new() -> io::Result<EventLoop<H>> {
            EventLoop::configured(Config::default())
        }
    
        fn configured(config: Config) -> io::Result<EventLoop<H>> {
            // Create the IO poller
            let poll = Poll::new()?;
    
            let timer = timer::Builder::default()
                .tick_duration(config.timer_tick)
                .num_slots(config.timer_wheel_size)
                .capacity(config.timer_capacity)
                .build();
    
            // Create cross thread notification queue
            let (tx, rx) = channel::sync_channel(config.notify_capacity);  //这里创建的是同步管道,可配置同步管道内部的buffer queue bound size.
    
            // Register the notification wakeup FD with the IO poller
            poll.register(&rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())?;
            poll.register(&timer, TIMER, Ready::readable(), PollOpt::edge())?;
    
            Ok(EventLoop {
                run: true,
                poll: poll,
                timer: timer,
                notify_tx: tx,
                notify_rx: rx,
                config: config,
                events: Events::with_capacity(1024),
            })
        }
    
        /// Returns a sender that allows sending messages to the event loop in a
        /// thread-safe way, waking up the event loop if needed.
    
        /// Each [EventLoop](#) contains a lock-free queue with a pre-allocated
        /// buffer size. The size can be changed by modifying
        /// [EventLoopConfig.notify_capacity](struct.EventLoopConfig.html#method.notify_capacity).
        /// When a message is sent to the EventLoop, it is first pushed on to the
        /// queue. Then, if the EventLoop is currently running, an atomic flag is
        /// set to indicate that the next loop iteration should be started without
        /// waiting.
        ///
        /// If the loop is blocked waiting for IO events, then it is woken up. The
        /// strategy for waking up the event loop is platform dependent. For
        /// example, on a modern Linux OS, eventfd is used. On older OSes, a pipe
        /// is used.
        ///
        /// The strategy of setting an atomic flag if the event loop is not already
        /// sleeping allows avoiding an expensive wakeup operation if at all possible.
        pub fn channel(&self) -> Sender<H::Message> {
            Sender::new(self.notify_tx.clone())
        }
    
        /// Schedules a timeout after the requested time interval. When the
        /// duration has been reached,
        /// [Handler::timeout](trait.Handler.html#method.timeout) will be invoked
        /// passing in the supplied token.
        ///
        /// Returns a handle to the timeout that can be used to cancel the timeout
        /// using [#clear_timeout](#method.clear_timeout).
        pub fn timeout(&mut self, token: H::Timeout, delay: Duration) -> timer::Result<Timeout> {
            self.timer.set_timeout(delay, token)
        }
    
        /// If the supplied timeout has not been triggered, cancel it such that it
        /// will not be triggered in the future.
        pub fn clear_timeout(&mut self, timeout: &Timeout) -> bool {
            self.timer.cancel_timeout(&timeout).is_some()
        }
    
        /// Tells the event loop to exit after it is done handling all events in the current iteration.
        pub fn shutdown(&mut self) {
            self.run = false;
        }
    
        /// Indicates whether the event loop is currently running. If it's not it has either
        /// stopped or is scheduled to stop on the next tick.
        pub fn is_running(&self) -> bool {
            self.run
        }
    
        /// Registers an IO handle with the event loop.
        pub fn register<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>
            where E: Evented
        {
            self.poll.register(io, token, interest, opt)
        }
    
        /// Re-Registers an IO handle with the event loop.
        pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>
            where E: Evented
        {
            self.poll.reregister(io, token, interest, opt)
        }
    
        /// Keep spinning the event loop indefinitely, and notify the handler whenever
        /// any of the registered handles are ready.
        pub fn run(&mut self, handler: &mut H) -> io::Result<()> {
            self.run = true;
    
            while self.run {
                // Execute ticks as long as the event loop is running
                self.run_once(handler, None)?;
            }
    
            Ok(())
        }
    
        /// Deregisters an IO handle with the event loop.
        ///
        /// Both kqueue and epoll will automatically clear any pending events when closing a
        /// file descriptor (socket). In that case, this method does not need to be called
        /// prior to dropping a connection from the slab.
        ///
        /// Warning: kqueue effectively builds in deregister when using edge-triggered mode with
        /// oneshot. Calling `deregister()` on the socket will cause a TcpStream error.
        pub fn deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented {
            self.poll.deregister(io)
        }
    
        /// Spin the event loop once, with a given timeout (forever if `None`),
        /// and notify the handler if any of the registered handles become ready
        /// during that time.
        pub fn run_once(&mut self, handler: &mut H, timeout: Option<Duration>) -> io::Result<()> {
            trace!("event loop tick");
    
            // Check the registered IO handles for any new events. Each poll
            // is for one second, so a shutdown request can last as long as
            // one second before it takes effect.
            let events = match self.io_poll(timeout) {
                Ok(e) => e,
                Err(err) => {
                    if err.kind() == io::ErrorKind::Interrupted {
                        handler.interrupted(self);
                        0
                    } else {
                        return Err(err);
                    }
                }
            };
    
            self.io_process(handler, events);
            handler.tick(self);
            Ok(())
        }
    
        #[inline]
        fn io_poll(&mut self, timeout: Option<Duration>) -> io::Result<usize> {
            self.poll.poll(&mut self.events, timeout)
        }
    
        // Process IO events that have been previously polled
        fn io_process(&mut self, handler: &mut H, cnt: usize) {
            let mut i = 0;
    
            trace!("io_process(..); cnt={}; len={}", cnt, self.events.len());
    
            // Iterate over the notifications. Each event provides the token
            // it was registered with (which usually represents, at least, the
            // handle that the event is about) as well as information about
            // what kind of event occurred (readable, writable, signal, etc.)
            while i < cnt {
                let evt = self.events.get(i).unwrap();
    
                trace!("event={:?}; idx={:?}", evt, i);
    
                match evt.token() {
                    NOTIFY => self.notify(handler),
                    TIMER => self.timer_process(handler),
                    _ => self.io_event(handler, evt)
                }
    
                i += 1;
            }
        }
    
        fn io_event(&mut self, handler: &mut H, evt: Event) {
            handler.ready(self, evt.token(), evt.readiness());
        }
    
        fn notify(&mut self, handler: &mut H) {
            for _ in 0..self.config.messages_per_tick {
                match self.notify_rx.try_recv() {
                    Ok(msg) => handler.notify(self, msg),
                    _ => break,
                }
            }
    
            // Re-register
            let _ = self.poll.reregister(&self.notify_rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot());
        }
    
        fn timer_process(&mut self, handler: &mut H) {
            while let Some(t) = self.timer.poll() {
                handler.timeout(self, t);
            }
        }
    }
    
    pub struct Poll {
        // Platform specific IO selector
        selector: sys::Selector,
    
        // Custom readiness queue
        readiness_queue: ReadinessQueue,
    
        // Use an atomic to first check if a full lock will be required. This is a
        // fast-path check for single threaded cases avoiding the extra syscall
        lock_state: AtomicUsize,
    
        // Sequences concurrent calls to `Poll::poll`
        lock: Mutex<()>,
    
        // Wakeup the next waiter
        condvar: Condvar,
    }
    
    pub struct Selector {
        id: usize,
        epfd: RawFd,
    }
    
    impl Selector {
        pub fn new() -> io::Result<Selector> {
            let epfd = unsafe {
                // Emulate `epoll_create` by using `epoll_create1` if it's available
                // and otherwise falling back to `epoll_create` followed by a call to
                // set the CLOEXEC flag.
                dlsym!(fn epoll_create1(c_int) -> c_int);
    
                match epoll_create1.get() {
                    Some(epoll_create1_fn) => {
                        cvt(epoll_create1_fn(libc::EPOLL_CLOEXEC))?
                    }
                    None => {
                        let fd = cvt(libc::epoll_create(1024))?;
                        drop(set_cloexec(fd));
                        fd
                    }
                }
            };
    
            // offset by 1 to avoid choosing 0 as the id of a selector
            let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
    
            Ok(Selector {
                id: id,
                epfd: epfd,
            })
        }
    
        pub fn id(&self) -> usize {
            self.id
        }
    
        /// Wait for events from the OS
        pub fn select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool> {
            let timeout_ms = timeout
                .map(|to| cmp::min(millis(to), i32::MAX as u64) as i32)
                .unwrap_or(-1);
    
            // Wait for epoll events for at most timeout_ms milliseconds
            evts.clear();
            unsafe {
                let cnt = cvt(libc::epoll_wait(self.epfd,
                                               evts.events.as_mut_ptr(),
                                               evts.events.capacity() as i32,
                                               timeout_ms))?;
                let cnt = cnt as usize;
                evts.events.set_len(cnt);
    
                for i in 0..cnt {
                    if evts.events[i].u64 as usize == awakener.into() {
                        evts.events.remove(i);
                        return Ok(true);
                    }
                }
            }
    
            Ok(false)
        }
    
        /// Register event interests for the given IO handle with the OS
        pub fn register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
            let mut info = libc::epoll_event {
                events: ioevent_to_epoll(interests, opts),
                u64: usize::from(token) as u64
            };
    
            unsafe {
                cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, fd, &mut info))?;
                Ok(())
            }
        }
    
        /// Register event interests for the given IO handle with the OS
        pub fn reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
            let mut info = libc::epoll_event {
                events: ioevent_to_epoll(interests, opts),
                u64: usize::from(token) as u64
            };
    
            unsafe {
                cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_MOD, fd, &mut info))?;
                Ok(())
            }
        }
    
        /// Deregister event interests for the given IO handle with the OS
        pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
            // The &info argument should be ignored by the system,
            // but linux < 2.6.9 required it to be not null.
            // For compatibility, we provide a dummy EpollEvent.
            let mut info = libc::epoll_event {
                events: 0,
                u64: 0,
            };
    
            unsafe {
                cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_DEL, fd, &mut info))?;
                Ok(())
            }
        }
    }
    

    参考:

    【譯】Tokio 內部機制:從頭理解 Rust 非同步 I/O 框架

    使用mio开发web framework - base

    My Basic Understanding of mio and Asynchronous IO

    MIO for Rust

    mio-github

    相关文章

      网友评论

        本文标题:Rust中轻量级I/O库——mio

        本文链接:https://www.haomeiwen.com/subject/owgtmftx.html