Android 早期的异步模型是AsyncTask,再到现在的RxJava和Kotlin协程,虽然AsyncTask有很多缺点,但AsyncTask的代码量不多,还是值得研读的。
如果对源码感兴趣,可以看这篇文章,AsyncTask源码分析
预览前置准备
-
AsyncTask的异步和安卓一样是使用Java线程池,唯一区别就是主线程API不同,但很类似。例如Android中使用Handler、Looper、Message来把异步线程中需要主线程要执行的操作封装成一个Message消息,通过Handler发送到主线程
-
鸿蒙中有对应的概念,例如EventHandler对应Handler,EventRunner对应Looper,InnerEvent对应Message
-
Android中,通过handler.sendMessgae()
发送消息到主线程,鸿蒙中,则通过eventHandler.sendEvent(),Android中消息Message对象有一个what属性用来标识消息的类型,而鸿蒙InnerEvent则是通过eventId属性来标识,同样Android的Message对象的obj属性用来存放消息的数据,而鸿蒙InnerEvent则是用object属性 -
Android中获取主线程Looper,是通过Looper.getMainLooper()来获取主线程的Looper,而鸿蒙也是通过EventRunner.getMainEventRunner()来获取主线程的EventRunner
AsyncTask修改
- 把Android中的AsyncTask源码拷贝到鸿蒙鸿蒙,替换Handler的对应部分即可
全部代码
public abstract class AsyncTask<Params, Progress, Result> {
private static final String LOG_TAG = "AsyncTask";
/**
* 获得当前CPU的核心数
*/
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
/**
* 设置线程池的核心线程数2-4之间,但是取决于CPU核数
*/
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
/**
* 设置线程池的最大线程数为 CPU核数*2+1
*/
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
/**
* 设置线程池空闲线程存活时间30s
*/
private static final int KEEP_ALIVE_SECONDS = 30;
/**
* 线程工厂,统一创建线程并配置
*/
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
@Override
public Thread newThread(Runnable runnable) {
//给每个生成的线程命名
return new Thread(runnable, "AsyncTask #" + mCount.getAndIncrement());
}
};
/**
* 任务队列,最大容量为128,就是最多支持128个任务并发
*/
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
/**
* 任务线程池,static修饰,所以是类共享,所以多个AsyncTask会共享一个线程池
*/
public static final Executor THREAD_POOL_EXECUTOR;
static {
//定义线程池,使用sPoolWorkQueue作为队列,如果超出任务执行数量,则抛出RejectedExecutionException
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
//允许线程池在没有任务时销毁线程
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
/**
* 串行线程池
*/
private static final Executor SERIAL_EXECUTOR = new SerialExecutor();
/**
* Handler的消息类型Code,发送结果的消息类型
*/
private static final int MESSAGE_POST_RESULT = 0x1;
/**
* Handler的消息类型Code,更新进度的消息类型
*/
private static final int MESSAGE_POST_PROGRESS = 0x2;
/**
* 默认线程池,默认为串行
*/
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
/**
* 主线程Handler
*/
private static InternalHandler sHandler;
/**
* 任务实现
*/
private final WorkerRunnable<Params, Result> mWorker;
/**
* 任务
*/
private final FutureTask<Result> mFuture;
/**
* 当前的任务状态
*/
private volatile AsyncTask.Status mStatus = AsyncTask.Status.PENDING;
/**
* 是否取消
*/
private final AtomicBoolean mCancelled = new AtomicBoolean();
/**
* 任务是否执行了
*/
private final AtomicBoolean mTaskInvoked = new AtomicBoolean();
/**
* 回调Handler
*/
private final EventHandler mHandler;
/**
* 顺序执行器,将线程池的执行包裹,所以即使线程池是有容量的线程池,经过SerialExecutor包裹,都是串行执行,相当于单线程执行
*/
private static class SerialExecutor implements Executor {
/**
* 任务队列,让任务串行
*/
final ArrayDeque<Runnable> mTasks = new ArrayDeque<>();
/**
* 当前执行的任务
*/
Runnable mActive;
@Override
public synchronized void execute(final Runnable runnable) {
//创建一个代理任务,包裹真正的任务,再将代理任务进队
mTasks.offer(new Runnable() {
@Override
public void run() {
try {
runnable.run();
} finally {
//执行完一个任务后,自动执行下一个任务
scheduleNext();
}
}
});
//第一次执行,马上执行任务
if (mActive == null) {
scheduleNext();
}
}
/**
* 执行下一个任务
*/
protected synchronized void scheduleNext() {
//获取下一个任务,如果有则执行
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
/**
* 运行状态
*/
public enum Status {
/**
* 等待
*/
PENDING,
/**
* 运行
*/
RUNNING,
/**
* 结束
*/
FINISHED,
}
/**
* 获取主线程Handler
*/
private static EventHandler getMainHandler() {
synchronized (AsyncTask.class) {
if (sHandler == null) {
sHandler = new InternalHandler(EventRunner.getMainEventRunner());
}
return sHandler;
}
}
private EventHandler getHandler() {
return mHandler;
}
/**
* 外部配置任务执行器,可外部定制
*
* @param exec 执行器
*/
public static void setDefaultExecutor(Executor exec) {
sDefaultExecutor = exec;
}
//-------------------- AsyncTask构造方法 start --------------------
/**
* 空参构造,取主线程的Handler回调
*/
public AsyncTask() {
this((EventRunner) null);
}
/**
* 指定回调的Handler
*/
public AsyncTask(EventHandler handler) {
this(handler != null ? handler.getEventRunner() : null);
}
/**
* 可指定消息轮训器
*
* @param callbackRunner 轮训器
*/
public AsyncTask(EventRunner callbackRunner) {
//没指定,或者指定为主线程的轮训器,则构造主线程的Handler,否则使用指定的轮训器创建
mHandler = callbackRunner == null || callbackRunner == EventRunner.getMainEventRunner()
? getMainHandler()
: new EventHandler(callbackRunner);
//任务的具体执行体,WorkerRunnable实现了Callable接口,就是说可以返回结果的任务
mWorker = new WorkerRunnable<Params, Result>() {
@Override
public Result call() throws Exception {
//标识任务为已经执行了
mTaskInvoked.set(true);
Result result = null;
try {
//执行任务,并获取执行结果
result = doInBackground(mParams);
} catch (Throwable throwable) {
//抛出了异常,设置任务已经被取消
mCancelled.set(true);
throw throwable;
} finally {
//发送结果
postResult(result);
}
//返回结果
return result;
}
};
//将WorkerRunnable作为FutureTask任务去执行
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
//任务结束,任务结束了,call还没有被调用
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
//被取消时会抛出异常
e.printStackTrace();
HiLog.error(new HiLogLabel(HiLog.LOG_APP, 0, LOG_TAG), e.getMessage());
} catch (ExecutionException e) {
//执行发生异常异常
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
//取消失败
postResultIfNotInvoked(null);
}
}
};
}
//-------------------- AsyncTask构造方法 end --------------------
/**
* 发送任务执行结果,如果没有发送过则发送
*/
private void postResultIfNotInvoked(Result result) {
//这里有点奇怪,这个方法调用在FutureTask的done方法,而mTaskInvoked标志在call()回调时就会设置为true
//所以后面的if判断一直都是进不去的,感觉这个方法是多余的,除非call方法执行还没到mTaskInvoked设置成功之前就抛出了异常
//这个if判断才成立
final boolean wasTaskInvoked = mTaskInvoked.get();
if (!wasTaskInvoked) {
postResult(result);
}
}
/**
* 发送任务执行结果到主线程进行回调
*/
private Result postResult(Result result) {
EventHandler handler = getHandler();
InnerEvent event = InnerEvent.get(MESSAGE_POST_RESULT, new AsyncTaskResult<>(this, result));
handler.sendEvent(event);
return result;
}
/**
* 获取任务运行状态
*/
public final AsyncTask.Status getStatus() {
return mStatus;
}
/**
* 执行前回调
*/
protected void onPreExecute() {
}
/**
* 任务执行回调(子线程执行)
*
* @param params 任务参数
* @return 任务结果
*/
protected abstract Result doInBackground(Params... params);
/**
* 执行后回调
*
* @param result 执行结果
*/
@SuppressWarnings({"UnusedDeclaration"})
protected void onPostExecute(Result result) {
}
/**
* 进度更新
*
* @param values 进度
*/
@SuppressWarnings({"UnusedDeclaration"})
protected void onProgressUpdate(Progress... values) {
}
/**
* 任务已经执行完毕了,却发现被取消,则回调,并带上结果
*
* @param result 结果
*/
@SuppressWarnings({"UnusedParameters"})
protected void onCancelled(Result result) {
onCancelled();
}
/**
* 任务未执行完,被取消,则回调
*/
protected void onCancelled() {
}
/**
* 是否被取消了
*/
public final boolean isCancelled() {
return mCancelled.get();
}
/**
* 取消任务
*
* @param mayInterruptIfRunning 是否强制取消,不等待执行完毕后再取消(马上打断)
*/
public final boolean cancel(boolean mayInterruptIfRunning) {
//设置任务取消的标志
mCancelled.set(true);
return mFuture.cancel(mayInterruptIfRunning);
}
/**
* 获取执行结果
*
* @return 执行结果
*/
public final Result get() throws InterruptedException, ExecutionException {
return mFuture.get();
}
/**
* 获取结果,并设定超时,如果超时还没获取到结果,则抛出异常
*
* @param timeout 超时时间
* @param unit 超时时间单位
*/
public final Result get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
return mFuture.get(timeout, unit);
}
/**
* 执行任务
*
* @param params 执行参数
*/
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
/**
* 提供执行器,执行任务
*
* @param exec 执行器,可以设置为内部的THREAD_POOL_EXECUTOR来实现并行,否则默认使用串行的执行器
* @param params 任务参数
*/
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec, Params... params) {
//处理状态,RUNNING正在执行不允许再调用执行,FINISHED已经结束了也不允许执行,所以AsyncTask只允许执行一次
if (mStatus != AsyncTask.Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
default:
break;
}
}
//切换状态为运行中
mStatus = AsyncTask.Status.RUNNING;
//任务执行前,回调,onPreExecute是直接调用的,所以如果在子线程中调用AsyncTask的execute
//onPreExecute的回调也是在子线程!
onPreExecute();
//配置任务参数
mWorker.mParams = params;
//将任务交给线程池执行
exec.execute(mFuture);
return this;
}
/**
* 执行Runnable
*/
public static void execute(Runnable runnable) {
sDefaultExecutor.execute(runnable);
}
/**
* 更新进度,需要手动调用
*
* @param values 进度
*/
protected final void publishProgress(Progress... values) {
//没有取消才更新
if (!isCancelled()) {
EventHandler handler = getHandler();
InnerEvent event = InnerEvent.get(MESSAGE_POST_PROGRESS, new AsyncTaskResult<>(this, values));
handler.sendEvent(event);
}
}
/**
* 结束任务
*
* @param result 结果
*/
private void finish(Result result) {
//结束任务时,如果被标记取消,则回调取消
if (isCancelled()) {
onCancelled(result);
} else {
//获取到结果了,并且没有取消,则回调任务结束
onPostExecute(result);
}
//设置状态为结束
mStatus = AsyncTask.Status.FINISHED;
}
/**
* 内部Handler,负责从子线程中发送消息会主线程进行方法回调
*/
private static class InternalHandler extends EventHandler {
InternalHandler(EventRunner runner) {
super(runner);
}
@Override
protected void processEvent(InnerEvent event) {
super.processEvent(event);
AsyncTaskResult<?> result = (AsyncTaskResult<?>) event.object;
switch (event.eventId) {
//主线程通知,任务结果,获取到结果
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
//更新进度
result.mTask.onProgressUpdate(result.mData);
break;
default:
break;
}
}
}
/**
* 任务
*
* @param <Params> 参数类型
* @param <Result> 结果类型
*/
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
/**
* 任务参数
*/
Params[] mParams;
}
/**
* 任务执行结果包裹类,包装结果作为Handler发送的数据
*
* @param <Data> 任务结果类型
*/
@SuppressWarnings({"RawUseOfParameterizedType"})
private static class AsyncTaskResult<Data> {
final AsyncTask mTask;
final Data[] mData;
/**
* @param task 任务对象
* @param data 执行结果
*/
AsyncTaskResult(AsyncTask task, Data... data) {
mTask = task;
mData = data;
}
}
}
简单使用
例子:通过AsyncTask加载一张网络图片,以及通过AsyncTask请求玩安卓的一个Get请求接口
public class MainAbilitySlice extends AbilitySlice {
private Image vImage;
private Button vLoadImg;
private Button vNativeNetRequest;
private Button vOkHttpNetRequest;
private OkHttpClient mOkHttpClient;
private ShapeLoadingDialog mLoadingDialog;
@Override
public void onStart(Intent intent) {
super.onStart(intent);
super.setUIContent(ResourceTable.Layout_ability_main);
mOkHttpClient = new OkHttpClient.Builder().build();
findView();
bindView();
}
private void findView() {
vImage = (Image) findComponentById(ResourceTable.Id_image);
vLoadImg = (Button) findComponentById(ResourceTable.Id_load_img);
vNativeNetRequest = (Button) findComponentById(ResourceTable.Id_native_net_request);
vOkHttpNetRequest = (Button) findComponentById(ResourceTable.Id_okhttp_net_request);
}
private void bindView() {
vLoadImg.setClickedListener(new Component.ClickedListener() {
@Override
public void onClick(Component v) {
String url = "https://ss0.bdstatic.com/94oJfD_bAAcT8t7mm9GUKT-xh_/timg?image&quality=100&size=b4000_4000&sec=1565684573&di=82491d3ea2a4d5195d6f8bd90eba1953&src=http://image.coolapk.com/picture/2016/1210/459462_1481302685_5118.png.m.jpg";
DownloadImageTask task = new DownloadImageTask(mOkHttpClient, new Callback<PixelMap>() {
@Override
public void onStart() {
vImage.setPixelMap(null);
showLoading();
}
@Override
public void onFinish(PixelMap pixelMap) {
hideLoading();
if (pixelMap != null) {
vImage.setPixelMap(pixelMap);
} else {
toast("下载失败");
}
}
});
task.execute(url);
}
});
vNativeNetRequest.setClickedListener(new Component.ClickedListener() {
@Override
public void onClick(Component component) {
HttpURLConnectionNetRequestTask task = new HttpURLConnectionNetRequestTask(new Callback<String>() {
@Override
public void onStart() {
showLoading();
}
@Override
public void onFinish(String result) {
hideLoading();
if (!TextUtils.isEmpty(result)) {
HiLogUtil.info(result);
toast(result);
} else {
toast("网络请求失败");
}
}
});
task.execute("https://wanandroid.com/article/listproject/0/json");
}
});
vOkHttpNetRequest.setClickedListener(new Component.ClickedListener() {
@Override
public void onClick(Component component) {
OkHttpNetRequestTask task = new OkHttpNetRequestTask(mOkHttpClient, new Callback<String>() {
@Override
public void onStart() {
showLoading();
}
@Override
public void onFinish(String result) {
hideLoading();
if (!TextUtils.isEmpty(result)) {
HiLogUtil.info(result);
toast(result);
} else {
toast("网络请求失败");
}
}
});
task.execute("https://wanandroid.com/article/listproject/0/json");
}
});
}
private static class DownloadImageTask extends AsyncTask<String, Void, PixelMap> {
private final OkHttpClient mClient;
private final Callback<PixelMap> mCallback;
public DownloadImageTask(OkHttpClient client, Callback<PixelMap> callback) {
mClient = client;
mCallback = callback;
}
@Override
protected void onPreExecute() {
super.onPreExecute();
mCallback.onStart();
}
@Override
protected PixelMap doInBackground(String... urls) {
try {
String url = urls[0];
Request request = new Request.Builder()
.url(url)
.build();
Call call = mClient.newCall(request);
Response response = call.execute();
ResponseBody body = response.body();
if (body == null) {
return null;
}
ImageSource imageSource = ImageSource.create(body.byteStream(), null);
return imageSource.createPixelmap(null);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
@Override
protected void onPostExecute(PixelMap pixelMap) {
super.onPostExecute(pixelMap);
mCallback.onFinish(pixelMap);
}
}
private static class HttpURLConnectionNetRequestTask extends AsyncTask<String, Void, String> {
private final Callback<String> mCallback;
public HttpURLConnectionNetRequestTask(Callback<String> callback) {
this.mCallback = callback;
}
@Override
protected void onPreExecute() {
super.onPreExecute();
mCallback.onStart();
}
@Override
protected String doInBackground(String... params) {
return HttpUtil.httpGet(params[0]);
}
@Override
protected void onPostExecute(String result) {
super.onPostExecute(result);
mCallback.onFinish(result);
}
}
private static class OkHttpNetRequestTask extends AsyncTask<String, Void, String> {
private final OkHttpClient mClient;
private final Callback<String> mCallback;
public OkHttpNetRequestTask(OkHttpClient client, Callback<String> callback) {
this.mClient = client;
this.mCallback = callback;
}
@Override
protected void onPreExecute() {
super.onPreExecute();
mCallback.onStart();
}
@Override
protected String doInBackground(String... params) {
try {
String url = params[0];
Request request = new Request.Builder()
.url(url)
.build();
Call call = mClient.newCall(request);
Response response = call.execute();
ResponseBody body = response.body();
if (body == null) {
return null;
}
return body.string();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
@Override
protected void onPostExecute(String result) {
super.onPostExecute(result);
mCallback.onFinish(result);
}
}
/**
* 回调接口
*/
public interface Callback<T> {
/**
* 执行前回调
*/
void onStart();
/**
* 执行后回调
*
* @param result 执行结果
*/
void onFinish(T result);
}
@Override
protected void onStop() {
super.onStop();
hideLoading();
mLoadingDialog = null;
}
private void showLoading() {
if (mLoadingDialog == null) {
mLoadingDialog = new ShapeLoadingDialog.Builder(this)
.loadText("加载中...")
//点击返回键时是否消失Dialog
.cancelable(true)
//点击Dialog外部时是否消失Dialog
.canceledOnTouchOutside(false)
.build();
}
if (!mLoadingDialog.isShowing()) {
mLoadingDialog.show();
}
}
private void hideLoading() {
if (mLoadingDialog != null && mLoadingDialog.isShowing()) {
mLoadingDialog.hide();
}
}
private void toast(String msg) {
new ToastDialog(getContext()
.getApplicationContext())
.setText(msg)
.setAlignment(LayoutAlignment.CENTER)
.show();
}
}
网友评论