美文网首页
大师兄的Python源码学习笔记(四十一): Python的多线

大师兄的Python源码学习笔记(四十一): Python的多线

作者: superkmi | 来源:发表于2021-11-12 09:30 被阅读0次

    大师兄的Python源码学习笔记(四十): Python的多线程机制(二)
    大师兄的Python源码学习笔记(四十二): Python的多线程机制(四)

    四、创建线程

    1. 创建子线程
    • 在建立多线程环境后,Python会开始创建底层平台的原生线程,也可以称为子进程。
    • 这还要从调用thread_PyThread_start_new_thread的主线程开始:
    Modules\_threadmodule.c
    
    static PyObject *
    thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
    {
        PyObject *func, *args, *keyw = NULL;
        struct bootstate *boot;
        unsigned long ident;
    
        if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3,
                               &func, &args, &keyw))
            return NULL;
       ... ...
        boot = PyMem_NEW(struct bootstate, 1);
        if (boot == NULL)
            return PyErr_NoMemory();
        boot->interp = PyThreadState_GET()->interp;
        boot->func = func;
        boot->args = args;
        boot->keyw = keyw;
        boot->tstate = _PyThreadState_Prealloc(boot->interp);
        if (boot->tstate == NULL) {
            PyMem_DEL(boot);
            return PyErr_NoMemory();
        }
        Py_INCREF(func);
        Py_INCREF(args);
        Py_XINCREF(keyw);
        PyEval_InitThreads(); /* Start the interpreter's thread-awareness */
        ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
        if (ident == PYTHREAD_INVALID_THREAD_ID) {
            PyErr_SetString(ThreadError, "can't start new thread");
            Py_DECREF(func);
            Py_DECREF(args);
            Py_XDECREF(keyw);
            PyThreadState_Clear(boot->tstate);
            PyMem_DEL(boot);
            return NULL;
        }
        return PyLong_FromUnsignedLong(ident);
    }
    
    • 主线程在创建多线程环境后,调用PyThread_start_new_thread创建子线程:
    Python\thread_nt.h
    
    unsigned long
    PyThread_start_new_thread(void (*func)(void *), void *arg)
    {
        HANDLE hThread;
        unsigned threadID;
        callobj *obj;
    
        dprintf(("%lu: PyThread_start_new_thread called\n",
                 PyThread_get_thread_ident()));
        if (!initialized)
            PyThread_init_thread();
    
        obj = (callobj*)HeapAlloc(GetProcessHeap(), 0, sizeof(*obj));
        if (!obj)
            return PYTHREAD_INVALID_THREAD_ID;
        obj->func = func;
        obj->arg = arg;
        PyThreadState *tstate = PyThreadState_GET();
        size_t stacksize = tstate ? tstate->interp->pythread_stacksize : 0;
        hThread = (HANDLE)_beginthreadex(0,
                          Py_SAFE_DOWNCAST(stacksize, Py_ssize_t, unsigned int),
                          bootstrap, obj,
                          0, &threadID);
        if (hThread == 0) {
            /* I've seen errno == EAGAIN here, which means "there are
             * too many threads".
             */
            int e = errno;
            dprintf(("%lu: PyThread_start_new_thread failed, errno %d\n",
                     PyThread_get_thread_ident(), e));
            threadID = (unsigned)-1;
            HeapFree(GetProcessHeap(), 0, obj);
        }
        else {
            dprintf(("%lu: PyThread_start_new_thread succeeded: %p\n",
                     PyThread_get_thread_ident(), (void*)hThread));
            CloseHandle(hThread);
        }
        return threadID;
    }
    
    
    • 观察PyThread_start_new_thread函数的参数:

    func:函数t_bootstrap。
    arg: boot,也就是保存了线程信息的bootstate结构。

    • PyThread_start_new_thread实际将funcarg打包到一个类型为callobj的结构体中:
    Python\thread_nt.h
    
    /*
     * Thread support.
     */
    
    typedef struct {
        void (*func)(void*);
        void *arg;
    } callobj;
    
    • 有一点值得注意,_beginthreadex是Win32下用于创建线程的API。
    Include\10.0.18362.0\ucrt\process.h
    
    _Success_(return != 0)
    _ACRTIMP uintptr_t __cdecl _beginthreadex(
        _In_opt_  void*                    _Security,
        _In_      unsigned                 _StackSize,
        _In_      _beginthreadex_proc_type _StartAddress,
        _In_opt_  void*                    _ArgList,
        _In_      unsigned                 _InitFlag,
        _Out_opt_ unsigned*                _ThrdAddr
        );
    
    • 这是一个关键的转折,因为在此之前,我们一直在主线程的执行路径上;而现在我们通过_beginthreadex创建了一个子线程,并将之前打包的callobj结构体obj作为参数传递给了子线程。
    • 梳理Python当前的状态:
    • Python当前实际上由两个Win32下的原生线程组成,一个是执行Python程序时操作系统创建的主线程;另一个是通过_beginthreadex创建的子线程。
    • 主线程在在执行PyEval_InitThreads的过程中,获得了GIL,并将自己挂起等待子线程。
    • 子线程的线程过程是bootstrap,为了访问Python解释器,必须首先获得GIL
    Python\thread_nt.h
    
    /* thunker to call adapt between the function type used by the system's
    thread start function and the internally used one. */
    static unsigned __stdcall
    bootstrap(void *call)
    {
        callobj *obj = (callobj*)call;
        void (*func)(void*) = obj->func;
        void *arg = obj->arg;
        HeapFree(GetProcessHeap(), 0, obj);
        func(arg);
        return 0;
    }
    
    • bootstrap中,子线程完成了三个动作:

    1. 获得线程id;
    2. 唤醒主线程;
    3. 调用t_bootstrap

    • 主线程之所以需要等待子线程,是因为主线程调用的PyThread_start_new_thread需要返回所创建子线程的线程id,一旦在子线程中获得了线程id,就会设法唤醒主线程。
    • 到这里,主线程和子线程开始分道扬镳,主线程在返回子线程id并获得GIL后,会继续执行后续字节码指令;而子线程则将进入t_bootstrap,最终进入等待GIL的状态。
    Modules\_threadmodule.c
    
    static void
    t_bootstrap(void *boot_raw)
    {
        struct bootstate *boot = (struct bootstate *) boot_raw;
        PyThreadState *tstate;
        PyObject *res;
    
        tstate = boot->tstate;
        tstate->thread_id = PyThread_get_thread_ident();
        _PyThreadState_Init(tstate);
        PyEval_AcquireThread(tstate);
        tstate->interp->num_threads++;
        res = PyObject_Call(boot->func, boot->args, boot->keyw);
        if (res == NULL) {
            if (PyErr_ExceptionMatches(PyExc_SystemExit))
                PyErr_Clear();
            else {
                PyObject *file;
                PyObject *exc, *value, *tb;
                PySys_WriteStderr(
                    "Unhandled exception in thread started by ");
                PyErr_Fetch(&exc, &value, &tb);
                file = _PySys_GetObjectId(&PyId_stderr);
                if (file != NULL && file != Py_None)
                    PyFile_WriteObject(boot->func, file, 0);
                else
                    PyObject_Print(boot->func, stderr, 0);
                PySys_WriteStderr("\n");
                PyErr_Restore(exc, value, tb);
                PyErr_PrintEx(0);
            }
        }
        else
            Py_DECREF(res);
        Py_DECREF(boot->func);
        Py_DECREF(boot->args);
        Py_XDECREF(boot->keyw);
        PyMem_DEL(boot_raw);
        tstate->interp->num_threads--;
        PyThreadState_Clear(tstate);
        PyThreadState_DeleteCurrent();
        PyThread_exit_thread();
    }
    
    • 子线程从这里开始了与主线程对GIL的竞争:
    • 首先子线程通过PyEval_AcquireThread申请GIL
    Python\ceval.c
    
    void
    PyEval_AcquireThread(PyThreadState *tstate)
    {
       if (tstate == NULL)
           Py_FatalError("PyEval_AcquireThread: NULL new thread state");
       /* Check someone has called PyEval_InitThreads() to create the lock */
       assert(gil_created());
       take_gil(tstate);
       if (PyThreadState_Swap(tstate) != NULL)
           Py_FatalError(
               "PyEval_AcquireThread: non-NULL old thread state");
    }
    
    • 接下来子线程通过PyObject_Call调用字节码执行引擎:
    Objects\call.c
    
    PyObject *
    PyObject_Call(PyObject *callable, PyObject *args, PyObject *kwargs)
    {
       ternaryfunc call;
       PyObject *result;
    
       /* PyObject_Call() must not be called with an exception set,
          because it can clear it (directly or indirectly) and so the
          caller loses its exception */
       assert(!PyErr_Occurred());
       assert(PyTuple_Check(args));
       assert(kwargs == NULL || PyDict_Check(kwargs));
    
       if (PyFunction_Check(callable)) {
           return _PyFunction_FastCallDict(callable,
                                           &PyTuple_GET_ITEM(args, 0),
                                           PyTuple_GET_SIZE(args),
                                           kwargs);
       }
       else if (PyCFunction_Check(callable)) {
           return PyCFunction_Call(callable, args, kwargs);
       }
       else {
           call = callable->ob_type->tp_call;
           if (call == NULL) {
               PyErr_Format(PyExc_TypeError, "'%.200s' object is not callable",
                            callable->ob_type->tp_name);
               return NULL;
           }
    
           if (Py_EnterRecursiveCall(" while calling a Python object"))
               return NULL;
    
           result = (*call)(callable, args, kwargs);
    
           Py_LeaveRecursiveCall();
    
           return _Py_CheckFunctionResult(callable, result, NULL);
       }
    }
    
    • 传递进PyObject_Callboot->func是一个PyFunctionObject对象,对应线程执行的方法。
    • PyObject_Call结束后,子线程将释放GIL,并完成销毁线程的所有扫尾工作。
    • t_bootstrap代码上看,子线程应该全部执行完成,才会通过PyThreadState_DeleteCurrent释放GIL
    • 但实际情况正如前面章节提到的,Python会定时激活线程的调度机制,在子线程和主线程之间不断切换,从而真正实现多线程机制。

    相关文章

      网友评论

          本文标题:大师兄的Python源码学习笔记(四十一): Python的多线

          本文链接:https://www.haomeiwen.com/subject/egsrzltx.html