AsyncTask 作为一个轻量级的异步操作类,他的封装还是非常具有代表性的,这篇文章我们通过源码来看一下他是如何实现的,
先分别介绍一下AsyncTask 中的角色
1.SerialExecutor 线程池 保存着需要被执行的任务队列,添加任务时,如果当前没有任务在执行,则调用任务线程池,执行任务,在任务执行结束后,也是判断任务队列中是否还有任务,有任务则调用线程池继续执行任务
2.THREAD_POOL_EXECUTOR 执行任务的任务队列,
3.InternalHandler 消息的传递着,用来发送消息到监听的线程, 这里不一定是主线程,
4.WorkerRunnable 真正工作的Runnable,
5.FutureTask 代表着需要被执行的任务,
再来说一下在AsyncTask 非常重要的方法
onPreExecute 当任务开始之前调用,用于准备数据,
doInBackground 耗时任务执行的方法,该方法工作在子线程,
onProgressUpdate 当进度发生变化后调用,作用于目标线程,通常为主线程
onPostExecute 当任务执行完毕后调用,
如何使用AsyncTask
new AsyncTask<String ,Integer,String>(){
@Override
protected void onProgressUpdate(Integer... values) {
super.onProgressUpdate(values);
}
@Override
protected void onPostExecute(String s) {
super.onPostExecute(s);
}
@Override
protected String doInBackground(String... strings) {
return null;
}
}.execute("");
AsyncTask<Params,Progress,Result> 我们可以看到构造AsyncTask的时候必须指定三个类型,Params就是在执行execute时的入参类型,这个根据自身来定义,Progress 就是进度类型,通常是Integer,第三个参数就是我们在执行完任务后返回的结果类型,
AsyncTask 构造方法
public AsyncTask(@Nullable Looper callbackLooper) {
mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
? getMainHandler()
: new Handler(callbackLooper);
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);
}
return result;
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
我们可以看到在构造方法中需要一个looper , 这个looper 决定着我们需要将消息最终传递到哪个线程,我们前面介绍InternalHandler 的时候就说了,这个消息的去处不一定时主线程,原因就是这个loopter,
我们再来看看这个InternalHandler
InternalHandler
private static class InternalHandler extends Handler {
public InternalHandler(Looper looper) {
super(looper);
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
InternalHandler 是AsyncTask 的静态内部类,继承自handler , 他只能处理2种消息,一种是result 结果,另外一种是progress进度,这里我们看到一个上面介绍比价重要的方法onProgressUpdate,可以看到消息是通过InternalHandler来发送的,同样的AsyncTask 的finish 方法也是在这里处理的, 接收的类型也比较特别 AsyncTaskResult ,我们来看看他真面目
private static class AsyncTaskResult<Data> {
final AsyncTask mTask;
final Data[] mData;
AsyncTaskResult(AsyncTask task, Data... data) {
mTask = task;
mData = data;
}
}
AsyncTaskResult的构造方法必须有AsyncTask,这个Data 是根据当前的消息类型来决定的(进度or结果),也就是我们在创建AsyncTask时约定的类型,
继续来分析WorkerRunnable
WorkerRunnable
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}
WorkerRunnable 是一个抽象类, 实现了Callable, 并且非常的简单 只有一个mParams属性,
再回过头看看AsyncTask构造方法中WorkerRunnable 的创建
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);
}
return result;
}
}
WorkerRunnable 中的 call 方法执行了一个我们在上面提到过的非常重要的方法 doInBackground , 我们看到在finally 方法中调用了postResult 方法
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
这里将result 和 当前的AsyncTask 封装成了一个类型是Result 的Msg, 并通过 InternalHandler 将它发送给目标线程
我们继续分析一下FutureTask
FutureTask
FutureTask 是一个接口,继承自 Runnable, Future,通过类的介绍我们可以将它定义为一个可以被取消的异步工作类,既然他可以被取消,那就代表着我们的耗时任务如果被添加到处理任务的队列中,我们也可以将它打断,我们先来看一下AsyncTask 中的取消方法,
public final boolean cancel(boolean mayInterruptIfRunning) {
mCancelled.set(true);
return mFuture.cancel(mayInterruptIfRunning);
}
可以看到取消任务就是调用FutureTask 的cancel方法,具体怎么实现的取消的在这里就不多说了,大家有兴趣的可以去看一下源码,
继续来分析构造方法中初始化FutureTask 的代码
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
}
};
可以看到一个done方法,那么这个done 方法的执行时机是在什么时候呢, FutureTask的入参是我们上面实现的继承自Callable 的WorkRunnable,
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
这FutureTask 的run方法中会调用我们workRunnable 的call 方法 ,然后执行 workRunnable 的doInBackground 的方法执行耗时任务,执行结束后将ran标记为true, 那么就会走下面那个set(Result)的方法,
protected void set(V v) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v;
U.putOrderedInt(this, STATE, NORMAL); // final state
finishCompletion();
}
}
检查了一下状态,继续执行finishCompletion()方法
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
可以看到在这里执行了done 方法,
再来分析一下上面初始化FutureTask 的done方法
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
在done 方法中可以到在这里会执行 postResultIfNotInvoked方法根据一个标识符来判断是否需要发送结果
private void postResultIfNotInvoked(Result result) {
final boolean wasTaskInvoked = mTaskInvoked.get();
if (!wasTaskInvoked) {
postResult(result);
}
}
那么这个标识符是在 workRunnable 的call方法中设置为true的,也就是说,如果执行了workRunnable 的call 方法,这里是肯定不会再次发送的消息
既然是done方法执行在workRunnable方法之后,那么为什么我们已经在workRunnable 的call方法中调用了postResult 的方法,这里还要再次判断呢,这就是我们前面介绍FutureTask时说他是可以被打断的,如果我们执行了 AsyncTask 的cancel 从而将 FuturnTask 打断,同样会执行这个done,而WorkRunnable 的call方法不会执行,此时就会发送结果
构造方法到这里已经分析完了,我们继续分析execute 方法,看看整个任务的执行过程,
execute
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
onPreExecute();
mWorker.mParams = params;
exec.execute(mFuture);
return this;
}
execute 方法的入参类型即是我们在创建AsyncTask 时定义的的Params类型,在调用execute 方法后,就会调用executeOnExecutor 方法,入参就是 sDefaultExecutor,那么这个sDefaultExecutor 是干什么用的呢,他是何时被创建的呢,我们继续来看
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
@UnsupportedAppUsage
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
通过源码我们可以看到sDefaultExecutor 这个类就是 SerialExecutor 的对象,并且sDefaultExecutor 是,静态变量在类初始化过程中被创建的,并且共享给所有AsyncTask的实现类,继续看看一下SerialExecutor
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
SerialExecutor 中维护了一个队列,当有任务进来时就将这个任务添加到队列里,根据当前是否有任务来判断是否需要执行任务,这里我们可以看到所有添加到SerialExecutor 中的任务是顺序执行的,也就是说如果上一个任务没有执行完毕,下一个任务是不会开启的,如果上一个任务是一个非常耗时的任务,那么你即使执行了AsyncTask 的execute 只是将这个任务放到了任务队列里面,他的执行时机是在前面的任务全部执行完毕后
那么我们如何让当前这个任务立即执行呢, 只需要使用asyncTask.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR,params);这样我们就越过了sDefaultExecutor 他里面的队列,但是这样操作还是有弊端,具体是什么我们来说一下THREAD_POOL_EXECUTOR 这个线程池创建的参数
private static final int CORE_POOL_SIZE = 1;
private static final int MAXIMUM_POOL_SIZE = 20;
private static final int BACKUP_POOL_SIZE = 5;
private static final int KEEP_ALIVE_SECONDS = 3;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), sThreadFactory);
threadPoolExecutor.setRejectedExecutionHandler(sRunOnSerialPolicy);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
源码中THREAD_POOL_EXECUTOR 的创建是一个静态方法,而他的最大线程数是20个,如果你的任务比较多,而且特别频繁,越过队列直接添加到THREAD_POOL_EXECUTOR中会丢失很多任务,此时我们就需要使用自定义的Executor 来实现具体的业务了,
到此整个过程我们就梳理完毕了,最后再来总结一下AsyncTask工作的具体流程,我们以对象来描述,
AsyncTask 在创建时约定了Params Progress Result 三个的类型,创建了一个Handler 负责发送消息(只发送进度和结果两种消息),这个Handler 可能不是在主线程(如果在创建AsyncTask 时给定的looper 不是MainLooper),同时创建了一个 实现Callable 的WorkRunnable ,他是任务的主体, 还创建了一个可以被打断的 FutureTask,承载着 WorkRunnable ,在我们执行AsyncTask 的execute方法时会将承载任务的FutureTask添加到sDefaultExecutor 他的任务队列中,让工作线程THREAD_POOL_EXECUTOR 一个一个的顺序处理消息,在消息处理过程中或完毕后通过事先创建Hanlder 来给目标线程发送结果
网友评论