美文网首页
线程池-工作单元

线程池-工作单元

作者: 王侦 | 来源:发表于2019-07-18 10:49 被阅读0次

1.Runnable接口

The Runnable interface should be implemented by any class 
whose instances are intended to be executed by a thread. The 
class must define a method of no arguments called run.

This interface is designed to provide a common protocol for 
objects that wish to execute code while they are active. For 
example, Runnable is implemented by class Thread. Being active 
simply means that a thread has been started and has not yet 
been stopped.

In addition, Runnable provides the means for a class to be active 
while not subclassing Thread. A class that implements Runnable 
can run without subclassing Thread by instantiating a Thread 
instance and passing itself in as the target. In most cases, the 
Runnable interface should be used if you are only planning to 
override the run() method and no other Thread methods. This is 
important because classes should not be subclassed unless the 
programmer intends on modifying or enhancing the fundamental 
behavior of the class.

Runnable接口由如下类实现:该类需要被线程执行。该类需要定义一个无参方法run。

Runnable提供了一种不是继承Thread的方式。在大多数情况下,如果仅仅是覆写run方法而不是其他Thread方法,则应该使用Runnable接口。除非打断修改或增强类的基本行为,否则不应该对类进行子类化。

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

2.Callable接口

A task that returns a result and may throw an exception. 
Implementors define a single method with no arguments called 
call.

The Callable interface is similar to Runnable, in that both are 
designed for classes whose instances are potentially executed by 
another thread. A Runnable, however, does not return a result 
and cannot throw a checked exception.

The Executors class contains utility methods to convert from 
other common forms to Callable classes.

返回结果或抛出异常的任务,实现者定义一个无参call方法。

与Runnable接口类似,该类实例可能由另一个线程执行。但是Runnable不能返回结果或抛出受查异常。

Executors类包含将其他常用形式转换为Callable类的实用方法。

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

3.Future

A Future represents the result of an asynchronous computation.
 Methods are provided to check if the computation is complete, to 
wait for its completion, and to retrieve the result of the 
computation. The result can only be retrieved using method get 
when the computation has completed, blocking if necessary until 
it is ready. Cancellation is performed by the cancel method. 
Additional methods are provided to determine if the task 
completed normally or was cancelled. Once a computation has 
completed, the computation cannot be cancelled. If you would 
like to use a Future for the sake of cancellability but not provide a 
usable result, you can declare types of the form Future<?> and 
return null as a result of the underlying task.

Future表示异步计算的结果。提供检查计算是否完成、等待计算完成以及取出计算结果的方法。只有在计算完成时,get方法才能取到结果,必要时会阻塞直到其计算完成。cacel方法执行取消动作。提供了其他方法检测任务是正常完成还是被取消。当计算完成后,其不能被取消。如果为了取消而不是提供有用结果而使用Future,可以声明Future<?>形式的类型,并返回null。

示例用法(如下的类都是虚构的):

 interface ArchiveSearcher { String search(String target); }
 class App {
   ExecutorService executor = ...
   ArchiveSearcher searcher = ...
   void showSearch(final String target)
       throws InterruptedException {
     Future<String> future
       = executor.submit(new Callable<String>() {
         public String call() {
             return searcher.search(target);
         }});
     displayOtherThings(); // do other things while searching
     try {
       displayText(future.get()); // use future
     } catch (ExecutionException ex) { cleanup(); return; }
   }
 }

FutureTask类是Future的一个实现,并实现了Runnable,因此可以由Executor执行。上述带submit结构可由下面的形式代替:

 FutureTask<String> future =
   new FutureTask<String>(new Callable<String>() {
     public String call() {
       return searcher.search(target);
   }});
 executor.execute(future);

如下为Future的完整形式:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

关于cancle方法的说明:

  • 在任务已经完成、已经被取消或者由于其他原因不能被取消时,取消会失败。
  • 任务还未启动时调用cacel,会成功,并且任务永远都不能运行
  • 任务已经启动了,mayInterruptIfRunning 参数会决定执行该任务的线程是否会被中断,以中断该任务
  • 该方法返回后,后续的isDone()调用总是返回true。如果该方法返回true,后续的isCancelled()总是返回true。

4.RunnableFuture

A Future that is Runnable. Successful execution of the run 
method causes completion of the Future and allows access to its 
results.

是Runnable的Future,成功执行run方法将使Future完成,并允许访问其他结果。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

5.FutureTask

A cancellable asynchronous computation. This class provides a 
base implementation of Future, with methods to start and cancel 
a computation, query to see if the computation is complete, and 
retrieve the result of the computation. The result can only be 
retrieved when the computation has completed; the get methods 
will block if the computation has not yet completed. Once the 
computation has completed, the computation cannot be restarted 
or cancelled (unless the computation is invoked using 
runAndReset()).

A FutureTask can be used to wrap a Callable or Runnable object. 
Because FutureTask implements Runnable, a FutureTask can be 
submitted to an Executor for execution.

In addition to serving as a standalone class, this class provides 
protected functionality that may be useful when creating 
customized task classes.

可取消的异步计算。此类提供了Future的基本实现,包括启动和取消计算的方法,查询计算是否完成的查询,以及检索计算结果。 只有在计算完成后才能检索结果; 如果计算尚未完成,get方法将阻塞。 计算完成后,无法重新启动或取消计算(除非使用runAndReset()调用计算)。

FutureTask可用于包装Callable或Runnable对象。 因为FutureTask实现了Runnable,所以可以将FutureTask提交给Executor执行。

除了作为独立类之外,此类还提供了在创建自定义任务类时可能有用的保护方法。

Revision notes: This differs from previous versions of this
class that relied on AbstractQueuedSynchronizer, mainly to
avoid surprising users about retaining interrupt status during
cancellation races. Sync control in the current design relies
on a "state" field updated via CAS to track completion, along
with a simple Treiber stack to hold waiting threads.

Style note: As usual, we bypass overhead of using
AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.

修订说明:之前版本依赖于AQS,现版本主要是为了避免在取消争用期间保留中断状态。当前设计中的同步控制依赖于通过CAS更新state域来跟踪完成,以及用于保存等待线程简单的Treiber栈。

样式说明:像之前一样,绕过了使用AtomicXFieldUpdaters的开销,而是直接使用Unsafe内部方法。

5.1 状态的变化

    /**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

任务的运行状态初始化为NEW。任务状态转为终止状态只会发生在方法set setException和cancel方法。在完成期间,state可能会一个短暂的COMPLETING或者INTERRUPTING。从这些中间状态到最终状态的转换使用更便宜的ordered/惰性写入,因为值是唯一的并且不能进一步修改。

可能的状态转换:

  • NEW -> COMPLETING -> NORMAL
  • NEW -> COMPLETING -> EXCEPTIONAL
  • NEW -> CANCELLED
  • NEW -> INTERRUPTING -> INTERRUPTED

5.2 构造器及域

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

如果不需要结果,考虑使用下面的形式:
Future<?> f = new FutureTask<Void>(runnable, null);

从构造器可以看出,任务的初始状态为NEW。

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
  • callable代表执行的任务,运行后置为null;
  • outcome代表返回结果或者要抛出的异常,从get()获取的
  • runner代表运行callable的线程,在run()中CAS赋值
  • waiters代表等待线程的Treiber栈

5.3 run

    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
  • step1.线程状态不为NEW时返回;设置任务的runner为当前线程
  • step2.运行任务result = c.call()
    正常运行完成后,set(result)
    发生异常,setException(ex)
  • step3.运行完成后,设置runner = null
    重新读取state以防有漏掉的中断:s >= INTERRUPTING时,会调用handlePossibleCancellationInterrupt(s)

runner作为锁使用,防止其他线程并发调用run:

  • run之前首先通过CAS尝试获得锁,并将runner置为当前线程
  • run之后通过将runner = null释放锁
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

从上面可以看出,run有两条状态转移路线:

  • 1)正常运行set(result)
    NEW -> COMPLETING -> NORMAL
  • 2)发生异常
    NEW -> COMPLETING -> EXCEPTIONAL

关于finishCompletion:

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }
  • step1.将waiters置为null
  • step2.逐个唤醒在栈中阻塞的线程LockSupport.unpark(t)
  • step3.调用保护方法done(),子类可以实现已完成想要的功能
  • step4.将callable置为null

5.4 get

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
  • step1.如果任务还没有完成,则调用awaitDone
  • step2.任务完成,则report(s)
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
  • step1.如果发生中断,则removeWaiter(q),并抛出异常
  • step2.如果已经完成,则返回状态
  • step3.状态为COMPLETING(表示已经执行完了,正在最后的设置期间),则调用Thread.yield(),因为当前线程是get,一般是另外的线程在执行任务,所以让执行任务的线程优先获得执行权限
  • step4.状态为< COMPLETING,此时任务还在执行,则创建q = WaitNode()
  • step5.如果创建的q没有入队,则入队,并置于队首,即入栈
  • step6.最当前线程进行进行阻塞
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

5.5 cancel

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
  • step1.不是NEW状态,直接返回false,表示此时任务已经执行完了
  • step2.为NEW状态,如果 mayInterruptIfRunning 为false
    NEW - > CANCELLED
  • step3.为NEW状态,如果 mayInterruptIfRunning 为true
    NEW -> INTERRUPTING -> INTERRUPTED
    并会对运行任务的runner进行中断

6.为什么FutureTask不再基于AQS

ThreadPoolExecutor executor = ...;  
executor.submit(task1).cancel(true);  
executor.submit(task2);  

虽然中断的是task1,但可能task2得到中断信号。

JDK1.6的 FutureTask.Sync.innerCancel的代码:

boolean innerCancel(boolean mayInterruptIfRunning) {  
 for (;;) {  
  int s = getState();  
  if (ranOrCancelled(s))  
      return false;  
  if (compareAndSetState(s, CANCELLED))  
      break;  
 }  
    if (mayInterruptIfRunning) {  
        Thread r = runner;  
        if (r != null)  //1
            r.interrupt(); //2
    }  
    releaseShared(0);  
    done();  
    return true;  
}  

按照如下的执行流程,task2得到中断信号:

  • step1.主线程调用cancel(true)想取消task1,执行完1处检查后,停住
  • step2.Thread1执行完task1,开始执行task2
  • step3.主线程此时继续执行2处的r.interrupt(),那么task2将会被中断

看看新版本怎么处理这个中断遗留问题:
cancel代码:

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

run最后的代码:

        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    /**
     * Ensures that any interrupt from a possible cancel(true) is only
     * delivered to a task while in run or runAndReset.
     */
    private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // chance to interrupt us.  Let's spin-wait patiently.
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt

        // assert state == INTERRUPTED;

        // We want to clear any interrupt we may have received from
        // cancel(true).  However, it is permissible to use interrupts
        // as an independent mechanism for a task to communicate with
        // its caller, and there is no way to clear only the
        // cancellation interrupt.
        //
        // Thread.interrupted();
    }

可知run最后对此进行了特别处理:

  • 如果该任务已经被取消,且未被取消完成,则handlePossibleCancellationInterrupt(s)
  • handlePossibleCancellationInterrupt其实很简单,就是在主线cancel()完成之前,在这里自旋,Thread.yield()让执行取消的主线程更容易获得执行机会。

这里根本就是在主线程执行取消指定任务时,让执行该取消任务的线程自旋等待 主线程中断操作 完成。

参考

相关文章

  • 线程池-工作单元

    1.Runnable接口 Runnable接口由如下类实现:该类需要被线程执行。该类需要定义一个无参方法run。 ...

  • 线程池-工作单元ForkJoinTask

    1.官方文档 在ForkJoinPool中运行的任务的抽象基类。ForkJoinTask是一个类线程的实体,比普通...

  • pthread 创建的动态线程池

    组织结构: 1,缓存工作的任务池,任务节点 2,存储工作线程的池,线程节点 3,任务池投递到工作线程的线程 4,工...

  • 线程池概述

    为什么要使用线程池? 线程池核心参数 线程池的几种拒绝策略 execute()和submit()的区别 线程池工作...

  • 大白话聊聊线程池的工作原理和核心参数

    目录 1、为啥要使用线程池 2、线程池的工作原理 3、线程池都用哪些核心参数 4. 有界队列下的线程池的工作流程 ...

  • 面试题2019年7月

    线程池原理 参考:Java 线程池原理分析 线程池工作原理:1、线程数量小于 corePoolSize,直接创建新...

  • 线程池

    线程池组件 1、线程池管理器(ThreadPoolManager):用于创建并管理线程池 2、工作线程(WorkT...

  • 线程池设计

    线程池管理器(CustomThreadPool): 用于创建并管理线程池工作线程(CustomWorker):线程...

  • 万字长文:带你透彻理解“线程池”

    目标 【理解】线程池的基本概念 【理解】线程池工作原理 【掌握】自定义线程池 【应用】java内置线程池 【应用】...

  • C++11 ThreadPool的应用

    线程池的应用 代码结构 任务队列 线程池 工作线程 代码如下

网友评论

      本文标题:线程池-工作单元

      本文链接:https://www.haomeiwen.com/subject/qtzclctx.html