前言
使用RxJava无非就是为了开异步任务,所以这儿简单按照个人使用习惯封装了下
使用方法
- 单个任务的异步
val task = Task.singleTask {
//需要异步进行的任务
}.success {
//成功回调(这里是UI线程了)
}.error {
//失败回调(这里是UI线程了)
}.interrupted {
//被打断回调(这里是UI线程了)
}.start()
task.cancel()
- 带有进度条的异步
val task = Task.progressTask<String,Unit> {
it.publishProgress("当前进度")
}.progress {
}.success {
}.error {
}.interrupted {
}.start()
task.cancel()
}
- 定时器的异步
val task = Task.timerTask { timer, tick ->
//需要定时重复的任务
}.start()
代码
interface ITask {
fun getProxy(): ITaskProxy
}
interface ITaskProxy {
fun start()
fun stop()
}
SingleTask
/**
* 带结果的任务
*/
open class SingleTask<RESULT> : ITask {
private var disposable: Disposable? = null
var isCancelled: Boolean = false
private set
private var successAction: ((RESULT) -> Unit)? = null
private var errorAction: ((Throwable) -> Unit)? = null
private var beforeAction: (() -> Unit)? = null
private var interruptedAction: (() -> Unit)? = null
private var isSuccess: Boolean = false
private var running = false
private var hasError = false
private var runnable: (SingleTask<*>) -> RESULT = { onTaskRun() }
val isRunning: Boolean
get() = disposable != null && !disposable!!.isDisposed && running && !isCancelled
class NullableResult<RESULT>(val result: RESULT)
/**
* 启动任务
*/
fun start(): SingleTask<RESULT> {
if (isRunning)
throw IllegalStateException("already started")
val current = this
hasError = false
isCancelled = false
isSuccess = false
running = true
// BEFORE
current.onTaskBefore()
beforeAction?.invoke()
if (isCancelled)
return this
val obs = Maybe
.create<NullableResult<RESULT>> { emitter ->
try {
if (isCancelled)
throw RuntimeException("task is already cancelled")
val result = runnable.invoke(this)
emitter.onSuccess(NullableResult(result))
} catch (e: Exception) {
if (!isCancelled) {
try {
emitter.onError(e)
}catch (e: Exception) {
e.printStackTrace()
}
}
}
}
.subscribeOn(Schedulers.newThread())
.observeOn(if (Task.ANDROID_PLATFORM) AndroidSchedulers.mainThread() else Schedulers.computation())
.unsubscribeOn(if (Task.ANDROID_PLATFORM) AndroidSchedulers.mainThread() else Schedulers.computation())
.doOnDispose {
// CANCEL
if (!isSuccess && !hasError) {
Single.just(isCancelled)
.subscribeOn(Schedulers.newThread())
.observeOn(if (Task.ANDROID_PLATFORM) AndroidSchedulers.mainThread() else Schedulers.computation())
.subscribe { isCancelled ->
if (isCancelled) {
onTaskInterrupted()
interruptedAction?.invoke()
}
}
}
}
if (isCancelled) {
println("start cancelled!!!!!!!!!!!!!!2222")
return this
}
disposable = obs.subscribe(
// SUCCESS
{ result ->
if (!isCancelled) {
running = false
isSuccess = true
current.onTaskSuccess(result.result)
successAction?.invoke(result.result)
}
},
// ERROR
{ throwable ->
if (!isCancelled) {
running = false
isSuccess = false
hasError = true
current.onTaskError(throwable)
errorAction?.invoke(throwable)
}
})
return this
}
fun start(runnable: (SingleTask<*>) -> RESULT): SingleTask<RESULT> {
this.runnable = runnable
return start()
}
/**
* 主动中断任务
*/
fun cancel() {
isCancelled = true
if (disposable == null) {
println("disposable is null")
}
disposable?.dispose()
}
/**
* 事件:执行前
*/
fun before(action: () -> Unit): SingleTask<RESULT> {
beforeAction = action
return this
}
/**
* 事件:任务成功
*/
fun success(action: (RESULT) -> Unit): SingleTask<RESULT> {
successAction = action
return this
}
/**
* 事件:任务出错
*/
fun error(action: (Throwable) -> Unit): SingleTask<RESULT> {
errorAction = action
return this
}
/**
* 事件:主动中断
*/
fun interrupted(action: () -> Unit): SingleTask<RESULT> {
interruptedAction = action
return this
}
protected fun onTaskBefore() {}
/**
* 任务执行信息,如果继承SingleTask,则重构此方法
*/
@Throws(Exception::class)
protected open fun onTaskRun(): RESULT {
throw NotImplementedError()
}
protected fun onTaskSuccess(result: RESULT) {}
protected fun onTaskError(error: Throwable) {}
protected fun onTaskInterrupted() {}
private var proxy = object : ITaskProxy {
override fun start() {
this@SingleTask.start()
}
override fun stop() {
this@SingleTask.cancel()
}
}
override fun getProxy(): ITaskProxy {
return proxy
}
}
ProgressTask
open class ProgressTask<PROGRESS, RESULT> : ITask {
private var progressPublishSubject: PublishSubject<PROGRESS>? = null
private var progressDisposable: Disposable? = null
private var disposable: Disposable? = null
private var isSuccess: Boolean = false
private var isCancelled: Boolean = false
private var running = false
private var hasError = false
private var progressAction: ((PROGRESS) -> Unit)? = null
private var successAction: ((RESULT) -> Unit)? = null
private var errorAction: ((Throwable) -> Unit)? = null
private var beforeAction: (() -> Unit)? = null
private var interruptedAction: (() -> Unit)? = null
val isRunning: Boolean
get() = disposable != null && !disposable!!.isDisposed && running && !isCancelled
/**
* 启动任务
*/
open fun start(): ProgressTask<PROGRESS, RESULT> {
if (isRunning)
throw IllegalStateException("already started")
val current = this
isSuccess = false
isCancelled = false
hasError = false
running = true
val subject = PublishSubject.create<PROGRESS>()
progressPublishSubject = subject
progressDisposable = subject
.observeOn(if (Task.ANDROID_PLATFORM) AndroidSchedulers.mainThread() else Schedulers.single())
.subscribe(
// PROGRESS
{ progress ->
current.onTaskProgress(progress)
progressAction?.invoke(progress)
},
{ error -> error.printStackTrace() })
// BEFORE
current.onTaskBefore()
beforeAction?.invoke()
disposable = Maybe.create<RESULT> { emitter ->
try {
val result = onTaskRun()
emitter.onSuccess(result)
} catch (e: Exception) {
if (!isCancelled)
emitter.onError(e)
}
}.subscribeOn(Schedulers.newThread())
.observeOn(if (Task.ANDROID_PLATFORM) AndroidSchedulers.mainThread() else Schedulers.single())
.unsubscribeOn(if (Task.ANDROID_PLATFORM) AndroidSchedulers.mainThread() else Schedulers.single())
.doOnDispose {
// CANCEL
if (!isSuccess && !hasError) {
Single.just(isCancelled)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { isCancelled ->
if (isCancelled) {
onTaskInterrupted()
interruptedAction?.invoke()
}
}
}
}
.subscribe(
// SUCCESS
{ result ->
if (!isCancelled) {
running = false
isSuccess = true
current.onTaskSuccess(result)
successAction?.invoke(result)
}
},
// ERROR
{ throwable ->
if (!isCancelled) {
running = false
isSuccess = false
hasError = true
current.onTaskError(throwable)
errorAction?.invoke(throwable)
}
})
return this
}
/**
* 主动中断任务
*/
open fun cancel() {
isCancelled = true
progressDisposable?.dispose()
disposable?.dispose()
}
/**
* 发送进度
*/
fun publishProgress(progress: PROGRESS) {
progressPublishSubject?.onNext(progress)
}
/**
* 事件:任务执行前
*/
fun before(action: () -> Unit): ProgressTask<PROGRESS, RESULT> {
beforeAction = action
return this
}
/**
* 事件:进度
*/
fun progress(action: (PROGRESS) -> Unit): ProgressTask<PROGRESS, RESULT> {
progressAction = action
return this
}
/**
* 事件:任务成功
*/
fun success(action: (RESULT) -> Unit): ProgressTask<PROGRESS, RESULT> {
successAction = action
return this
}
/**
* 事件:任务出错
*/
fun error(action: (Throwable) -> Unit): ProgressTask<PROGRESS, RESULT> {
errorAction = action
return this
}
/**
* 事件:任务主动中断
*/
fun interrupted(action: () -> Unit): ProgressTask<PROGRESS, RESULT> {
interruptedAction = action
return this
}
open fun onTaskBefore() {}
@Throws(Exception::class)
protected open fun onTaskRun(): RESULT {
throw NotImplementedError()
}
open fun onTaskProgress(progress: PROGRESS) {}
open fun onTaskSuccess(result: RESULT) {}
open fun onTaskError(error: Throwable) {}
open fun onTaskInterrupted() {}
private var proxy = object : ITaskProxy {
override fun start() {
this@ProgressTask.start()
}
override fun stop() {
this@ProgressTask.cancel()
}
}
override fun getProxy(): ITaskProxy {
return proxy
}
}
TimerTask
/**
* 定时器任务
* @param initialDelay 启动延时,单位:毫秒
* @param interval 周期,单位:毫秒
* @param action 定时器执行内容(次数:Long,当前时间戳:Long)
*
*/
open class TimerTask(val initialDelay: Long = 10,
val interval: Long = 1000,
var action: (timer: TimerTask, tick: Tick) -> Unit) : ITask {
private var disposable: Disposable? = null
private var mainThread: Boolean = true
private var running: Boolean = false
private var startTime: Long = 0L
private val tick: Tick = Tick()
/**
* 设定在子线程上运行
*/
fun runOnThread(): TimerTask {
mainThread = false
return this
}
/**
* 启动Timer
*/
fun start(): TimerTask {
stop()
startTime = System.currentTimeMillis()
running = true
val scheduler = if (mainThread) AndroidSchedulers.mainThread() else Schedulers.newThread()
disposable = Observable
.interval(initialDelay, interval, TimeUnit.MILLISECONDS, scheduler)
.subscribe { time ->
tick.startTime = startTime
tick.currentTime = System.currentTimeMillis()
tick.counter = time + 1
action.invoke(this, tick)
}
return this
}
/**
* 停止Timer
*/
fun stop() {
running = false
disposable?.dispose()
}
val isRunning: Boolean
get() = disposable != null && !disposable!!.isDisposed && running
class Tick {
var startTime = 0L
var currentTime = 0L
var counter = 0L
override fun toString(): String {
return "startTime=$startTime, currentTime=$currentTime, counter=$counter"
}
}
private var proxy = object : ITaskProxy {
override fun start() {
this@TimerTask.start()
}
override fun stop() {
this@TimerTask.stop()
}
}
override fun getProxy(): ITaskProxy {
return proxy
}
}
Task
/**
* 任务工具类
*/
class Task {
companion object {
var ANDROID_PLATFORM = true
/**
* 创建结果任务
*/
fun <T> singleTask(runnable: (SingleTask<*>) -> T): SingleTask<T> {
return object : SingleTask<T>() {
@Throws(Exception::class)
override fun onTaskRun(): T {
return runnable(this)
}
}
}
/**
* 创建进度任务
*/
fun <P, R> progressTask(runnable: (ProgressTask<P, R>) -> R): ProgressTask<P, R> {
return object : ProgressTask<P, R>() {
@Throws(Exception::class)
override fun onTaskRun(): R {
return runnable(this)
}
}
}
/**
* 创建定时器
*/
fun timerTask(initialDelay: Long = 10,
interval: Long = 1000,
action: (timer: TimerTask, tick: TimerTask.Tick) -> Unit): TimerTask {
return TimerTask(initialDelay, interval, action)
}
}
}
结语
基于RxJava的简单封装完成!有问题可以留言哟
网友评论