美文网首页程序员
理解与实现(by Python)JS event loop

理解与实现(by Python)JS event loop

作者: davidhuangdw | 来源:发表于2018-12-30 01:43 被阅读15次

    为什么需要event loop

    因为: 需要用event loop来实现异步IO(回调函数的方式)。
    异步IO的好处在于可以单线程执行程序却不会被IO阻塞,而单线程使得我们不用担心线程安全问题 -- 每个函数在执行过程中不会被其他逻辑(协程)中断。
    任何异步IO(基于回调函数)方式,无论是JavaScript, python的asyncio, ruby的event machine, 都是在执行一个单线程的event loop,其中JS隐式地执行event loop。

    区分同步与异步:

    # python:
    # synchronous:
    line = input()      # blocked until input a line
    print(line)
    
    • 同步IO(== 阻塞) 步骤:
      1. 用户线程 主动发起系统调用
        • 比如read(int fd, void *buf, size_t count) 读IO
      2. OS将此线程被挂起到此进程的等待队列
      3. IO读写完毕时, OS将对应被挂起的线程唤醒(放入进程的就绪队列)
        • 比如read(int fd, void buf, size_t count) 完毕, OS将数据放入buf, 唤醒线程
    // js:
    // asynchronous IO:
    axios.get("api.github.com")
         .then(data => handle(data))   # by callback
    immediatelyDoSomething()  # non-blocking
    
    • 异步IO(== 非阻塞 == 轮询) 步骤:
      1. 用户线程发起一次IO查询:
        1. 用户线程 发起系统调用, 检查IO读写是否就绪
          • 例如 select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *errorfds, struct timeval *timeout)
        2. 无论读写是否就绪, 系统调用都立即返回
          • 例如 select()中, OS将读写就绪的*fd_readable, *fd_writable替换 *readfds, *writefds
      2. 如果还有读写没就绪,用户线程需要重复步骤1查询

    Event Loop 与 用户逻辑

    无论是显式地还是隐式地,整个程序(线程)的执行顺序都是:

    1. 创建event loop对象
    2. 执行用户逻辑(或者把整个用户逻辑当作一个回调函数schedule到event loop对象的queue中)
      • 其中用户代码通过setTimeout(), httpRequest() 等API, 往event loop对象的queue中存入callback, 整个过程是非阻塞的
    3. 执行event loop 死循环: 每次迭代中,
      1. 每个循环遍历并执行所有到期的callback
        • 用优先队列存储timeout queue
      2. 系统调用查询并执行就绪的IO
        • 可以用select, epoll, kqueue等系统调用,本质都是一次非阻塞的IO查询

    这里所有IO共享一个IO queue, 因为unix-like(linux, macOS)操作系统中,所有IO都映射成了文件IO,IO读写的系统调用都一样。

    实现 event loop

    我们只要处理两种不同类型回调:

    1. timeout回调
      • 用优先队列存储所有timeout回调,每个循环只遍历已经到期的callback
    2. IO读写回调
      • 每个循环调用select查询一次IO

    另外, 为了避免空循环消耗cpu, 每个循环 sleep 50ms.

    以下使用python实现一个简单的event loop, 支持timeout/interval, stdio, httpGet:

    import queue, time, select, os, socket
    
    
    class EventLoop:
        IDLE_SECONDS = 0.05
    
        def __init__(self):
            self.time_queue = queue.PriorityQueue()
            self.io_read_queue = {}           # fd -> callback
            self.io_write_queue = {}
            self.io_exception_queue = {}
    
        def run(self):
            while True:
                self.handle_time()
                self.poll_io()
                time.sleep(self.IDLE_SECONDS)        # save cpu usage
    
        def handle_time(self):
            now = time.time()
            while not self.time_queue.empty():
                timeout, callback = self.time_queue.get()
                if now < timeout:
                    self.time_queue.put((timeout, callback))
                    break
                callback()
    
        def poll_io(self):
            readable_fds, writable_fds, ex_fds = select.select(self.io_read_queue.keys(),
                                                               self.io_write_queue.keys(),
                                                               self.io_exception_queue.keys(),0)
            for fd in readable_fds:
                self.io_read_queue[fd](fd)      # fd as callback argument
            for fd in writable_fds:
                self.io_write_queue[fd](fd)
    
    
    def set_timeout(seconds, callback):
        global_loop.time_queue.put((time.time()+seconds, callback))
    
    def set_interval(seconds, callbacks):
        next_run = [time.time()+seconds]      # use array because python doesn't support closure..
        def run_and_reschedule():
            next_run[0] += seconds
            global_loop.time_queue.put((next_run[0], run_and_reschedule))
            callbacks()
        global_loop.time_queue.put((next_run[0], run_and_reschedule))
    
    def getlines(callback, fd=0):                   # fd 0 == stdin
        def read(fd):
            line = os.read(fd, 10000).decode()
            callback(line)
        global_loop.io_read_queue[fd] = read
    
    def getHttp(host, port, path, callback):
        s = socket.socket()
        s.connect((host, port))
        s.send(f"GET {path} HTTP/1.1\r\nHost:{host}\r\nUser-Agent: Mozilla/5.0\r\nConnection: close\r\n\r\n".encode())
        buffer = []
        def read_exhaust(fd):
            part = os.read(fd, 4000).decode()   # 4k buffer
            buffer.append(part)
            if not part or part[-4:] == "\r\n\r\n":
                del global_loop.io_read_queue[s.fileno()]
                s.close()
                callback("".join(buffer))
        global_loop.io_read_queue[s.fileno()] = read_exhaust
    
    
    global_loop = EventLoop()
    
    def main_test():
        set_timeout(30, lambda : exit())
        getHttp("baidu.com", 80, "/", lambda tcpdata: print(tcpdata))
        getlines(lambda line: print(f"stdin: {line}"))
        set_timeout(5, lambda : print("====after 5 seconds"))
        set_timeout(2, lambda : print("====after 2 seconds"))
        cnt = [1]
        set_interval(1, lambda : print(f"----interval {cnt[-1]}") or cnt.append(cnt[-1]+1))
    
    main_test()
    global_loop.run()
    

    相关文章

      网友评论

        本文标题:理解与实现(by Python)JS event loop

        本文链接:https://www.haomeiwen.com/subject/iommlqtx.html