美文网首页
Android Kotlin 基于RxJava的简单封装

Android Kotlin 基于RxJava的简单封装

作者: 水天滑稽天照八野滑稽石 | 来源:发表于2020-08-24 23:11 被阅读0次

    前言

    使用RxJava无非就是为了开异步任务,所以这儿简单按照个人使用习惯封装了下

    使用方法

    • 单个任务的异步
      val task = Task.singleTask { 
                //需要异步进行的任务
            }.success { 
                //成功回调(这里是UI线程了)
            }.error { 
                //失败回调(这里是UI线程了)
            }.interrupted { 
                //被打断回调(这里是UI线程了)
            }.start()
            task.cancel()
    
    
    • 带有进度条的异步
     val task = Task.progressTask<String,Unit> {
    
                it.publishProgress("当前进度")
            }.progress {
                
            }.success {
    
            }.error {
    
            }.interrupted {
    
            }.start()
            task.cancel()
        }
    
    • 定时器的异步
     val task = Task.timerTask { timer, tick ->
                //需要定时重复的任务
            }.start()
    

    代码

    interface ITask {
    
        fun getProxy(): ITaskProxy
    }
    
    interface ITaskProxy {
    
        fun start()
    
        fun stop()
    
    }
    
    SingleTask
    /**
     * 带结果的任务
     */
    open class SingleTask<RESULT> : ITask {
    
        private var disposable: Disposable? = null
        var isCancelled: Boolean = false
            private set
    
        private var successAction: ((RESULT) -> Unit)? = null
        private var errorAction: ((Throwable) -> Unit)? = null
        private var beforeAction: (() -> Unit)? = null
        private var interruptedAction: (() -> Unit)? = null
    
        private var isSuccess: Boolean = false
        private var running = false
        private var hasError = false
    
        private var runnable: (SingleTask<*>) -> RESULT = { onTaskRun() }
    
        val isRunning: Boolean
            get() = disposable != null && !disposable!!.isDisposed && running && !isCancelled
    
    
        class NullableResult<RESULT>(val result: RESULT)
    
        /**
         * 启动任务
         */
        fun start(): SingleTask<RESULT> {
            if (isRunning)
                throw IllegalStateException("already started")
    
            val current = this
            hasError = false
            isCancelled = false
            isSuccess = false
            running = true
    
            // BEFORE
            current.onTaskBefore()
            beforeAction?.invoke()
    
            if (isCancelled)
                return this
    
            val obs = Maybe
                .create<NullableResult<RESULT>> { emitter ->
                    try {
                        if (isCancelled)
                            throw RuntimeException("task is already cancelled")
    
                        val result = runnable.invoke(this)
                        emitter.onSuccess(NullableResult(result))
    
                    } catch (e: Exception) {
                        if (!isCancelled) {
                            try {
                                emitter.onError(e)
                            }catch (e: Exception) {
                                e.printStackTrace()
                            }
                        }
                    }
                }
                .subscribeOn(Schedulers.newThread())
                .observeOn(if (Task.ANDROID_PLATFORM) AndroidSchedulers.mainThread() else Schedulers.computation())
                .unsubscribeOn(if (Task.ANDROID_PLATFORM) AndroidSchedulers.mainThread() else Schedulers.computation())
                .doOnDispose {
    
                    // CANCEL
                    if (!isSuccess && !hasError) {
                        Single.just(isCancelled)
                            .subscribeOn(Schedulers.newThread())
                            .observeOn(if (Task.ANDROID_PLATFORM) AndroidSchedulers.mainThread() else Schedulers.computation())
                            .subscribe { isCancelled ->
                                if (isCancelled) {
                                    onTaskInterrupted()
                                    interruptedAction?.invoke()
                                }
                            }
                    }
                }
    
            if (isCancelled) {
                println("start cancelled!!!!!!!!!!!!!!2222")
                return this
            }
    
    
            disposable = obs.subscribe(
                // SUCCESS
                { result ->
                    if (!isCancelled) {
                        running = false
                        isSuccess = true
                        current.onTaskSuccess(result.result)
                        successAction?.invoke(result.result)
                    }
                },
                // ERROR
                { throwable ->
                    if (!isCancelled) {
                        running = false
                        isSuccess = false
                        hasError = true
                        current.onTaskError(throwable)
                        errorAction?.invoke(throwable)
                    }
                })
            return this
        }
    
        fun start(runnable: (SingleTask<*>) -> RESULT): SingleTask<RESULT> {
            this.runnable = runnable
            return start()
        }
    
        /**
         * 主动中断任务
         */
        fun cancel() {
            isCancelled = true
    
            if (disposable == null) {
                println("disposable is null")
            }
            disposable?.dispose()
        }
    
        /**
         * 事件:执行前
         */
        fun before(action: () -> Unit): SingleTask<RESULT> {
            beforeAction = action
            return this
        }
    
        /**
         * 事件:任务成功
         */
        fun success(action: (RESULT) -> Unit): SingleTask<RESULT> {
            successAction = action
            return this
        }
    
        /**
         * 事件:任务出错
         */
        fun error(action: (Throwable) -> Unit): SingleTask<RESULT> {
            errorAction = action
            return this
        }
    
        /**
         * 事件:主动中断
         */
        fun interrupted(action: () -> Unit): SingleTask<RESULT> {
            interruptedAction = action
            return this
        }
    
        protected fun onTaskBefore() {}
    
        /**
         * 任务执行信息,如果继承SingleTask,则重构此方法
         */
        @Throws(Exception::class)
        protected open fun onTaskRun(): RESULT {
            throw NotImplementedError()
        }
    
        protected fun onTaskSuccess(result: RESULT) {}
    
        protected fun onTaskError(error: Throwable) {}
    
        protected fun onTaskInterrupted() {}
    
        private var proxy = object : ITaskProxy {
            override fun start() {
                this@SingleTask.start()
            }
    
            override fun stop() {
                this@SingleTask.cancel()
            }
        }
    
        override fun getProxy(): ITaskProxy {
            return proxy
        }
    }
    
    ProgressTask
    open class ProgressTask<PROGRESS, RESULT> : ITask {
        private var progressPublishSubject: PublishSubject<PROGRESS>? = null
        private var progressDisposable: Disposable? = null
        private var disposable: Disposable? = null
    
        private var isSuccess: Boolean = false
        private var isCancelled: Boolean = false
        private var running = false
        private var hasError = false
    
        private var progressAction: ((PROGRESS) -> Unit)? = null
        private var successAction: ((RESULT) -> Unit)? = null
        private var errorAction: ((Throwable) -> Unit)? = null
        private var beforeAction: (() -> Unit)? = null
        private var interruptedAction: (() -> Unit)? = null
    
        val isRunning: Boolean
            get() = disposable != null && !disposable!!.isDisposed && running && !isCancelled
    
        /**
         * 启动任务
         */
        open fun start(): ProgressTask<PROGRESS, RESULT> {
            if (isRunning)
                throw IllegalStateException("already started")
    
            val current = this
            isSuccess = false
            isCancelled = false
            hasError = false
            running = true
    
            val subject = PublishSubject.create<PROGRESS>()
            progressPublishSubject = subject
            progressDisposable = subject
                    .observeOn(if (Task.ANDROID_PLATFORM) AndroidSchedulers.mainThread() else Schedulers.single())
                    .subscribe(
                            // PROGRESS
                            { progress ->
                                current.onTaskProgress(progress)
                                progressAction?.invoke(progress)
                            },
                            { error -> error.printStackTrace() })
    
            // BEFORE
            current.onTaskBefore()
            beforeAction?.invoke()
    
            disposable = Maybe.create<RESULT> { emitter ->
                try {
                    val result = onTaskRun()
                    emitter.onSuccess(result)
                } catch (e: Exception) {
                    if (!isCancelled)
                        emitter.onError(e)
                }
            }.subscribeOn(Schedulers.newThread())
                    .observeOn(if (Task.ANDROID_PLATFORM) AndroidSchedulers.mainThread() else Schedulers.single())
                    .unsubscribeOn(if (Task.ANDROID_PLATFORM) AndroidSchedulers.mainThread() else Schedulers.single())
                    .doOnDispose {
    
                        // CANCEL
                        if (!isSuccess && !hasError) {
                            Single.just(isCancelled)
                                    .subscribeOn(Schedulers.newThread())
                                    .observeOn(AndroidSchedulers.mainThread())
                                    .subscribe { isCancelled ->
                                        if (isCancelled) {
                                            onTaskInterrupted()
                                            interruptedAction?.invoke()
                                        }
                                    }
                        }
                    }
                    .subscribe(
                            // SUCCESS
                            { result ->
                                if (!isCancelled) {
                                    running = false
                                    isSuccess = true
                                    current.onTaskSuccess(result)
                                    successAction?.invoke(result)
                                }
                            },
                            // ERROR
                            { throwable ->
                                if (!isCancelled) {
                                    running = false
                                    isSuccess = false
                                    hasError = true
                                    current.onTaskError(throwable)
                                    errorAction?.invoke(throwable)
                                }
                            })
    
            return this
        }
    
        /**
         * 主动中断任务
         */
        open fun cancel() {
            isCancelled = true
            progressDisposable?.dispose()
            disposable?.dispose()
        }
    
        /**
         * 发送进度
         */
        fun publishProgress(progress: PROGRESS) {
            progressPublishSubject?.onNext(progress)
        }
    
        /**
         * 事件:任务执行前
         */
        fun before(action: () -> Unit): ProgressTask<PROGRESS, RESULT> {
            beforeAction = action
            return this
        }
    
        /**
         * 事件:进度
         */
        fun progress(action: (PROGRESS) -> Unit): ProgressTask<PROGRESS, RESULT> {
            progressAction = action
            return this
        }
    
        /**
         * 事件:任务成功
         */
        fun success(action: (RESULT) -> Unit): ProgressTask<PROGRESS, RESULT> {
            successAction = action
            return this
        }
    
        /**
         * 事件:任务出错
         */
        fun error(action: (Throwable) -> Unit): ProgressTask<PROGRESS, RESULT> {
            errorAction = action
            return this
        }
    
        /**
         * 事件:任务主动中断
         */
        fun interrupted(action: () -> Unit): ProgressTask<PROGRESS, RESULT> {
            interruptedAction = action
            return this
        }
    
        open fun onTaskBefore() {}
    
        @Throws(Exception::class)
        protected open fun onTaskRun(): RESULT {
            throw NotImplementedError()
        }
    
        open fun onTaskProgress(progress: PROGRESS) {}
    
        open fun onTaskSuccess(result: RESULT) {}
    
        open fun onTaskError(error: Throwable) {}
    
        open fun onTaskInterrupted() {}
    
        private var proxy = object : ITaskProxy {
            override fun start() {
                this@ProgressTask.start()
            }
    
            override fun stop() {
                this@ProgressTask.cancel()
            }
        }
    
        override fun getProxy(): ITaskProxy {
            return proxy
        }
    
    }
    
    TimerTask
    /**
     * 定时器任务
     * @param initialDelay  启动延时,单位:毫秒
     * @param interval      周期,单位:毫秒
     * @param action        定时器执行内容(次数:Long,当前时间戳:Long)
     *
     */
    open class TimerTask(val initialDelay: Long = 10,
                         val interval: Long = 1000,
                         var action: (timer: TimerTask, tick: Tick) -> Unit) : ITask {
    
        private var disposable: Disposable? = null
        private var mainThread: Boolean = true
        private var running: Boolean = false
        private var startTime: Long = 0L
        private val tick: Tick = Tick()
    
        /**
         * 设定在子线程上运行
         */
        fun runOnThread(): TimerTask {
            mainThread = false
            return this
        }
    
        /**
         * 启动Timer
         */
        fun start(): TimerTask {
            stop()
    
            startTime = System.currentTimeMillis()
            running = true
    
            val scheduler = if (mainThread) AndroidSchedulers.mainThread() else Schedulers.newThread()
            disposable = Observable
                    .interval(initialDelay, interval, TimeUnit.MILLISECONDS, scheduler)
                    .subscribe { time ->
                        tick.startTime = startTime
                        tick.currentTime = System.currentTimeMillis()
                        tick.counter = time + 1
    
                        action.invoke(this, tick)
                    }
    
            return this
        }
    
        /**
         * 停止Timer
         */
        fun stop() {
            running = false
            disposable?.dispose()
        }
    
        val isRunning: Boolean
            get() = disposable != null && !disposable!!.isDisposed && running
    
        class Tick {
            var startTime = 0L
            var currentTime = 0L
            var counter = 0L
            override fun toString(): String {
                return "startTime=$startTime, currentTime=$currentTime, counter=$counter"
            }
    
        }
    
        private var proxy = object : ITaskProxy {
            override fun start() {
                this@TimerTask.start()
            }
    
            override fun stop() {
                this@TimerTask.stop()
            }
        }
    
        override fun getProxy(): ITaskProxy {
            return proxy
        }
    
    }
    
    Task
    /**
     * 任务工具类
     */
    class Task {
    
        companion object {
    
            var ANDROID_PLATFORM = true
    
    
            /**
             * 创建结果任务
             */
            fun <T> singleTask(runnable: (SingleTask<*>) -> T): SingleTask<T> {
                return object : SingleTask<T>() {
    
                    @Throws(Exception::class)
                    override fun onTaskRun(): T {
                        return runnable(this)
                    }
                }
            }
    
            /**
             * 创建进度任务
             */
            fun <P, R> progressTask(runnable: (ProgressTask<P, R>) -> R): ProgressTask<P, R> {
                return object : ProgressTask<P, R>() {
    
                    @Throws(Exception::class)
                    override fun onTaskRun(): R {
                        return runnable(this)
                    }
                }
            }
    
            /**
             * 创建定时器
             */
            fun timerTask(initialDelay: Long = 10,
                          interval: Long = 1000,
                          action: (timer: TimerTask, tick: TimerTask.Tick) -> Unit): TimerTask {
    
                return TimerTask(initialDelay, interval, action)
            }
        }
    }
    

    结语

    基于RxJava的简单封装完成!有问题可以留言哟

    相关文章

      网友评论

          本文标题:Android Kotlin 基于RxJava的简单封装

          本文链接:https://www.haomeiwen.com/subject/slpojktx.html