美文网首页
Java多线程技能(三) 线程间通信

Java多线程技能(三) 线程间通信

作者: 资深养猪大户 | 来源:发表于2018-10-11 20:47 被阅读0次

    1、等待/通知机制

    线程间通讯可以采用while语句轮询检测某一条件,缺点是线程主动操作、读取、判断同一个变量,此方法浪费CPU资源。

    1.1 等待/通知机制的实现

    • wait()方法和notify()方法

    1、要在同步方法或同步代码块中调用,即必须在调用前先获取该对象锁,若没有会抛出IllegalMonitorStateException。
    2、执行wait()方法后,线程停止运行并释放锁,后续代码不执行;
    3、notify()方法用来通知呈wait状态的线程,随机挑选一个对其发送通知并使其转为就绪状态,获取对象锁并执行后续代码。执行notify()方法不会立即释放锁,需要等待线程将程序执行完成。
    4、notifyAll()方法可以通知所有在等待同一资源的线程全部进入就绪状态。

    public class MyList
    {
      //此处必须将变量定义为static,方法定义为public方便访问
      private static List list = new ArrayList();
      public static void add()
      {
        list.add("myString");
      }
      public static int size()
      {
        return list.size();
      }
    }
    
    public class ThreadA extends Thread
    {
      private Object lock;
      public ThreadA(Object lock)
      {
        super();
        this.lock = lock;
      }
    
      @Override
      public void run()
      {  
        try
        {  
          synchronized(lock)
          {
            if(MyList.size() != 5)
            {
              System.out.println("wait begin");
              lock.wait();
              System.out.println("wait end");
            }
          }
        }
        catch(InterruptedException e)
        {  
          e.printStackTrace();
        }
      }
    }
    
    public class ThreadB extends Thread
    {
      private Object lock;
      public ThreadB(Object lock)
      {
        super();
        this.lock = lock;
      }
    
      @Override
      public void run()
      {  
        try
        {  
          synchronized(lock)
          {
            for(int i=0; i < 10; I++)
            {
              MyList.add();
              if(MyList.size() == 5)
              {
                System.out.println("notify begin");
                lock.notify();
                System.out.println("notify end");
              }
              System.out.println("add 第" +  (i+1)  + "个元素");
            }
          }
        }
        catch(InterruptedException e)
        {  
          e.printStackTrace();
        }
      }
    }
    
    public class Run
    {
      public static void main(String[] args)
      {
        try
        {
          Object lock = new Object();
          ThreadA a = new ThreadA(lock);
          a.start();
          Thread.sleep(50);
          ThreadB b = new ThreadB(lock);
          b.start();
        }
        catch(InterruptedException e)
        {  
          e.printStackTrace();
        }
      }
    }
    
    • 每个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列,就绪队列存储将要获取锁的线程,阻塞队列存储被阻塞的线程。
    • wait()方法被执行后,锁被自动释放,执行完notify()方法锁不会立即释放,等待notify()方法所在的代码块代码全部执行完成。
    • wait(long)方法:等待某一段时间内是否有线程对锁进行唤醒,超过这个时间线程会自动唤醒。

    1.2 生产者/消费者模式的实现

    • 一生产者赋值,一消费者读取
    public class Consumer
    {
      private Object lock;
      public Consumer(Object lock)
      {
        this.lock = lock;
      }
      public void getValue()
      {
        try
        {
          synchronized (lock)
          {
             if(Value.getValue().equals(""))
             {
                lock.wait();
             }
             System.out.println("Consumer get " + Value.getValue());
             Value.setValue("");
             lock.notify();
           }
         }
         catch (InterruptedException e)
         {
             e.printStackTrace();
         }
      }
    }
    
    public class Produce
    {
       private Object lock;
       public Produce(Object lock)
       {
         this.lock = lock;
       }
       public void setValue()
       {
         try
         {
            synchronized (lock)
            {
               if(!Value.getValue().equals(""))
               {
                    lock.wait();
               }
               String value = System.currentTimeMillis()+"";
               System.out.println("Produce set " + value);
               Value.setValue(value);
               lock.notify();
             }
          }
          catch (InterruptedException e)
          {
             e.printStackTrace();
          }
       }
    }
    public class ConsumerThread extends Thread
    {
        private Consumer consumer;
        public ConsumerThread(Consumer consumer)
        {
            super();
            this.consumer = consumer;
        }
        @Override
        public void run()
        {
            while(true)
            {
                consumer.getValue();
            }
        }
    }
    public class ProduceThread extends Thread
    {
        private Produce produce;
        public ProduceThread(Produce produce)
        {
            super();
            this.produce = produce;
        }
        @Override
        public void run()
        {
            while(true)
            {
                produce.setValue();
            }
        }
    }
    
    public class Value
    {
        //设置初值
        private static String value = "";
        public static void setValue(String valueSet)
        {
            value = valueSet;
        }
        public static String getValue()
        {
            return value;
        }
    }
    
    public class Main
    {
       public static void main(String[] args)
       {
          Object lock = new Object();
    
         Produce produce = new Produce(lock);
         ProduceThread produceThread = new ProduceThread(produce);
    
         Consumer consumer = new Consumer(lock);
         ConsumerThread consumerThread = new ConsumerThread(consumer);
    
         produceThread.start();
         consumerThread.start();
        }
    }
    

    运作机制:
    以value值作为切入点,当value值为""时,消费者线程为wait状态,生产者获取锁对value赋值,完成赋值后notify并释放锁,消费者被唤醒,获取生产者给value赋的值,并将value值设置回"",notify并释放锁。

    • 多生产者/多消费者实现
      假死:因为线程notify的对象有可能是异类,也有可能是同类,连续多次唤醒同类,这就有可能导致所有的线程都进入waiting状态,从而任务无法正常执行即假死。
      解决方法:使用notifyAll()方法
    public class Stack
    {
      private List list = new ArrayList();
      synchronized public void push()
      {
        try
        {
          //此处由if改为while,条件改变时无法得到及时响应
          //唤醒多个wait状态的线程,pop中的remove出错。
          while(list.size() == 1)
          {
             this.wait();
          }
          list.add("anyString"));
          this.notify();
          System.out.println("push " + list.size());
        }
        catch(InterruptedException e)
        {
          e.printStackTrace();
        }
      }
    
      synchronized public void pop()
      {
        try
        {
          while(list.size() != 1)
          {
             this.wait();
          }
          list.remove(0);
          //通知同类和异类
          this.notifyAll();
          System.out.println("pop" + list.size());
        }
      }
    }
    

    *交叉备份实现

    public class BackUpTool
    {
        private boolean backToA = false;
        synchronized public void backUpA()
        {
            try
            {
                while (backToA == false)
                {
                    wait();
                }
                System.out.println("Backup to A");
                backToA = false;
                notifyAll();
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }
        synchronized public void backUpB()
        {
            try
            {
                while (backToA == true)
                {
                    wait();
                }
                System.out.println("Backup to B");
                backToA = true;
                notifyAll();
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }
    }
    
    public class ThreadA extends Thread
    {
        private BackUpTool backUpTool;
        public ThreadA(BackUpTool tool)
        {
            super();
            this.backUpTool = tool;
        }
        @Override
        public void run()
        {
            backUpTool.backUpA();
        }
    }
    
    public class ThreadB extends Thread
    {
        private BackUpTool backUpTool;
        public ThreadB(BackUpTool tool)
        {
            super();
            this.backUpTool = tool;
        }
        @Override
        public void run()
        {
            backUpTool.backUpB();
        }
    }
    public class Run
    {
        public static void main(String[] args)
        {
            BackUpTool backUpTool = new BackUpTool();
    
            for(int i = 0; i < 20; i++)
            {
                ThreadA threadA = new ThreadA(backUpTool);
                ThreadB threadB = new ThreadB(backUpTool);
                threadA.start();
                threadB.start();
            }
        }
    }
    

    2、通过管道进行线程间通信

    管道流(pipeStream)是一种特殊的流,用于不同线程间直接传送数据。一个线程发送数据到输出管道,另一个线程从输入管道读取数据。
    字节流 PipedInputStream、PipedOutputStream
    字符流 PipedReader、PipedWriter

    import java.io.IOException;
    import java.io.PipedOutputStream;
    public class WriteData
    {
        public void wirteMethod(PipedOutputStream out)
        {
            try
            {
                System.out.println("Write:");
                for(int i=0; i<300;i++)
                {
                    String outData = "" + (i+1);
                    //0到299写入到输出管道
                    out.write(outData.getBytes());
                    System.out.print(outData);
                }
                System.out.println();
                out.close();
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
        }
    }
    
    import java.io.PipedOutputStream;
    public class ThreadWrite extends Thread
    {
        private WriteData writeData;
        private PipedOutputStream out;
        public ThreadWrite(WriteData writeData, PipedOutputStream out)
        {
            super();
            this.writeData = writeData;
            this.out = out;
        }
        @Override
        public void run()
        {
            writeData.wirteMethod(out);
        }    
    }
    
    import java.io.IOException;
    import java.io.PipedInputStream;
    public class ReadData
    {
        public void readMethod(PipedInputStream input)
        {
            try
            {
                System.out.println("Read:");
                byte[] byteArray = new byte[20];
                //到输入管道里读取数据到byteArray数组中
                //返回数组大小
                int readLength = input.read(byteArray);
                while(readLength != -1)
                {
                    String newData = new String(byteArray,0,readLength);
                    System.out.println(newData);
                    readLength = input.read(byteArray);
                }
                System.out.println();
                input.close();
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
        }
    }
    
    import java.io.PipedInputStream;
    public class ThreadReader extends Thread
    {
        private ReadData readData;
        private PipedInputStream input;
        public ThreadReader(ReadData readData, PipedInputStream input)
        {
            super();
            this.readData = readData;
            this.input = input;
        }
        @Override
        public void run()
        {
            readData.readMethod(input);
        }
    }
    
    import java.io.IOException;
    import java.io.PipedInputStream;
    import java.io.PipedOutputStream;
    public class Run
    {
        public static void main(String[] args)
        {
            try
            {
                WriteData writeData = new WriteData();
                ReadData readData = new ReadData();
                PipedInputStream inputStream = new PipedInputStream();
                PipedOutputStream outputStream = new PipedOutputStream();
                inputStream.connect(outputStream);
    
                ThreadReader threadReader = new ThreadReader(readData, inputStream);
                ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);
                threadWrite.start();
                Thread.sleep(2000);
                threadReader.start();
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
            catch (InterruptedException ex)
            {
                ex.printStackTrace();
            }
        }
    }
    

    3、join()方法

    3.1 join()方法使用方式

    class Run
    {
      public static void main(String[] args)
      {
        Thread thread = new Thread();
        thread.start();
        //加上join语句可以保证在thread线程结束后再执行打印end操作
        thread.join();
        System.out.println("end");
      }
    }
    

    方法join()的作用是使所属的线程对象thread正常执行完run()方法中的任务,而使当前的线程main进行无限期的阻塞,等待线程thread销毁后再继续执行线程main后面的代码。

    3.2 join(long)与sleep(long)的区别

    方法join(long)的功能在内部使用wait(long)来实现的,因此具有释放锁的特点;而sleep(long)方法不是释放锁。

    public final synchronized void join(long millis)
        throws InterruptedException {
            long base = System.currentTimeMillis();
            long now = 0;
    
            if (millis < 0) {
                throw new IllegalArgumentException("timeout value is negative");
            }
    
            if (millis == 0) {
                while (isAlive()) {
                    wait(0);
                }
            } else {
                while (isAlive()) {
                    long delay = millis - now;
                    if (delay <= 0) {
                        break;
                    }
                    wait(delay);
                    now = System.currentTimeMillis() - base;
                }
            }
        }
    

    4、类ThreadLocal作用

    每个线程绑定自己的值,ThreadLocal类可以存储每个线程的私有数据。每个线程可以获取到各自设置的值,互不影响,隔离性。

    public class Tools
    {
      public static ThreadLocal t1 = new ThreadLocal();
    }
    
    public class ThreadA extends Thread
    {
       @Override
       public void run()
       {
          try
          {
            for(int i = 0; i < 100; i++)
            {
              Tools.t1.set("ThreadA" + (i+1));
              System.out.println("ThreadA get " + Tools.t1.get());
              Thread.sleep(1000);
            }
          }
          catch (InterruptedException ex)
          {
             ex.printStackTrace();
          }
       }
    }
    
    public class ThreadB extends Thread
    {
       @Override
       public void run()
       {
          try
          {
            for(int i = 0; i < 100; i++)
            {
              Tools.t1.set("ThreadB" + (i+1));
              System.out.println("ThreadB get " + Tools.t1.get());
              Thread.sleep(1000);
            }
          }
          catch (InterruptedException ex)
          {
             ex.printStackTrace();
          }
       }
    }
    public class Run
    {
        public static void main(String[] args)
        {
          try
          {
           ThreadA threadA = new Thread();
           ThreadB threadB = new Thread();
           threadA.start();
           threadB.start();
           for(int i = 0; i < 100; i++)
           {
             Tools.t1.set("Main" + (i+1));
             System.out.println("Main get " + Tools.t1.get());
             Thread.sleep(1000);
           }
          }
          catch (InterruptedException ex)
          {
             ex.printStackTrace();
          }
        }
    }
    

    InteritableThreadLocal类可以让子线程从父类线程中取得值。

    相关文章

      网友评论

          本文标题:Java多线程技能(三) 线程间通信

          本文链接:https://www.haomeiwen.com/subject/jsswoftx.html