美文网首页
Java设计模式之异步方法

Java设计模式之异步方法

作者: 唐ikigai | 来源:发表于2016-01-03 19:04 被阅读798次

AsyncCallback.java

/**

 * AsyncCallback interface

 */

public interface AsyncCallback<T> {
    void onComplete(T value, Optional<Exception> ex);
}

AsyncExecutor.java

/**
 * 
 * AsyncExecutor interface
 *
 */
public interface AsyncExecutor {
  <T> AsyncResult<T> startProcess(Callable<T> task);

  <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback);

  <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException;
  
}

AsyncResult.java

/**
 * 
 * AsyncResult interface
 */
public interface AsyncResult<T> {

  boolean isCompleted();

  T getValue() throws ExecutionException;

  void await() throws InterruptedException;
}

实现类:ThreadAsyncExecutor.java

/**
 * 
 * Implementation of async executor that creates a new thread for every task.
 * 
 */
public class ThreadAsyncExecutor implements AsyncExecutor {

  /** Index for thread naming */
  private final AtomicInteger idx = new AtomicInteger(0);

  @Override
  public <T> AsyncResult<T> startProcess(Callable<T> task) {
    return startProcess(task, null);
  }

  @Override
  public <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback) {
    CompletableResult<T> result = new CompletableResult<>(callback);
    new Thread(() -> {
      try {
        result.setValue(task.call());
      } catch (Exception ex) {
        result.setException(ex);
      }
    } , "executor-" + idx.incrementAndGet()).start();
    return result;
  }

  @Override
  public <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException {
    if (asyncResult.isCompleted()) {
      return asyncResult.getValue();
    } else {
      asyncResult.await();
      return asyncResult.getValue();
    }
  }

  /**
   * Simple implementation of async result that allows completing it successfully with a value or exceptionally with an
   * exception. A really simplified version from its real life cousins FutureTask and CompletableFuture.
   *
   * @see java.util.concurrent.FutureTask
   * @see java.util.concurrent.CompletableFuture
   */
  private static class CompletableResult<T> implements AsyncResult<T> {

    static final int RUNNING = 1;
    static final int FAILED = 2;
    static final int COMPLETED = 3;

    final Object lock;
    final Optional<AsyncCallback<T>> callback;

    volatile int state = RUNNING;
    T value;
    Exception exception;

    CompletableResult(AsyncCallback<T> callback) {
      this.lock = new Object();
      this.callback = Optional.ofNullable(callback);
    }

    /**
     * Sets the value from successful execution and executes callback if available. Notifies any thread waiting for
     * completion.
     *
     * @param value
     *          value of the evaluated task
     */
    void setValue(T value) {
      this.value = value;
      this.state = COMPLETED;
      this.callback.ifPresent(ac -> ac.onComplete(value, Optional.<Exception>empty()));
      synchronized (lock) {
        lock.notifyAll();
      }
    }

    /**
     * Sets the exception from failed execution and executes callback if available. Notifies any thread waiting for
     * completion.
     *
     * @param exception
     *          exception of the failed task
     */
    void setException(Exception exception) {
      this.exception = exception;
      this.state = FAILED;
      this.callback.ifPresent(ac -> ac.onComplete(null, Optional.of(exception)));
      synchronized (lock) {
        lock.notifyAll();
      }
    }

    @Override
    public boolean isCompleted() {
      return state > RUNNING;
    }

    @Override
    public T getValue() throws ExecutionException {
      if (state == COMPLETED) {
        return value;
      } else if (state == FAILED) {
        throw new ExecutionException(exception);
      } else {
        throw new IllegalStateException("Execution not completed yet");
      }
    }

    @Override
    public void await() throws InterruptedException {
      synchronized (lock) {
        if (!isCompleted()) {
          lock.wait();
        }
      }
    }
  }
}

App.java

public class App {

  /**
   * Program entry point
   */
  public static void main(String[] args) throws Exception {
    // construct a new executor that will run async tasks
    AsyncExecutor executor = new ThreadAsyncExecutor();

    // start few async tasks with varying processing times, two last with callback handlers
    AsyncResult<Integer> asyncResult1 = executor.startProcess(lazyval(10, 500));
    AsyncResult<String> asyncResult2 = executor.startProcess(lazyval("test", 300));
    AsyncResult<Long> asyncResult3 = executor.startProcess(lazyval(50L, 700));
    AsyncResult<Integer> asyncResult4 = executor.startProcess(lazyval(20, 400), callback("Callback result 4"));
    AsyncResult<String> asyncResult5 = executor.startProcess(lazyval("callback", 600), callback("Callback result 5"));

    // emulate processing in the current thread while async tasks are running in their own threads
    Thread.sleep(350); // Oh boy I'm working hard here
    log("Some hard work done");

    // wait for completion of the tasks
    Integer result1 = executor.endProcess(asyncResult1);
    String result2 = executor.endProcess(asyncResult2);
    Long result3 = executor.endProcess(asyncResult3);
    asyncResult4.await();
    asyncResult5.await();

    // log the results of the tasks, callbacks log immediately when complete
    log("Result 1: " + result1);
    log("Result 2: " + result2);
    log("Result 3: " + result3);
  }

 
  private static <T> Callable<T> lazyval(T value, long delayMillis) {
    return () -> {
      Thread.sleep(delayMillis);
      log("Task completed with: " + value);
      return value;
    };
  }

  private static <T> AsyncCallback<T> callback(String name) {
    return (value, ex) -> {
      if (ex.isPresent()) {
        log(name + " failed: " + ex.map(Exception::getMessage).orElse(""));
      } else {
        log(name + ": " + value);
      }
    };
  }

  private static void log(String msg) {
    System.out.println(String.format("[%1$-10s] - %2$s", Thread.currentThread().getName(), msg));
  }
}

相关文章

网友评论

      本文标题:Java设计模式之异步方法

      本文链接:https://www.haomeiwen.com/subject/pslohttx.html