虽然在终端日志的上报程序中,我接触到并第一次使用了事件循环(使用的是libev的python实现pyev,现在该模块改名为pyev-static):
# worker.py
class Worker:
...
def sib_hd(self, watcher, revents):
logging.......
# 使用日志记录
watcher.loop.stop(pyev.EVBREAK_ALL)
# 主动结束循环
def run(self):
self.loop = pyev.default_loop(debug=self.config.get('debug',False))
# 创建事件循环
sig_event = self.loop.signal(signal.SIGINT | signal.SIGTERM, self.sig_hd)
# 如果进程接收到了SIGINT SIGTERM则执行sig_hd方法(也就是‘优雅’地退出循环)
sig_event.start()
self.Server = Server(self.loop, self.config)
# Server是解析类,给传入的self.loop添加了 pyev的IO事件
self.loop.start()
# 启动事件循环
Server类中给loop添加的事件与对应行为的代码为:
class Server(EventEmitter):
# Server继承pyee的EventEmitter类
def __init__(self, loop, config):
...
self.loop = loop
self.config = config
self.udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp.bind(self.config.get('address'))
# 使用socket创建udp端口监听
self.udp_io = self.loop.io(self.udp, pyev.EV_READ, self.udp_hd)
self.udp_io.start()
# 给事件循环添加IO事件,当self.udp 触发 pyev.EV_READ事件, 则执行self.udp_hd方法
libev除了上文中出现的IO事件、SIGNAL系统信号事件外,还支持Child Watcher、Filestat Watcher(监听文件系统,个人猜想应该是利用inotify)、timer Watcher、periodic Watcher等
这个self.udp_hd 就是对接收包数据进行解码的方法。那么处理完了之后,也可以在事件循环中添加存储的方法,比如存储到本地数据库。这样就可以让某个事件的IO操作不再阻塞整个代码的运行。
在这个收集程序中,我并未考虑存储到数据库的问题。因为一方面这样会耦合日志收集和日志分析。而是使用logging模块,将解码后的日志以日志的形式存储在本地。再通过flume将日志收集至HDFS或Kafka.之后再进行处理。
这样便可提升代码的运行效率,并且相对比直接使用多线程编程,代码会更加简洁,也不需要考虑数据锁与创建线程开销的问题。但是,公司里的C语言工程师指出这样也可能会导致偶尔丢失数据的情况。不采用先缓存再处理,而是一边取一遍处理,若UDP数据过多,则网卡会主动抛弃udp包(网卡队列被吃满)
并且在python2下,实现事件循环,是需要第三方库的支持的,比如gevent,pyev,pyuv等。并且参与具体业务实现的库必须也支持事件循环的实现。比如我们在网上搜索gevent的代码示例,IO操作都用gevent.sleep(x)代替代。如果单纯写time.sleep(x)则会报错。这也从侧面暴露了这个问题。
网友评论