美文网首页FS全栈计划
从0开始学线程并发(二)——线程通信

从0开始学线程并发(二)——线程通信

作者: MaxZing | 来源:发表于2019-05-20 17:24 被阅读0次

    线程通讯

    每个线程都有自己的内存空间(栈),这个空间存在于线程代码运行开始直至结束,期间若线程不与其他线程交互配合,几乎就没有价值。而交互配合,便涉及到线程间的通讯。暂且不表通讯前的安全问题,先熟悉下
    有哪些常用的通信方式。

    等待通知

    这个模型很简单,A线程与B线程协作,当A线程修改某个数据后,B线程感知到了这个数据变化后,进行响应的操作。这个模型常见的就是生产者和消费者,数据起始于一个线程,又完结于另一个线程。解耦且可伸缩。

    不过生产消费可能不是等待通知哦

    Java中等待,通知自己实现的话,最简单的就是循环+sleep。最近遇到的上古项目中就有,以下为伪代码

    while(networkNotReady()){
        log.debug("等待网络层启动完毕");
        Thread.sleep(1000);
    }
    log.info("network initialize finish!,heartbeat start");
    heartbeat.start();
    

    这段代码明显会存在问题,首先sleep时间写死的,所以无法保证及时性,其次,sleep时间过长效率慢,过短会导致线程频繁切换,白白消耗资源。

    Java为了解决这个问题,在Object类中,加入了几个有意思的方法

    /**
     * 将
     */
    
    wait()
    notify()
    wait(long timeout)
    wait(long timeout, int nanos)
    notifyAll()
    

    其左右如下

    Thread-A执行到一个逻辑后让A对象进行等待A.wait(), Thread-B执行完某个逻辑后调用A.notify()之后,会唤醒Thread-A继续工作,这样就避免了Sleep的时间过长或者过短,导致资源分配不平衡。

    但是会遇到新的线程安全的问题,如果多个线程执行A.wait() ,多个线程执行A.notify(),由于并行的关系,可能不是先后发生的顺序,notify发生在wait前一点意义都没有。或者A在条件未达成前提前被唤醒。这都问题,为了解决这些问题,一般要求遵循以下规则。

    注意:
    1.调用wait或notify、notifyAll需要对对象加锁
    2.wait的线程被唤醒后仍需要检查运行条件是否满足,不满足可以继续进入WAITING状态

    管道pip

    看书上一共介绍了4个类,PipedOutputStream,PipedInputStream,PipedReader,PipedWriter很少使用,很少有资料介绍。和操作系统有很大关系。这里列出用法

    public class PipedStreamExample {
        public static void main(String[] args) throws IOException, InterruptedException {
            
            final PipedInputStream pipedInputStream=new PipedInputStream();
            final PipedOutputStream pipedOutputStream=new PipedOutputStream();
            
            /*Connect pipe*/
            pipedInputStream.connect(pipedOutputStream);
            
            /*Thread for writing data to pipe*/
            Thread pipeWriter=new Thread(new Runnable() {
                @Override
                public void run() {
                                    /*输出A-Z字母*/
                    for (int i = 65; i < 91; i++) {
                        try {
                            pipedOutputStream.write(i);
                            Thread.sleep(500);
                        } catch (IOException | InterruptedException e) {
                            e.printStackTrace();
                        }
                    }       
                }
            });
            
            /*Thread for reading data from pipe*/
            Thread pipeReader=new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i = 65; i < 91; i++) {
                        try {
                            System.out.print((char)pipedInputStream.read());
                            Thread.sleep(1000);
                        } catch (InterruptedException | IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            
            /*Start thread*/
            pipeWriter.start();
            pipeReader.start();
            
            /*Join Thread*/
            pipeWriter.join();
            pipeReader.join();
            
            /*Close stream*/
            pipedOutputStream.close();
            pipedInputStream.close();
            
        }
    }
    
    public class PipedReaderWriterExample {
       public static void main(String[] args) throws Exception {
          final PipedReader pipedReader = new PipedReader();
          final PipedWriter pipedWriter = new PipedWriter();
    
          // Connect pipe
          pipedReader.connect(pipedWriter);
    
          // Writing data to pipe
          Thread writerThread = new Thread(new Runnable() {
             @Override
             public void run() {
                try {
                   for (int i = 65; i <= 70; i++) {
                      pipedWriter.write((char) i);
                      Thread.sleep(500);
                   }
                   pipedWriter.close();
                } catch (IOException | InterruptedException e) {
                   e.printStackTrace();
                }
             }
          });
    
          // Reading data from pipe
          Thread readerThread = new Thread(new Runnable() {
             @Override
             public void run() {
                try {
                   int i;
                   while ((i = pipedReader.read()) != -1) {
                      System.out.println((char) i);
                      Thread.sleep(1000);
                   }
                   pipedReader.close();
                } catch (IOException | InterruptedException e) {
                   e.printStackTrace();
                }
             }
          });
    
          // Start thread
          writerThread.start();
          readerThread.start();
       }
    }
    

    ThreadJoin

    前面的例子中就有用到Join,join的意义:当前线程立即等待,直到
    join的线程返回为止,说白了,就是插队当前线程。

    join支持设定超时时间,如果超过时间未返回的话,那么就会从超时方法返回。

    PS:ThreadLocal

    线程变量,就是Key-Value结构,与线程绑定的,每个线程只能访问到自己线程存储在内的数据,例子如下:

    public class ThreadLocalTest {
    
    
        public static void main(String[] args) throws InterruptedException {
            ThreadLocal<Long> localTest = new ThreadLocal<>();
            long startTime = System.currentTimeMillis();
            localTest.set(startTime);
            System.out.println("main.set" + localTest.get());
    
            Thread t1 = new Thread(() -> {
                try {
                    Thread.sleep(1000);
                    long time = System.currentTimeMillis();
                    System.out.println("t1.set" + time);
                    localTest.set(time);
    
                    Thread.sleep(500);
                    System.out.println("t1.get" + localTest.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            });
    
            Thread t2 = new Thread(() -> {
                try {
                    Thread.sleep(500);
                    long time = System.currentTimeMillis();
                    System.out.println("t2.set" + time);
                    localTest.set(time);
                    Thread.sleep(1000);
                    System.out.println("t2.get" + localTest.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            });
    
            t1.start();
            t2.start();
    
            t1.join();
            t2.join();
    
            System.out.println("main.get" + localTest.get());
    
    
        }
    }
    

    执行结果:


    其他线程set的值,并不会影响当前线程存储的值。内部ThreadLocalMap处理不当还会造成内存泄露。

    所以:

    ThreadLocal并不解决变量共享的问题,而是提供了线程本地的实例。每个使用该变量的线程都会初始化一个完全独立的实例副本。


    ps:看书暂未看到Future的模式,后面看到再补充

    参考

    书:《Java并发编程的艺术》第四章
    代码例子:https://www.boraji.com

    喜欢请点个赞
    转载请注明出处:https://www.jianshu.com/u/4915ed24d1e3
    如有错误,请务必指正!谢谢!
    我的博客:https://xzing.github.io/

    相关文章

      网友评论

        本文标题:从0开始学线程并发(二)——线程通信

        本文链接:https://www.haomeiwen.com/subject/rlinpqtx.html