美文网首页
大师兄的Python源码学习笔记(四十): Python的多线程

大师兄的Python源码学习笔记(四十): Python的多线程

作者: superkmi | 来源:发表于2021-11-05 08:53 被阅读0次

    大师兄的Python源码学习笔记(三十九): Python的多线程机制(一)
    大师兄的Python源码学习笔记(四十一): Python的多线程机制(三)

    二、关于_thread包和threading

    • Python提供的最基础的多线程机制接口是_thread,用C实现。
    • threading包提供更高层的多线程机制接口,用Python实现。
    • 我们先从更底层的_thread接口开始:
    Modules\_threadmodule.c
    
    static PyMethodDef thread_methods[] = {
        {"start_new_thread",        (PyCFunction)thread_PyThread_start_new_thread,
         METH_VARARGS, start_new_doc},
        {"start_new",               (PyCFunction)thread_PyThread_start_new_thread,
         METH_VARARGS, start_new_doc},
        {"allocate_lock",           (PyCFunction)thread_PyThread_allocate_lock,
         METH_NOARGS, allocate_doc},
        {"allocate",                (PyCFunction)thread_PyThread_allocate_lock,
         METH_NOARGS, allocate_doc},
        {"exit_thread",             (PyCFunction)thread_PyThread_exit_thread,
         METH_NOARGS, exit_doc},
        {"exit",                    (PyCFunction)thread_PyThread_exit_thread,
         METH_NOARGS, exit_doc},
        {"interrupt_main",          (PyCFunction)thread_PyThread_interrupt_main,
         METH_NOARGS, interrupt_doc},
        {"get_ident",               (PyCFunction)thread_get_ident,
         METH_NOARGS, get_ident_doc},
        {"_count",                  (PyCFunction)thread__count,
         METH_NOARGS, _count_doc},
        {"stack_size",              (PyCFunction)thread_stack_size,
         METH_VARARGS, stack_size_doc},
        {"_set_sentinel",           (PyCFunction)thread__set_sentinel,
         METH_NOARGS, _set_sentinel_doc},
        {NULL,                      NULL}           /* sentinel */
    };
    
    • 可以发现,threadmodule中有的接口以不同形式出现了两次,比如allocate_lockallocate,但实际在他们都对应的是thread_PyThread_allocate_lock函数。

    三、Python线程的创建

    • 观察thread_methods,可以发现其中创建线程的接口对应的是thread_PyThread_start_new_thread函数:
    Modules\_threadmodule.c
    
    static PyObject *
    thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
    {
        PyObject *func, *args, *keyw = NULL;
        struct bootstate *boot;
        unsigned long ident;
    
        ... ...
        boot = PyMem_NEW(struct bootstate, 1);
        if (boot == NULL)
            return PyErr_NoMemory();
        boot->interp = PyThreadState_GET()->interp;
        boot->func = func;
        boot->args = args;
        boot->keyw = keyw;
        boot->tstate = _PyThreadState_Prealloc(boot->interp);
        if (boot->tstate == NULL) {
            PyMem_DEL(boot);
            return PyErr_NoMemory();
        }
        Py_INCREF(func);
        Py_INCREF(args);
        Py_XINCREF(keyw);
        PyEval_InitThreads(); /* Start the interpreter's thread-awareness */
        ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
        ...
        return PyLong_FromUnsignedLong(ident);
    }
    
    • thread_PyThread_start_new_thread中,虚拟机通过三个主要动作来完成线程的创建:

    1. 创建并初始化boot,其中保存着线程的所有信息:

       boot->interp = PyThreadState_GET()->interp;
       boot->func = func;
       boot->args = args;
       boot->keyw = keyw;
       boot->tstate = _PyThreadState_Prealloc(boot->interp);
    

    2. 初始化Python的多线程环境:

       PyEval_InitThreads(); /* Start the interpreter's thread-awareness */
    

    3.boot为参数,创建操作系统的原生线程

       ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
       ...
       return PyLong_FromUnsignedLong(ident);
    
    • 可以看到boot->interp中保存了Python的PyInterpreterState对象,这个对象中携带了module pool这样的全局信息,所有的线程会共享这些全局信息。
    1. 建立多线程环境
    • 建立多线程环境的核心是建立GILGIL是一个_gil_runtime_state结构体:
    struct _gil_runtime_state {
        /* microseconds (the Python API uses seconds, though) */
        unsigned long interval;
        /* Last PyThreadState holding / having held the GIL. This helps us
           know whether anyone else was scheduled after we dropped the GIL. */
        _Py_atomic_address last_holder;
        /* Whether the GIL is already taken (-1 if uninitialized). This is
           atomic because it can be read without any lock taken in ceval.c. */
        _Py_atomic_int locked;
        /* Number of GIL switches since the beginning. */
        unsigned long switch_number;
        /* This condition variable allows one or several threads to wait
           until the GIL is released. In addition, the mutex also protects
           the above variables. */
        PyCOND_T cond;
        PyMUTEX_T mutex;
    #ifdef FORCE_SWITCHING
        /* This condition variable helps the GIL-releasing thread wait for
           a GIL-awaiting thread to be scheduled and take the GIL. */
        PyCOND_T switch_cond;
        PyMUTEX_T switch_mutex;
    #endif
    };
    
    含义
    interval 一个线程拥有gil的间隔,默认是5000毫秒
    last_holder 最后一个持有GIL的PyThreadState(线程),
    locked GIL是否被获取,如果未被获取这个值为-1,这个是原子性的,因为在ceval.c中不需要任何锁就能够读取它
    switch_number 从GIL创建之后,总共切换的次数
    cond 允许一个或多个线程等待,直到GIL被释放
    mutex 负责保护上面的变量
    • Python在初始化解释器时,会创建一把未初始化的GIL
    ceval.c
    
    void
    _PyEval_Initialize(struct _ceval_runtime_state *state)
    {
        state->recursion_limit = Py_DEFAULT_RECURSION_LIMIT;
        _Py_CheckRecursionLimit = Py_DEFAULT_RECURSION_LIMIT;
        _gil_initialize(&state->gil);
    }
    
    Python\ceval_gil.h
    
    #define DEFAULT_INTERVAL 5000
    
    static void _gil_initialize(struct _gil_runtime_state *state)
    {
        _Py_atomic_int uninitialized = {-1};
        state->locked = uninitialized;
        state->interval = DEFAULT_INTERVAL;
    }
    
    • 在激活多线程机制前,除了上面提到的未初始化的GIL,Python实际是不支持多线程的,这是因为Python选择了让用户激活多线程机制的策略,一旦用户调用thread.start_new_thread,才会开始实际初始化多线程环境。
    • 而我们知道,start_new_thread会对应调用thread_PyThread_start_new_thread函数,并在其中调用PyEval_InitThreads初始化多线程环境:
    ceval.c
    
    void
    PyEval_InitThreads(void)
    {
        if (gil_created())
            return;
        create_gil();
        take_gil(PyThreadState_GET());
        _PyRuntime.ceval.pending.main_thread = PyThread_get_thread_ident();
        if (!_PyRuntime.ceval.pending.lock)
            _PyRuntime.ceval.pending.lock = PyThread_allocate_lock();
    }
    
    • PyEval_InitThreads会通过gil_created以原子读操作检查GIL在线程中是否已经初始化:
    ceval.c
    
    static int gil_created(void)
    {
        return (_Py_atomic_load_explicit(&_PyRuntime.ceval.gil.locked,
                                         _Py_memory_order_acquire)
                ) >= 0;
    }
    
    • 如果没有初始化,则通过create_gil初始化GIL
    ceval.c
    
    static void create_gil(void)
    {
        MUTEX_INIT(_PyRuntime.ceval.gil.mutex);
    #ifdef FORCE_SWITCHING
        MUTEX_INIT(_PyRuntime.ceval.gil.switch_mutex);
    #endif
        COND_INIT(_PyRuntime.ceval.gil.cond);
    #ifdef FORCE_SWITCHING
        COND_INIT(_PyRuntime.ceval.gil.switch_cond);
    #endif
        _Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.last_holder, 0);
        _Py_ANNOTATE_RWLOCK_CREATE(&_PyRuntime.ceval.gil.locked);
        _Py_atomic_store_explicit(&_PyRuntime.ceval.gil.locked, 0,
                                  _Py_memory_order_release);
    }
    
    • 在初始化GIL后,会通过take_gil函数获取GIL:
    ceval.c
    
    static void take_gil(PyThreadState *tstate)
    {
        int err;
        if (tstate == NULL)
            Py_FatalError("take_gil: NULL tstate");
    
        err = errno;
        MUTEX_LOCK(_PyRuntime.ceval.gil.mutex);
    
        if (!_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked))
            goto _ready;
    
        while (_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked)) {
            int timed_out = 0;
            unsigned long saved_switchnum;
    
            saved_switchnum = _PyRuntime.ceval.gil.switch_number;
            COND_TIMED_WAIT(_PyRuntime.ceval.gil.cond, _PyRuntime.ceval.gil.mutex,
                            INTERVAL, timed_out);
            /* If we timed out and no switch occurred in the meantime, it is time
               to ask the GIL-holding thread to drop it. */
            if (timed_out &&
                _Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked) &&
                _PyRuntime.ceval.gil.switch_number == saved_switchnum) {
                SET_GIL_DROP_REQUEST();
            }
        }
    _ready:
    #ifdef FORCE_SWITCHING
        /* This mutex must be taken before modifying
           _PyRuntime.ceval.gil.last_holder (see drop_gil()). */
        MUTEX_LOCK(_PyRuntime.ceval.gil.switch_mutex);
    #endif
        /* We now hold the GIL */
        _Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.locked, 1);
        _Py_ANNOTATE_RWLOCK_ACQUIRED(&_PyRuntime.ceval.gil.locked, /*is_write=*/1);
    
        if (tstate != (PyThreadState*)_Py_atomic_load_relaxed(
                        &_PyRuntime.ceval.gil.last_holder))
        {
            _Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.last_holder,
                                     (uintptr_t)tstate);
            ++_PyRuntime.ceval.gil.switch_number;
        }
    
    #ifdef FORCE_SWITCHING
        COND_SIGNAL(_PyRuntime.ceval.gil.switch_cond);
        MUTEX_UNLOCK(_PyRuntime.ceval.gil.switch_mutex);
    #endif
        if (_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil_drop_request)) {
            RESET_GIL_DROP_REQUEST();
        }
        if (tstate->async_exc != NULL) {
            _PyEval_SignalAsyncExc();
        }
    
        MUTEX_UNLOCK(_PyRuntime.ceval.gil.mutex);
        errno = err;
    }
    
    • take_gil时,首先获取互斥锁(mutex),并检查是否有线程持有GIL
    • 如果GIL被占用,则设置信号量(cond)等待,并将线程挂起,并释放互斥锁(mutex)
    • 信号量(cond)被唤醒时,互斥锁(mutex)会被自动加锁。
    • 大多数情况下通过条件满足唤醒,while循环用于避免意外唤醒时条件不足。
    • 如果等待超时并且期间没有发生线程切换,则通过SET_GIL_DROP_REQUEST请求last_holder释放GIL
    • 如果GIL没有被占用,则将locked设置为1占用GIL,并将当前线程状态对象保存到last_holder,最后将切换次数(switch_number)加1。
    • 获取GIL后,会通过PyThread_get_thread_ident函数获取线程id,PyThread_get_thread_ident会根据编译器和平台来确认初始化动作:
    Include\pythread.h
    
    unsigned long
    PyThread_get_thread_ident(void)
    {
        if (!initialized)
            PyThread_init_thread();
    
        return GetCurrentThreadId();
    }
    
    Python\thread_pthread.h
    
    unsigned long
    PyThread_get_thread_ident(void)
    {
        volatile pthread_t threadid;
        if (!initialized)
            PyThread_init_thread();
        threadid = pthread_self();
        return (unsigned long) threadid;
    }
    
    • 在获取主线程id之前,会先检查 initialized,也就是底层平台所提供的原生线程,如果原生线程没有建立,则通过PyThread_init_thread创建。
    Python\thread.c
    
    void
    PyThread_init_thread(void)
    {
    #ifdef Py_DEBUG
       const char *p = Py_GETENV("PYTHONTHREADDEBUG");
    
       if (p) {
           if (*p)
               thread_debug = atoi(p);
           else
               thread_debug = 1;
       }
    #endif /* Py_DEBUG */
       if (initialized)
           return;
       initialized = 1;
       dprintf(("PyThread_init_thread called\n"));
       PyThread__init_thread();
    }
    
    • 最后,创建GIL对应的互斥锁的工作由PyThread_allocate_lock完成,PyThread_allocate_lock会根据平台来确认动作:
    Python\thread_nt.h
    
    PyThread_type_lock
    PyThread_allocate_lock(void)
    {
        PNRMUTEX aLock;
    
        dprintf(("PyThread_allocate_lock called\n"));
        if (!initialized)
            PyThread_init_thread();
    
        aLock = AllocNonRecursiveMutex() ;
    
        dprintf(("%lu: PyThread_allocate_lock() -> %p\n", PyThread_get_thread_ident(), aLock));
    
        return (PyThread_type_lock) aLock;
    }
    
    Python\thread_pthread.h
    
    PyThread_type_lock
    PyThread_allocate_lock(void)
    {
        pthread_lock *lock;
        int status, error = 0;
    
        dprintf(("PyThread_allocate_lock called\n"));
        if (!initialized)
            PyThread_init_thread();
    
        lock = (pthread_lock *) PyMem_RawMalloc(sizeof(pthread_lock));
        if (lock) {
            memset((void *)lock, '\0', sizeof(pthread_lock));
            lock->locked = 0;
    
            status = pthread_mutex_init(&lock->mut,
                                        pthread_mutexattr_default);
            CHECK_STATUS_PTHREAD("pthread_mutex_init");
            /* Mark the pthread mutex underlying a Python mutex as
               pure happens-before.  We can't simply mark the
               Python-level mutex as a mutex because it can be
               acquired and released in different threads, which
               will cause errors. */
            _Py_ANNOTATE_PURE_HAPPENS_BEFORE_MUTEX(&lock->mut);
    
            status = pthread_cond_init(&lock->lock_released,
                                       pthread_condattr_default);
            CHECK_STATUS_PTHREAD("pthread_cond_init");
    
            if (error) {
                PyMem_RawFree((void *)lock);
                lock = 0;
            }
        }
    
        dprintf(("PyThread_allocate_lock() -> %p\n", lock));
        return (PyThread_type_lock) lock;
    }
    
    • 创建的互斥锁结构体也根据平台并不相同:
    Python\thread_nt.h
    
    typedef struct _NRMUTEX
    {
       PyMUTEX_T cs;
       PyCOND_T cv;
       int locked;
    } NRMUTEX;
    typedef NRMUTEX *PNRMUTEX;
    
    Python\thread_pthread.h
    
    typedef struct {
       char             locked; /* 0=unlocked, 1=locked */
       /* a <cond, mutex> pair to handle an acquire of a locked lock */
       pthread_cond_t   lock_released;
       pthread_mutex_t  mut;
    } pthread_lock;
    

    相关文章

      网友评论

          本文标题:大师兄的Python源码学习笔记(四十): Python的多线程

          本文链接:https://www.haomeiwen.com/subject/gshzaltx.html