美文网首页
Java并发(3)-- 线程间通信&生产者消费者问题和哲学家就餐

Java并发(3)-- 线程间通信&生产者消费者问题和哲学家就餐

作者: kolibreath | 来源:发表于2020-03-04 10:33 被阅读0次
    守望先锋 猎空

    本文主要分两个章节,先对线程间通信机制的介绍,然后通过对生产者问题和哲学家问题的解决对线程的基础部分收尾

    1. 线程间通信机制
      1.1 使用同步机制
      1.2 使用轮询机制
      1.3 使用wait/notify
      1.4 使用Lock/Condition
    2. 两个经典问题
      2.1 哲学家问题死锁的解决
      2.2 生产者消费者问题

    线程间通信机制

    同步机制

    使用关键字volatilesynchronized ,前面几篇文章已经说明了这个问题,这里不再重复

    使用轮询机制

    public class SpinLockTest {
    
    
        private static CountDownLatch latch = new CountDownLatch(100);
        private AtomicReference<Thread> ref = new AtomicReference<>();
    
        public void lock() {
            Thread currentThread = Thread.currentThread();
            while (!ref.compareAndSet(null, currentThread)) {
            }
        }
    
        public void unLock() {
            Thread thread = Thread.currentThread();
            ref.compareAndSet(thread,null);
        }
    
    
        public static void main(String args[]) {
            ExecutorService service = Executors.newCachedThreadPool();
            SpinLockTest test = new SpinLockTest();
            int count[] = {0};
            for (int i = 0; i < 100 ; i++) {
                service.execute(new Thread(() -> {
                    test.lock();
                    count[0] ++;
                    test.unLock();
                    latch.countDown();
                }));
            }
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println(count[0]);
        }
    
    }
    

    我们使用了CountDownLatch做一个100次的倒计时,如果倒计时0时,结束阻塞。理想情况下,100个线程应该会让最后的结果变成100,而结果和我们预料的一致,假设第一个被调度的线程为A,ref.compareAndSet()返回true(当前是nullexpect的是也是nullref的值被设置成currentThread的值)。当A线程没有unlock()时,如果来一个B线程,不满足while中CAS的条件,开始while循环,B线程会一直询问有锁吗,有锁吗......直到A线程unlock为止。

    B线程询问有锁吗?

    我们这个例子中也实现了一个自旋锁:一个线程从在阻塞到切换成为别的线程的过程,如果只是执行简单的任务的话,切换线程上下文的时间反而比执行任务的时间还要长。所以我们可以采取自旋锁的方法进行线程的同步。

    使用wait/notify

    wait()notify()是定义在Object上的native方法,具体的内容有赖于各个平台的实现。

    wait/notfity具体使用

    1. wait()和notify()
      wait()函数调用之后线程被挂起。调用了notify()notifyAll()之后会唤醒一个等待这个对象锁的线程,但是只有当退出对象锁的区域才行。
      对象调用notify()之后只会有一个线程去竞争锁,notifyAll()会让所有等待这个对象锁的线程去竞争锁。
    2. 具体使用
      Java中给出了一个使用wait()很明确的套路,就是使用这样的一个结构:
    synchronized(object){
      //某种条件
      while(condition){
          //do something  
          wait();
         //do something else
      }
    }
    

    首先记住以下原则:

    1. wait()notify()方法必须定义在synchronized方法块中
    2. wait()通常情况下放在while块中,这主要是因为虚假唤醒问题
      下面看一段例子:
    public class LockReleaseTest {
    
        private static Object object = new Object();
    
        private static class A extends  Thread{
    
            private Object object;
            public A(Object object){
                this.object = object;
            }
            public void run(){
                synchronized (object) {
                    while (!Thread.interrupted()) {
                        try {
                            //让A线程直接wait
                            System.out.println("A进入同步代码块");
                            //wait 将线程挂起 从哪里跌倒从哪里爬起来 如果唤醒了 从这里继续运行
                            object.wait();
                            System.out.println("线程A获得了锁");
    
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
                System.out.println("A退出同步代码块 退出run()");
            }
        }
    
        private static class B extends  Thread{
    
            private Object object;
            public B(Object object){
                this.object = object;
            }
            public void run() {
                synchronized (object) {
                    while (!Thread.interrupted()) {
                        System.out.println("B进入同步代码块");
                        object.notify();
                        System.out.println("B通知A 从挂起中醒来,但是没有释放锁");
                        try {
                            TimeUnit.MILLISECONDS.sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("线程B退出同步代码块");
                    }
                    System.out.println("线程B释放了锁");
                }
            }
        }
    
        public static void main(String args[]) throws InterruptedException {
            A a = new A(object);
            B b = new B(object);
    
            a.start();
            b.start();
        }
    }
    
    

    A,B分别是两个线程,他们都通过一个公共的object进行同步(通过构造函数传入的),运行之后的结果如下所示:

    A进入同步代码块
    B进入同步代码块
    B通知A 从挂起中醒来,但是没有释放锁
    线程B退出同步代码块
    线程B释放了锁
    线程A获得了锁
    A退出同步代码块 退出run()
    

    A进入同步代码块,但是调用了wait()函数之后,线程A就挂起了。但是B线程却可以正常运行。这说明即使A线程调用了wait(),函数没有退出run()但是A线程还是放弃了锁,并且被B线程获得。此时object.notify()运行,却没有让A线程立即恢复,只有当B线程休眠结束并且退出同步代码块,A线程才能继续运行,这就解释了上面的问题,notify()执行之后没有立刻释放锁,只能等待解释同步代码块。
    调用wait()方法的时候一定是获得了同步锁的,如果没有在synchronized块中调用wait()方法将抛出异常。

    使用Lock 和 Condition

    个人认为LockCondition是比设计在Object上的wait()notify()更容易理解的api,所有使用wait&notify的地方都还可以使用Lock&Condition处理。

    生产者消费者问题

    生产着消费者问题的场景是:消费者消费生产者生产出并且放在队列里面的产品,如果产品用完了消费者需要等待,如果队列满了,生产者等待。

    1. 先使用wait&notify完成:
    public class ProducerAndConsumer1 {
    
    
        private static final Queue<Content> contents = new LinkedList<>();
        static class Content{
            private String start;
            private String end;
            private String[] places = {"Shanghai", "Wuhan", "GuangZhou", "Hangzhou"};
    
            public Content(){
                int index = new Random().nextInt(places.length);
                this.start = this.end = places[index];
            }
            public String toString(){
                return " start " + start + " end " + end;
            }
        }
    
    
        @SuppressWarnings("Duplicates")
        static class Producer implements Runnable{
    
            private int maxCount;
    
            public Producer(int maxCount){
                this.maxCount = maxCount;
            }
    
            public void run(){
                while(true){
                    synchronized (contents){
                      //使用while + wait的语义: 判断是否还要继续等待
                        while(contents.size() == maxCount){
                            System.out.println("The queue is full");
                            try {
                                contents.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        try {
                            //模拟生产
                            TimeUnit.MILLISECONDS.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
    
                        Content content = new Content();
                        contents.add(content);
                        System.out.println("produced "+content);
                        contents.notifyAll();
                    }
                }
            }
        }
    
        @SuppressWarnings("Duplicates")
        static class Consumer implements  Runnable{
            private int maxCount;
            public Consumer(int maxCount){
                this.maxCount = maxCount;
            }
    
            public void run(){
                while(true){
                    synchronized (contents){
                        //使用while + wait的语义: 判断是否还要继续等待
                        while(contents.size() == 0){
                            System.out.println("The queue is empty");
                            try {
                                contents.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        Content content =contents.poll();
                        try {
                            //模拟消费
                            TimeUnit.MILLISECONDS.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("consumed " + content);
                        contents.notifyAll();
                    }
                }
            }
        }
    
    
        public static void main(String args[]) {
            ExecutorService service = Executors.newCachedThreadPool();
            int maxCount = 5;
            //3个生产者 3个消费者
    
            for (int i = 0; i < 3 ; i++) {
                service.execute(new Producer(maxCount));
                service.execute(new Consumer(maxCount));
            }
    
        }
    }
    
    1. 使用Lock&Condition解决
    public class ProducerAndConsumer2 {
    
        private static final Queue<Content> contents = new LinkedList<>();
    
        private static final Lock lock = new ReentrantLock();
        private static final Condition fullQueue = lock.newCondition();
        private static final Condition emptyQueue = lock.newCondition();
    
        static class Content{
            private String start;
            private String end;
            private String[] places = {"Shanghai", "Wuhan", "GuangZhou", "Hangzhou"};
    
            public Content(){
                int index = new Random().nextInt(places.length);
                this.start = this.end = places[index];
            }
            public String toString(){
                return " start " + start + " end " + end;
            }
        }
    
    
        static class Producer implements Runnable{
    
            private int maxCount;
    
            public Producer(int maxCount){
                this.maxCount = maxCount;
            }
    
            public void run() {
                while (true) {
                    lock.lock();
                    //使用while + wait的语义: 判断是否还要继续等待
                    while (contents.size() == maxCount) {
                        System.out.println("The queue is full");
                        try {
                            fullQueue.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    try {
                        //模拟生产
                        TimeUnit.MILLISECONDS.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                    Content content = new Content();
                    contents.add(content);
                    System.out.println("produced " + content);
                    fullQueue.signalAll();
                    emptyQueue.signalAll();
    
                    lock.unlock();
                }
    
            }
        }
    
        @SuppressWarnings("Duplicates")
        static class Consumer implements  Runnable{
            private int maxCount;
            public Consumer(int maxCount){
                this.maxCount = maxCount;
            }
    
            public void run() {
                while (true) {
                    lock.lock();
                    //使用while + wait的语义: 判断是否还要继续等待
                    while (contents.isEmpty()) {
                        System.out.println("The queue is empty");
                        try {
                            emptyQueue.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    Content content = contents.poll();
                    try {
                        //模拟消费
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("consumed " + content);
                    fullQueue.signalAll();
                    emptyQueue.signalAll();
    
                    lock.unlock();
                }
            }
        }
    

    Lock&Condition的用法和上文中相同。

    1. 使用阻塞队列来完成生产着消费者问题
      使用阻塞队列能够很好地帮我们托管同步的问题:
    public class ProducerAndConsumer {
    
    
        private static final int maxCount = 10;
        private static final BlockingQueue<Content> queue = new LinkedBlockingDeque<>(maxCount);
    
        static class Content {
            private String start;
            private String end;
            private String[] places = {"Shanghai", "Wuhan", "GuangZhou", "Hangzhou"};
    
            public Content() {
                int index = new Random().nextInt(places.length);
                this.start = this.end = places[index];
            }
    
            public String toString() {
                return " start " + start + " end " + end;
            }
        }
    
    
        @SuppressWarnings("Duplicates")
        static class Producer implements Runnable {
    
            public void run() {
                while (true) {
                    try {
                        //模拟生产
                        TimeUnit.MILLISECONDS.sleep(1000);
                        Content content = new Content();
                        queue.put(content);
                        System.out.println("produced " + content);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
    
            }
        }
    
        @SuppressWarnings("Duplicates")
        static class Consumer implements Runnable {
            public void run() {
                while (true) {
    
                    try {
                        //模拟消费
                        TimeUnit.MILLISECONDS.sleep(500);
                        Content content = queue.take();
                        System.out.println("consumed " + content);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            }
        }
    
        public static void main(String args[]) {
            ExecutorService service = Executors.newCachedThreadPool();
            for (int i = 0; i < 5 ; i++) {
                service.execute(new Producer());
                service.execute(new Consumer());
            }
        }
    }
    

    解决哲学家就餐问题

    哲学家就餐问题的一种解法是,可以让最后一个人拿起的筷子固定就可以解决:

    public class DeadLockTest {
        //通过哲学家问题演示一个思索的情况
    
        public static  class Chopstick {
            private boolean taken = false;
    
            public synchronized void take() throws InterruptedException {
                //反复检查是否已经被拿走 如果拿走,就算了
                while (taken) {
                    wait();
                }
                taken = true;
            }
    
            public synchronized void drop(){
                taken = false;
                notifyAll();
            }
        }
    
        public static class Philosopher implements Runnable {
    
            private Chopstick left;
            private Chopstick right;
    
            private final int id;
            private final int ponderFactor;
            private Random rand = new Random(47);
    
            public Philosopher(Chopstick left, Chopstick right, int ident, int ponder) {
                this.ponderFactor = ponder;
                this.left = left;
                this.right = right;
                id = ident;
            }
    
            public void pause() throws InterruptedException {
                if (ponderFactor == 0) return;
                TimeUnit.MILLISECONDS.sleep(rand.nextInt(ponderFactor * 250));
            }
    
            public String toString(){
                return "Philosopher" + id;
            }
            @Override
            public void run() {
                try {
                    while (!Thread.interrupted()) {
                        System.out.println(this + " " + "thinking");
                        pause();
                        //哲学家开始就餐
                        System.out.println(this + " " + "grabbing right");
                        right.take();
    
                        System.out.println(this + " " + "grabbing left" );
                        left.take();
    
                        System.out.println(this + " " + "eating");
                        pause();
    
                        right.drop();
                        left.drop();
                    }
                } catch (InterruptedException e) {
                    System.out.println(this + " " + "exiting via interrupt");
                }
            }
        }
    
        public static void main(String args[]) throws InterruptedException {
            int ponder = 0;
            int size = 5;
            ExecutorService exec = Executors.newCachedThreadPool();
            Chopstick[] chopsticks = new Chopstick[size];
            for (int i = 0; i < size ; i++) {
                chopsticks[i] = new Chopstick();
            }
    
            for (int i = 0; i < size ; i++) {
                //会发生死锁
    //            exec.execute(new Philosopher(chopsticks[i], chopsticks[(i +1) % size], i , ponder));
                //死锁的解决方式
                if(i < (size - 1)){
                    exec.execute(new Philosopher(chopsticks[i], chopsticks[(i +1) % size], i , ponder));
                }else{
                    exec.execute(new Philosopher(chopsticks[0], chopsticks[i], i , ponder));
                }
            }
    
            //如果发生死锁就回卡住!
            TimeUnit.SECONDS.sleep(30);
            exec.shutdownNow();
        }
    }
    
    

    参考内容
    使用线程间通信机制解决问题
    Java 中线程间通信机制
    阻塞队列

    读 《Thinking in Java》有感,遂记之

    相关文章

      网友评论

          本文标题:Java并发(3)-- 线程间通信&生产者消费者问题和哲学家就餐

          本文链接:https://www.haomeiwen.com/subject/xhevkhtx.html