美文网首页
watchdog源码分析

watchdog源码分析

作者: 落羽归尘 | 来源:发表于2019-09-21 12:00 被阅读0次

    简介

    python中有一个监控文件变化的库,watchdog。包括添加删除文件或目录、修改文件内容、重命名文件或目录等,每种都是一种事件,可自定义方法,用于当事件来临时的动作。

    简单用法

    import time
    from watchdog.observers import Observer
    from watchdog.events import FileSystemEventHandler
    
    # 自定义处理类
    class MyHandler(FileSystemEventHandler):
        def on_modified(self, event):
            print("文件被修改了 %s"%event.src_path)
    
    
    if __name__ == "__main__":
        path = "."
        event_handler = MyHandler()
        observer = Observer()
        observer.schedule(event_handler, path, recursive=True)
        observer.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()
    

    如上示例,当当前文件下有文件改变时,会触发on_modified方法。
    下面我们对背后的原理进行分析

    watchdog 流程分析

    对于以下分析,是基于Windows操作系统的!
    对于上述例子,我们自定义处理类是继承FileSystemEventHandler的,下面是FileSystemEventHandler的源码:

    class FileSystemEventHandler(object):
        """
        Base file system event handler that you can override methods from.
        """
    
        def dispatch(self, event):
            """Dispatches events to the appropriate methods.
    
            :param event:
                The event object representing the file system event.
            :type event:
                :class:`FileSystemEvent`
            """
            self.on_any_event(event)
            _method_map = {
                EVENT_TYPE_MODIFIED: self.on_modified,
                EVENT_TYPE_MOVED: self.on_moved,
                EVENT_TYPE_CREATED: self.on_created,
                EVENT_TYPE_DELETED: self.on_deleted,
            }
            event_type = event.event_type
            _method_map[event_type](event)
    
        def on_any_event(self, event):
            """Catch-all event handler.
    
            :param event:
                The event object representing the file system event.
            :type event:
                :class:`FileSystemEvent`
            """
    
        def on_moved(self, event):
            """Called when a file or a directory is moved or renamed.
    
            :param event:
                Event representing file/directory movement.
            :type event:
                :class:`DirMovedEvent` or :class:`FileMovedEvent`
            """
    
        def on_created(self, event):
            """Called when a file or directory is created.
    
            :param event:
                Event representing file/directory creation.
            :type event:
                :class:`DirCreatedEvent` or :class:`FileCreatedEvent`
            """
    
        def on_deleted(self, event):
            """Called when a file or directory is deleted.
    
            :param event:
                Event representing file/directory deletion.
            :type event:
                :class:`DirDeletedEvent` or :class:`FileDeletedEvent`
            """
    
        def on_modified(self, event):
            """Called when a file or directory is modified.
    
            :param event:
                Event representing file/directory modification.
            :type event:
                :class:`DirModifiedEvent` or :class:`FileModifiedEvent`
            """
    
    • dispatch方法,用于事件来了,分发事件.其余方法用于处理对应的事件。
    • 其实我们除了自定义处理类时能继承FileSystemEventHandler,还可以继承其他的类,如RegexMatchingEventHandler(FileSystemEventHandler),也可以继承这个类,可以自定义正则表达式,用于匹配要监控的文件目录。当然还有其他类型的类,具体参考源码文件events.py.

    在例子中实例化定义MyHandler类后,再实例化observer = Observer(),在Windows下 Observer()即是WindowsApiObserver,
    WindowsApiObserver类源码如下:

    class WindowsApiObserver(BaseObserver):
        """
        Observer thread that schedules watching directories and dispatches
        calls to event handlers.
        """
    
        def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT):
            BaseObserver.__init__(self, emitter_class=WindowsApiEmitter,
                                  timeout=timeout)
    
    • 注意这里有个参数emitter_class=WindowsApiEmitter,先记下,后面会说

    关于BaseObserver,由于篇幅限制,这里只贴出关键代码:

    class BaseThread(threading.Thread):
        """ Convenience class for creating stoppable threads. """
    
        def __init__(self):
            threading.Thread.__init__(self)
            if has_attribute(self, 'daemon'):
                self.daemon = True
            else:
                self.setDaemon(True)
            self._stopped_event = Event()
    
            if not has_attribute(self._stopped_event, 'is_set'):
                self._stopped_event.is_set = self._stopped_event.isSet
    
        def on_thread_start(self):
            pass
    
        def start(self):
            self.on_thread_start()
            threading.Thread.start(self)
    
    class EventDispatcher(BaseThread):
        def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT):
            BaseThread.__init__(self)
            self._event_queue = EventQueue()
            self._timeout = timeout
    
        @property
        def timeout(self):
            """Event queue block timeout."""
            return self._timeout
    
        @property
        def event_queue(self):
            """The event queue which is populated with file system events
            by emitters and from which events are dispatched by a dispatcher
            thread."""
            return self._event_queue
    
        def dispatch_events(self, event_queue, timeout):
            pass
    
        def run(self):
            while self.should_keep_running():
                try:
                    self.dispatch_events(self.event_queue, self.timeout)
                except queue.Empty:
                    continue
    
    class BaseObserver(EventDispatcher):
        """Base observer."""
    
        def __init__(self, emitter_class, timeout=DEFAULT_OBSERVER_TIMEOUT):
            EventDispatcher.__init__(self, timeout)
            self._emitter_class = emitter_class
            self._lock = threading.RLock()
            self._watches = set()
            self._handlers = dict()
            self._emitters = set()
            self._emitter_for_watch = dict()
    
        def _add_emitter(self, emitter):
            self._emitter_for_watch[emitter.watch] = emitter
            self._emitters.add(emitter)
    
        def _add_handler_for_watch(self, event_handler, watch):
            if watch not in self._handlers:
                self._handlers[watch] = set()
            self._handlers[watch].add(event_handler)
    
        @property
        def emitters(self):
            """Returns event emitter created by this observer."""
            return self._emitters
    
        def start(self):
            for emitter in self._emitters.copy():
                try:
                    emitter.start()
                except Exception:
                    self._remove_emitter(emitter)
                    raise
            super(BaseObserver, self).start()
    
        def schedule(self, event_handler, path, recursive=False):
            with self._lock:
                watch = ObservedWatch(path, recursive)
                self._add_handler_for_watch(event_handler, watch)
    
                # If we don't have an emitter for this watch already, create it.
                if self._emitter_for_watch.get(watch) is None:
                    emitter = self._emitter_class(event_queue=self.event_queue,
                                                  watch=watch,
                                                  timeout=self.timeout)
                    self._add_emitter(emitter)
                    if self.is_alive():
                        emitter.start()
                self._watches.add(watch)
            return watch
    
        def add_handler_for_watch(self, event_handler, watch):
            with self._lock:
                self._add_handler_for_watch(event_handler, watch)
    
        def dispatch_events(self, event_queue, timeout):
            event, watch = event_queue.get(block=True, timeout=timeout)
    
            with self._lock:
                # To allow unschedule/stop and safe removal of event handlers
                # within event handlers itself, check if the handler is still
                # registered after every dispatch.
                for handler in list(self._handlers.get(watch, [])):
                    if handler in self._handlers.get(watch, []):
                        handler.dispatch(event)
            event_queue.task_done()
    
    1. 这里继承关系WindowsApiObserver-BaseObserver-EventDispatcher-BaseThread最终是一个线程类
    2. 实例化WindowsApiObserver时,执行各个对象的init方法,包括BaseThread的init方法,启动线程,调用EventDispatcherrun方法,循环执行self.dispatch_events(self.event_queue, self.timeout),而这个方法如源码所示,从队列中取出事件,handler.dispatch(event),调用我们自定义的handler对象的分发任务的方法。这个过程在程序运行中都是循环进行的,也就是当有事件时,会进行事件的分发处理。
    3. 然后例子中observer.schedule(event_handler, path, recursive=True),调用schedule方法,会emitter.start(),而这个emitter就是上面我们说的emitter_class=WindowsApiEmitter,会另起一个线程,循环执行queue_events方法,而这个方法就是传入一个队列,也就是上面说的事件队列,监控指定目录文件,当有事件发生时,就把事件放到队列。
    4. 放到队列后,也就可以在第二步中进行事件分发了。

    总结

    • WindowsApiEmitter类的作用,是循环监控文件变化等事件,如果有事件产生,就放到事件队列
    • observer类,循环监控事件队列,如果有事件,就调用handler类分发处理此事件。并且调用这个类的schedule方法,用于启动WindowsApiEmitter类的线程。当然,WindowsApiEmitter是Windows下的emitter,也可以是其他系统的。
    • 不同的Emitterobserver进行关联。
    • 这种设计模式值得学习了解使用下。

    相关文章

      网友评论

          本文标题:watchdog源码分析

          本文链接:https://www.haomeiwen.com/subject/ernsuctx.html