为什么需要event loop
因为: 需要用event loop来实现异步IO(回调函数的方式)。
异步IO的好处在于可以单线程执行程序却不会被IO阻塞,而单线程使得我们不用担心线程安全问题 -- 每个函数在执行过程中不会被其他逻辑(协程)中断。
任何异步IO(基于回调函数)方式,无论是JavaScript, python的asyncio, ruby的event machine, 都是在执行一个单线程的event loop,其中JS隐式地执行event loop。
区分同步与异步:
# python:
# synchronous:
line = input() # blocked until input a line
print(line)
- 同步IO(== 阻塞) 步骤:
- 用户线程 主动发起系统调用
- 比如read(int fd, void *buf, size_t count) 读IO
- OS将此线程被挂起到此进程的等待队列
- IO读写完毕时, OS将对应被挂起的线程唤醒(放入进程的就绪队列)
- 比如read(int fd, void buf, size_t count) 完毕, OS将数据放入buf, 唤醒线程
- 用户线程 主动发起系统调用
// js:
// asynchronous IO:
axios.get("api.github.com")
.then(data => handle(data)) # by callback
immediatelyDoSomething() # non-blocking
- 异步IO(== 非阻塞 == 轮询) 步骤:
- 用户线程发起一次IO查询:
- 用户线程 发起系统调用, 检查IO读写是否就绪
- 例如 select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *errorfds, struct timeval *timeout)
- 无论读写是否就绪, 系统调用都立即返回
- 例如 select()中, OS将读写就绪的*fd_readable, *fd_writable替换 *readfds, *writefds
- 用户线程 发起系统调用, 检查IO读写是否就绪
- 如果还有读写没就绪,用户线程需要重复步骤1查询
- 用户线程发起一次IO查询:
Event Loop 与 用户逻辑
无论是显式地还是隐式地,整个程序(线程)的执行顺序都是:
- 创建event loop对象
- 执行用户逻辑(或者把整个用户逻辑当作一个回调函数schedule到event loop对象的queue中)
- 其中用户代码通过setTimeout(), httpRequest() 等API, 往event loop对象的queue中存入callback, 整个过程是非阻塞的
- 执行event loop 死循环: 每次迭代中,
- 每个循环遍历并执行所有到期的callback
- 用优先队列存储timeout queue
- 系统调用查询并执行就绪的IO
- 可以用select, epoll, kqueue等系统调用,本质都是一次非阻塞的IO查询
- 每个循环遍历并执行所有到期的callback
这里所有IO共享一个IO queue, 因为unix-like(linux, macOS)操作系统中,所有IO都映射成了文件IO,IO读写的系统调用都一样。
实现 event loop
我们只要处理两种不同类型回调:
- timeout回调
- 用优先队列存储所有timeout回调,每个循环只遍历已经到期的callback
- IO读写回调
- 每个循环调用select查询一次IO
另外, 为了避免空循环消耗cpu, 每个循环 sleep 50ms.
以下使用python实现一个简单的event loop, 支持timeout/interval, stdio, httpGet:
import queue, time, select, os, socket
class EventLoop:
IDLE_SECONDS = 0.05
def __init__(self):
self.time_queue = queue.PriorityQueue()
self.io_read_queue = {} # fd -> callback
self.io_write_queue = {}
self.io_exception_queue = {}
def run(self):
while True:
self.handle_time()
self.poll_io()
time.sleep(self.IDLE_SECONDS) # save cpu usage
def handle_time(self):
now = time.time()
while not self.time_queue.empty():
timeout, callback = self.time_queue.get()
if now < timeout:
self.time_queue.put((timeout, callback))
break
callback()
def poll_io(self):
readable_fds, writable_fds, ex_fds = select.select(self.io_read_queue.keys(),
self.io_write_queue.keys(),
self.io_exception_queue.keys(),0)
for fd in readable_fds:
self.io_read_queue[fd](fd) # fd as callback argument
for fd in writable_fds:
self.io_write_queue[fd](fd)
def set_timeout(seconds, callback):
global_loop.time_queue.put((time.time()+seconds, callback))
def set_interval(seconds, callbacks):
next_run = [time.time()+seconds] # use array because python doesn't support closure..
def run_and_reschedule():
next_run[0] += seconds
global_loop.time_queue.put((next_run[0], run_and_reschedule))
callbacks()
global_loop.time_queue.put((next_run[0], run_and_reschedule))
def getlines(callback, fd=0): # fd 0 == stdin
def read(fd):
line = os.read(fd, 10000).decode()
callback(line)
global_loop.io_read_queue[fd] = read
def getHttp(host, port, path, callback):
s = socket.socket()
s.connect((host, port))
s.send(f"GET {path} HTTP/1.1\r\nHost:{host}\r\nUser-Agent: Mozilla/5.0\r\nConnection: close\r\n\r\n".encode())
buffer = []
def read_exhaust(fd):
part = os.read(fd, 4000).decode() # 4k buffer
buffer.append(part)
if not part or part[-4:] == "\r\n\r\n":
del global_loop.io_read_queue[s.fileno()]
s.close()
callback("".join(buffer))
global_loop.io_read_queue[s.fileno()] = read_exhaust
global_loop = EventLoop()
def main_test():
set_timeout(30, lambda : exit())
getHttp("baidu.com", 80, "/", lambda tcpdata: print(tcpdata))
getlines(lambda line: print(f"stdin: {line}"))
set_timeout(5, lambda : print("====after 5 seconds"))
set_timeout(2, lambda : print("====after 2 seconds"))
cnt = [1]
set_interval(1, lambda : print(f"----interval {cnt[-1]}") or cnt.append(cnt[-1]+1))
main_test()
global_loop.run()
网友评论