3.线程间通信

作者: 落叶飞逝的恋 | 来源:发表于2017-09-13 21:55 被阅读90次

    软件都是由不同的模块组成一个系统,从而模块与模块间的通信,线程与线程间通信是经常碰到的。下面我们介绍了关于线程间通信的几种技术方案

    1.wait/notify

    1.1不使用wait/notify的代码演示

    public class TpadTask {
        private List<Integer> list = new ArrayList<>();
    
        public void doAdd() {
            Integer num = new Random().nextInt(100);
            list.add(num);
        }
    
        public int size() {
            return list.size();
        }
    }
    
    public class ThreadA extends Thread {
        private TpadTask task;
    
        public ThreadA(TpadTask task) {
            super();
            this.task = task;
        }
    
        @Override
        public void run() {
            for(int i=0;i<10;i++){
                task.doAdd();
                System.out.println(i);
            }
        }
    }
    
    public class ThreadB extends Thread {
        private TpadTask task;
    
        public ThreadB(TpadTask task) {
            super();
            this.task = task;
        }
    
        @Override
        public void run() {
            try {
                while (true) {
                    if (task.size() == 5) {
                        System.out.println("已达到5,b线程要退出");
                        throw new InterruptedException();
                    }
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }
    
    public class Client {
        public static void main(String[] args) {
            TpadTask task = new TpadTask();
            ThreadA a = new ThreadA(task);
            ThreadB b = new ThreadB(task);
            a.start();
            b.start();
        }
    }
    

    说明:线程A是进行对list进行添加数据,线程B是对list的大小进行监控,如果达到5,则主动退出,抛出异常。

    • 结果
    0
    1
    2
    3
    4
    已达到5,b线程要退出
    5
    6
    7
    8
    9
    java.lang.InterruptedException
        at communication.ThreadB.run(ThreadB.java:21)
    

    缺点:

    • 线程B需要不停的循环来判断定条件,这样会浪费CPU资源。

    1.2 wait/notify机制

    • 1.wait()

    wait()的作用是使当前执行的线程进行等待,wait()方法是Object类的方法。该方法是将当前线程置入预执行队列,并在wait代码处停止执行,直到接收到通知或被中断执行。

    在调用wait()之前,线程必须获取该对象的对象级别锁,即只能在同步方法或者同步代码块中调用wait()方法。

    在执行wait()方法后,当前线程释放锁。从wait()返回前,线程与其他线程竞争重新获取锁。如果调用wait()没有持有锁,则会抛出异常。

    • wait方法的注释
    The current thread must own this object's monitor.
    
    • 2.notify()

    notify()的作用是使停止的线程继续运行。

    在调用notify()之前,线程必须获取该对象的对象级别锁,也只能在同步方法或者同步代码块调用

    执行notify()方法后,当前线程不会马上释放该对象锁。呈现wait的线程也不会马上获取到该对象锁。要等到执行notify()方法的线程将程序执行完。也就是退出synchronized代码块后,当前线程才会释放该对象锁。

    • 3.改造上述代码

    • TpadTask代码不变

    • ThreadA代码改造

    public class ThreadA extends Thread {
        private TpadTask task;
    
        private Object lock;
    
        public ThreadA(TpadTask task, Object obj) {
            super();
            this.task = task;
            this.lock = obj;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    synchronized (lock) {
                        task.doAdd();
                        System.out.println(i);
                        if (task.size() == 5) {
                            lock.wait();
                            System.out.println("接收到信息!!!");
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    • ThreadB
    public class ThreadB extends Thread {
        private TpadTask task;
    
        private Object lock;
    
        public ThreadB(TpadTask task, Object obj) {
            super();
            this.task = task;
            this.lock = obj;
        }
    
        @Override
        public void run() {
            try {
                synchronized (lock) {
                    if (task.size() == 5) {
                       lock.notify();
                       System.out.println("已发出信号!!!");
                    }
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }
    
    • 测试
    public class Client {
        public static void main(String[] args) throws InterruptedException {
            TpadTask task = new TpadTask();
            Object lock = new Object();
            ThreadA a = new ThreadA(task, lock);
            a.start();
            ThreadB b = new ThreadB(task, lock);
            b.start();
        }
    }
    
    • 结果
    0
    1
    2
    3
    4
    已发出信号!!!
    接收到信息!!!
    5
    6
    7
    8
    9
    
    • 4.线程状态切换
    线程状态切换
    • 5.wait(long timeout)

    等待一段时间,看是否有线程对锁进行唤醒。如果超过这个时间,则自动唤醒。

    2.通过管道流进行通信(pipeStream)

    jdk提供了4个类用于管道流通信。java.io.PipedInputStream、java.io.PipedOutputStream、java.io.PipedReader、java.io.PipedWriter

    它们的作用是让多线程可以通过管道进行线程间的通讯。在使用管道通信时,必须将PipedOutputStream和PipedInputStream配套使用。

    大致的流程是:我们在线程A中向PipedOutputStream中写入数据,这些数据会自动的发送到与PipedOutputStream对应的PipedInputStream中,进而存储在PipedInputStream的缓冲中;此时,线程B通过读取PipedInputStream中的数据。就可以实现,线程A和线程B的通信。

    • PipedOutputStream、PipedInputStream代码演示

    • 新建生产消息线程

    public class Producer  extends Thread {
        private PipedOutputStream pos;
    
        public Producer(PipedOutputStream pos) {
            this.pos = pos;
        }
    
        @Override
        public void run() {
            super.run();
            try {
                pos.write("Hello".getBytes());
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 新建接收消息线程
    public class Consumer extends Thread  {
        private PipedInputStream pis;
    
        public Consumer(PipedInputStream pis) {
            this.pis = pis;
        }
    
        @Override
        public void run() {
            super.run();
            byte[] b = new byte[100]; // 将数据保存在byte数组中
            try {
                int len = pis.read(b); // 从数组中得到实际大小。
                System.out.println(String.format("接收到信号:%s",new String(b, 0, len)));
                pis.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 测试
    public class Client {
        public static void main(String[] args) throws IOException, InterruptedException {
            PipedOutputStream pos = new PipedOutputStream();
            PipedInputStream pis = new PipedInputStream();
            try {
                pos.connect(pis);// 连接管道
    
                new Producer(pos).start();// 启动线程
    
                new Consumer(pis).start();// 启动线程
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 结果
    接收到信号:Hello
    

    pos.connect(pis)作用是将“管道输入流”和“管道输出流”关联起来。查看PipedWriter.java和PipedReader.java中connect()的源码;我们知道 out.connect(in); 等价于 in.connect(out)。当然JDK还支持管道通信字符流。

    3.Join

    主线程创建并启动子线程,如果子线程需要消耗大量的资源,主线程往往早于子线程执行完毕。而主线程需要等待子线程的结果,这时候使用join方法。

    • 例子
    public class CustomerThread extends Thread {
        public CustomerThread(){
            super();
        }
    
        @Override
        public void run() {
            System.out.println("我是子线程代码");
        }
    }
    
    public class Client {
        public static void main(String[] args) throws InterruptedException {
            CustomerThread a=new CustomerThread();
            a.start();
            System.out.println("我是主线程代码");
        }
    }
    
    • 输出
    我是主线程代码
    我是子线程代码
    

    主线程已经执行完毕了,才执行子线程,如果需要用到子线程的结果。那么只需要加一句join()。

    public class Client {
        public static void main(String[] args) throws InterruptedException {
            CustomerThread a=new CustomerThread();
            a.start();
            a.join();//join语句
            System.out.println("我是主线程代码");
        }
    }
    
    • 结果
    我是子线程代码
    我是主线程代码
    

    join的方法注释是:

    /**
     * Waits at most {@code millis} milliseconds for this thread to
     * die. A timeout of {@code 0} means to wait forever.
     */
     
    while (isAlive()) {
        wait(0);
    }
    

    使得所属的线程对象正常的执行,使得当前的线程进行无线等待,直到所属线程销毁后,再执行当前线程。

    在join过程中,如果当前线程对象被中断,则当前线程对象抛出异常。

    join(long)中的参数是等待的时间。例如join(2000),只等等2秒子线程的执行时间。超过2秒自动执行主线程代码。

    3.1join(long)与sleep(long)的区别

    join内部使用的是wait(long)来实现的。所以join()具有释放锁的特性。当前线程的锁被释放,那么其他线程就可以调用此线程的同步方法了。

    Thread.sleep(long)方法却不释放锁。

    4.ThreadLocal

    ThreadLocal并不是一个Thread,而是Thread的局部变量,也许把它命名为ThreadLocalVariable更容易让人理解一些。

    ThreadLocal是解决每个线程绑定属于自己的值。

    ThreadLocal是解决线程安全问题一个很好的思路,它通过为每个线程提供一个独立的变量副本。解决了变量并发访问的冲突问题。在很多情况下,ThreadLocal比直接使用synchronized同步机制解决线程安全问题更简单,更方便,且结果程序拥有更高的并发性。

    • 代码演示

    • 新建ThreadLocal公用类

    public class ThreadLocalVariable {
        public static ThreadLocal threadLocalVariable = new ThreadLocal();
    }
    
    • 新建ThreadA
    public class ThreadA extends Thread {
        @Override
        public void run() {
            try {
                for(int i=0;i<5;i++){
                    ThreadLocalVariable.threadLocalVariable.set(String.format("ThreadA:%s",i));
                    System.out.println(ThreadLocalVariable.threadLocalVariable.get());
                }
            }
            catch (Exception ex){
                ex.printStackTrace();
            }
        }
    }
    
    • 新建ThreadB
    public class ThreadB extends Thread {
        @Override
        public void run() {
            try {
                for(int i=0;i<5;i++){
                    ThreadLocalVariable.threadLocalVariable.set(String.format("ThreadB:%s",i));
                    System.out.println(ThreadLocalVariable.threadLocalVariable.get());
                }
            }
            catch (Exception ex){
                ex.printStackTrace();
            }
        }
    }
    
    • Client
    public class Client {
        public static void main(String[] args) {
            ThreadA a = new ThreadA();
            ThreadB b = new ThreadB();
            a.start();
            b.start();
            ThreadLocalVariable.threadLocalVariable.set("a");
            System.out.println(ThreadLocalVariable.threadLocalVariable.get());
    
        }
    }
    
    • 结果
    a
    ThreadA:0
    ThreadA:1
    ThreadA:2
    ThreadA:3
    ThreadA:4
    ThreadB:0
    ThreadB:1
    ThreadB:2
    ThreadB:3
    ThreadB:4
    

    每个线程都对threadLocalVariable进行设置值,但是取值的时候,都是取的各自的设置的值。

    没赋值前返回的值是null

    protected T initialValue() {
        return null;
    }
    

    4.1InheritableThreadLocal

    该类可以让子线程从父线程中取出值。而子线程从父类继承的值可以在子线程进行对值得修改

    相关文章

      网友评论

        本文标题:3.线程间通信

        本文链接:https://www.haomeiwen.com/subject/dbolsxtx.html