Apache commons-exec对Process进行了封装,提供了如下功能:
- 为Process的stdin, stdout, stderr重定向流,而不是File
- 并发向Process stdin写入数据、读取Process stdout和stderr的数据,避免进程阻塞
- 超时终止Process
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<version>1.3</version>
</dependency>
1. DefaultExecutor
DefaultExecutor是启动子进程的执行类。
// 设置工作目录
public void setWorkingDirectory(final File dir);
// 设置流处理器
public void setStreamHandler(final ExecuteStreamHandler streamHandler);
// 执行命令
public int execute(final CommandLine command);
public int execute(final CommandLine command, final Map<String, String> environment);
public void execute(final CommandLine command, final ExecuteResultHandler handler);
public void execute(final CommandLine command, final Map<String, String> environment,
final ExecuteResultHandler handler);
- 实际的执行方法
private int executeInternal(final CommandLine command, final Map<String, String> environment,
final File dir, final ExecuteStreamHandler streams) throws IOException {
setExceptionCaught(null);
// launch():使用Runtime.exec创建Process
// Runtime.getRuntime().exec(cmd.toStrings(), EnvironmentUtils.toStrings(env), workingDir);
final Process process = this.launch(command, environment, dir);
// 将用户提供的流与进程的流连接起来
try {
streams.setProcessInputStream(process.getOutputStream());
streams.setProcessOutputStream(process.getInputStream());
streams.setProcessErrorStream(process.getErrorStream());
} catch (final IOException e) {
process.destroy();
throw e;
}
// 启动线程,从输入流复制数据写入输出流
streams.start();
try {
// add the process to the list of those to destroy if the VM exits
if (this.getProcessDestroyer() != null) {
this.getProcessDestroyer().add(process);
}
// associate the watchdog with the newly created process
if (watchdog != null) {
watchdog.start(process);
}
int exitValue = Executor.INVALID_EXITVALUE;
try {
exitValue = process.waitFor();
} catch (final InterruptedException e) {
process.destroy();
}
finally {
// see http://bugs.sun.com/view_bug.do?bug_id=6420270
// see https://issues.apache.org/jira/browse/EXEC-46
// Process.waitFor should clear interrupt status when throwing InterruptedException
// but we have to do that manually
Thread.interrupted();
}
if (watchdog != null) {
watchdog.stop();
}
try {
streams.stop();
}
catch (final IOException e) {
setExceptionCaught(e);
}
closeProcessStreams(process);
if (getExceptionCaught() != null) {
throw getExceptionCaught();
}
if (watchdog != null) {
try {
watchdog.checkException();
} catch (final IOException e) {
throw e;
} catch (final Exception e) {
throw new IOException(e.getMessage());
}
}
if (this.isFailure(exitValue)) {
throw new ExecuteException("Process exited with an error: " + exitValue, exitValue);
}
return exitValue;
} finally {
// remove the process to the list of those to destroy if the VM exits
if (this.getProcessDestroyer() != null) {
this.getProcessDestroyer().remove(process);
}
}
}
2. CommandLine
封装命令和参数。
// 设置命令,无参数
public CommandLine(final String executable);
// 设置命令文件,无参数
public CommandLine(final File executable);
// 设置参数
public CommandLine addArguments(final String[] addArguments);
// 设置未分拆的参数
public CommandLine addArguments(final String addArguments);
// 设置替换值,替换参数中${}的占位变量
public void setSubstitutionMap(final Map<String, ?> substitutionMap);
// 直接解析命令行:第一个元素是命令,其余是参数
public static CommandLine parse(final String line);
public static CommandLine parse(final String line, final Map<String, ?> substitutionMap);
3. ExecuteStreamHandler
处理进程的输入流、输出流、错误流。
// os为process.getOutputStream(),连接着进程stdin
void setProcessInputStream(OutputStream os) throws IOException;
// is为process.getInputStream(),连接着进程stdout
void setProcessOutputStream(InputStream is) throws IOException;
// is为process.getErrorStream(),连接着进程stderr
void setProcessErrorStream(InputStream is) throws IOException;
// 启动处理
void start() throws IOException;
// 停止处理
void stop() throws IOException;
4. PumpStreamHandler
ExecuteStreamHandler的实现类
启动三个线程:
从用户提供的输入流input复制数据,到与进程stdin连接的输出流
从与进程stdout连接的输入流,复制数据到用户提供的输出流out
从与进程stderr连接的输入流,复制数据到用户提供的输出流err用户提供的输入流需要手动关闭,提供的输出流若是PipedOutputStream则会被自动关闭。
// 设置进程的stdin、stdout、stderr
public PumpStreamHandler(final OutputStream out, final OutputStream err, final InputStream input);
// 设置进程的stdout、stderr
public PumpStreamHandler(final OutputStream out, final OutputStream err);
// 设置进程的stdout、stderr
public PumpStreamHandler(final OutputStream outAndErr);
// 设置为System.out, System.err
public PumpStreamHandler();
4.1 处理进程stdin
用户提供一个输入流input,进程提供一个与stdin连接的输出流os
启动一个线程,不断的从input复制数据到os,这样input就成为了进程的stdin
input关闭或出错后,复制结束,关闭os
// input是用户提供的输入流
// os是进程提供的与stdin连接的输出流
public void setProcessInputStream(final OutputStream os) {
if (input != null) {
if (input == System.in) {
inputThread = createSystemInPump(input, os);
} else {
inputThread = createPump(input, os, true);
}
} else { // 无需输入流
try {
os.close();
} catch (final IOException e) {
final String msg = "Got exception while closing output stream";
DebugUtils.handleException(msg, e);
}
}
}
创建线程和数据泵,连接输入流is和输出流os,线程启动后将不断从输入流复制数据到输出流。
protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) {
final Thread result = new Thread(new StreamPumper(is, os, closeWhenExhausted), "Exec Stream Pumper");
result.setDaemon(true);
return result;
}
输入流与输出流之间的数据泵StreamPumper
public class StreamPumper implements Runnable {
// is,输入流;os,输出流;
// closeWhenExhausted,输入流无数据是是否关闭输出流
public StreamPumper(final InputStream is, final OutputStream os,
final boolean closeWhenExhausted);
public void run() {
synchronized (this) {
// Just in case this object is reused in the future
finished = false;
}
final byte[] buf = new byte[this.size];
int length;
try {
// 从输入流复制数据写入输出流
while ((length = is.read(buf)) > 0) {
os.write(buf, 0, length);
}
} catch (final Exception e) {
// nothing to do - happens quite often with watchdog
} finally {
if (closeWhenExhausted) {
try {
os.close();
} catch (final IOException e) {
final String msg = "Got exception while closing exhausted output stream";
DebugUtils.handleException(msg ,e);
}
}
synchronized (this) {
finished = true;
notifyAll(); // 通知等待泵结束的线程
}
}
}
// 阻塞等待泵结束
public synchronized void waitFor() throws InterruptedException;
}
4.2 处理进程stdout
用户提供一个输出流out,进程提供一个与stdout连接的输入流is
启动一个线程,不断的从is复制数据到out,这样out就成为了进程的stdout
is关闭或出错后,复制结束,关闭out
// out是用户提供的输出流
// is是进程提供的与stdout连接的输出流
public void setProcessOutputStream(final InputStream is) {
if (out != null) {
createProcessOutputPump(is, out);
}
}
protected void createProcessOutputPump(final InputStream is, final OutputStream os) {
outputThread = createPump(is, os);
}
protected Thread createPump(final InputStream is, final OutputStream os) {
final boolean closeWhenExhausted = os instanceof PipedOutputStream ? true : false;
return createPump(is, os, closeWhenExhausted);
}
4.3 处理进程stderr
用户提供一个输出流err,进程提供一个与stderr连接的输入流is
启动一个线程,不断的从is复制数据到err,这样err就成为了进程的stderr
is关闭或出错后,复制结束,关闭err
public void setProcessErrorStream(final InputStream is) {
if (err != null) {
createProcessErrorPump(is, err);
}
}
protected void createProcessErrorPump(final InputStream is, final OutputStream os) {
errorThread = createPump(is, os);
}
4.4 启动与停止
- 启动线程,开始复制数据
public void start() {
if (outputThread != null) {
outputThread.start();
}
if (errorThread != null) {
errorThread.start();
}
if (inputThread != null) {
inputThread.start();
}
}
- 等待线程结束,flush残留数据
public void stop() throws IOException {
// inputStreamPumper是处理System.in的泵
if (inputStreamPumper != null) {
inputStreamPumper.stopProcessing();
}
// 超时等待线程结束
stopThread(outputThread, stopTimeout);
stopThread(errorThread, stopTimeout);
stopThread(inputThread, stopTimeout);
// flush输出流
if (err != null && err != out) {
try {
err.flush();
} catch (final IOException e) {
final String msg = "Got exception while flushing the error stream : " + e.getMessage();
DebugUtils.handleException(msg, e);
}
}
if (out != null) {
try {
out.flush();
} catch (final IOException e) {
final String msg = "Got exception while flushing the output stream";
DebugUtils.handleException(msg, e);
}
}
if (caught != null) {
throw caught;
}
}
5. 超时终止ExecuteWatchdog
执行器启动时,另起一个线程进行异步计时;
进程提前执行完毕,则通知计时器终止;
若计时器先计时结束,则通知执行器超时,执行器终止进程;
创建时指定超时时间:
public ExecuteWatchdog(final long timeout) {
this.killedProcess = false;
this.watch = false;
this.hasWatchdog = timeout != INFINITE_TIMEOUT; // -1表示不超时
this.processStarted = false;
if (this.hasWatchdog) {
this.watchdog = new Watchdog(timeout); // 新建Watchdog进行计时
this.watchdog.addTimeoutObserver(this); // ExecuteWatchdog作为WatchDog的观察者,计时超时后得到通知
}
else {
this.watchdog = null;
}
}
提交给DefaultExecutor:
public void setWatchdog(final ExecuteWatchdog watchDog);
DefaultExecutor执行时启动ExecuteWatchdog,执行完毕停止:
private int executeInternal(...) {
...
if (watchdog != null) {
watchdog.start(process);
}
...
if (watchdog != null) {
watchdog.stop();
}
}
ExecuteWatchdog启动:
public synchronized void start(final Process processToMonitor) {
if (processToMonitor == null) {
throw new NullPointerException("process is null.");
}
if (this.process != null) {
throw new IllegalStateException("Already running.");
}
this.caught = null;
this.killedProcess = false;
this.watch = true;
this.process = processToMonitor;
this.processStarted = true;
this.notifyAll();
if (this.hasWatchdog) {
watchdog.start(); // 启动WatchDog计时
}
}
WatchDog启动:
public synchronized void start() {
stopped = false;
final Thread t = new Thread(this, "WATCHDOG"); // 启动线程进行异步计时
t.setDaemon(true);
t.start();
}
public void run() {
final long startTime = System.currentTimeMillis();
boolean isWaiting; // 未超时
synchronized (this) {
long timeLeft = timeout - (System.currentTimeMillis() - startTime);
isWaiting = timeLeft > 0;
while (!stopped && isWaiting) { // stopped表示Process是否执行完毕,执行完毕时退出计时循环
try {
wait(timeLeft); // 等待指定超时时间
} catch (final InterruptedException e) {
}
timeLeft = timeout - (System.currentTimeMillis() - startTime);
isWaiting = timeLeft > 0;
}
}
// notify the listeners outside of the synchronized block (see EXEC-60)
if (!isWaiting) { // 超时
fireTimeoutOccured();
}
}
// 观察者模式,通知观察者超时
protected final void fireTimeoutOccured() {
final Enumeration<TimeoutObserver> e = observers.elements();
while (e.hasMoreElements()) {
e.nextElement().timeoutOccured(this);
}
}
ExecuteWatchdog被通知超时:
public synchronized void timeoutOccured(final Watchdog w) {
try {
try {
if (process != null) {
process.exitValue(); // 再检查一次是否执行完毕
}
} catch (final IllegalThreadStateException itse) {
// 未执行完毕
if (watch) {
killedProcess = true;
process.destroy(); // 终止进程
}
}
} catch (final Exception e) {
caught = e;
DebugUtils.handleException("Getting the exit value of the process failed", e);
} finally {
cleanUp();
}
}
6. 示例
public static void main(String[] args) throws Exception {
// 命令行
CommandLine commandLine = CommandLine.parse("ping baidu.com");
// 重定向stdout和stderr到文件
FileOutputStream fileOutputStream = new FileOutputStream("D:\\Temp\\Process\\exec.log");
PumpStreamHandler pumpStreamHandler = new PumpStreamHandler(fileOutputStream);
// 超时终止:1秒
ExecuteWatchdog executeWatchdog = new ExecuteWatchdog(1000);
// 创建执行器
DefaultExecutor executor = new DefaultExecutor();
executor.setStreamHandler(pumpStreamHandler);
executor.setWatchdog(executeWatchdog);
// 执行,打印退出码
int exitValue = executor.execute(commandLine);
System.out.println(exitValue);
// 关闭流
fileOutputStream.close();
}
网友评论