美文网首页
LongAccumulator 和 DoubleAccumula

LongAccumulator 和 DoubleAccumula

作者: blank_white | 来源:发表于2020-07-19 20:58 被阅读0次

    关键字:Accumulator 没能获得预期值,BinaryOperator 表达式如何写,Accumulator 出错

    以下都以 Double 类型的说明

    基本使用

    LongAccumulator 和 DoubleAccumulator 是 Java 提供的用于为多线程提供原子值的一个类,可以视作一个能保证线程安全的 long 类型和 double 值来用

            DoubleBinaryOperator doubleBinaryOperator = (a, b) -> {            
                return a * b;
            };
            // 创建一个 DoubleAccumulator 并设置初始值
            DoubleAccumulator doubleAccumulator = new DoubleAccumulator(doubleBinaryOperator,2f);
    
            // 根据 上面提供的 operator 运算,这里也就是  doubleBinaryOperator提供的值=2*3 变为6
            doubleAccumulator.accumulate(3);
            // 返回 doubleAccumlator 提供的值
            int aaaaaa=doubleAccumulator.get();
    
    参数 DoubleBinaryOperator

    在 Java核心技术,关于LongAccumulator 和 DoubleAccumulator 的介绍中,

    DoubleAccumulator 的实现方式,简单的说就是在内部提供了一组变量

    identity ,a1, a2 ,a3 ···
    

    调用 doubleAccumulator.accumulate(x)

    • 线程不冲突的时候 , 直接 identity = doubleBinaryOperator (identity ,x)

    • 线程冲突的时候,根据线程选择一个变量 a1= doubleBinaryOperator (a1,x),若 a1 没有被使用过,直接把 x 赋值给 a1 作为 a1 的初始值

    在需要doubleAccumulator 提供值得时候也就是 调用 doubleAccumulator.get(); 的时候

    提供 identity  op  a1 op a2 op a3 ...  
    即
    result = identity;
    result= doubleBinaryOperator  (result,a1);
    result= doubleBinaryOperator  (result,a2);
    result= doubleBinaryOperator  (result,a3);
    ····
    
    return  result;
    

    根据 以上的逻辑,那么 DoubleBinaryOperator 的实现的运算规则,是收到一定限制的

    如果 运算规则 是 a*b 
    
    未产生线程冲突下
    doubleAccumulator.accumulate(p1);
    doubleAccumulator.accumulate(p2);
    
    identity = identity*p1;
    identity = identity*p2;
    
    计算 result 的时候相当于
    result =identity *p1 *p2;
    
    线程冲突,使用 a1 
    a1=p1;
    a1=p1*p2;
    计算 result 的时候相当于
    result=identity* (p1*p2);
    ------------------------------------------------------------
    如果 运算规则 是 a-b 
    
    未产生线程冲突下
    doubleAccumulator.accumulate(p1);
    doubleAccumulator.accumulate(p2);
    
    identity = identity-p1;
    identity = identity-p2;
    
    计算 result 的时候相当于
    result =identity -p1 -p2;
    
    线程冲突,使用 a1 // 这个 a1 其实就是源码中的 cell
    a1=p1;
    a1=p1-p2;
    计算 result 的时候相当于
    result=identity- (p1-p2);
    
    result值为 identity-p1+p2
    
    并不会获得我们期望的数值
    --------------------------------------
    所以一定要考虑好如何写 运算规则
    
    源码

    DoubleAccumulator.class

    
    
         public DoubleAccumulator(DoubleBinaryOperator accumulatorFunction,
                                 double identity) {
            this.function = accumulatorFunction;
             // 构造方法,在这里可以看到 初始值得赋予
            base = this.identity = Double.doubleToRawLongBits(identity);
        }
    
    
         /**
         * Updates with the given value.
         *
         * @param x the value
         */
        public void accumulate(double x) {
            Cell[] as; long b, v, r; int m; Cell a;
            if ((as = cells) != null ||
                (r = Double.doubleToRawLongBits
                 (function.applyAsDouble
                  (Double.longBitsToDouble(b = base), x))) != b  && !casBase(b, r)) {
                // as 非空的时候直接查数组,看有没有线程对应的 cell 
                boolean uncontended = true;
                if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[getProbe() & m]) == null ||
                    !(uncontended =
                      (r = Double.doubleToRawLongBits
                       (function.applyAsDouble
                        (Double.longBitsToDouble(v = a.value), x))) == v ||
                      a.cas(v, r)))
                    // 找不到的时候就进入此方法 为线程构建一个它对应的 cell,此方法实现见下端
                    doubleAccumulate(x, function, uncontended);
            }
        }
    
    
        /**
         * Returns the current value.  The returned value is <em>NOT</em>
         * an atomic snapshot; invocation in the absence of concurrent
         * updates returns an accurate result, but concurrent updates that
         * occur while the value is being calculated might not be
         * incorporated.
         *
         * @return the current value
         */
        public double get() {
            Cell[] as = cells; Cell a;
            double result = Double.longBitsToDouble(base);
            if (as != null) {
                for (int i = 0; i < as.length; ++i) {
                    if ((a = as[i]) != null)
                        // 把 as 遍历,挨个做 applyAsDouble ,就是自己写的 运算规则
                        result = function.applyAsDouble
                            (result, Double.longBitsToDouble(a.value));
                }
            }
            return result;
        }
    
    
        final void doubleAccumulate(double x, DoubleBinaryOperator fn,boolean wasUncontended) {
            // 省略的其他代码
            // 在这里可以看到,新建 Cell 的时候,就是直接 把 x 赋做初值 ,对应上面 a1=p1;
            Cell r = new Cell(Double.doubleToRawLongBits(x));
    }
    

    测试

    测试思路:

    两组使用相同的初始值 ,和运算法则

    组 1 ,使用 DoubleAccumulator,用两个线程 ,总共循环计算 2*times 次

    组 2 ,使用 普通 double 类型,单线程 ,循环 计算 2*times 次,这个计算结果一定是我们期望的

    调整 times 的数值,当 times 较小的时候,不容易产生同步问题,就是一直在使用 identity 做计算,可以看到 最后两组计算结果相同

    当 times 较大时,线程冲突,使用了其他变量,可以看到 最后两组计算结果不相同

            // 初始值
            Double x=2.0d;
    
            // 每个线程循环次数
            int times=500000;
            // 赋一个差值。让两个线程执行不同次数
            int chazhi=-490000;
            // 运算方式
            DoubleBinaryOperator doubleBinaryOperator = (a, b) -> {
                //return (a*b>999999||a*b<0.0000011)?a:a*b;
                return a - b;
            };
            DoubleAccumulator doubleAccumulator = new DoubleAccumulator(doubleBinaryOperator,2.0d);
    
    
    
            new Thread(()->{
                for (int i = 0;i<times+chazhi; i++) {
    
    //                if (Double.isNaN(doubleAccumulator.get())||Double.isInfinite(doubleAccumulator.get())){
    //                    System.out.println("t1 停止了");
    //                    break;
    //                }
    
    //                if (doubleAccumulator.get()>99999f){
    //                    System.out.println(doubleAccumulator.get()+"------t1 跳过");
    //                    continue;
    //                }
    
                    doubleAccumulator.accumulate(x);
                    //System.out.println("t1第"+i+"次:"+doubleAccumulator.get());
                }
                System.out.println("t1 结束循环了 "+doubleAccumulator.get());
            }).start();
    
            new Thread(()->{
                for (int i = 0;i<times-chazhi; i++) {
    //                if (Double.isNaN(doubleAccumulator.get())||Double.isInfinite(doubleAccumulator.get())){
    //
    //                    System.out.println("t2 停止了");
    //                    break;
    //                }
    
    //                if (doubleAccumulator.get()<0.0001f){
    //                    System.out.println(doubleAccumulator.get()+"------t2 跳过");
    //                    continue;
    //                }
    
                    doubleAccumulator.accumulate(x);
                    //System.out.println("t2第"+i+"次:"+doubleAccumulator.get());
                }
                System.out.println("t2 结束循环了 "+doubleAccumulator.get());
            }).start();
    
    
    
            Double a= 2.0d;
            for (int i = 0; i < times*2; i++) {
                a=doubleBinaryOperator.applyAsDouble(a,x);
            }
    
            Thread.sleep(3000);
            System.out.println("a的值----------- "+ BigDecimal.valueOf(a));
            System.out.println("l的值----------- "+BigDecimal.valueOf(doubleAccumulator.get()));
    

    运算规则为 a+b 的时候,无论如何 a , l 值相同

    运算规则为 a-b 的时候

    尝试在 doubleAccumulate(x, function, uncontended); 这个位置打断点

    就会发现,只要在此位置触发了断点, a 和 l 的值就出现不同,因为使用了 cell ,这种运算规则在这种逻辑下不能满足我们的预期,最后计算结果的时候就出错了

    相关文章

      网友评论

          本文标题:LongAccumulator 和 DoubleAccumula

          本文链接:https://www.haomeiwen.com/subject/ehcgfktx.html