美文网首页
线程安全性

线程安全性

作者: 磊_5d71 | 来源:发表于2018-11-02 08:47 被阅读0次
  • 当多个线程访问某个类时,不管运行环境时采用何种调度方式或者这些进程进行如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么这个类就是线程安全的。
  • 原子性:提供了互斥访问,同一时刻只能有一个线程对它进行操作
  • 可见性:一个线程对主内存的修改可以及时的被其他线程观察到
  • 有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序。

原子性

图片.png

AtomicInteger

AtomicInteger类中的incrementAndGet方法,调用CompareAndSetInt 简称CAS
有native修饰说明是底层的类,不是用java实现的

  • ,AtomicLong的原理是依靠底层的cas来保障原子性的更新数据,在要添加或者减少的时候,会使用死循环不断地cas到特定的值,从而达到更新数据的目的。工作内存中的值与主存中进行比较。
  • 实现原理底层采用while
    循环方式,调用较多时可能会受到影响
package com.alan.concurrency.example.atomic;


import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@ThreadSafe
public class AtomicExample1 {


    //请求数1000
    public static int clientTotal = 5000;
    //同时并发执行的线程数
    public static int threadTotal = 200;

    //count里面的值相当于工作内存,有可能和底层也就是主内存的值不一致
    public static AtomicInteger count = new AtomicInteger(0);


    private static void add(){
        //用到unsafe类
        count.incrementAndGet();
    }

    public static void main(String[] args) throws InterruptedException {

        //定义线程池ExecutorService接口
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定义信号量,传入并发线程数 final修饰不允许重新赋值
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定义计数器闭锁。传入请求总数
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {
            //通过匿名内部类方式
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //semaphore控制并发数量
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    } catch (InterruptedException e) {
                        log.error("exception",e);
                    }
                    //每次执行计数器减掉一个
                    countDownLatch.countDown();
                }

            });

        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}",count.get());
    }
}

LongAdder

  • LongAdder在AtomicLong的基础上将单点的更新压力分散到各个节点,在低并发的时候通过对base的直接更新可以很好的保障和AtomicLong的性能基本保持一致,而在高并发的时候通过分散提高了性能。
  • 缺点是LongAdder在统计的时候如果有并发更新,可能导致统计的数据有误差。
package com.alan.concurrency.example.atomic;


import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

@Slf4j
@ThreadSafe
public class AtomicExample3 {


    //请求数1000
    public static int clientTotal = 5000;
    //同时并发执行的线程数
    public static int threadTotal = 200;

    //count里面的值相当于工作内存,有可能和底层也就是主内存的值不一致
    public static LongAdder count = new LongAdder();


    private static void add(){
        //用到unsafe类
        count.increment();
    }

    public static void main(String[] args) throws InterruptedException {

        //定义线程池ExecutorService接口
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定义信号量,传入并发线程数 final修饰不允许重新赋值
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定义计数器闭锁。传入请求总数
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {
            //通过匿名内部类方式
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //semaphore控制并发数量
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    } catch (InterruptedException e) {
                        log.error("exception",e);
                    }
                    //每次执行计数器减掉一个
                    countDownLatch.countDown();
                }

            });

        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}",count);
    }
}

AtomicReference

package com.alan.concurrency.example.atomic;


import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;

@Slf4j
@ThreadSafe
public class AtomicExample4 {

    private static AtomicReference<Integer> count = new AtomicReference<>(0);

    public static void main(String[] args) {
        count.compareAndSet(0,2);
        count.compareAndSet(0,1);
        count.compareAndSet(1,3);
        count.compareAndSet(2,4);
        count.compareAndSet(3,5);
        log.info("count:{}",count.get());
    }

}

AtomicIntegerFieldUpdater

package com.alan.concurrency.example.atomic;


import com.alan.concurrency.annoations.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;

@Slf4j
@ThreadSafe
public class AtomicExample5 {


    //这里必须要使用volatile修饰符,并且非static才可以。通过lombok添加注解
    @Getter
    public volatile int count = 100;

    private static AtomicIntegerFieldUpdater<AtomicExample5> update = AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class,"count");

    public static void main(String[] args) {

        AtomicExample5 example5 = new AtomicExample5();

        if(update.compareAndSet(example5,100,120)){
            log.info("update success,{}",example5.getCount());
        }

        if(update.compareAndSet(example5,100,120)){
            log.info("update success,{}",example5.getCount());
        }else {
            log.info("update failed,{}",example5.getCount());
        }
    }
}

AtomicStampedReference

-ABA问题 线程1准备用CAS将变量的值由A替换为B,在此之前,线程2将变量的值由A替换为C,又由C替换为A,然后线程1执行CAS时发现变量的值仍然为A,所以CAS成功。但实际上这时的现场已经和最初不同了,尽管CAS成功,但可能存在潜藏的问题, AtomicStampedReference可以解决此问题

AtomicBoolean

  • 让某一段代码只执行一次,绝对不会重复
package com.alan.concurrency.example.atomic;


import com.alan.concurrency.annoations.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

@Slf4j
@ThreadSafe
public class AtomicExample6 {

    //请求数5000
    public static int clientTotal = 5000;
    //同时并发执行的线程数
    public static int threadTotal = 200;

    private static AtomicBoolean isHappened = new AtomicBoolean(false);

    public static void main(String[] args) throws InterruptedException {     //定义线程池ExecutorService接口
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定义信号量,传入并发线程数 final修饰不允许重新赋值
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定义计数器闭锁。传入请求总数
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {
            //通过匿名内部类方式
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //semaphore控制并发数量
                        semaphore.acquire();
                        test();
                        semaphore.release();
                    } catch (InterruptedException e) {
                        log.error("exception",e);
                    }
                    //每次执行计数器减掉一个
                    countDownLatch.countDown();
                }

            });

        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("isHappened:{}",isHappened.get());
    }

    //让某一段代码只执行一次,绝对不会重复
    private static void test(){
        if (isHappened.compareAndSet(false, true)) {
            log.info("executed");
        }
        }
}

图片.png

synchronized

  • 修饰代码块:大括号括起来的代码,作用于调用的对象
  • 修饰方法:整个方法,作用于调用的对象
  • 修饰静态方法:整个静态方法,作用于所有对象
  • 修饰类:括号括起来的部分,作用于所有对象
    子类继承父类,父类的方法中含有synchronized关键字,子类继承此方法是不包含synchronized关键字的
package com.alan.concurrency.example.sync;


import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class SynchronizedExample1 {


    //修饰一个代码块。作用范围是大括号括起的代码
    public void test1(int   j){
        synchronized (this){
            for (int i = 0; i < 10; i++) {
                log.info("test1 {}- {}",j,i);
            }
        }
    }

    //修饰一个方法。作用范围是整个方法
    public synchronized void test2(int j){
        for (int i = 0; i < 10; i++) {
            log.info("test2 {} - {}",j,i);
        }

    }

    public static void main(String[] args) {

        SynchronizedExample1 example1 = new SynchronizedExample1();
        SynchronizedExample1 example2 = new SynchronizedExample1();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(()->{
            example1.test2(1);
        });
        executorService.execute(()->{
            example2.test2( 2);
        });
    }
}
  • 修饰静态方法和修饰类
package com.alan.concurrency.example.sync;


import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class SynchronizedExample2 {


    //修饰一个类。作用范围是所有对象
    public void test1(int  j){
        synchronized (SynchronizedExample2.class){
            for (int i = 0; i < 10; i++) {
                log.info("test1 {}- {}",j,i);
            }
        }
    }

    //修饰一个静态方法。作用范围是所有对象
    public static synchronized void test2(int j){
        for (int i = 0; i < 10; i++) {
            log.info("test2 {} - {}",j,i);
        }

    }

    public static void main(String[] args) {

        SynchronizedExample2 example1 = new SynchronizedExample2();
        SynchronizedExample2 example2 = new SynchronizedExample2();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(()->{
            example1.test1(1);
        });
        executorService.execute(()->{
            example2.test1( 2);
        });
    }

}

对比

图片.png

可见性

  • 导致共享变量在线程中不可见的原因
    1、线程交叉执行
    2、重排序结合线程交叉执行
    3、共享变量更新后的值没有在工作内存与主存间及时更新

可见性-synchronized方式

图片.png

可见性-volatile方式 (volatile不具备原子性)

通过加入内存屏障和禁止重排序优化来实现

  • 对volatile变量写操作时,会在写操作后加入一条store屏障指令,将本地内存中的共享变量值刷新到主内存

  • 对volatile变量读操作时,会在读操作前加入一条load屏障指令,从主内存中读取共享变量。

  • volatile写


    图片.png
  • volatile读


    图片.png

    volatile 一般用于保证变量状态


    图片.png

volatile 也可以用于double check

有序性

  • Java内存模型中,允许编译器和处理器对指令进行重排序,但是重排序过程不会影响到单线程程序的执行,确会影响到多线程并发执行的正确性。
  • volatile、synchronized、lock

有序性原则

图片.png
图片.png
图片.png
图片.png
  • 总结


    图片.png

相关文章

  • EffectiveJava第十章第五节

    线程安全性的文档化 并非出现synchronized关键字就是线程安全性文档化了。实际上,一个类支持的线程安全性有...

  • String的线程安全

    线程安全性 说道有关string的线程安全性,大家想到的肯定时stringbuffer和stringbuilder...

  • Java并发编程 线程安全性

    什么是线程安全性 线程安全性:当多个线程访问某个类时,不管运行时采用何种调度方式或者这些线程将被如何交替执行,并且...

  • java并发编程实战2~3

    2 线程安全性 2.1 什么是线程安全性 当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何...

  • 谈谈并发编程中的线程安全性

    1. 线程安全性 在单线程程序中,我们并不需要去考虑线程的安全性。但是在多线程程序中,由于多个线程要共享相同的内存...

  • 线程安全性(一)

    参考线程安全性总结 CountDownLatchCountDownLatch 可以阻塞线程并保证线程在满足某种特定...

  • 高并发编程03 ~ 线程安全性

    这节我们讨论一个话题:线程安全性 一、概念 线程安全性:当多个线程同时访问某个资源的时候,不管环境采用何种调度方式...

  • 线程安全性详解

    线程安全性 线程安全性定义: 当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程将如何交替执行,并...

  • 理解Java中的线程安全及处理方法

    1. 线程安全性 1.1. 继承方式VS实现方式(掌握) 当多线程并发访问同一个资源时,会导致线程出现安全性的原因...

  • Effective STL 第12条

    容器的线程安全性 不能对容器的线程安全性抱太大的期望 对于stl容器,大部分容器只能保证1)支持多线程同时读2)支...

网友评论

      本文标题:线程安全性

      本文链接:https://www.haomeiwen.com/subject/sojfxqtx.html