美文网首页
谈谈代码:Java IO业务代码优化之路

谈谈代码:Java IO业务代码优化之路

作者: 泊浮目 | 来源:发表于2019-07-01 20:45 被阅读0次

    本文首发于泊浮目的专栏:https://segmentfault.com/blog/camile

    版本 日期 备注
    1.0 2019.4.27 文章首发
    1.1 2021.5.21 修改标题:从一段代码谈起——浅谈JavaIO接口-> 谈谈代码:Java IO业务代码优化之路
    1.1 2022.2.14 部分章节调整

    1.前言

    前阵子休息天日常在寻找项目里不好的代码,看到了这样的一段代码:

        private Result sshSameExec(Session session, String cmd) {
            if (log.isDebugEnabled()) {
                log.debug("shell command: {}", cmd);
            }
            UserInfo ui = getUserInfo();
            session.setUserInfo(ui);
            int exitStatus = 0;
            StringBuilder builder = new StringBuilder();
            ChannelExec channel;
            InputStream in;
            InputStream err;
            try {
                session.connect(connectTimeout);
                channel = (ChannelExec) session.openChannel("exec");
                channel.setCommand(cmd);
                in = channel.getInputStream();
                err = channel.getErrStream();
                channel.connect();
            } catch (Exception e) {
                throw new CloudRuntimeException(e);
            }
    
            try {
                long lastRead = Long.MAX_VALUE;
                byte[] tmp = new byte[1024];
                while (true) {
                    while (in.available() > 0 || err.available() > 0) {
                        int i = 0;
                        if (in.available() > 0) {
                            i = in.read(tmp, 0, 1024);
                        } else if (err.available() > 0) {
                            i = err.read(tmp, 0, 1024);
                        }
                        if (i < 0) {
                            break;
                        }
                        lastRead = System.currentTimeMillis();
                        builder.append(new String(tmp, 0, i));
                    }
                    if (channel.isClosed()) {
                        if (in.available() > 0) {
                            continue;
                        }
                        exitStatus = channel.getExitStatus();
                        break;
                    }
                    if (System.currentTimeMillis() - lastRead > exeTimeout) {
                        break;
                    }
                }
            } catch (IOException e) {
                throw new CloudRuntimeException(e);
            } finally {
                channel.disconnect();
                session.disconnect();
            }
    
            if (0 != exitStatus) {
                return Result.createByError(ErrorData.builder()
                        .errorCode(ResultCode.EXECUTE_SSH_FAIL.getCode())
                        .detail(builder.toString())
                        .title(ResultCode.EXECUTE_SSH_FAIL.toString())
                        .build());
            } else {
                return Result.createBySuccess(builder.toString());
            }
        }
    

    简单解释一下这段代码——即通过ssh到一台机器上,然后执行一些命令.对命令输出的东西,开了一个循环,每一次读一定的位置,然后以字节流的形式读回来.

    这段代码有点丑,于是我闻到了学习的味道.


    首先是对两个Stream的消费,很显然,在多核环境下,我们同时也只能够消费其中一个Stream.其次,这代码太挫了,自己定义一个tmp,然后1024、1024这样的去取出来.

    在改良之前,我们先来回顾一下JavaIO的接口定义.

    2.JavaIO 接口知识回顾

    2.1 低级抽象接口:InputStream 和 OutputStream

    这里有同学可能问了,为啥叫它低抽象接口呢?因为它离底层太近了,计算机本来就是处理二进制的,而这两个接口正是用来处理二进制数据流的.

    先简单看一眼这两个接口:

    • InputStream
    **
     * This abstract class is the superclass of all classes representing
     * an input stream of bytes.
     *
     * <p> Applications that need to define a subclass of <code>InputStream</code>
     * must always provide a method that returns the next byte of input.
     *
     * @author  Arthur van Hoff
     * @see     java.io.BufferedInputStream
     * @see     java.io.ByteArrayInputStream
     * @see     java.io.DataInputStream
     * @see     java.io.FilterInputStream
     * @see     java.io.InputStream#read()
     * @see     java.io.OutputStream
     * @see     java.io.PushbackInputStream
     * @since   JDK1.0
     */
    public abstract class InputStream implements Closeable {.....}
    
    • OutputStream
    /**
     * This abstract class is the superclass of all classes representing
     * an output stream of bytes. An output stream accepts output bytes
     * and sends them to some sink.
     * <p>
     * Applications that need to define a subclass of
     * <code>OutputStream</code> must always provide at least a method
     * that writes one byte of output.
     *
     * @author  Arthur van Hoff
     * @see     java.io.BufferedOutputStream
     * @see     java.io.ByteArrayOutputStream
     * @see     java.io.DataOutputStream
     * @see     java.io.FilterOutputStream
     * @see     java.io.InputStream
     * @see     java.io.OutputStream#write(int)
     * @since   JDK1.0
     */
    public abstract class OutputStream implements Closeable, Flushable {...}
    

    我们可以发现,它们都实现了Closeable的接口.因此大家在使用这些原生类时,要注意在结束时调用Close方法哦.

    这两个接口的常用实现类有:

    • FileInputStreamFileOutputStream
    • DataInputStreamDataOutputStream
    • ObjectInputStreamObjectOutputStream

    2.2 高级抽象接口——Writer和Reader

    为啥说它是高级抽象接口呢?我们先来看看它们的注释:

    • Writer
    /**
     * Abstract class for writing to character streams.  The only methods that a
     * subclass must implement are write(char[], int, int), flush(), and close().
     * Most subclasses, however, will override some of the methods defined here in
     * order to provide higher efficiency, additional functionality, or both.
     *
     * @see Writer
     * @see   BufferedWriter
     * @see   CharArrayWriter
     * @see   FilterWriter
     * @see   OutputStreamWriter
     * @see     FileWriter
     * @see   PipedWriter
     * @see   PrintWriter
     * @see   StringWriter
     * @see Reader
     *
     * @author      Mark Reinhold
     * @since       JDK1.1
     */
    
    public abstract class Writer implements Appendable, Closeable, Flushable {
    
    • Reader
    /**
     * Abstract class for reading character streams.  The only methods that a
     * subclass must implement are read(char[], int, int) and close().  Most
     * subclasses, however, will override some of the methods defined here in order
     * to provide higher efficiency, additional functionality, or both.
     *
     *
     * @see BufferedReader
     * @see   LineNumberReader
     * @see CharArrayReader
     * @see InputStreamReader
     * @see   FileReader
     * @see FilterReader
     * @see   PushbackReader
     * @see PipedReader
     * @see StringReader
     * @see Writer
     *
     * @author      Mark Reinhold
     * @since       JDK1.1
     */
    
    public abstract class Reader implements Readable, Closeable {
    

    我们可以看到,这个抽象类是用来面向character的,也就是字符.字符的抽象等级必然比字节高,因为字符靠近上层,即人类.

    2.3 优化输入和输出——Buffered

    如果我们直接使用上述实现类去打开一个文件(如FileWriterFileReaderFileInputStreamFileOutputStream),对其对象调用readwritereadLine等,每个请求都是由基础OS直接处理的,这会使一个程序效率低得多——因为它们都会引发磁盘访问or网络请求等.

    为了减少这种开销,Java 平台实现缓冲 I/O 流。缓冲输入流从被称为缓冲区(buffer)的存储器区域读出数据;仅当缓冲区是空时,本地输入 API 才被调用。同样,缓冲输出流,将数据写入到缓存区,只有当缓冲区已满才调用本机输出 API。

    用于包装非缓存流的缓冲流类有4个:BufferedInputStreamBufferedOutputStream·用于创建字节缓冲字节流,BufferedReaderBufferedWriter`用于创建字符缓冲字节流.

    3. 着手优化

    之前,我们提到了这段代码写得搓的地方:

    • 首先是对两个Stream的消费,很显然,在多核环境下,我们同时也只能够消费其中一个Stream.
    • 其次,这代码太挫了,自己定义一个tmp,然后1024、1024这样的去取出来.

    故此,我们可以考虑对每个Stream都进行包装,支持用线程去消费,其次我们可以用高级抽象分接口去适配Byte,然后去装饰成Buffer.

    接下来,我们来看一段ZStack里的工具类ShellUtils,为了节省篇幅,我们仅仅截出它在IDE里的Structure:

    run方法的核心:


    我们可以看到StreamConsumer这个类,我们来看一下它的代码:

        private static class StreamConsumer extends Thread {
            final InputStream in;
            final PrintWriter out;
            final boolean flush;
    
            StreamConsumer(InputStream in, PrintWriter out, boolean flushEveryWrite) {
                this.in = in;
                this.out = out;
                flush = flushEveryWrite;
            }
    
            @Override
            public void run() {
                BufferedReader br = null;
                try {
                    br = new BufferedReader(new InputStreamReader(in));
                    String line;
                    while ( (line = br.readLine()) != null) {
                        out.println(line);
                        if (flush) {
                            out.flush();
                        }
                    }
                } catch (Exception e) {
                    logger.warn(e.getMessage(), e);
                } finally {
                    try {
                        if (br != null) {
                            br.close();
                        }
                    } catch (IOException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        }
    

    这段代码已经达到了我们的理想状态:线程消费,高级抽象.

    3.1 使用Kotlin

    3.1.1 Kotlin IO

    闲话不多说,先贴代码为敬:

    import java.io.InputStream
    import java.io.InputStreamReader
    
    class StreamGobbler(private val inputStream: InputStream, private var result: StringBuilder) : Runnable {
    
        override fun run() {
            InputStreamReader(inputStream).buffered().use {
                it.lines().forEach { r -> result.append(r) }
            }
        }
    }
    

    还是一样熟悉的配方,我们逐行来解读:

    1. 定义一个类,并且要求构造函数必须传入InputStream和一个StringBuilder.且实现了Runnable接口,这意味着它可以被线程消费.
    2. 覆写run方法.我们可以看到InputStream被适配成了InputStreamReader,这意味着它可以输出字符流了,然后我们使用了Kotlin的接口将其装饰成了Buffer.
    3. 读每一行buffer,并appned到result这个StringBuilder里去.
    4. 读完就可以告辞了,close.(use会将其关闭)

    3.1.2 Kotlin Coroutine

    先看一下上面的图,我们都知道内核态线程是由OS调度的,但当一个线程拿到时间片时,却调到了阻塞IO,那么只能等在那边,浪费时间.

    而协程则可以解决这个问题,当一个Jobhang住的时候,可以去做别的事情,绕开阻塞.更好的利用时间片.

    相关文章

      网友评论

          本文标题:谈谈代码:Java IO业务代码优化之路

          本文链接:https://www.haomeiwen.com/subject/kuwrcctx.html