fanotify的API文档:http://man7.org/linux/man-pages/man7/fanotify.7.html
python fanotify代码样例:
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import os
import fanotify
import select
class file_notifer(object):
def __init__(self, path):
self.notifer_fd = fanotify.Init(
fanotify.FAN_CLASS_NOTIF, #接受event的优先级,系统发出event后,根据优先级,优先级最高的,最先接到
os.O_RDWR | os.O_LARGEFILE # flag os.O_RDWR 文件描述符self.notifer_fd以什么样的权限打开
)
print "notifer fd is %s " %(self.notifer_fd)
fanotify.Mark(
self.notifer_fd, # fanotify.Init返回的
fanotify.FAN_MARK_ADD, #对监听的文件的mark进行修改,可以是(ADD)添加,REMOVE(删除)等等
fanotify.FAN_ACCESS, #监听的事件
-1,
path #监听的文件路径
)
def start_notifer(self):
while True:
r, w, e = select.select([self.notifer_fd], [], []) #用select去监听event的发生
buf = os.read(self.notifer_fd, 4096) #关于os.read是否阻塞,其实是看读取的fd的类型的
while fanotify.EventOk(buf): #如果buf中有event
buf, event = fanotify.EventNext(buf) #读出 event
print "event.mask: ", event.mask
print "file modified %s" %(os.readlink("/proc/self/fd/%d" %event.fd))
if event.mask & fanotify.FAN_MODIFY:
print "file modified %s" %(os.readlink("/proc/self/fd/%d" %fanotify_envent.fd))
if __name__ == "__main__":
# notifer = file_notifer("/home/xxx/work/test/fanotify_sourcelist.py")
notifer = file_notifer("/var/lib/apt/periodic/update-success-stamp")
#notifer = file_notifer("/var/lib/apt/lists/test")
notifer.start_notifer()
测试发现一个问题,对于ubuntu下/var/lib/apt/periodic/update-success-stamp文件,不管怎么操作,都没有产生event,但是对于个人手动创建的文件,fanotify是可以监测到事件的,待进一步研究。
inotify的API文档:http://man7.org/linux/man-pages/man7/inotify.7@@man-pages.html
python inotify代码样例
#!/usr/bin/env python
# encoding:utf-8
import os
from pyinotify import WatchManager, Notifier, \
ProcessEvent,IN_DELETE, IN_CREATE,IN_MODIFY, IN_ACCESS, IN_ATTRIB
class EventHandler(ProcessEvent):
"""事件处理"""
def process_IN_CREATE(self, event):
print "Create file: %s " % os.path.join(event.path,event.name)
def process_IN_DELETE(self, event):
print "Delete file: %s " % os.path.join(event.path,event.name)
def process_IN_MODIFY(self, event):
print "Modify file: %s " % os.path.join(event.path,event.name)
def process_IN_ACCESS(self, event):
print "Access file: %s " % os.path.join(event.path, event.name)
def process_IN_ATTRIB(self, event):
print "Access file: %s " % os.path.join(event.path, event.name)
def FSMonitor(path='.'):
wm = WatchManager()
mask = IN_DELETE | IN_CREATE |IN_MODIFY |IN_ACCESS |IN_ATTRIB
notifier = Notifier(wm, EventHandler())
wm.add_watch(path, mask,rec=True)
print 'now starting monitor %s'%(path)
while True:
try:
notifier.process_events() #绑定处理event方法
if notifier.check_events(): #检查是否有有可读取的新event
notifier.read_events() #读取event,交给EventHandler处理
except KeyboardInterrupt:
notifier.stop()
break
if __name__ == "__main__":
FSMonitor("/var/lib/apt/periodic/update-success-stamp")
# FSMonitor("/home/xxx/work/test")
# FSMonitor("/var/lib/apt/lists")
需要说明的是:按照目前个人理解,EventHandler类必须继承_ProcessEvent类,然后在类里面重写对应event的处理方法。_ProcessEvent源码如下:
class _ProcessEvent:
"""
Abstract processing event class.
"""
def __call__(self, event):
"""
To behave like a functor the object must be callable.
This method is a dispatch method. Its lookup order is:
1. process_MASKNAME method
2. process_FAMILY_NAME method
3. otherwise calls process_default
@param event: Event to be processed.
@type event: Event object
@return: By convention when used from the ProcessEvent class:
- Returning False or None (default value) means keep on
executing next chained functors (see chain.py example).
- Returning True instead means do not execute next
processing functions.
@rtype: bool
@raise ProcessEventError: Event object undispatchable,
unknown event.
"""
stripped_mask = event.mask - (event.mask & IN_ISDIR)
maskname = EventsCodes.ALL_VALUES.get(stripped_mask)
if maskname is None:
raise ProcessEventError("Unknown mask 0x%08x" % stripped_mask)
# 1- look for process_MASKNAME
meth = getattr(self, 'process_' + maskname, None)
if meth is not None:
return meth(event)
# 2- look for process_FAMILY_NAME
meth = getattr(self, 'process_IN_' + maskname.split('_')[1], None) #匹配,处理对应的event
if meth is not None:
return meth(event)
# 3- default call method process_default
return self.process_default(event)
网友评论