美文网首页JAVA进阶
使用commons-exec执行外部命令

使用commons-exec执行外部命令

作者: 毛小力 | 来源:发表于2018-09-18 17:55 被阅读0次

    Apache commons-exec对Process进行了封装,提供了如下功能:

    1. 为Process的stdin, stdout, stderr重定向流,而不是File
    2. 并发向Process stdin写入数据、读取Process stdout和stderr的数据,避免进程阻塞
    3. 超时终止Process
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-exec</artifactId>
        <version>1.3</version>
    </dependency>
    

    1. DefaultExecutor

    DefaultExecutor是启动子进程的执行类。

    // 设置工作目录
    public void setWorkingDirectory(final File dir);
    
    // 设置流处理器
    public void setStreamHandler(final ExecuteStreamHandler streamHandler);
    
    // 执行命令
    public int execute(final CommandLine command);
    public int execute(final CommandLine command, final Map<String, String> environment);
    public void execute(final CommandLine command, final ExecuteResultHandler handler);
    public void execute(final CommandLine command, final Map<String, String> environment, 
        final ExecuteResultHandler handler);
    
    • 实际的执行方法
    private int executeInternal(final CommandLine command, final Map<String, String> environment,
            final File dir, final ExecuteStreamHandler streams) throws IOException {
    
        setExceptionCaught(null);
    
        // launch():使用Runtime.exec创建Process
        // Runtime.getRuntime().exec(cmd.toStrings(), EnvironmentUtils.toStrings(env), workingDir);
        final Process process = this.launch(command, environment, dir);
    
        // 将用户提供的流与进程的流连接起来
        try {
            streams.setProcessInputStream(process.getOutputStream());
            streams.setProcessOutputStream(process.getInputStream());
            streams.setProcessErrorStream(process.getErrorStream());
        } catch (final IOException e) {
            process.destroy();
            throw e;
        }
    
        // 启动线程,从输入流复制数据写入输出流
        streams.start();
    
        try {
    
            // add the process to the list of those to destroy if the VM exits
            if (this.getProcessDestroyer() != null) {
              this.getProcessDestroyer().add(process);
            }
    
            // associate the watchdog with the newly created process
            if (watchdog != null) {
                watchdog.start(process);
            }
    
            int exitValue = Executor.INVALID_EXITVALUE;
    
            try {
                exitValue = process.waitFor();
            } catch (final InterruptedException e) {
                process.destroy();
            }
            finally {
                // see http://bugs.sun.com/view_bug.do?bug_id=6420270
                // see https://issues.apache.org/jira/browse/EXEC-46
                // Process.waitFor should clear interrupt status when throwing InterruptedException
                // but we have to do that manually
                Thread.interrupted();
            }            
    
            if (watchdog != null) {
                watchdog.stop();
            }
    
            try {
                streams.stop();
            }
            catch (final IOException e) {
                setExceptionCaught(e);
            }
    
            closeProcessStreams(process);
    
            if (getExceptionCaught() != null) {
                throw getExceptionCaught();
            }
    
            if (watchdog != null) {
                try {
                    watchdog.checkException();
                } catch (final IOException e) {
                    throw e;
                } catch (final Exception e) {
                    throw new IOException(e.getMessage());
                }
            }
    
            if (this.isFailure(exitValue)) {
                throw new ExecuteException("Process exited with an error: " + exitValue, exitValue);
            }
    
            return exitValue;
        } finally {
            // remove the process to the list of those to destroy if the VM exits
            if (this.getProcessDestroyer() != null) {
              this.getProcessDestroyer().remove(process);
            }
        }
    }
    

    2. CommandLine

    封装命令和参数。

    // 设置命令,无参数
    public CommandLine(final String executable);
    // 设置命令文件,无参数
    public CommandLine(final File executable);
    
    // 设置参数
    public CommandLine addArguments(final String[] addArguments);
    // 设置未分拆的参数
    public CommandLine addArguments(final String addArguments);
    // 设置替换值,替换参数中${}的占位变量
    public void setSubstitutionMap(final Map<String, ?> substitutionMap);
    
    // 直接解析命令行:第一个元素是命令,其余是参数
    public static CommandLine parse(final String line);
    public static CommandLine parse(final String line, final Map<String, ?> substitutionMap);
    

    3. ExecuteStreamHandler

    处理进程的输入流、输出流、错误流。

    // os为process.getOutputStream(),连接着进程stdin
    void setProcessInputStream(OutputStream os) throws IOException;
    
    // is为process.getInputStream(),连接着进程stdout
    void setProcessOutputStream(InputStream is) throws IOException;
    
    // is为process.getErrorStream(),连接着进程stderr
    void setProcessErrorStream(InputStream is) throws IOException;
    
    // 启动处理
    void start() throws IOException;
    
    // 停止处理
    void stop() throws IOException;
    

    4. PumpStreamHandler

    ExecuteStreamHandler的实现类
    启动三个线程:
    从用户提供的输入流input复制数据,到与进程stdin连接的输出流
    从与进程stdout连接的输入流,复制数据到用户提供的输出流out
    从与进程stderr连接的输入流,复制数据到用户提供的输出流err

    用户提供的输入流需要手动关闭,提供的输出流若是PipedOutputStream则会被自动关闭。

    // 设置进程的stdin、stdout、stderr
    public PumpStreamHandler(final OutputStream out, final OutputStream err, final InputStream input);
    
    // 设置进程的stdout、stderr
    public PumpStreamHandler(final OutputStream out, final OutputStream err);
    
    // 设置进程的stdout、stderr
    public PumpStreamHandler(final OutputStream outAndErr);
    
    // 设置为System.out, System.err
    public PumpStreamHandler();
    

    4.1 处理进程stdin

    用户提供一个输入流input,进程提供一个与stdin连接的输出流os
    启动一个线程,不断的从input复制数据到os,这样input就成为了进程的stdin
    input关闭或出错后,复制结束,关闭os

    // input是用户提供的输入流
    // os是进程提供的与stdin连接的输出流
    public void setProcessInputStream(final OutputStream os) {
        if (input != null) {
            if (input == System.in) {
                inputThread = createSystemInPump(input, os);
            } else {
                inputThread = createPump(input, os, true);
            }
        } else { // 无需输入流
            try {
                os.close();
            } catch (final IOException e) {
                final String msg = "Got exception while closing output stream";
                DebugUtils.handleException(msg, e);
            }
        }
    }
    

    创建线程和数据泵,连接输入流is和输出流os,线程启动后将不断从输入流复制数据到输出流。

    protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) {
        final Thread result = new Thread(new StreamPumper(is, os, closeWhenExhausted), "Exec Stream Pumper");
        result.setDaemon(true);
        return result;
    }
    

    输入流与输出流之间的数据泵StreamPumper

    public class StreamPumper implements Runnable {
        // is,输入流;os,输出流;
        // closeWhenExhausted,输入流无数据是是否关闭输出流
        public StreamPumper(final InputStream is, final OutputStream os,
                final boolean closeWhenExhausted);
    
        public void run() {
            synchronized (this) {
                // Just in case this object is reused in the future
                finished = false;
            }
    
            final byte[] buf = new byte[this.size];
    
            int length;
            try {
                // 从输入流复制数据写入输出流
                while ((length = is.read(buf)) > 0) {
                    os.write(buf, 0, length);
                }
            } catch (final Exception e) {
                // nothing to do - happens quite often with watchdog
            } finally {
                if (closeWhenExhausted) {
                    try {
                        os.close();
                    } catch (final IOException e) {
                        final String msg = "Got exception while closing exhausted output stream";
                        DebugUtils.handleException(msg ,e);
                    }
                }
                synchronized (this) {
                    finished = true;
                    notifyAll(); // 通知等待泵结束的线程
                }
            }
        }
    
        // 阻塞等待泵结束
        public synchronized void waitFor() throws InterruptedException;
    }
    

    4.2 处理进程stdout

    用户提供一个输出流out,进程提供一个与stdout连接的输入流is
    启动一个线程,不断的从is复制数据到out,这样out就成为了进程的stdout
    is关闭或出错后,复制结束,关闭out

    // out是用户提供的输出流
    // is是进程提供的与stdout连接的输出流
    public void setProcessOutputStream(final InputStream is) {
        if (out != null) {
            createProcessOutputPump(is, out);
        }
    }
    
    protected void createProcessOutputPump(final InputStream is, final OutputStream os) {
        outputThread = createPump(is, os);
    }
    
    protected Thread createPump(final InputStream is, final OutputStream os) {
        final boolean closeWhenExhausted = os instanceof PipedOutputStream ? true : false;
        return createPump(is, os, closeWhenExhausted);
    }
    

    4.3 处理进程stderr

    用户提供一个输出流err,进程提供一个与stderr连接的输入流is
    启动一个线程,不断的从is复制数据到err,这样err就成为了进程的stderr
    is关闭或出错后,复制结束,关闭err

    public void setProcessErrorStream(final InputStream is) {
        if (err != null) {
            createProcessErrorPump(is, err);
        }
    }
    
    protected void createProcessErrorPump(final InputStream is, final OutputStream os) {
        errorThread = createPump(is, os);
    }
    

    4.4 启动与停止

    • 启动线程,开始复制数据
    public void start() {
        if (outputThread != null) {
            outputThread.start();
        }
        if (errorThread != null) {
            errorThread.start();
        }
        if (inputThread != null) {
            inputThread.start();
        }
    }
    
    • 等待线程结束,flush残留数据
    public void stop() throws IOException {
        // inputStreamPumper是处理System.in的泵
        if (inputStreamPumper != null) {
            inputStreamPumper.stopProcessing();
        }
    
        // 超时等待线程结束
        stopThread(outputThread, stopTimeout);
        stopThread(errorThread, stopTimeout);
        stopThread(inputThread, stopTimeout);
    
        // flush输出流
        if (err != null && err != out) {
            try {
                err.flush();
            } catch (final IOException e) {
                final String msg = "Got exception while flushing the error stream : " + e.getMessage();
                DebugUtils.handleException(msg, e);
            }
        }
    
        if (out != null) {
            try {
                out.flush();
            } catch (final IOException e) {
                final String msg = "Got exception while flushing the output stream";
                DebugUtils.handleException(msg, e);
            }
        }
    
        if (caught != null) {
            throw caught;
        }
    }
    

    5. 超时终止ExecuteWatchdog

    执行器启动时,另起一个线程进行异步计时;
    进程提前执行完毕,则通知计时器终止;
    若计时器先计时结束,则通知执行器超时,执行器终止进程;

    创建时指定超时时间:

    public ExecuteWatchdog(final long timeout) {
        this.killedProcess = false;
        this.watch = false;
        this.hasWatchdog = timeout != INFINITE_TIMEOUT; // -1表示不超时
        this.processStarted = false;
        if (this.hasWatchdog) {
            this.watchdog = new Watchdog(timeout); // 新建Watchdog进行计时
            this.watchdog.addTimeoutObserver(this); // ExecuteWatchdog作为WatchDog的观察者,计时超时后得到通知
        }
        else {
            this.watchdog = null;
        }
    }
    

    提交给DefaultExecutor:

    public void setWatchdog(final ExecuteWatchdog watchDog);
    

    DefaultExecutor执行时启动ExecuteWatchdog,执行完毕停止:

    private int executeInternal(...) {
        ...
        if (watchdog != null) {
            watchdog.start(process);
        }
        ...
        if (watchdog != null) {
            watchdog.stop();
        }
    }
    

    ExecuteWatchdog启动:

    public synchronized void start(final Process processToMonitor) {
        if (processToMonitor == null) {
            throw new NullPointerException("process is null.");
        }
        if (this.process != null) {
            throw new IllegalStateException("Already running.");
        }
        this.caught = null;
        this.killedProcess = false;
        this.watch = true;
        this.process = processToMonitor;
        this.processStarted = true;
        this.notifyAll();
        if (this.hasWatchdog) {
            watchdog.start();  // 启动WatchDog计时
        }
    }
    

    WatchDog启动:

    public synchronized void start() {
        stopped = false;
        final Thread t = new Thread(this, "WATCHDOG"); // 启动线程进行异步计时
        t.setDaemon(true);
        t.start();
    }
    
    public void run() {
        final long startTime = System.currentTimeMillis();
        boolean isWaiting; // 未超时
        synchronized (this) {
            long timeLeft = timeout - (System.currentTimeMillis() - startTime);
            isWaiting = timeLeft > 0;
            while (!stopped && isWaiting) { // stopped表示Process是否执行完毕,执行完毕时退出计时循环
                try {
                    wait(timeLeft); // 等待指定超时时间
                } catch (final InterruptedException e) {
                }
                timeLeft = timeout - (System.currentTimeMillis() - startTime);
                isWaiting = timeLeft > 0;
            }
        }
    
        // notify the listeners outside of the synchronized block (see EXEC-60)
        if (!isWaiting) { // 超时
            fireTimeoutOccured();
        }
    }
    
    // 观察者模式,通知观察者超时
    protected final void fireTimeoutOccured() {
        final Enumeration<TimeoutObserver> e = observers.elements();
        while (e.hasMoreElements()) {
            e.nextElement().timeoutOccured(this);
        }
    }
    

    ExecuteWatchdog被通知超时:

    public synchronized void timeoutOccured(final Watchdog w) {
        try {
            try {
                if (process != null) {
                    process.exitValue(); // 再检查一次是否执行完毕
                }
            } catch (final IllegalThreadStateException itse) {
                // 未执行完毕
                if (watch) {
                    killedProcess = true;
                    process.destroy(); // 终止进程
                }
            }
        } catch (final Exception e) {
            caught = e;
            DebugUtils.handleException("Getting the exit value of the process failed", e);
        } finally {
            cleanUp();
        }
    }
    

    6. 示例

    public static void main(String[] args) throws Exception {
        // 命令行
        CommandLine commandLine = CommandLine.parse("ping baidu.com");
    
        // 重定向stdout和stderr到文件
        FileOutputStream fileOutputStream = new FileOutputStream("D:\\Temp\\Process\\exec.log");
        PumpStreamHandler pumpStreamHandler = new PumpStreamHandler(fileOutputStream);
    
        // 超时终止:1秒
        ExecuteWatchdog executeWatchdog = new ExecuteWatchdog(1000);
    
        // 创建执行器
        DefaultExecutor executor = new DefaultExecutor();
        executor.setStreamHandler(pumpStreamHandler);
        executor.setWatchdog(executeWatchdog);
    
        // 执行,打印退出码
        int exitValue = executor.execute(commandLine);
        System.out.println(exitValue);
    
        // 关闭流
        fileOutputStream.close();
    }
    

    相关文章

      网友评论

        本文标题:使用commons-exec执行外部命令

        本文链接:https://www.haomeiwen.com/subject/zckunftx.html