美文网首页从多线程到分布式
从多线程到分布式(六)—— 用ThreadLocal避免竞争

从多线程到分布式(六)—— 用ThreadLocal避免竞争

作者: 吟游雪人 | 来源:发表于2023-08-13 11:15 被阅读0次

    ThreadLocal顾名思义,就是每个线程保存一份只有自己能访问的对象,避免了不同线程对同一个线程的竞争。所以核心在于如何隐藏一个对象的可见性,来保证只有指定的线程能访问到。反而言之,如果把ThreadLocal控制的对象发布出去,那么此对象也不再是线程安全的。如果一个线程一个对象,不同线程间不发布对象,那么这些对象也是线程安全的,没必要用ThreadLocal。所以ThreadLocal的应用场景一定就是需要控制在一定数量下的复用场景,注意复用一定要是场景相同,比如获取SQL连接的等价场景。或者一定要防止意外共享,显示的用ThreadLocal来保护。
    设计要点就是
    1.不共享。
    2.合理的资源回收时机。

    应用场景:

    1.池化技术,先从自己线程中找对象,然后再去公共的找,避免竞争。
    2.不同线程间传递技术,tomcat
    3.局部变量跨方法传递

    从Java语言实现的ThreadLocal来说,要理解hreadLocal本质上是一个key,不同线程可以用相同的key给自己保存不同的对象。
    所以保存不同的对象,需要创建新的ThreadLocal

    public class main {
        private static ThreadLocal<String> sThreadLocal = new ThreadLocal<>();
        public static void main(String args[]) {
            sThreadLocal.set("这是在主线程中");
            System.out.println("线程名字:" + Thread.currentThread().getName() + "---" + sThreadLocal.get());
            //线程a
            new Thread(new Runnable() {
                @Override
                public void run() {
                    sThreadLocal.set("这是在线程a中");
                    System.out.println("线程名字:" + Thread.currentThread().getName() + "---" + sThreadLocal.get());
                }
            }, "线程a").start();
        }
    }
    

    1.线程执行完后,意味着这些数据都不在需要。可以通过在线程实例上设置一个属性来做到,因为实例属性生命周期是一样的。

    2.可以保存多个变量,所以需要一个map结构。每个Thread,也就是每个线程实例内部维护有一个ThreadLocalMap。然后key怎么选择呢?可以选择字符串作为key,但是我们为什么选择ThreadLocal作为key呢?因为用对象作为key,可以给它加上get方法。

    那同一个ThreadLocal怎么和不同的Thread关联呢?用的是Thread.currentThread()。这个方法看似是一个静态方法,但是返回的Thread对象是每个线程都不一样的,所以可以通过这个核心方法来关联。 不同线程的多个对象,用同一个ThreadLocal来作为对外的公开key表示。

    class Thread {
    
      ThreadLocal.ThreadLocalMap threadLocals;  //作为线程类的实例变量,所以每个线程实例有一个自己的map
    
    }
    

    业务对象生命周期的设计

    ThreadLocalMap是为了保存业务对象而存在的。为了使得ThreadLocalMap的生命周期是和thread一样长,所以设计ThreadLocalMap是Thread的实例变量。如果thread声明周期过长,还要考虑当夜的业务生命周期是否完结,这样在可以比线程生命周期完结前就可以提前终止。
    业务生命周期可以通过控制ThreadLocal对象的生命周期来表示。比如把ThreadLocal设置为局部变量或者是静态变量。同时,因为ThreadLocal对象被回收,导致我们无法访问key去处理释放内存,所以一定要有一个机制去跟踪ThreadLocal对象被回收这个事件,也就是通过弱引用。弱引用可以视作一种事件通知机制。

    下面来看这个ThreadLocalMap的set方法是怎么实现的。可以看到数据是保存在Entry数组里。


    image.png image.png

    Key的弱引用来实现自动清理

    回归本质,ThreadLocalMap是用来存放对象的,在一次线程的执行栈中,存放数据后方便我们在任意的地方取得我们想要的值而不被其他线程干扰。ThreadLocalMap本身并没有为外界提供取出和存放数据的API,我们所能获得数据的方式只有通过ThreadLocal类提供的API来间接的从ThreadLocalMap取出数据,所以,当我们用不了key(ThreadLocal对象)的API也就无法从ThreadLocalMap里取出指定的数据。
    所以最好的方法是在A对象被回收后,系统自动回收对应的Entry对象,但是让Entry对象或其中的value对象做为弱引用都是非常不合理的。所以,让key(ThreadLocal对象)为弱引用,自动被垃圾回收,key就变为null了,下次,我们就可以通过Entry不为null,而key为null来判断该Entry对象该被清理掉了。(所以key 为null只是标记这个value可以安全回收了)

    ThreadLocal中一个设计亮点是ThreadLocalMap中的Entry结构的Key用到了弱引用。试想如果使用强引用,等于ThreadLocalMap中的所有数据都是与Thread的生命周期绑定,这样很容易出现因为大量线程持续活跃导致的内存泄漏。使用了弱引用的话,JVM触发GC回收弱引用后,ThreadLocal在下一次调用get()、set()、remove()方法就可以删除那些ThreadLocalMap中Key为null的值,起到了惰性删除释放内存的作用。

    至于ThreadLocalMap为什么不给外界提供API来操作数据,我觉得是因为这个Map对于一个线程只有一份,任何地方都在用,为了提供更方便的API和为了我们不破换其他框架保存到里面的数据,所以才用ThreadLocal同时作为key和API来操作数据

    ThreadLocalMap采用线性探测的方式解决Hash冲突的效率很低,如有大量不同的ThreadLocal对象放入map中时发送冲突。所以建议每个线程只存一个变量(一个ThreadLocal)就不存在Hash冲突的问题,如果一个线程要保存set多个变量,就需要创建多个ThreadLocal,多个ThreadLocal放入Map中时会极大的增加Hash冲突的可能。

    一些常见疑惑解答:
    存储在jvm的哪个区域
    问:线程私有,那么就是说ThreadLocal的实例和他的值是放到栈上咯?
    答:不是。还是在堆的。ThreadLocal对象也是对象,对象就在堆。只是JVM通过一些技巧将其可见性变成了线程可见。

    为什么用Entry数组而不是Entry对象
    ThreadLocalMap内部的table为什么是数组而不是单个对象呢?

    答:因为你业务代码能new好多个ThreadLocal对象,各司其职。但是在一次请求里,也就是一个线程里,ThreadLocalMap是同一个,而不是多个,不管你new几次ThreadLocal,ThreadLocalMap在一个线程里就一个,因为ThreadLocalMap的引用是在Thread里的,所以它里面的Entry数组存放的是一个线程里你new出来的多个ThreadLocal对象。

    ThreadLocal里的对象一定是线程安全的吗
    未必,如果在每个线程中ThreadLocal.set()进去的东西本来就是多线程共享的同一个对象,比如static对象,那么多个线程的ThreadLocal.get()获取的还是这个共享对象本身,还是有并发访问线程不安全问题。

    ThreadLocal的使用示例
    如果让我们自己来实现一个数据库连接池,最简单的办法就是用两个阻塞队列来实现,一个用于保存空闲数据库连接的队列 idle,另一个用于保存忙碌数据库连接的队列 busy;获取连接时将空闲的数据库连接从 idle 队列移动到 busy 队列,而关闭连接时将数据库连接从 busy 移动到 idle。

    // 忙碌队列
    BlockingQueue<Connection> busy;
    // 空闲队列
    BlockingQueue<Connection> idle;
    

    这种方案将并发问题委托给了阻塞队列,实现简单,但是性能并不是很理想。因为 Java SDK 中的阻塞队列是用锁实现的,而高并发场景下锁的争用对性能影响很大。而 ConcurrentBag 通过 ThreadLocal 做一次预分配,避免直接竞争共享资源,非常适合池化资源的分配。需要开一个单独的线程池。

    SynchronousQueue 主要用于线程之间传递数据。

    // 用于存储所有的数据库连接
    CopyOnWriteArrayList<T> sharedList;
    // 线程本地存储中的数据库连接
    ThreadLocal<List<Object>> threadList;
    // 等待数据库连接的线程数
    AtomicInteger waiters;
    // 分配数据库连接的工具
    SynchronousQueue<T> handoffQueue;
    

    当线程池创建了一个数据库连接时,通过调用 ConcurrentBag 的 add() 方法加入到 ConcurrentBag 中,下面是 add() 方法的具体实现,逻辑很简单,就是将这个连接加入到共享队列 sharedList 中,如果此时有线程在等待数据库连接,那么就通过 handoffQueue 将这个连接分配给等待的线程。

    // 将空闲连接添加到队列
    void add(final T bagEntry){
      // 加入共享队列
      sharedList.add(bagEntry);
      // 如果有等待连接的线程,
      // 则通过 handoffQueue 直接分配给等待的线程
      while (waiters.get() > 0
        && bagEntry.getState() == STATE_NOT_IN_USE
        && !handoffQueue.offer(bagEntry)) {
          yield();
      }
    }
    

    通过 ConcurrentBag 提供的 borrow() 方法,可以获取一个空闲的数据库连接,borrow() 的主要逻辑是:

    1. 首先查看线程本地存储是否有空闲连接,如果有,则返回一个空闲的连接;
    2. 如果线程本地存储中无空闲连接,则从共享队列中获取。
    3. 如果共享队列中也没有空闲的连接,则请求线程需要等待。
      需要注意的是,线程本地存储中的连接是可以被其他线程窃取的,所以需要用 CAS 方法防止重复分配。在共享队列中获取空闲连接,也采用了 CAS 方法防止重复分配。sharedlist和其他线程的threadlocal里有可能都有同一个连接,从前者取到连接,就相当于窃取了其他线程的threadLocal里的链接
    T borrow(long timeout, final TimeUnit timeUnit){
      // 先查看线程本地存储是否有空闲连接
      final List<Object> list = threadList.get();
      for (int i = list.size() - 1; i >= 0; i--) {
        final Object entry = list.remove(i);
        final T bagEntry = weakThreadLocals
          ? ((WeakReference<T>) entry).get()
          : (T) entry;
        // 线程本地存储中的连接也可以被窃取,
        // 所以需要用 CAS 方法防止重复分配
        if (bagEntry != null
          && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
          return bagEntry;
        }
      }
      // 线程本地存储中无空闲连接,则从共享队列中获取
      final int waiting = waiters.incrementAndGet();
      try {
        for (T bagEntry : sharedList) {
          // 如果共享队列中有空闲连接,则返回
          if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
          }
        }
        // 共享队列中没有连接,则需要等待
        timeout = timeUnit.toNanos(timeout);
        do {
          final long start = currentTime();
          final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
          if (bagEntry == null
            || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
              return bagEntry;
          }
          // 重新计算等待时间
          timeout -= elapsedNanos(start);
        } while (timeout > 10_000);
        // 超时没有获取到连接,返回 null
        return null;
      } finally {
        waiters.decrementAndGet();
      }
    }
    

    相关文章

      网友评论

        本文标题:从多线程到分布式(六)—— 用ThreadLocal避免竞争

        本文链接:https://www.haomeiwen.com/subject/mhlzpdtx.html