美文网首页
Java设计模式之异步方法

Java设计模式之异步方法

作者: Hector_ | 来源:发表于2016-01-03 19:04 被阅读798次

    AsyncCallback.java

    /**
    
     * AsyncCallback interface
    
     */
    
    public interface AsyncCallback<T> {
        void onComplete(T value, Optional<Exception> ex);
    }
    

    AsyncExecutor.java

    /**
     * 
     * AsyncExecutor interface
     *
     */
    public interface AsyncExecutor {
      <T> AsyncResult<T> startProcess(Callable<T> task);
    
      <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback);
    
      <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException;
      
    }
    

    AsyncResult.java

    /**
     * 
     * AsyncResult interface
     */
    public interface AsyncResult<T> {
    
      boolean isCompleted();
    
      T getValue() throws ExecutionException;
    
      void await() throws InterruptedException;
    }
    

    实现类:ThreadAsyncExecutor.java

    /**
     * 
     * Implementation of async executor that creates a new thread for every task.
     * 
     */
    public class ThreadAsyncExecutor implements AsyncExecutor {
    
      /** Index for thread naming */
      private final AtomicInteger idx = new AtomicInteger(0);
    
      @Override
      public <T> AsyncResult<T> startProcess(Callable<T> task) {
        return startProcess(task, null);
      }
    
      @Override
      public <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback) {
        CompletableResult<T> result = new CompletableResult<>(callback);
        new Thread(() -> {
          try {
            result.setValue(task.call());
          } catch (Exception ex) {
            result.setException(ex);
          }
        } , "executor-" + idx.incrementAndGet()).start();
        return result;
      }
    
      @Override
      public <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException {
        if (asyncResult.isCompleted()) {
          return asyncResult.getValue();
        } else {
          asyncResult.await();
          return asyncResult.getValue();
        }
      }
    
      /**
       * Simple implementation of async result that allows completing it successfully with a value or exceptionally with an
       * exception. A really simplified version from its real life cousins FutureTask and CompletableFuture.
       *
       * @see java.util.concurrent.FutureTask
       * @see java.util.concurrent.CompletableFuture
       */
      private static class CompletableResult<T> implements AsyncResult<T> {
    
        static final int RUNNING = 1;
        static final int FAILED = 2;
        static final int COMPLETED = 3;
    
        final Object lock;
        final Optional<AsyncCallback<T>> callback;
    
        volatile int state = RUNNING;
        T value;
        Exception exception;
    
        CompletableResult(AsyncCallback<T> callback) {
          this.lock = new Object();
          this.callback = Optional.ofNullable(callback);
        }
    
        /**
         * Sets the value from successful execution and executes callback if available. Notifies any thread waiting for
         * completion.
         *
         * @param value
         *          value of the evaluated task
         */
        void setValue(T value) {
          this.value = value;
          this.state = COMPLETED;
          this.callback.ifPresent(ac -> ac.onComplete(value, Optional.<Exception>empty()));
          synchronized (lock) {
            lock.notifyAll();
          }
        }
    
        /**
         * Sets the exception from failed execution and executes callback if available. Notifies any thread waiting for
         * completion.
         *
         * @param exception
         *          exception of the failed task
         */
        void setException(Exception exception) {
          this.exception = exception;
          this.state = FAILED;
          this.callback.ifPresent(ac -> ac.onComplete(null, Optional.of(exception)));
          synchronized (lock) {
            lock.notifyAll();
          }
        }
    
        @Override
        public boolean isCompleted() {
          return state > RUNNING;
        }
    
        @Override
        public T getValue() throws ExecutionException {
          if (state == COMPLETED) {
            return value;
          } else if (state == FAILED) {
            throw new ExecutionException(exception);
          } else {
            throw new IllegalStateException("Execution not completed yet");
          }
        }
    
        @Override
        public void await() throws InterruptedException {
          synchronized (lock) {
            if (!isCompleted()) {
              lock.wait();
            }
          }
        }
      }
    }
    

    App.java

    public class App {
    
      /**
       * Program entry point
       */
      public static void main(String[] args) throws Exception {
        // construct a new executor that will run async tasks
        AsyncExecutor executor = new ThreadAsyncExecutor();
    
        // start few async tasks with varying processing times, two last with callback handlers
        AsyncResult<Integer> asyncResult1 = executor.startProcess(lazyval(10, 500));
        AsyncResult<String> asyncResult2 = executor.startProcess(lazyval("test", 300));
        AsyncResult<Long> asyncResult3 = executor.startProcess(lazyval(50L, 700));
        AsyncResult<Integer> asyncResult4 = executor.startProcess(lazyval(20, 400), callback("Callback result 4"));
        AsyncResult<String> asyncResult5 = executor.startProcess(lazyval("callback", 600), callback("Callback result 5"));
    
        // emulate processing in the current thread while async tasks are running in their own threads
        Thread.sleep(350); // Oh boy I'm working hard here
        log("Some hard work done");
    
        // wait for completion of the tasks
        Integer result1 = executor.endProcess(asyncResult1);
        String result2 = executor.endProcess(asyncResult2);
        Long result3 = executor.endProcess(asyncResult3);
        asyncResult4.await();
        asyncResult5.await();
    
        // log the results of the tasks, callbacks log immediately when complete
        log("Result 1: " + result1);
        log("Result 2: " + result2);
        log("Result 3: " + result3);
      }
    
     
      private static <T> Callable<T> lazyval(T value, long delayMillis) {
        return () -> {
          Thread.sleep(delayMillis);
          log("Task completed with: " + value);
          return value;
        };
      }
    
      private static <T> AsyncCallback<T> callback(String name) {
        return (value, ex) -> {
          if (ex.isPresent()) {
            log(name + " failed: " + ex.map(Exception::getMessage).orElse(""));
          } else {
            log(name + ": " + value);
          }
        };
      }
    
      private static void log(String msg) {
        System.out.println(String.format("[%1$-10s] - %2$s", Thread.currentThread().getName(), msg));
      }
    }
    

    相关文章

      网友评论

          本文标题:Java设计模式之异步方法

          本文链接:https://www.haomeiwen.com/subject/pslohttx.html