美文网首页
深入理解Python的TLS机制和Threading.local

深入理解Python的TLS机制和Threading.local

作者: 云爬虫技术研究笔记 | 来源:发表于2019-03-23 00:23 被阅读0次

    1.背景介绍

    我之前写过一个关于PythonTLS机制的浅浅析,大家可以参考这个文章,首先,我们再来熟悉熟悉什么是TLS机制。

    1.1 Thread Local Storage(线程局部存储)

    这个概念最早是相对于全局变量来说的,就是我们在编程的时候,会涉及到希望所有线程都能够共享访问同一个变量,在 Python/Go/C 中,我们就可以定义一个全局变量,这样Global Variable 对多个线程就是可见的,因为同一个进程所有线程共享地址空间,大家都可以操作。例如,一个全局的配置变量或单实例对象,所有线程就可以很方便访问了,但是仅仅这样有一个前提,就是这个变量的并发操作必须是幂等的,读写不影响我们程序的正确性。但是往往多线程共同操作一个全局变量,就会影响程序的正确性,因此我们必须枷锁,比如经典的并发加操作。

    import threading
    count = 0
    lock = threading.RLock()
    def inc():
        global count, lock
        with lock:
            count += 1
    

    上面那个例子很多博客用来做ThreadLocal变量的讲解,实际上我觉得是有误导的,不恰当的。因为这种共享变量,你必须枷锁,因为他的目的就是为了大家一起去更新一个共享变量,多线程环境下必须枷锁。就算你使用ThreadLocal替换也没用,ThreadLocal能替换这个Count变量让所有线程单独存储一份么,不满足需求。你单独存一份,更改之后还得把结果再次写回到全局变量去更新,那写会的过程还是得枷锁。除非使用Golang中的单Channel更新机制,才能避免枷锁。

    所以ThreadLocal变量使用强调的侧重点不在这里,更多的是在编程范式上面。其实就是有些时候,我们某个变量类型很多函数或者类都需要用,但是我又不想写死在代码里,每次传递参数都要传递这个类或者变量,因为一旦这个类发生类型上的变化,可能对于静态类型的语言,很多地方就得修改参数,而且这种变量一直在程序代码的参数传递中层层出现,你如果写过代码就会有感觉,有时候你设计的函数API好像一层层的得把一个参数传递进去,即使某些层好像用不到这个参数。

    def getMysqlConn(passwd, db, host="localhost", port=3306, user="root", charset='utf8'):
        conn = MySQLdb.connect(host=host, port=port, user=user, passwd=passwd, db=db, charset=charset)
        return conn
    
    def func1(zzz, passwd, db, host="localhost", port=3306, user="root", charset='utf8'):
        conn = getMysqlConn(passwd, db, host, port, user, charset)
        ...
    
    def func2(xxx,yyy,zzz, passwd, db, host="localhost", port=3306, user="root", charset='utf8'):
        ...
        func1(zzz,passwd,db,host,port,user,charset)
    

    上面的代码你可能会疯掉。那么你可能就考虑想把这个参数提出来,当成全局变量算了,哪一层用到了直接用就好了,不能让他无缘无故的不停的被当成局部变量传参。文章Alternatives to global variables and passing the same value over a long chain of calls描述了这个问题,但是这个时候出现的问题就是,可能其他代码线程会不可控的更改这个变量,导致你的程序发生未知错误。你把这种参数变成全局的暴露出来,那么基于的假设就是该参数不会被随意修改!一旦这个假设崩塌,你的程序可能会发生灾难后果。这不符合软件设计的开闭原则。所以我们使用TLS技术化解这种矛盾。

    那么我们就设计了一种方案,就是有这样一种变量,他是全局的,但是每个线程在访问的时候都会存储一份成为自己的局部变量,修改就不会相互影响了。比如 Linux/UnixC 程序库 libc的全局变量errno, 这个其实就是TLS的例子。当系统调用从内核空间返回用户空间时,如果系统调用出错,那么便设置errno的值为一个负值,这样就不需要每次在函数内部定义局部变量。但是当多线程的概念和技术被提出后,这套机制就不再适用了,可以使用局部变量,但是不太可能去更改已有的代码了,比较好的解决方案是让每个线程都有自己的errno。实际上,现在的C库函数不是把出错代码写入全局量errno,而是通过一个函数__errno_location()获取一个地址,再把出错代码写入该地址,其意图就是让不同的线程使用不同的出错代码存储地点,而errno,现在一般已经变成了一个宏定义。每一个线程都会维护自己的一份,修改不影响其他线程。

    这是不是意味着ThreadLocal对象不用枷锁了? 其实这个ThreadLocal和同步没有关系,他仅仅是提供了一种方便每个线程快速访问变量的方式,但是如果这个对象本身有些共享状态需要大家一起维护(比如Count++),你就必须枷锁,尽管每个线程操作的是ThreadLocal副本。维基百科上有以下原话:

    A second use case would be multiple threads accumulating information into a global variable. To avoid a race condition, every access to this global variable would have to be protected by a mutex. Alternatively, each thread might accumulate into a thread-local variable (that, by definition, cannot be read from or written to from other threads, implying that there can be no race conditions). Threads then only have to synchronise a final accumulation from their own thread-local variable into a single, truly global variable.

    比如我们写了一个共享的Manager类,这个类可能是用来做数据库连接,网络连接或者其他的做底层管理功能。我们有很多线程需要使用这个Manager的某些功能,并且这种类不是用来表示一种状态,供所有线程并发修改其状态并将最终修改的结果表现在该类上面(上面count的例子)。Manager只是可以提供给线程使用某些功能,然后每个线程可以把这个Manager复制一份成为自己的局部变量,自己可以随意修改,但是不会影响到其他线程,因为是复制的一份。但是如果你需要让管理器记录所有的连接操作次数,那么多线程对立面的某些变量访问比如Count就需要枷锁了。

    2.TLS 在Python中的运用和实现

    2.1 简单使用

    ThreadLocal不仅仅可以解决全局变量访问冲突,其实还有其他好处,在PEP266中有提到,ThreadLocal变量是可以减少指令加速运算的,因为全局变量往往需要更多的指令(需要for loop)来做查询访问,而ThreadLocal 之后,有了索引表,直接可以一条指令找到这个对象。

    import threading
    
    userName = threading.local()
    
    def SessionThread(userName_in):
        userName.val = userName_in
        print(userName.val)   
    
    Session1 = threading.Thread(target=SessionThread("User1"))
    Session2 = threading.Thread(target=SessionThread("User2"))
    
    # start the session threads
    Session1.start()
    Session2.start()
    # wait till the session threads are complete
    Session1.join()
    Session2.join()
    

    上述Threadlocal的实现原理类似有一个全局的词典,词典的key是线程id,value就是共享的全局变量的副本。每次访问全局变量的时候,你访问到的其实是副本,只是Python使用黑魔法帮我们屏蔽了这个userName.val 的访问细节,其实他访问的是词典中的对应线程所拥有的对象副本。

    2.2 实现源码分析

    __all__ = ["local"]
    class _localbase(object):
        __slots__ = '_local__key', '_local__args', '_local__lock'
    
        def __new__(cls, *args, **kw):
            # 新建一个类对象
            self = object.__new__(cls)
            # 在主线程中初始化这个这个全局对象的某些属性,比如 `_local__key`, 这个key以后会用作其他线程使用全局变量副本的查询依据,以后每个线程都会根据这个key来查找自己的局部副本数据
            key = '_local__key', 'thread.local.' + str(id(self))
            object.__setattr__(self, '_local__key', key)
            object.__setattr__(self, '_local__args', (args, kw))
            # 多线程会并发设置全局变量的属性,这时候会并发访问设置属性,因此需要一把全局锁,进行互斥操作
            object.__setattr__(self, '_local__lock', RLock())
    
            if (args or kw) and (cls.__init__ is object.__init__):
                raise TypeError("Initialization arguments are not supported")
    
            # We need to create the thread dict in anticipation of
            # __init__ being called, to make sure we don't call it
            # again ourselves.
            dict = object.__getattribute__(self, '__dict__')
            current_thread().__dict__[key] = dict
    
            return self
    
    def _patch(self):
        # 拿到全局的key
        key = object.__getattribute__(self, '_local__key')
        # 在当前线程中根据key找到线程的私有数据副本,并替换掉 ThreadLocal自己的__dict__属性。如果没有,就创建一个,并添加
        d = current_thread().__dict__.get(key)
        if d is None:
            d = {}
            # 线程还没得私有数据副本,创建一个并加入线程自己的属性中
            current_thread().__dict__[key] = d
            # 替换ThreadLocal的__dict__为当前线程的私有数据词典d
            object.__setattr__(self, '__dict__', d)
    
            # we have a new instance dict, so call out __init__ if we have
            # one
            # 这段的意思其实是,如果原来的全局变量ThreadLocal 本身有一些其他的属性和数据,那么直接替换掉一个新dict之后,以前的数据就丢失了,这里我们必须初始化以前的数据到新dict中
            cls = type(self)
            if cls.__init__ is not object.__init__:
                args, kw = object.__getattribute__(self, '_local__args')
                cls.__init__(self, *args, **kw)
        else:
            object.__setattr__(self, '__dict__', d)
    
    class local(_localbase):
    
        def __getattribute__(self, name):
            lock = object.__getattribute__(self, '_local__lock')
            lock.acquire()
            try:
                _patch(self)
                return object.__getattribute__(self, name)
            finally:
                lock.release()
    
        def __setattr__(self, name, value):
            if name == '__dict__':
                raise AttributeError(
                    "%r object attribute '__dict__' is read-only"
                    % self.__class__.__name__)
            # 拿到早已经在主线程设置的共享的一把锁
            lock = object.__getattribute__(self, '_local__lock')
            lock.acquire()
            try:
                _patch(self)# 关键代码,这个patch会导致 Threadlocal 这个数据的__dict__直接被换成了所在线程自己的私有数据, Python 里面有很多这种patch的替换手段,就是直接把基础库的某些功能和函数直接替换成了第三方库的比如monkey patch
                # 再次设置属性的时候,设置的__dict__ 其实不是 Threadlocal 自己的属性了,是而是当前所在线程的__dict__的某一个key-value 副本数据的value,这个value是一个dict
                # object 的setattr默认行为其实就是在自己的__dict__对象中添加一对key-pair,但是现在他的__dict__已经更换成所在线程的一个数据副本词典了,黑魔法替换就在这里
                return object.__setattr__(self, name, value)
            finally:
                lock.release()
    
        def __delattr__(self, name):
            if name == '__dict__':
                raise AttributeError(
                    "%r object attribute '__dict__' is read-only"
                    % self.__class__.__name__)
            lock = object.__getattribute__(self, '_local__lock')
            lock.acquire()
            try:
                _patch(self)
                return object.__delattr__(self, name)
            finally:
                lock.release()
    
        def __del__(self):
            import threading
    
            key = object.__getattribute__(self, '_local__key')
    
            try:
                # We use the non-locking API since we might already hold the lock
                # (__del__ can be called at any point by the cyclic GC).
                threads = threading._enumerate()
            except:
                # If enumerating the current threads fails, as it seems to do
                # during shutdown, we'll skip cleanup under the assumption
                # that there is nothing to clean up.
                return
    
            for thread in threads:
                try:
                    __dict__ = thread.__dict__
                except AttributeError:
                    # Thread is dying, rest in peace.
                    continue
    
                if key in __dict__:
                    try:
                        del __dict__[key]
                    except KeyError:
                        pass # didn't have anything in this thread
    
    from threading import current_thread, RLock
    
    data = local()
    print (data.__dict__)
    def t(x):
        global data
        data.x = x
        data.y = 1
        print (current_thread().__dict__)
        print (data.__dict__)
    
    t1 = threading.Thread(target=t, args = (777,))
    t2 = threading.Thread(target=t, args = (888,))
    print current_thread().__dict__
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(data.__dict__)
    

    关键技术就在patch上面,Python 里面有很多这种patch的替换手段,就是直接把基础库的某些功能和函数直接替换成了第三方库的比如monkey patch. 再次设置属性的时候,设置的 __dict__ 其实不是ThreadLocal自己的,是而是当前所在线程的__dict__ 的某一个key-value 副本数据,key 就是线程访问的某个TLS变量生成的(一个线程可以有很多TLS变量,每个有不同的key),value是一个dict. objectsetattr默认行为其实就是在自己的__dict__对象中添加一对key-pair,但是现在他的__dict__已经更换成所在线程的一个数据副本词典dict了,黑魔法替换就在这里.

    下面的例子展示了Python黑魔法的一个替换词典的方式,可以运行看看

    class A:
        def substitute(self, d):
            object.__setattr__(self, '__dict__', d)
    a = A()
    a.y  = 3
    old_dict = a.__dict__
    print(old_dict)
    d = {'x':1}
    a.substitute(d)
    print(a.__dict__)
    a.y = 777
    print(a.__dict__)
    print(d)
    
    ### OUTPUT
    {'y': 3}
    {'x': 1}
    {'x': 1, 'y': 777}
    {'x': 1, 'y': 777}
    

    如果A本身已经含有一些数据,那就不能简单的直接复制了,还需要初始化以前的数据填充新的词典,这也是在源码中看到的。

    from threading import current_thread
    class A:
        def __new__(cls, *args, **kw):
            self = object.__new__(cls)
            setattr(cls, '_local__args', (args, kw))
            return self
    
        def __init__(self, *args, **kw):
            self.shared_x = kw["shared_x"]
            self.shared_y = kw["shared_y"]
        def substitute(self, d):
            object.__setattr__(self, '__dict__', d)
            cls = type(self)
            if cls.__init__ is not object.__init__:
                print("7---------------")
                args, kw = getattr(self, '_local__args')
                cls.__init__(self, *args, **kw)
    
    a = A(shared_x=111, shared_y=222)
    a.y  = 3
    old_dict = a.__dict__
    print(old_dict)
    d = {'x':1}
    a.substitute(d)
    print(a.__dict__)
    a.y = 777
    print(a.__dict__)
    print(d)
    print(old_dict)
    

    下图就是访问每个线程访问过程,实际上操作的是线程自己的私有数据副本。同时需要注意的还是那句话,使用 ThreadLocal对象不意味着你的程序不需要再枷锁,比如这个 ThreadLocal 对象可能又引用了其他共享状态的对象,那么就要对这个共享状态对象的操作进行枷锁实现同步和互斥。

    ThreadLocal 实现过程

    3 TLS 在Java 中的运用和实现

    3.1 简单使用

    public class ThreadLocalExample {
    
        public static class MyRunnable implements Runnable {
    
            private ThreadLocal threadLocal = new ThreadLocal();
    
            @Override
            public void run() {
                threadLocal.set((int) (Math.random() * 100D));
                try {
                Thread.sleep(2000);
                } catch (InterruptedException e) {
    
                }
                System.out.println(threadLocal.get());
            }
        }
    
        public static void main(String[] args) {
             MyRunnable sharedRunnableInstance = new MyRunnable();
             Thread thread1 = new Thread(sharedRunnableInstance);
             Thread thread2 = new Thread(sharedRunnableInstance);
             thread1.start();
             thread2.start();
        }
    }
    

    3.2 源码实现

    有了Python版本的分析,Java版本就不再多做解释,感兴趣的可以看看源码,实现原理肯定都是大同小异,只是语言上的差异,导致 Java 不可能像Python这种动态类型语言一样灵活。

    需要每个线程都维护一个 key-value 集合数据结构,记录每个线程访问到的 TLS 变量副本,这样每个线程可以根据 key 来找到相应的 TLS副本数据,对副本数据进行真实的操作,而不是TLS全局变量或者静态类(Java中)。在Python中直接很简单的使用了动态数据绑定的词典数据结构,在Java中稍显麻烦,需要实现一个类似Map的结构,ThreadLocal.get() 方法其实本质上也是和Python中一样,先获取当前线程自己的ThreadLocalMap对象(就是每个线程维护的TLS key-value集合啦)。再从ThreadLocalMap对象中找出当前的ThreadLocal变量副本,和HashMap一样的采用了链地址法的hash结构。可以参考文章Java 多线程(7): ThreadLocal 的应用及原理。Java 里一般是采用泛型规定你共享的变量类型,然后每个线程维护该变量的副本。

    4. 小结

    TLS技术的使用和属性:

    • 解决多线程编程中的对同一变量的访问冲突的一种技术,TLS会为每一个线程维护一个和该线程绑定的变量的副本。而不是无止尽的传递局部参数的方式编程。
    • 每一个线程都拥有自己的变量副本,并不意味着就一定不会对TLS变量中某些操作枷锁了。
    • Java平台java.lang.ThreadLocalPython 中的threading.local()都是TLS技术的一种实现,。
    • TLS使用的缺陷是,如果你的线程都不退出,那么副本数据可能一直不被GC回收,会消耗很多资源,比如线程池中,线程都不退出,使用TLS需要非常小心。

    TLS技术的实现原理:
    需要每个线程都维护一个 key-value集合数据结构,记录每个线程访问到的 TLS变量副本,这样每个线程可以根据 key来找到相应的 TLS副本数据,对副本数据进行真实的操作,而不是TLS全局变量或者静态类(Java中).

    TLS变量自己会根据当前调用他的Thread对象,根据Thread对象得到该线程维护的 TLS 副本集合,然后进一步根据当前TLS的key,查到到key对一个的TLS副本数据。这样就给每个线程造成一种假象,以为大家可以同时更新一个全局共享变量或者静态类对象。

    相关文章

      网友评论

          本文标题:深入理解Python的TLS机制和Threading.local

          本文链接:https://www.haomeiwen.com/subject/tdzdvqtx.html