美文网首页
事件循环学习记录

事件循环学习记录

作者: wangrui927 | 来源:发表于2018-03-19 14:42 被阅读0次

虽然在终端日志的上报程序中,我接触到并第一次使用了事件循环(使用的是libev的python实现pyev,现在该模块改名为pyev-static):

# worker.py
class Worker:
    ...
    def sib_hd(self, watcher, revents):
        logging.......   
        # 使用日志记录
        watcher.loop.stop(pyev.EVBREAK_ALL)
        # 主动结束循环

    def run(self):
        self.loop = pyev.default_loop(debug=self.config.get('debug',False))
        # 创建事件循环
        sig_event = self.loop.signal(signal.SIGINT | signal.SIGTERM, self.sig_hd)
        # 如果进程接收到了SIGINT SIGTERM则执行sig_hd方法(也就是‘优雅’地退出循环)
        
        sig_event.start()
        
        self.Server = Server(self.loop, self.config)
        # Server是解析类,给传入的self.loop添加了 pyev的IO事件
        self.loop.start()
        # 启动事件循环

Server类中给loop添加的事件与对应行为的代码为:

class Server(EventEmitter):
# Server继承pyee的EventEmitter类
    def __init__(self, loop, config):
        ...
        self.loop = loop
        self.config = config
        self.udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.udp.bind(self.config.get('address'))
        # 使用socket创建udp端口监听
        self.udp_io = self.loop.io(self.udp, pyev.EV_READ, self.udp_hd)
        self.udp_io.start()
        # 给事件循环添加IO事件,当self.udp 触发 pyev.EV_READ事件, 则执行self.udp_hd方法  

libev除了上文中出现的IO事件、SIGNAL系统信号事件外,还支持Child Watcher、Filestat Watcher(监听文件系统,个人猜想应该是利用inotify)、timer Watcher、periodic Watcher等
这个self.udp_hd 就是对接收包数据进行解码的方法。那么处理完了之后,也可以在事件循环中添加存储的方法,比如存储到本地数据库。这样就可以让某个事件的IO操作不再阻塞整个代码的运行。
在这个收集程序中,我并未考虑存储到数据库的问题。因为一方面这样会耦合日志收集和日志分析。而是使用logging模块,将解码后的日志以日志的形式存储在本地。再通过flume将日志收集至HDFS或Kafka.之后再进行处理。

这样便可提升代码的运行效率,并且相对比直接使用多线程编程,代码会更加简洁,也不需要考虑数据锁与创建线程开销的问题。但是,公司里的C语言工程师指出这样也可能会导致偶尔丢失数据的情况。不采用先缓存再处理,而是一边取一遍处理,若UDP数据过多,则网卡会主动抛弃udp包(网卡队列被吃满)
并且在python2下,实现事件循环,是需要第三方库的支持的,比如gevent,pyev,pyuv等。并且参与具体业务实现的库必须也支持事件循环的实现。比如我们在网上搜索gevent的代码示例,IO操作都用gevent.sleep(x)代替代。如果单纯写time.sleep(x)则会报错。这也从侧面暴露了这个问题。

相关文章

网友评论

      本文标题:事件循环学习记录

      本文链接:https://www.haomeiwen.com/subject/zxfpqftx.html