Gunicorn源码分析(二)Worker进程

作者: 喵帕斯0_0 | 来源:发表于2018-10-16 22:49 被阅读12次

    Gunicorn.worker实现了不同类型的work进程,有单进程、多线程、多协程等形式。

    gunicorn.worker目录结构:

    workers/
    ├── __init__.py
    ├── _gaiohttp.py
    ├── base.py
    ├── base_async.py
    ├── gaiohttp.py
    ├── geventlet.py
    ├── ggevent.py
    ├── gthread.py
    ├── gtornado.py
    ├── sync.py
    └── workertmp.py
    

    主要看以下几个源码文件

    1. base.py:基类文件
    2. gthread.py:单进程多线程工作模式
    3. sync.py:单进程单线程模式
    4. workertmp:tmp文件,master监控worker进程的机制

    剩下的其他文件大同小异。

    Worker

    下面是将Worker类实现的简略。

    class Worker(object):
        SIGNALS = [getattr(signal, "SIG%s" % x)
                for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()] # 支持的信号
    
        PIPE = []
    
        def __init__(self, age, ppid, sockets, app, timeout, cfg, log)
        def __str__(self)
        def notify(self)
        def run(self)
        def init_process(self)
        def load_wsgi(self)                  # 获得实现wsgi协议的app,如Flask
        def init_signals(self)
        def handle_usr1(self, sig, frame)
        def handle_exit(self, sig, frame)
        def handle_quit(self, sig, frame)
        def handle_abort(self, sig, frame)
        def handle_error(self, req, client, addr, exc)
        def handle_winch(self, sig, fname)
    
    Worker调用过程
        def __init__(self, age, ppid, sockets, app, timeout, cfg, log):
            """\
            This is called pre-fork so it shouldn't do anything to the
            current process. If there's a need to make process wide
            changes you'll want to do that in ``self.init_process()``.
            """
            self.age = age
            self.pid = "[booting]"
            self.ppid = ppid
            self.sockets = sockets
            self.app = app                      
            self.timeout = timeout  #超时时间
            self.cfg = cfg              # 配置
            # 状态
            self.booted = False  #已启动
            self.aborted = False  #已终止
    
            self.reloader = None  
    
            self.nr = 0
            jitter = randint(0, cfg.max_requests_jitter)
            self.max_requests = cfg.max_requests + jitter or sys.maxsize
            self.alive = True           # 是否存活
            self.log = log                #日志对象
            self.tmp = WorkerTmp(cfg)  # worker tmp文件
    

    __init__()做的事情相对简单,就是将一些相关的参数,如cfgapp等作为Worker对象的属性,同时创建一个tmpfile父进程通过检查该文件的时间戳,来确认子进程是否存活。

    def notify(self):
            """\
            Your worker subclass must arrange to have this method called
            once every ``self.timeout`` seconds. If you fail in accomplishing
            this task, the master process will murder your workers.
            """
            self.tmp.notify()
    

    notify()调用WorkerTmp.notify()更改所对应tmp文件的时间戳。

        def init_process(self):
            """\
            If you override this method in a subclass, the last statement
            in the function should be to call this method with
            super(MyWorkerClass, self).init_process() so that the ``run()``
            loop is initiated.
            """
    
            # set environment' variables
            if self.cfg.env:
                for k, v in self.cfg.env.items():
                    os.environ[k] = v
            #设置进程信息
            util.set_owner_process(self.cfg.uid, self.cfg.gid,
                                   initgroups=self.cfg.initgroups)
    
            # Reseed the random number generator
            util.seed()
    
            # For waking ourselves up
            self.PIPE = os.pipe()
            for p in self.PIPE:
                util.set_non_blocking(p)
                util.close_on_exec(p)
    
            # Prevent fd inheritance
            # close_on_exec 设置对应的文件在创建子进程的时候不会被继承
            for s in self.sockets:
                util.close_on_exec(s)
            util.close_on_exec(self.tmp.fileno())
    
            self.wait_fds = self.sockets + [self.PIPE[0]]
    
            self.log.close_on_exec()
    
            # 设置信号处理函数
            self.init_signals()
    
            # start the reloader
            if self.cfg.reload:
                def changed(fname):
                    self.log.info("Worker reloading: %s modified", fname)
                    self.alive = False
                    self.cfg.worker_int(self)
                    time.sleep(0.1)
                    sys.exit(0)
    
                reloader_cls = reloader_engines[self.cfg.reload_engine]
                self.reloader = reloader_cls(extra_files=self.cfg.reload_extra_files,
                                             callback=changed)
                self.reloader.start()
    
            self.load_wsgi()
            self.cfg.post_worker_init(self) 
    
            # Enter main run loop
            self.booted = True
            self.run()            #主循环
    

    init_process()Work进程的入口文件,启动工作进程调用的是该方法,官方建议所有的实现子类的重载方法应该调用父类的该方法,该方法主要做了以下几件事:

    1. 设置进程的进程组信息;
    2. 创建单进程管道,Worker是通过管道来存储导致中断的信号,不直接处理,先收集起来,在主循环中处理;
    3. 获取要监听的文件描述符,并将描述符设置为不可被子进程继承;
    4. 设置中断信号处理函数;
    5. 设置代码更新时,自动重启的配置
    6. 获取实现了wsgi协议的app对象
    7. 进入主循环方法
        def run(self):
            """\
            This is the mainloop of a worker process. You should override
            this method in a subclass to provide the intended behaviour
            for your particular evil schemes.
            """
            raise NotImplementedError()
    

    Workder类没有实现run(),由子类去实现具体的逻辑。

    再来看看WorkerTmp类。

    WorkerTmp

    # -*- coding: utf-8 -
    #
    # This file is part of gunicorn released under the MIT license.
    # See the NOTICE for more information.
    
    import os
    import platform
    import tempfile
    
    from gunicorn import util
    
    PLATFORM = platform.system()
    IS_CYGWIN = PLATFORM.startswith('CYGWIN')
    
    class WorkerTmp(object):
    
        def __init__(self, cfg):
            old_umask = os.umask(cfg.umask)
            fdir = cfg.worker_tmp_dir
            if fdir and not os.path.isdir(fdir):
                raise RuntimeError("%s doesn't exist. Can't create workertmp." % fdir)
            fd, name = tempfile.mkstemp(prefix="wgunicorn-", dir=fdir)
    
            # allows the process to write to the file
            util.chown(name, cfg.uid, cfg.gid)
            os.umask(old_umask)
    
            # unlink the file so we don't leak tempory files
            try:
                if not IS_CYGWIN:
                    # 即使这里unlink了文件,已经打开了文件描述符仍然可以访问该文件内容  close能够实施删除文件内容的操作,必定因为在close之前有一个unlink操作。
                    util.unlink(name)
                self._tmp = os.fdopen(fd, 'w+b', 1)
            except:
                os.close(fd)
                raise
    
            self.spinner = 0
    
        def notify(self):
            self.spinner = (self.spinner + 1) % 2
            os.fchmod(self._tmp.fileno(), self.spinner)         #  更新时间戳
    
        def last_update(self):
            return os.fstat(self._tmp.fileno()).st_ctime
    
        def fileno(self):
            return self._tmp.fileno()
    
        def close(self):
            return self._tmp.close()
    

    WorkTmp类主要的作用是创建一个临时文件,子进程通过更新该文件的时间戳,父进程定期检查子进程临时文件的时间戳确定子进程是否存活。
    WorkTmp._init__()在系统创建了临时文件并获取其文件描述符,然后unlink该文件,防止子进程关闭后没有删除文件,,即使被unlink了,已经打开的文件描述符仍然访问文件。
    WorkTmp.notify()通过更改文件的权限来更新文件修改时间。
    WorkTmp.last_update()用来获取文件最后一次更新的时间。

    最后看下工作进程的一个实现子类ThreadWorker

    ThreadWorker

    该类实现了父类的Woker.run()方法,并重载了部分其他方法。

     def init_process(self):
            """初始化函数
            """
            self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
            self.poller = selectors.DefaultSelector()       # 利用系统提供的selector
            self._lock = RLock()                            # 创建可重入锁
            super(ThreadWorker, self).init_process()
    

    初始化函数做了以下事情:

    1. 通过concurrent.futures.ThreadPoolExecutor创建线程池,线程的数量由配置文件的thread决定;
    2. 通过selectors.DefaultSelector()获取符合所在平台的最优的I/O复用,如Linux使用epollMac下面使用Kqueue,这个模块隐藏了底层的平台细节,对外提供统一的接口;
    3. 创建一个可重入锁;
    4. 调用父类的init_process(),在该方法里面调用了run()方法。
        def run(self):
            """运行的主函数
                ①通知父进程,我还活着
                ②监听事件
                ③处理监听事件
                ④判断父进程是否已经挂了,是的话退出循环
                ⑤murder 超过keep-alive最长的时间的请求
            """
    
            # init listeners, add them to the event loop
            for sock in self.sockets:
                sock.setblocking(False)                                             # 设置为非阻塞
                # a race condition during graceful shutdown may make the listener
                # name unavailable in the request handler so capture it once here
                server = sock.getsockname()
                acceptor = partial(self.accept, server)  # self.acceptor的偏函数
                self.poller.register(sock, selectors.EVENT_READ, acceptor)          #register(fileobj, events, data=None) 用data来保存callback函数
    
            while self.alive:                                                       # 主循环
                # notify the arbiter we are alive
                self.notify()                                                       # todo 通知机制?
    
                # can we accept more connections?
                if self.nr_conns < self.worker_connections:                         # 防止超过并发数
                    # wait for an event
                    # selector新写法
                    events = self.poller.select(1.0)                                # 等待事件
                    for key, _ in events:
                        callback = key.data                                         #callback从data获取
                        callback(key.fileobj)
    
                    # check (but do not wait) for finished requests
                    result = futures.wait(self.futures, timeout=0,
                            return_when=futures.FIRST_COMPLETED)                    #等待队列事件 futures.wait 接收的第一个参数是一个可迭代对象,无阻塞等待完成
                else:
                    # wait for a request to finish
                    result = futures.wait(self.futures, timeout=1.0,                # 阻塞等待
                            return_when=futures.FIRST_COMPLETED)
    
                # clean up finished requests
                for fut in result.done:
                    self.futures.remove(fut)
    
                if not self.is_parent_alive(): # 通过判断ppid是否已经发生变化
                    break
    
                # hanle keepalive timeouts
                self.murder_keepalived()
    
            self.tpool.shutdown(False)
            self.poller.close()
    
            for s in self.sockets:
                s.close()
    
            futures.wait(self.futures, timeout=self.cfg.graceful_timeout)       # 优雅关闭等待的最长时间
    

    run()方法中主要做了以下事情:

    1. 更新tmpfile时间戳
    2. 获取就绪的请求连接;
    3. 如果并发数允许,分配一个线程处理请求;
    4. 判断父进程是否已经停止工作,有的话准备退出主循环;
    5. 杀死已经允许最大连接事件的keep-alive连接。

    下面是一个请求刚进来的处理过程:

        def _wrap_future(self, fs, conn):
            """将futuren放入队列中,并设置处理完成后的回调函数
            
            Arguments:
                fs {[type]} -- [description]
                conn {[type]} -- [description]
            """
    
            fs.conn = conn
            self.futures.append(fs)
            fs.add_done_callback(self.finish_request)
    
        def enqueue_req(self, conn):
            """将请求放入线程处理
            
            Arguments:
                conn {[type]} -- [description]
            """
    
            conn.init()
            # submit the connection to a worker
            fs = self.tpool.submit(self.handle, conn)
            self._wrap_future(fs, conn)
    
        def accept(self, server, listener):
            """监听时间处理函数
            
            Arguments:
                server {[type]} -- [description]
                listener {[type]} -- [description]
            """
    
            try:
                sock, client = listener.accept()
                # initialize the connection object
                conn = TConn(self.cfg, sock, client, server)
                self.nr_conns += 1                                      # 增加当前正在处理的请求数
                # enqueue the job
                self.enqueue_req(conn)
            except EnvironmentError as e:
                if e.errno not in (errno.EAGAIN,
                        errno.ECONNABORTED, errno.EWOULDBLOCK):
                    raise
    
    1. socketaccept返回一个与客户端连接的socket;
    2. socket作为self.handler()方法的参数启动线程;
    3. 注册线程运行完成后的回调函数。

    self.handler()主要的部分在于其调用的self.handle_request(),因此直接看self.handle_request()做了哪些事情:

        def handle_request(self, req, conn):
            """主要的处理函数
            """
    
            environ = {}
            resp = None
            try:
                self.cfg.pre_request(self, req)
                request_start = datetime.now()
                resp, environ = wsgi.create(req, conn.sock, conn.client,
                        conn.server, self.cfg)
                environ["wsgi.multithread"] = True
                self.nr += 1
                if self.alive and self.nr >= self.max_requests:
                    self.log.info("Autorestarting worker after current request.")
                    resp.force_close()
                    self.alive = False
    
                if not self.cfg.keepalive:
                    resp.force_close()
                elif len(self._keep) >= self.max_keepalived:
                    resp.force_close()
    
                respiter = self.wsgi(environ, resp.start_response)
                try:
                    if isinstance(respiter, environ['wsgi.file_wrapper']):
                        resp.write_file(respiter)
                    else:
                        for item in respiter:
                            resp.write(item)
    
                    resp.close()
                    request_time = datetime.now() - request_start
                    self.log.access(resp, req, environ, request_time)
                finally:
                    if hasattr(respiter, "close"):
                        respiter.close()
    
                if resp.should_close():
                    self.log.debug("Closing connection.")
                    return False
            except EnvironmentError:
                # pass to next try-except level
                util.reraise(*sys.exc_info())
            except Exception:
                if resp and resp.headers_sent:
                    # If the requests have already been sent, we should close the
                    # connection to indicate the error.
                    self.log.exception("Error handling request")
                    try:
                        conn.sock.shutdown(socket.SHUT_RDWR)
                        conn.sock.close()
                    except EnvironmentError:
                        pass
                    raise StopIteration()
                raise
            finally:
                try:
                    self.cfg.post_request(self, req, environ, resp)
                except Exception:
                    self.log.exception("Exception in post_request hook")
    
            return True
    

    除了一些配置和环境相关的处理,关键的在于respiter = self.wsgi(environ, resp.start_response)这行代码,这行代码获取了实现wsgi协议的app并运行,将获取后的结果返回给客户端。
    这里就是整个请求处理的关键,只要符合wsgi协议的框架,都可以这样接入Gunicorn

    整个ThreadWork的处理流程,如下图:

    Gunicorn.ThreadWorker

    相关文章

      网友评论

        本文标题:Gunicorn源码分析(二)Worker进程

        本文链接:https://www.haomeiwen.com/subject/nqtxzftx.html