美文网首页
Python-multiprocessing

Python-multiprocessing

作者: hustxujinkang | 来源:发表于2018-07-12 19:18 被阅读0次

    1.用法注释

    1.1 Process类

    import multiprocessing as mp
    
    def foo(q):
        q.put('hello')
    
    if __name__ == '__main__':
        mp.set_start_method('spawn')
        q = mp.Queue()
        p = mp.Process(target=foo, args=(q,))
        p.start()
        print(q.get())
        p.join()
    

    这里值得注意的是这个Queue,是可以用在多进程环境下的阻塞队列
    我们可以看一下Process类的实现

    class Process(object):
        '''
        Process objects represent activity that is run in a separate process
    
        The class is analagous to `threading.Thread`
        '''
        _Popen = None
    
        def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
            assert group is None, 'group argument must be None for now'
            count = _current_process._counter.next()
            self._identity = _current_process._identity + (count,)
            self._authkey = _current_process._authkey
            self._daemonic = _current_process._daemonic
            self._tempdir = _current_process._tempdir
            self._parent_pid = os.getpid()
            self._popen = None
            self._target = target
            self._args = tuple(args)
            self._kwargs = dict(kwargs)
            self._name = name or type(self).__name__ + '-' + \
                         ':'.join(str(i) for i in self._identity)
    
        def run(self):
            '''
            Method to be run in sub-process; can be overridden in sub-class
            '''
            if self._target:
                self._target(*self._args, **self._kwargs)
    
        def start(self):
            '''
            Start child process
            '''
            assert self._popen is None, 'cannot start a process twice'
            assert self._parent_pid == os.getpid(), \
                   'can only start a process object created by current process'
            assert not _current_process._daemonic, \
                   'daemonic processes are not allowed to have children'
            _cleanup()
            if self._Popen is not None:
                Popen = self._Popen
            else:
                from .forking import Popen
            self._popen = Popen(self)
            _current_process._children.add(self)
    
        def terminate(self):
            '''
            Terminate process; sends SIGTERM signal or uses TerminateProcess()
            '''
            self._popen.terminate()
    
        def join(self, timeout=None):
            '''
            Wait until child process terminates
            '''
            assert self._parent_pid == os.getpid(), 'can only join a child process'
            assert self._popen is not None, 'can only join a started process'
            res = self._popen.wait(timeout)
            if res is not None:
                _current_process._children.discard(self)
            
    
        def is_alive(self):
            '''
            Return whether process is alive
            '''
            if self is _current_process:
                return True
            assert self._parent_pid == os.getpid(), 'can only test a child process'
            if self._popen is None:
                return False
            self._popen.poll()
            return self._popen.returncode is None
    
        @property
        def name(self):
            return self._name
    
        @name.setter
        def name(self, name):
            assert isinstance(name, basestring), 'name must be a string'
            self._name = name
    
        @property
        def daemon(self):
            '''
            Return whether process is a daemon
            '''
            return self._daemonic
    
        @daemon.setter
        def daemon(self, daemonic):
            '''
            Set whether process is a daemon
            '''
            assert self._popen is None, 'process has already started'
            self._daemonic = daemonic
    
        @property
        def authkey(self):
            return self._authkey
    
        @authkey.setter
        def authkey(self, authkey):
            '''
            Set authorization key of process
            '''
            self._authkey = AuthenticationString(authkey)
    
        @property
        def exitcode(self):
            '''
            Return exit code of process or `None` if it has yet to stop
            '''
            if self._popen is None:
                return self._popen
            return self._popen.poll()
    
        @property
        def ident(self):
            '''
            Return identifier (PID) of process or `None` if it has yet to start
            '''
            if self is _current_process:
                return os.getpid()
            else:
                return self._popen and self._popen.pid
    
        pid = ident
    
        def __repr__(self):
            if self is _current_process:
                status = 'started'
            elif self._parent_pid != os.getpid():
                status = 'unknown'
            elif self._popen is None:
                status = 'initial'
            else:
                if self._popen.poll() is not None:
                    status = self.exitcode
                else:
                    status = 'started'
    
            if type(status) is int:
                if status == 0:
                    status = 'stopped'
                else:
                    status = 'stopped[%s]' % _exitcode_to_name.get(status, status)
    
            return '<%s(%s, %s%s)>' % (type(self).__name__, self._name,
                                       status, self._daemonic and ' daemon' or '')
    
        ##
    
        def _bootstrap(self):
            from . import util
            global _current_process
    
            try:
                self._children = set()
                self._counter = itertools.count(1)
                try:
                    sys.stdin.close()
                    sys.stdin = open(os.devnull)
                except (OSError, ValueError):
                    pass
                _current_process = self
                util._finalizer_registry.clear()
                util._run_after_forkers()
                util.info('child process calling self.run()')
                try:
                    self.run()
                    exitcode = 0
                finally:
                    util._exit_function()
            except SystemExit, e:
                if not e.args:
                    exitcode = 1
                elif isinstance(e.args[0], int):
                    exitcode = e.args[0]
                else:
                    sys.stderr.write(str(e.args[0]) + '\n')
                    sys.stderr.flush()
                    exitcode = 1
            except:
                exitcode = 1
                import traceback
                sys.stderr.write('Process %s:\n' % self.name)
                sys.stderr.flush()
                traceback.print_exc()
    
            util.info('process exiting with exitcode %d' % exitcode)
            return exitcode
    

    这里是参考threading的做法封装的一些接口,只不过join函数是基于waitpid实现的,而线程里面的join是调用的pthreadd的pthread_join系统调用。从接口层面值得注意的是run和start函数,可以看到run方法很简单就是执行了一个函数,但是一般我们对于process object不会直接去调用run函数(这样就等于在当前进程执行了一个函数,没有意义),我们会调用start函数,由start函数负责在另一个进程里调度run函数,当然这里为了维持Process object的定义简单化,固定一个Process对象只能执行一次。换句话说,一个process对象的完整生命周期就是从start开始,到执行target函数完成退出或者中间被terminate。这对于子过程调用而言是十分重要的简化,大多数时候我们就是单纯的希望将某个耗时任务放到一个子进程里面进行,从而并行化计算任务,提高CPU占用率。

    另一个值得注意的class是context对象

    import os
    import sys
    import threading
    
    from . import process
    from . import reduction
    
    __all__ = []            # things are copied from here to __init__.py
    
    #
    # Exceptions
    #
    
    class ProcessError(Exception):
        pass
    
    class BufferTooShort(ProcessError):
        pass
    
    class TimeoutError(ProcessError):
        pass
    
    class AuthenticationError(ProcessError):
        pass
    
    #
    # Base type for contexts
    #
    
    class BaseContext(object):
    
        ProcessError = ProcessError
        BufferTooShort = BufferTooShort
        TimeoutError = TimeoutError
        AuthenticationError = AuthenticationError
    
        current_process = staticmethod(process.current_process)
        active_children = staticmethod(process.active_children)
    
        def cpu_count(self):
            '''Returns the number of CPUs in the system'''
            num = os.cpu_count()
            if num is None:
                raise NotImplementedError('cannot determine number of cpus')
            else:
                return num
    
        def Manager(self):
            '''Returns a manager associated with a running server process
    
            The managers methods such as `Lock()`, `Condition()` and `Queue()`
            can be used to create shared objects.
            '''
            from .managers import SyncManager
            m = SyncManager(ctx=self.get_context())
            m.start()
            return m
    
        def Pipe(self, duplex=True):
            '''Returns two connection object connected by a pipe'''
            from .connection import Pipe
            return Pipe(duplex)
    
        def Lock(self):
            '''Returns a non-recursive lock object'''
            from .synchronize import Lock
            return Lock(ctx=self.get_context())
    
        def RLock(self):
            '''Returns a recursive lock object'''
            from .synchronize import RLock
            return RLock(ctx=self.get_context())
    
        def Condition(self, lock=None):
            '''Returns a condition object'''
            from .synchronize import Condition
            return Condition(lock, ctx=self.get_context())
    
        def Semaphore(self, value=1):
            '''Returns a semaphore object'''
            from .synchronize import Semaphore
            return Semaphore(value, ctx=self.get_context())
    
        def BoundedSemaphore(self, value=1):
            '''Returns a bounded semaphore object'''
            from .synchronize import BoundedSemaphore
            return BoundedSemaphore(value, ctx=self.get_context())
    
        def Event(self):
            '''Returns an event object'''
            from .synchronize import Event
            return Event(ctx=self.get_context())
    
        def Barrier(self, parties, action=None, timeout=None):
            '''Returns a barrier object'''
            from .synchronize import Barrier
            return Barrier(parties, action, timeout, ctx=self.get_context())
    
        def Queue(self, maxsize=0):
            '''Returns a queue object'''
            from .queues import Queue
            return Queue(maxsize, ctx=self.get_context())
    
        def JoinableQueue(self, maxsize=0):
            '''Returns a queue object'''
            from .queues import JoinableQueue
            return JoinableQueue(maxsize, ctx=self.get_context())
    
        def SimpleQueue(self):
            '''Returns a queue object'''
            from .queues import SimpleQueue
            return SimpleQueue(ctx=self.get_context())
    
        def Pool(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None):
            '''Returns a process pool object'''
            from .pool import Pool
            return Pool(processes, initializer, initargs, maxtasksperchild,
                        context=self.get_context())
    
        def RawValue(self, typecode_or_type, *args):
            '''Returns a shared object'''
            from .sharedctypes import RawValue
            return RawValue(typecode_or_type, *args)
    
        def RawArray(self, typecode_or_type, size_or_initializer):
            '''Returns a shared array'''
            from .sharedctypes import RawArray
            return RawArray(typecode_or_type, size_or_initializer)
    
        def Value(self, typecode_or_type, *args, lock=True):
            '''Returns a synchronized shared object'''
            from .sharedctypes import Value
            return Value(typecode_or_type, *args, lock=lock,
                         ctx=self.get_context())
    
        def Array(self, typecode_or_type, size_or_initializer, *, lock=True):
            '''Returns a synchronized shared array'''
            from .sharedctypes import Array
            return Array(typecode_or_type, size_or_initializer, lock=lock,
                         ctx=self.get_context())
    
        def freeze_support(self):
            '''Check whether this is a fake forked process in a frozen executable.
            If so then run code specified by commandline and exit.
            '''
            if sys.platform == 'win32' and getattr(sys, 'frozen', False):
                from .spawn import freeze_support
                freeze_support()
    
        def get_logger(self):
            '''Return package logger -- if it does not already exist then
            it is created.
            '''
            from .util import get_logger
            return get_logger()
    
        def log_to_stderr(self, level=None):
            '''Turn on logging and add a handler which prints to stderr'''
            from .util import log_to_stderr
            return log_to_stderr(level)
    
        def allow_connection_pickling(self):
            '''Install support for sending connections and sockets
            between processes
            '''
            # This is undocumented.  In previous versions of multiprocessing
            # its only effect was to make socket objects inheritable on Windows.
            from . import connection
    
        def set_executable(self, executable):
            '''Sets the path to a python.exe or pythonw.exe binary used to run
            child processes instead of sys.executable when using the 'spawn'
            start method.  Useful for people embedding Python.
            '''
            from .spawn import set_executable
            set_executable(executable)
    
        def set_forkserver_preload(self, module_names):
            '''Set list of module names to try to load in forkserver process.
            This is really just a hint.
            '''
            from .forkserver import set_forkserver_preload
            set_forkserver_preload(module_names)
    
        def get_context(self, method=None):
            if method is None:
                return self
            try:
                ctx = _concrete_contexts[method]
            except KeyError:
                raise ValueError('cannot find context for %r' % method) from None
            ctx._check_available()
            return ctx
    
        def get_start_method(self, allow_none=False):
            return self._name
    
        def set_start_method(self, method, force=False):
            raise ValueError('cannot set start method of concrete context')
    
        @property
        def reducer(self):
            '''Controls how objects will be reduced to a form that can be
            shared with other processes.'''
            return globals().get('reduction')
    
        @reducer.setter
        def reducer(self, reduction):
            globals()['reduction'] = reduction
    
        def _check_available(self):
            pass
    
    #
    # Type of default context -- underlying context can be set at most once
    #
    
    class Process(process.BaseProcess):
        _start_method = None
        @staticmethod
        def _Popen(process_obj):
            return _default_context.get_context().Process._Popen(process_obj)
    
    class DefaultContext(BaseContext):
        Process = Process
    
        def __init__(self, context):
            self._default_context = context
            self._actual_context = None
    
        def get_context(self, method=None):
            if method is None:
                if self._actual_context is None:
                    self._actual_context = self._default_context
                return self._actual_context
            else:
                return super().get_context(method)
    
        def set_start_method(self, method, force=False):
            if self._actual_context is not None and not force:
                raise RuntimeError('context has already been set')
            if method is None and force:
                self._actual_context = None
                return
            self._actual_context = self.get_context(method)
    
        def get_start_method(self, allow_none=False):
            if self._actual_context is None:
                if allow_none:
                    return None
                self._actual_context = self._default_context
            return self._actual_context._name
    
        def get_all_start_methods(self):
            if sys.platform == 'win32':
                return ['spawn']
            else:
                if reduction.HAVE_SEND_HANDLE:
                    return ['fork', 'spawn', 'forkserver']
                else:
                    return ['fork', 'spawn']
    
    DefaultContext.__all__ = [x for x in dir(DefaultContext) if x[0] != '_']
    
    #
    # Context types for fixed start method
    #
    
    if sys.platform != 'win32':
    
        class ForkProcess(process.BaseProcess):
            _start_method = 'fork'
            @staticmethod
            def _Popen(process_obj):
                from .popen_fork import Popen
                return Popen(process_obj)
    
        class SpawnProcess(process.BaseProcess):
            _start_method = 'spawn'
            @staticmethod
            def _Popen(process_obj):
                from .popen_spawn_posix import Popen
                return Popen(process_obj)
    
        class ForkServerProcess(process.BaseProcess):
            _start_method = 'forkserver'
            @staticmethod
            def _Popen(process_obj):
                from .popen_forkserver import Popen
                return Popen(process_obj)
    
        class ForkContext(BaseContext):
            _name = 'fork'
            Process = ForkProcess
    
        class SpawnContext(BaseContext):
            _name = 'spawn'
            Process = SpawnProcess
    
        class ForkServerContext(BaseContext):
            _name = 'forkserver'
            Process = ForkServerProcess
            def _check_available(self):
                if not reduction.HAVE_SEND_HANDLE:
                    raise ValueError('forkserver start method not available')
    
        _concrete_contexts = {
            'fork': ForkContext(),
            'spawn': SpawnContext(),
            'forkserver': ForkServerContext(),
        }
        _default_context = DefaultContext(_concrete_contexts['fork'])
    
    else:
    
        class SpawnProcess(process.BaseProcess):
            _start_method = 'spawn'
            @staticmethod
            def _Popen(process_obj):
                from .popen_spawn_win32 import Popen
                return Popen(process_obj)
    
        class SpawnContext(BaseContext):
            _name = 'spawn'
            Process = SpawnProcess
    
        _concrete_contexts = {
            'spawn': SpawnContext(),
        }
        _default_context = DefaultContext(_concrete_contexts['spawn'])
    
    #
    # Force the start method
    #
    
    def _force_start_method(method):
        _default_context._actual_context = _concrete_contexts[method]
    
    #
    # Check that the current thread is spawning a child process
    #
    
    _tls = threading.local()
    
    def get_spawning_popen():
        return getattr(_tls, 'spawning_popen', None)
    
    def set_spawning_popen(popen):
        _tls.spawning_popen = popen
    
    def assert_spawning(obj):
        if get_spawning_popen() is None:
            raise RuntimeError(
                '%s objects should only be shared between processes'
                ' through inheritance' % type(obj).__name__
                )
    

    从外部接口来看,Python的多进程库几乎是想复刻多线程的使用体验,如果没有特殊需求的话,一般只要按这种理念即可很好的使用多进程编程。

    相关文章

      网友评论

          本文标题:Python-multiprocessing

          本文链接:https://www.haomeiwen.com/subject/asrlpftx.html