美文网首页
vnpy源码阅读-3.EventEngine

vnpy源码阅读-3.EventEngine

作者: Leernh | 来源:发表于2021-11-29 10:56 被阅读0次
    from collections import defaultdict  # 字典默认类型
    from queue import Empty, Queue  # 队列
    from threading import Thread  # 多线程
    from time import sleep
    from typing import Any, Callable, List  # 类型提示
    

    先看第一个导入的模块,defaultdict类初始化时传入一个类型list, str, int, dict,当字典中找不到对应的key时返回传入的类型,看示例

    d = defaultdict(dict)
    d[1] = 'one'
    
    print(d[1])
    print(d[2])
    # return
    one
    {}
    

    导入的包了解了,接着看代码吧

    Evnet

    class Event:
    
        def __init__(self, type: str, data: Any = None):
            self.type: str = type
            self.data: Any = data
    

    定义一个Evnet的类,有type和data参数

    init

    def __init__(self, interval: int = 1):
            """
            默认情况下,计时器事件每1秒生成一次
            """
            self._interval = interval  # 时间间隔
            self._queue = Queue()  # FIFO队列,先进先出
            self._active = False    # 开关
            self._thread = Thread(target=self._run)  # run方法线程
            self._timer = Thread(target=self._run_timer)  #run_timer方法线程
            self._handlers = defaultdict(list)  # 处理列表 {'type': []}
            self._general_handlers = []  # 通用处理列表
    

    初始化的参数中,interval和timer用不到可以不管,其他的看注释能明白作用

    _run

        def _run(self) -> None:
            """
            从队列中获取事件,然后对其进行处理。
            Get event from queue and then process it.
            """
            while self._active:
                try:
                    event = self._queue.get(block=True, timeout=1)  # 从队列取出事件进行处理,先进先出
                    self._process(event)
                except Empty:
                    pass
    

    EventEngine启动后,会不停从队列_queue中get,当_queue不为空时就把队列中等待的任务取出交给_process方法去处理

    _process

        def _process(self, event: Event) -> None:
            """
            First ditribute event to those handlers registered listening
            to this type.
    
            Then distrubute event to those general handlers which listens
            to all types.
            """
            if event.type in self._handlers:
                [handler(event) for handler in self._handlers[event.type]]
    
            if self._general_handlers:
                [handler(event) for handler in self._general_handlers]
    

    根据event.type分配到不同的方法去处理

    _run_timer

        def _run_timer(self) -> None:
            """
            按间隔(秒)睡眠,然后生成计时器事件。
            Sleep by interval second(s) and then generate a timer event.
            """
            while self._active:
                sleep(self._interval)
                event = Event(EVENT_TIMER)
                self.put(event)
    

    可以暂时不用管,用不上

    register

        def register(self, type: str, handler: HandlerType) -> None:  # handler是一个方法
            """
            为特定事件类型注册新的处理程序函数。每一个事件类型只能注册一次函数。 
            Register a new handler function for a specific event type. Every
            function can only be registered once for each event type.
            """
            handler_list = self._handlers[type]
            if handler not in handler_list:
                handler_list.append(handler)
    

    参数里HandlerType的定义,callable是一个回调函数,可以看成func([event], None)

    HandlerType = Callable[[Event], None]
    

    当我们知道handler是一个方法,再看下订阅tick示例,就清晰多了。当我们注册一个关键字'tick',self._handlers中没有’tick'时,则返回一个[],接着就会把on_tick方法注册进去,此时self._handlers = {'tick': [on_tick]}。
    下面代码中event_engine.put(event)后,event被添加到_queue,并在_run方法被get到_process方法,最后找到self._handlers['tick']对应的回调函数,到此事件结束。

    def on_tick(event):
        # 处理tick数据
         pass
    
    if __name__ == '__main__':
        event_engine = EventEngine()
        event_engine.start()
        event_engine.register('tick', on_tick)
    
        tick_data = get_tick(symbol)  # {'open': 1, 'close':0.8}
        event = Event(type='tick', data=tick_data)
        event_engine.put(event) 
    

    register_general

        def register_general(self, handler: HandlerType) -> None:
            """
            Register a new handler function for all event types. Every
            function can only be registered once for each event type.
            """
            if handler not in self._general_handlers:
                self._general_handlers.append(handler)
    

    用法和register差不多,只是少了type,暂时没找到用处,一般都是用的register

    代码中还有start、put、unregister等方法,因为阅读难度不大就不拿出来了。

    总结

    EvnetEngine的用法大致就是:
    接收到事件(如订阅、点击、接收数据)-->推送到事件引擎-->引擎从队列中取出事件交给_process分配-->_process根据register注册的对应类型分配处理方法

    遗留问题

    1.callable 回调函数的用法
    2.register什么时候进行注册?
    3.register_general要怎么用?
    4.queue源码阅读
    5.thread源码阅读

    相关文章

      网友评论

          本文标题:vnpy源码阅读-3.EventEngine

          本文链接:https://www.haomeiwen.com/subject/ugicxrtx.html