美文网首页
Java并发

Java并发

作者: Vinson武 | 来源:发表于2020-01-19 16:24 被阅读0次

    使用线程

    Java中有三种使用线程的方法:

    • 实现 Runnable 接口;
    • 实现 Callable 接口;
    • 继承 Thread 类。

    实现 Runnable 和 Callable 接口的类只能当做一个可以在线程中运行的任务,不是真正意义上的线程,因此最后还需要通过 Thread 来调用。可以理解为任务是通过线程驱动从而执行的。

    1. 实现Runnable接口
    public class MyRunnable implements Runnable {
        @Override
        public void run() {
            // ...实现接口中的 run() 方法。
        }
    }
    //使用 Runnable 实例再创建一个 Thread 实例,然后调用 Thread 实例的 start() 方法来启动线程。
    public static void main(String[] args) {
        MyRunnable instance = new MyRunnable();
        Thread thread = new Thread(instance);
        thread.start();
    }
    
    1. 实现 Callable 接口

    ==与 Runnable 相比,Callable 可以有返回值,返回值通过 FutureTask 进行封装。==

    public class MyCallable implements Callable<Integer> {
        @Override
        public Integer call() { //回调接口
            return 123;
        }
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyCallable mc = new MyCallable();
        FutureTask<Integer> ft = new FutureTask<>(mc); //FutureTask封装
        Thread thread = new Thread(ft);
        thread.start();
        System.out.println(ft.get());
    }
    
    1. 继承 Thread 类

    同样也是需要实现 run() 方法,因为 Thread 类也实现了 Runable 接口。

    当调用 start() 方法启动一个线程时,虚拟机会将该线程放入就绪队列中等待被调度,当一个线程被调度时会执行该线程的 run() 方法。

    public class MyThread extends Thread {
        public void run() {
            // ...也要实现run方法
        }
    }
    public static void main(String[] args) {
        MyThread mt = new MyThread();
        mt.start();
    }
    

    ==实现接口 VS 继承 Thread==

    实现接口会更好一些,因为:

    • Java 不支持多重继承,因此继承了 Thread 类就无法继承其它类,但是可以实现多个接口;
    • 类可能只要求可执行就行,继承整个 Thread 类开销过大。

    基础线程机制

    Executor

    == Executor 管理多个异步任务的执行,而无需程序员显式地管理线程的生命周期 ==。

    主要有三种 Executor:

    • CachedThreadPool:一个任务创建一个线程;
    • FixedThreadPool:所有任务只能使用固定大小的线程;
    • SingleThreadExecutor:相当于大小为 1 的 FixedThreadPool。
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            executorService.execute(new MyRunnable());
        }
        executorService.shutdown();
    }
    

    Deamon

    ==守护线程是程序运行时在后台提供服务的线程==,不属于程序中不可或缺的部分。

    当所有非守护线程结束时,程序也就终止,同时会杀死所有守护线程。

    使用 setDaemon() 方法可以将一个线程设置为守护线程。

    public static void main(String[] args) {
        Thread thread = new Thread(new MyRunnable());
        thread.setDaemon(true); //将thread线程设为守护线程
    }
    

    sleep()

    Thread.sleep(millisec) 方法会==休眠当前正在执行的线程==,millisec 单位为毫秒。

    sleep() 可能会抛出 InterruptedException,因为异常不能跨线程传播回 main() 中,因此必须在本地进行处理。线程中抛出的其它异常也同样需要在本地进行处理。

    public void run() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    

    yield()

    对静态方法 Thread.yield() 的调用==声明了当前线程已经完成了生命周期中最重要的部分,可以切换给其它线程来执行==。该方法只是对线程调度器的一个建议,而且也只是建议具有相同优先级的其它线程可以运行

    public void run() {
        Thread.yield();
    }
    

    中断

    一个线程执行完毕之后会自动结束,如果在运行过程中发生异常也会提前结束。

    InterruptedException

    通过==调用一个线程的 interrupt() 来中断该线程==,如果该线程处于阻塞、限期等待或者无限期等待状态,那么就会抛出 InterruptedException,从而提前结束该线程。但是不能中断 I/O 阻塞和 synchronized 锁阻塞

    interrupted()

    调用 interrupt() 方法会设置线程的中断标记,此时调用 interrupted() 方法会返回 true。因此可以在循环体中使用 interrupted() 方法来判断线程是否处于中断状态,从而提前结束线程。

    Executor 的中断操作

    调用 Executor 的 ==shutdown() 方法会等待线程都执行完毕之后再关闭==,但是如果调用的是 shutdownNow() 方法,则相当于调用每个线程的 interrupt() 方法。

    ==如果只想中断 Executor 中的一个线程,可以通过使用 submit() 方法来提交一个线程,它会返回一个 Future<?> 对象,通过调用该对象的 cancel(true) 方法就可以中断线程。==

    Future<?> future = executorService.submit(() -> {
        // ..
    });
    future.cancel(true);
    

    互斥同步

    Java 提供了两种锁机制来控制多个线程对共享资源的互斥访问,第一个是 JVM 实现的 synchronized,而另一个是 JDK 实现的 ReentrantLock。

    ==原子类==

    JDK里面提供了很多atomic类,AtomicInteger,AtomicLong,AtomicBoolean等等。它们是通过CAS完成原子性。

    ==volatile 关键字==

    ==volatile仅仅用来保证该变量对所有线程的可见性,但不保证原子性==。 与锁相比,Volatile 变量是一种非常简单但同时又非常脆弱的同步机制,它在某些情况下将提供优于锁的性能和伸缩性。==如果严格遵循 volatile 的使用条件 —— 即变量真正独立于其他变量和自己以前的值 —— 在某些情况下可以使用 volatile 代替 synchronized 来简化代码==。然而,使用 volatile 的代码往往比使用锁的代码更加容易出错。

    为了优化性能,编译器和CPU可能对某些指令进行重排。java代码最终会被编译成汇编指令,而一条java语句可能对应多条汇编指令。为了优化性能,CPU和编译器会对这些指令重排,==volatile的变量在进行操作只会在尾部添加一个内存屏障==(Memory Barrier),lock addl $0x0,(%rsp)。 a) 确保一些特定操作执行的顺序; b) 影响一些数据的可见性(可能是某些指令执行后的结果)。 编译器和CPU可以在保证输出结果一样的情况下对指令重排序,使性能得到优化插入一个内存屏障,相当于告诉CPU和编译器先于这个命令的必须先执行,后于这个命令的必须后执行。内存屏障另一个作用是强制更新一次不同CPU的缓存。 例如,一个写屏障会把这个屏障前写入的数据刷新到缓存,这样任何试图读取该数据的线程将得到最新值,而不用考虑到底是被哪个cpu核心或者哪颗CPU执行的。所以一旦你完成写入,任何访问这个变量的线程将会得到最新的值。而且在你写入前,会保证所有之前发生的事已经发生,并且任何更新过的数据值也是可见的,因为内存屏障会把之前的写入值都刷新到缓存。

    ==synchronized==

    1. ==同步一个代码块==
    public void func() {
        synchronized (this) {
            // ...
        }
    }
    

    它==只作用于同一个对象==,如果调用两个对象上的同步代码块,就不会进行同步。

    于以下代码,使用 ExecutorService 执行了两个线程,由于调用的是同一个对象的同步代码块,因此这两个线程会进行同步,当一个线程进入同步语句块时,另一个线程就必须等待。

    public class SynchronizedExample {
    
        public void func1() {
            synchronized (this) {
                for (int i = 0; i < 10; i++) {
                    System.out.print(i + " ");
                }
            }
        }
    }
    
    public static void main(String[] args) {
        SynchronizedExample e1 = new SynchronizedExample();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> e1.func1());
        executorService.execute(() -> e1.func1());
    }
    
    输出 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
    

    对于以下代码,两个线程调用了不同对象的同步代码块,因此这两个线程就不需要同步。从输出结果可以看出,两个线程交叉执行。

    public static void main(String[] args) {
        SynchronizedExample e1 = new SynchronizedExample();
        SynchronizedExample e2 = new SynchronizedExample();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> e1.func1());
        executorService.execute(() -> e2.func1());
    }
    输出:0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
    
    1. ==同步一个实例方法==
    public synchronized void func () {
        // ...
    }
    

    它和同步代码块一样,作用于同一个对象。

    1. ==同步一个类==
    public void func() {
        synchronized (SynchronizedExample.class) {
            // ...
        }
    }
    

    ==作用于整个类==,也就是说两个线程调用同一个类的不同对象上的这种同步语句,也会进行同步。

    public class SynchronizedExample {
    
        public void func2() {
            synchronized (SynchronizedExample.class) {
                for (int i = 0; i < 10; i++) {
                    System.out.print(i + " ");
                }
            }
        }
    }
    public static void main(String[] args) {
        SynchronizedExample e1 = new SynchronizedExample();
        SynchronizedExample e2 = new SynchronizedExample();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> e1.func2());
        executorService.execute(() -> e2.func2());
    }
    输出:0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
    
    1. ==同步一个静态方法==
    public synchronized static void fun() {
        // ...
    }
    

    ==作用于整个类==。

    ==ReentrantLock==

    ReentrantLock 是 java.util.concurrent(J.U.C)包中的锁。

    不同对象调用也会按顺序执行,如下例:

    public class LockExample {
    
        private Lock lock = new ReentrantLock();
    
        public void func() {
            lock.lock();
            try {
                for (int i = 0; i < 10; i++) {
                    System.out.print(i + " ");
                }
            } finally {
                lock.unlock(); // 确保释放锁,从而避免发生死锁。
            }
        }
    }
    public static void main(String[] args) {
        LockExample lockExample = new LockExample();
        LockExample lockExample1 = new LockExample();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> lockExample.func());
        executorService.execute(() -> lockExample1.func());
    }
    输出:0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
    

    synchronized和ReentranLock比较

    1. 锁的实现

    ==synchronized 是 JVM 实现的,而 ReentrantLock 是 JDK 实现的==。

    1. 性能

    新版本 Java 对 synchronized 进行了很多优化,例如自旋锁等,synchronized 与 ReentrantLock 大致相同。

    1. 等待可中断

    当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。
    ==ReentrantLock 可中断,而 synchronized 不行==

    1. 公平锁

    公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁。
    ==synchronized 中的锁是非公平的,ReentrantLock 默认情况下也是非公平的,但是也可以是公平的==。

    1. 锁绑定多个条件

    一个 ReentrantLock 可以同时绑定多个 Condition 对象。

    1. 锁释放

    ==synchronized不用担心没有释放锁而导致死锁问题,因为 JVM会确保锁的释放。而ReentrantLock一定要手动释放锁==。

    使用选择

    ==除非需要使用 ReentrantLock 的高级功能,否则优先使用 synchronized==。这是因为 synchronized 是 JVM 实现的一种锁机制,JVM 原生地支持它,而 ReentrantLock 不是所有的 JDK 版本都支持。并且使用 synchronized 不用担心没有释放锁而导致死锁问题,因为 JVM 会确保锁的释放。

    死锁条件

    ==死锁发生的条件,以下四个同时满足时就会发生死锁==

    1. 互斥条件:任务使用的资源中至少有一个是不能共享的。
    2. 持有等待:至少有一个任务它必须持有一个资源且正在等待获取一个当前被别的任务持有的资源。
    3. 资源不能被任务抢占,任务必须把资源释放当作普通事件,不能抢占其他被占用的资源。
    4. 必须有循环等待,一个任务在等待其他任务所持有的资源,后者又在等待另一个资源,一直下去,直到有任务等待第一个任务的资源,形成闭环。

    要防止死锁,只需破坏上面其中一个条件即可。在程序中最容易的是破坏第4个条件,不要形成循环等待。

    乐观锁和悲观锁

    乐观锁和悲观锁都是一种并发控制的方法。

    • 乐观锁

    它假设多用户并发的事务在处理时不会彼此互相影响,各事务能够在不产生锁的情况下处理各自影响的 那部分数据。

    是对于数据冲突保持一种乐观态度,操作数据时不会对操作的数据进行加锁(这使得多个任务可以并行的对数据进行操作),只有到数据提交的时候才通过一种机制来验证数据是否存在冲突(一般实现方式是通过加版本号然后进行版本号的对比方式实现);乐观锁适用于多读的应用类型。

    特点:乐观锁是一种并发类型的锁,其本身不对数据进行加锁通而是通过业务实现锁的功能,不对数据进行加锁就意味着允许多个请求同时访问数据,同时也省掉了对数据加锁和解锁的过程,这种方式大大的提高了数据操作的性能;

    实现:在Java中java.util.==concurrent包下面的原子变量类就是使用了====乐观锁的一种实现方式CAS(比较并交换)实现的==。(比如AtomicInteger)

    算法:CAS 操作中包含三个操作数 —— 需要读写的内存位置(V)、进行比较的预期原值(A)和拟写入的新值(B)。如果内存位置V的值与预期原值A相匹配,那么处理器会自动将该位置值更新为新值B。否则处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。(在 CAS 的一些特殊情况下将仅返回 CAS 是否成功,而不提取当前值。)CAS 有效地说明了“ 我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。 ”这其实和乐观锁的冲突检查+数据更新的原理是一样的

    CAS存在问题

    1. ==ABA问题==:比如说一个线程one从内存位置V中取出A,这时候另一个线程two也从内存中取出A,并且two进行了一些操作变成了B,然后two又将V位置的数据变成A,这时候线程one进行CAS操作发现内存中仍然是A,然后one操作成功。尽管线程one的CAS操作成功,但可能存在潜藏的问题。(比如栈操作)
      atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
    2. ==循环时间长开销大==:自旋CAS(不成功,就一直循环执行,直到成功)如果长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。
    3. ==只能保证一个共享变量的原子操作==:
      当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁.从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。
    • 悲观锁

    悲观锁是基于一种悲观的态度类来防止一切数据冲突,它是以一种预防的姿态在修改数据之前把数据锁住,然后再对数据进行读写,在它释放锁之前任何人都不能对其数据进行操作,直到前面一个人把锁释放后下一个人数据加锁才可对数据进行加锁,然后才可以对数据进行操作,一般数据库本身锁的机制都是基于悲观锁的机制实现的;

    特点:可以完全保证数据的独占性和正确性,因为每次请求都会先对数据进行加锁, 然后进行数据操作,最后再解锁,而加锁释放锁的过程会造成消耗,所以性能不高;

    实现:==Synchronize和ReentrantLock的加锁都是悲观锁==

    问题:==悲观锁机制存在以下问题==:

    1. 在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,==引起性能问题==。
    2. 一个线程持有锁会==导致其它所有需要此锁的线程挂起。==
    3. 如果一个优先级高的线程等待一个优先级低的线程释放锁会==导致优先级倒置==,引起性能风险。

    参考

    线程之间的协作

    当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其它部分之前完成,那么就需要对线程进行协调。

    join()

    在线程中调用另一个线程的 join() 方法,会==将当前线程挂起,直到目标线程结束==。

    public class JoinExample {
    
        private class A extends Thread {
            @Override
            public void run() {
                System.out.println("A");
            }
        }
    
        private class B extends Thread {
    
            private A a;
    
            B(A a) {
                this.a = a;
            }
    
            @Override
            public void run() {
                try {
                    a.join(); //当前线程挂起,A执行
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("B");
            }
        }
    
        public void test() {
            A a = new A();
            B b = new B(a);
            b.start();
            a.start();
        }
    }
    public static void main(String[] args) {
        JoinExample example = new JoinExample();
        example.test();
    }
    
    输出:
    A
    B
    

    wait() notify() notifyAll()

    ==调用 wait() 使得线程等待某个条件满足,线程在等待时会被挂起,当其他线程的运行使得这个条件满足时,其它线程会调用 notify() 或者 notifyAll() 来唤醒挂起的线程==。

    它们都属于 Object 的一部分,而不属于 Thread。

    只能用在同步方法或者同步控制块中使用,否则会在运行时抛出 IllegalMonitorStateException。

    ==使用 wait()挂起期间,线程会释放锁==。这是因为,如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify() 或者 notifyAll() 来唤醒挂起的线程,造成死锁。

    ==调用notify()方法后并不会立即释放object锁,会等待该线程执行完毕后释放Object锁==。

    ==wait() 和 sleep() 的区别==

    • wait() 是 Object 的方法,而 sleep() 是 Thread 的静态方法
    • wait() 会释放锁,sleep() 不会。

    await() signal() signalAll()

    java.util.concurrent 类库中提供了 Condition 类来实现线程之间的协调,可以在 Condition 上调用 await() 方法使线程等待,其它线程调用 signal() 或 signalAll() 方法唤醒等待的线程。

    相比于 wait() 这种等待方式,await() 可以指定等待的条件,因此更加灵活。

    public class AwaitSignalExample {
    
        private Lock lock = new ReentrantLock();
        private Condition condition = lock.newCondition();
    
        public void before() {
            lock.lock();
            try {
                System.out.println("before");
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    
        public void after() {
            lock.lock();
            try {
                condition.await();
                System.out.println("after");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
    

    线程状态

    一个线程只能处于一种状态,并且这里的线程状态特指 Java 虚拟机的线程状态,不能反映线程在特定操作系统下的状态。

    • 新建:创建后尚未启动。
    • 可运行:正在 Java 虚拟机中运行。但是在操作系统层面,它可能处于运行状态,也可能等待资源调度。
    • 阻塞:请求获取 monitor lock 从而进入 synchronized 函数或者代码块,但是其它线程已经占用了该 monitor lock,所以出于阻塞状态。
    • 无限期等待:等待其它线程显式地唤醒
      阻塞和等待的区别在于,阻塞是被动的,它是在等待获取 monitor lock。而等待是主动的,通过调用 Object.wait() 等方法进入。
    • 限期等待:无需等待其它线程显式地唤醒,在一定时间之后会被系统自动唤醒。
    • 死亡:可以是线程结束任务之后自己结束,或者产生了异常而结束。

    并发工具包J.U.C

    java.util.concurrent(J.U.C)大大提高了并发性能。

    CountDownLatch

    用来控制一个或者多个线程等待多个线程

    维护了一个计数器 cnt,每次调用 countDown() 方法会让计数器的值减 1,减到 0 的时候,那些因为调用 await() 方法而在等待的线程就会被唤醒。

    public class CountdownLatchExample {
    
        public static void main(String[] args) throws InterruptedException {
            final int totalThread = 10; 
            CountDownLatch countDownLatch = new CountDownLatch(totalThread);//计数初始值10
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i = 0; i < totalThread; i++) {
                executorService.execute(() -> {
                    System.out.print("run..");
                    countDownLatch.countDown();//计数减1
                });
            }
            countDownLatch.await(); //等待,到计数0才继续往下
            System.out.println("end");
            executorService.shutdown();
        }
    }
    //输出:run..run..run..run..run..run..run..run..run..run..end
    

    CyclicBarrier

    用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。

    和 CountdownLatch 相似,都是通过维护计数器来实现的。线程执行 await() 方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 await() 方法而在等待的线程才能继续执行。

    CyclicBarrier 和 CountdownLatch 的一个区别是:**CyclicBarrier 的计数器通过调用 reset() **方法可以循环使用****,所以它才叫做循环屏障。

    public class CyclicBarrierExample {
    
        public static void main(String[] args) {
            final int totalThread = 10;
            CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i = 0; i < totalThread; i++) {
                executorService.execute(() -> {
                    System.out.print("before..");
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                    System.out.print("after..");
                });
            }
            executorService.shutdown();
        }
    }
    
    //before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..
    
    

    Semaphore

    Semaphore 类似于操作系统中的信号量,可以控制对互斥资源的访问线程数。

    FutureTask

    FutureTask 实现了 RunnableFuture 接口,该接口继承自 Runnable 和 Future 接口,这使得 FutureTask 既可以当做一个任务执行,也可以有返回值。

    FutureTask 可用于异步获取执行结果或取消执行任务的场景。当一个计算任务需要执行很长时间,那么就可以用 FutureTask 来封装这个任务,主线程在完成自己的任务之后再去获取结果。

    public class FutureTaskExample {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    int result = 0;
                    for (int i = 0; i < 100; i++) {
                        Thread.sleep(10);
                        result += I;
                    }
                    return result;
                }
            });
    
            Thread computeThread = new Thread(futureTask);
            computeThread.start();
    
            Thread otherThread = new Thread(() -> {
                System.out.println("other task is running...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("other task is end...");
            });
            otherThread.start();
            System.out.println("main thread is running...");
            System.out.println(futureTask.get());
        }
    }
    //main thread is running...
    //other task is running...
    //other task is end...
    //4950
    

    BlockingQueue

    java.util.concurrent.BlockingQueue 接口有以下阻塞队列的实现:

    • FIFO 队列 :LinkedBlockingQueue、ArrayBlockingQueue(固定长度)
    • 优先级队列 :PriorityBlockingQueue

    提供了阻塞的 take() 和 put() 方法:如果队列为空 take() 将阻塞,直到队列中有内容;如果队列为满 put() 将阻塞,直到队列有空闲位置。

    //使用 BlockingQueue 实现生产者消费者问题
    
    public class ProducerConsumer {
    
        private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
    
        private static class Producer extends Thread {
            @Override
            public void run() {
                try {
                    queue.put("product");//生产
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.print("produce..");
            }
        }
    
        private static class Consumer extends Thread {
    
            @Override
            public void run() {
                try {
                    String product = queue.take();//消费
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.print("consume..");
            }
        }
    }
    

    Java 内存模型

    主内存与工作内存

    处理器上的寄存器的读写的速度比内存快几个数量级,为了解决这种速度矛盾,在它们之间加入了高速缓存。

    加入高速缓存带来了一个新的问题:缓存一致性。如果多个缓存共享同一块主内存区域,那么多个缓存的数据可能会不一致,需要一些协议来解决这个问题。

    image.png

    所有的变量都存储在主内存中,每个线程还有自己的工作内存,工作内存存储在高速缓存或者寄存器中,保存了该线程使用的变量的主内存副本拷贝。

    线程只能直接操作工作内存中的变量,不同线程之间的变量值传递需要通过主内存来完成。

    image.png

    内存间交互操作

    Java 内存模型定义了 8 个操作来完成主内存和工作内存的交互操作。

    image.png
    • read:把一个变量的值从主内存传输到工作内存中
    • load:在 read 之后执行,把 read 得到的值放入工作内存的变量副本中
    • use:把工作内存中一个变量的值传递给执行引擎
    • assign:把一个从执行引擎接收到的值赋给工作内存的变量
    • store:把工作内存的一个变量的值传送到主内存中
    • write:在 store 之后执行,把 store 得到的值放入主内存的变量中
    • lock:作用于主内存的变量
    • unlock

    内存模型三大特性

    1.==原子性==

    Java 内存模型保证了 read、load、use、assign、store、write、lock 和 unlock 操作具有原子性。例如对一个 int 类型的变量执行 assign 赋值操作,这个操作就是原子性的。但是 Java 内存模型允许虚拟机将没有被 volatile 修饰的 64 位数据(long,double)的读写操作划分为两次 32 位的操作来进行,即 load、store、read 和 write 操作可以不具备原子性。因此对 int 类型读写操作满足原子性只是说明 load、assign、store 这些单个操作具备原子性

    • AtomicInteger 等原子性类能保证多个线程修改的原子性。
    • 除了使用原子类之外,也可以使用 synchronized 互斥锁来保证操作的原子性。
    1. ==可见性==

    可见性指当一个线程修改了共享变量的值,其它线程能够立即得知这个修改。Java 内存模型是通过在变量修改后将新值同步回主内存,在变量读取前从主内存刷新变量值来实现可见性的。

    ==主要有三种实现可见性的方式==:

    • volatile(这个不能保证原子性)
    • synchronized,对一个变量执行 unlock 操作之前,必须把变量值同步回主内存。
    • final,被 final 关键字修饰的字段在构造器中一旦初始化完成,并且没有发生 this 逃逸(其它线程通过 this 引用访问到初始化了一半的对象),那么其它线程就能看见 final 字段的值。
    1. ==有序性==

    有序性是指:==在本线程内观察,所有操作都是有序的。在一个线程观察另一个线程,所有操作都是无序的==,无序是因为发生了==指令重排序==。在 Java 内存模型中,允许编译器和处理器对指令进行重排序,重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性

    • volatile 关键字通过添加内存屏障的方式来禁止指令重排,即重排序时不能把后面的指令放到内存屏障之前。

    • 也可以通过 synchronized 来保证有序性,它保证每个时刻只有一个线程执行同步代码,相当于是让线程顺序执行同步代码。

    线程安全

    多个线程不管以何种方式访问某个类,并且在主调代码中不需要进行同步,都能表现正确的行为。

    ==线程安全有以下几种实现方式==:

    1. ==不可变类型==

    不可变(Immutable)的对象一定是线程安全的,不需要再采取任何的线程安全保障措施。只要一个不可变的对象被正确地构建出来,永远也不会看到它在多个线程之中处于不一致的状态。多线程环境下,应当尽量使对象成为不可变,来满足线程安全。

    • final 关键字修饰的基本数据类型
    • String
    • 枚举类型
    • Number 部分子类,如 Long 和 Double 等数值包装类型,BigInteger 和 BigDecimal 等大数据类型。但同为 Number 的原子类 AtomicInteger 和 AtomicLong 则是可变的。
    • 对于集合类型,可以使用 Collections.unmodifiableXXX() 方法来获取一个不可变的集合。
    1. ==互斥同步==

    synchronized 和 ReentrantLock。

    1. ==非阻塞同步==

    互斥同步最主要的问题就是线程阻塞和唤醒所带来的性能问题,因此这种同步也称为阻塞同步。

    随着硬件指令集的发展,我们可以使用基于冲突检测的乐观并发策略:先进行操作,如果没有其它线程争用共享数据,那操作就成功了,否则采取补偿措施(不断地重试,直到成功为止)。这种乐观的并发策略的许多实现都不需要将线程阻塞,因此这种同步操作称为非阻塞同步。

    • ==比较并交换CAS==

    乐观锁需要操作和冲突检测这两个步骤具备原子性,这里就不能再使用互斥同步来保证了,只能靠硬件来完成。==CAS 指令需要有 3 个操作数,分别是内存地址 V、旧的预期值 A 和新值 B。当执行操作时,只有当 V 的值等于 A,才将 V 的值更新为 B==。
    实现例子: ==AtomicInteger等==

    存在问题: ABA

    如果一个变量初次读取的时候是 A 值,它的值被改成了 B,后来又被改回为 A,那 CAS 操作就会误认为它从来没有被改变过。
    J.U.C 包提供了一个带有标记的原子引用类 AtomicStampedReference 来解决这个问题,它可以通过控制变量值的版本来保证 CAS 的正确性。

    1. ==无同步方案==

    要保证线程安全,并不是一定就要进行同步。如果一个方法本来就不涉及共享数据,那它自然就无须任何同步措施去保证正确性。

    • 栈封闭

    多个线程访问==同一个方法的局部变量时==,不会出现线程安全问题,因为局部变量存储在虚拟机栈中,属于线程私有的。

    • 线程本地存储

    如果一段代码中所需要的数据必须与其他代码共享,那就看看这些共享数据的代码是否能保证在同一个线程中执行。如果能保证,我们就可以把共享数据的可见范围限制在同一个线程之内,这样,无须同步也能保证线程之间不出现数据争用的问题。

    可以使用 java.lang.ThreadLocal 类来实现线程本地存储功能。

    public class ThreadLocalExample {
        public static void main(String[] args) {
            ThreadLocal threadLocal = new ThreadLocal();
            Thread thread1 = new Thread(() -> {
                threadLocal.set(1);
                try {
                    Thread.sleep(1000); //睡眠1秒
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(threadLocal.get()); //1,thread2的设置不对thread1产生影响
                threadLocal.remove();
            });
            Thread thread2 = new Thread(() -> {
                threadLocal.set(2);
                threadLocal.remove();
            });
            thread1.start();
            thread2.start();
        }
    }
    

    实现原理:
    每个 Thread 都有一个ThreadLocal.ThreadLocalMap 对象。
    当调用一个 ThreadLocal 的 set(T value) 方法时,先得到当前线程的 ThreadLocalMap 对象,然后将 ThreadLocal->value 键值对插入到该 Map 中

    锁优化

    这里的锁优化主要是指 JVM 对 synchronized 的优化。

    • 自旋锁

    互斥同步进入阻塞状态的开销都很大,应该尽量避免。在许多应用中,共享数据的锁定状态只会持续很短的一段时间。自旋锁的思想是让一个线程在请求一个共享数据的锁时执行忙循环(自旋)一段时间,如果在这段时间内能获得锁,就可以避免进入阻塞状态。

    自旋锁虽然能避免进入阻塞状态从而减少开销,但是它需要进行忙循环操作占用 CPU 时间,它只适用于共享数据的锁定状态很短的场景。

    • 锁消除

    锁消除是指对于被检测出不可能存在竞争的共享数据的锁进行消除。

    锁消除主要是通过逃逸分析来支持,如果堆上的共享数据不可能逃逸出去被其它线程访问到,那么就可以把它们当成私有数据对待,也就可以将它们的锁进行消除。

    对于一些看起来没有加锁的代码,其实隐式的加了很多锁。例如

    public static String concatString(String s1, String s2, String s3) {
        StringBuffer sb = new StringBuffer();
        sb.append(s1);
        sb.append(s2);
        sb.append(s3);
        return sb.toString();
    }
    

    每个 append() 方法中都有一个同步块。虚拟机观察变量 sb,很快就会发现它的动态作用域被限制在 concatString() 方法内部。也就是说,sb 的所有引用永远不会逃逸到 concatString() 方法之外,其他线程无法访问到它,因此可以进行消除。

    • 锁粗化

    如果一系列的连续操作都对同一个对象反复加锁和解锁,频繁的加锁操作就会导致性能损耗。

    上一节的示例代码中连续的 append() 方法就属于这类情况。如果虚拟机探测到由这样的一串零碎的操作都对同一个对象加锁,将会把加锁的范围扩展(粗化)到整个操作序列的外部。对于上一节的示例代码就是扩展到第一个 append() 操作之前直至最后一个 append() 操作之后,这样只需要加锁一次就可以了。

    • 轻量级锁

    JDK 1.6 引入了偏向锁和轻量级锁,从而让锁拥有了四个状态:无锁状态(unlocked)、偏向锁状态(biasble)、轻量级锁状态(lightweight locked)和重量级锁状态(inflated)

    • 偏向锁

    偏向锁的思想是偏向于让第一个获取锁对象的线程,这个线程在之后获取该锁就不再需要进行同步操作,甚至连 CAS 操作也不再需要。

    多线程开发良好的实践

    • 给线程起个有意义的名字,这样可以方便找 Bug。

    • 缩小同步范围,从而减少锁争用。例如对于 synchronized,应该尽量使用同步块而不是同步方法。

    • 多用同步工具少用 wait() 和 notify()。首先,CountDownLatch, CyclicBarrier, Semaphore 和 Exchanger 这些同步类简化了编码操作,而用 wait() 和 notify() 很难实现复杂控制流;其次,这些同步类是由最好的企业编写和维护,在后续的 JDK 中还会不断优化和完善。

    • 使用 BlockingQueue 实现生产者消费者问题。

    • 多用并发集合少用同步集合,例如应该使用 ConcurrentHashMap 而不是 Hashtable。

    • 使用本地变量和不可变类来保证线程安全。

    • 使用线程池而不是直接创建线程,这是因为创建线程代价很高,线程池可以有效地利用有限的线程来启动任务。

    参考链接

    https://github.com/CyC2018/CS-Notes/blob/master/notes/Java%20并发.md

    相关文章

      网友评论

          本文标题:Java并发

          本文链接:https://www.haomeiwen.com/subject/tuwrzctx.html