美文网首页
线程安全性

线程安全性

作者: soda精分菌 | 来源:发表于2018-12-09 23:40 被阅读0次

    定义:当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。

    原子性:提供了互斥访问,同一时刻只能有一个线程来对它进行操作

    可见性:一个线程对主内存的修改可以及时的被其他线程观察到

    有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序

    Atomic包 jdk里

    Atomic:cas(compareAndSwap)

    通过查询

    Unsafe.compareAndSwapInt

    可以了解到底层的思想

    AtomicLong原理

    就像我们所知道的那样,AtomicLong的原理是依靠底层的cas来保障原子性的更新数据,在要添加或者减少的时候,会使用死循环不断地cas到特定的值,从而达到更新数据的目的。那么LongAdder又是使用到了什么原理?难道有比cas更加快速的方式?

    LongAdder原理

    首先我们来看一下LongAdder有哪些方法?

    1.png

    可以看到和AtomicLong基本类似,同样有增加、减少等操作,那么如何实现原子的增加呢?

    2.png

    我们可以看到一个Cell的类,那这个类是用来干什么的呢?

    3.png

    我们可以看到Cell类的内部是一个volatile的变量,然后更改这个变量唯一的方式通过cas。我们可以猜测到LongAdder的高明之处可能在于将之前单个节点的并发分散到各个节点的,这样从而提高在高并发时候的效率。

    下面我们来验证我们的观点,我们接着看上图的add方法,如果cell数组不为空,那么就尝试更新base元素,如果更新成功,那么就直接返回。base元素在这里起到了一个什么作用呢?可以保障的是在低并发的时候和AtomicLong一样的直接对基础元素进行更新。

    而如果cell为空或者更新base失败,我们看接下来的那个if判断,即如果as不为空并且成功更新对应节点的数据,则返回,否则就会进入longAccumulate()方法。

    图有点大,无法截图,直接贴源码

    for(;;) {
    
            Cell[]as; Cell a;intn;longv;
    
           if((as= cells) !=null&& (n =as.length) >0) {
    
               if((a =as[(n -1) & h]) ==null) {//如果对应位置没有数据,那么直接插入元素
    
                   if(cellsBusy ==0) {      // Try to attach new Cell
    
                        Cell r =newCell(x);  // Optimistically create
    
                       if(cellsBusy ==0&& casCellsBusy()) {
    
                            boolean created =false;
    
                           try{              // Recheck under lock
    
                                Cell[] rs;intm, j;
    
                               if((rs = cells) !=null&&
    
                                    (m = rs.length) >0&&
    
                                    rs[j = (m -1) & h] ==null) {
    
                                    rs[j] = r;
    
                                    created =true;
    
                                }
    
                            }finally{
    
                                cellsBusy =0;
    
                            }
    
                           if(created)
    
                               break;
    
                           continue;          // Slot is now non-empty
    
                        }
    
                    }
    
                    collide =false;
    
                }
    
               elseif(!wasUncontended)    //标示冲突标志位 ,进行重试 CAS already known to fail
    
                    wasUncontended =true;     // Continue after rehash
    
               elseif(a.cas(v = a.value, ((fn ==null) ? v + x :
    
                                             fn.applyAsLong(v, x))))
    
                   break;
    
               elseif(n >= NCPU || cells !=as)
    
                    collide =false;           // At max size or stale,如果已经cell数组的大小已经超过了CPU核数,那么再扩容没意义了,直接重试,或者有别的线程扩容导致变更了数组,设置标示位,进行重试,,避免一失败就扩容
    
               elseif(!collide)
    
                    collide =true;
    
               elseif(cellsBusy ==0&& casCellsBusy()) {//开始扩容
    
                   try{
    
                       if(cells ==as) {     // Expand table unless stale
    
                            Cell[] rs =newCell[n <<1];
    
                           for(inti =0; i < n; ++i)
    
                                rs[i] =as[i];
    
                            cells = rs;
    
                        }
    
                    }finally{
    
                        cellsBusy =0;
    
                    }
    
                    collide =false;
    
                   continue;                  // Retry with expanded table
    
                }
    
                h = advanceProbe(h);//扩容完成以后,重新初始化要更新的索引值,尽量保障可以更新成功
    
            }
    
           elseif(cellsBusy ==0&& cells ==as&&//初始化casCellsBusy()) {
    
                boolean init =false;
    
               try{                          // Initialize table
    
                   if(cells ==as) {
    
                        Cell[] rs =newCell[2];
    
                        rs[h &1] =newCell(x);
    
                        cells = rs;
    
                        init =true;
    
                    }
    
                }finally{
    
                    cellsBusy =0;
    
                }
    
               if(init)
    
                   break;
    
            }
    
           elseif(casBase(v =base, ((fn ==null) ? v + x :
    
                                        fn.applyAsLong(v, x))))
    
               break;                         // Fall back on using base
    
        }
    

    上面的代码主要有三个分支:

    1. 如果数组不为空

    2. 数据为空,则初始化

    3. 前面都更新失败了,尝试更新base数据

    cellBusy是一个标示元素,只有当修改cell数组大小或者插入元素的时候才会修改。分支二、分支三都很简单,我们来重点分析一下分支一。

    当要更新的位置没有元素的时候,首先cas标志位,防止扩容以及插入元素,然后插入数据。如果成功直接返回,否则标示发生了冲突,然后重试。如果对应的位置有元素则更新,如果更新失败,进行判断是否数组的大小已经超过了cpu的核数,如果大于的话,则意味着扩容没有意义。直接重试。否则进行扩容,扩容完成后,重新设置要更新的位置,尽可能保证下一次更新成功。

    我们来看一下如何统计计数。

    4.png

    当计数的时候,将base和各个cell元素里面的值进行叠加,从而得到计算总数的目的。这里的问题是在计数的同时如果修改cell元素,有可能导致计数的结果不准确。

    总结:

    LongAdder在AtomicLong的基础上将单点的更新压力分散到各个节点,在低并发的时候通过对base的直接更新可以很好的保障和AtomicLong的性能基本保持一致,而在高并发的时候通过分散提高了性能。

    缺点是LongAdder在统计的时候如果有并发更新,可能导致统计的数据有误差,比如生成序列号,全局的AtomicLong更好

    相关文章

      网友评论

          本文标题:线程安全性

          本文链接:https://www.haomeiwen.com/subject/wfelhqtx.html