美文网首页
协程Flow之FlowCallAdapterFactory

协程Flow之FlowCallAdapterFactory

作者: youxiaochen | 来源:发表于2023-01-14 14:35 被阅读0次

    Flow是kotlin协程的一个类似RxJava的流式API,它的出现可以替代RxJava, 所以Retrofit的CallAdapterFactory也可以替换了

    代码非常少就一个类,直接上源码吧
    class FlowCallAdapterFactory private constructor(
        private val dispatcher: CoroutineDispatcher?,
        private val isAsync: Boolean
    ) : CallAdapter.Factory() {
    
        companion object {
            @JvmStatic
            fun createAsync() = FlowCallAdapterFactory(null, true)
    
            @JvmStatic
            fun createSynchronous(dispatcher: CoroutineDispatcher? = null) = FlowCallAdapterFactory(dispatcher, false)
        }
    
        override fun get(returnType: Type, annotations: Array<out Annotation>, retrofit: Retrofit): CallAdapter<*, *>? {
            val rawType = getRawType(returnType)
            if (rawType != Flow::class.java) return null
            if (returnType !is ParameterizedType) {
                throw IllegalStateException("Flow return type must be parameterized as Flow<Foo> or Flow<out Foo>")
            }
            val observableType = getParameterUpperBound(0, returnType)
            //Log.d("FlowCallAdapterFactory", "rawType = $rawType, returnType = $returnType, observableType = $observableType")
            return if (isAsync) AsyncFlowCallAdapter<Any>(observableType) else FlowCallAdapter<Any>(observableType, dispatcher)
        }
    
        class FlowCallAdapter<T>(
            private val responseType: Type,
            private val dispatcher: CoroutineDispatcher?
        ) : CallAdapter<T, Flow<T>> {
    
            override fun responseType(): Type = responseType
    
            override fun adapt(call: Call<T>): Flow<T> {
                val adaptFlow = flow {
                    suspendCancellableCoroutine<T> { continuation ->
                        continuation.invokeOnCancellation {
                            call.cancel()
                        }
                        try {
                            val response = call.execute()
                            var body: T? = null
                            if (response.isSuccessful && response.body().let { body = it; it != null }) {
                                continuation.resume(body!!)
                            } else {
                                continuation.resumeWithException(HttpException(response))
                            }
                        } catch (e: Exception) {
                            continuation.resumeWithException(e)
                        }
                    }.also { emit(it) }
                }
                return dispatcher?.let { adaptFlow.flowOn(it) } ?: adaptFlow
            }
        }
    
        class AsyncFlowCallAdapter<T>(private val responseType: Type) : CallAdapter<T, Flow<T>> {
    
            override fun responseType(): Type = responseType
    
            override fun adapt(call: Call<T>): Flow<T> = flow {
                suspendCancellableCoroutine<T> { continuation ->
                    continuation.invokeOnCancellation {
                        call.cancel()
                    }
                    call.enqueue(object : Callback<T> {
                        override fun onResponse(call: Call<T>, response: Response<T>) {
                            var body: T? = null
                            if (response.isSuccessful && response.body().let { body = it; it != null }) {
                                continuation.resume(body!!)
                            } else {
                                continuation.resumeWithException(HttpException(response))
                            }
                        }
    
                        override fun onFailure(call: Call<T>, t: Throwable) {
                            continuation.resumeWithException(t)
                        }
                    })
                }.also { emit(it) }
            }
        }
    }
    
    使用示例
    open class BaseViewModel(private val repository: DataRepository = DataRepository) : ViewModel(), DefaultLifecycleObserver {
    
        private val services = ArrayMap<Class<*>, Any>()
    
        protected fun <T> getService(serviceClass: Class<T>): T {
            val service = services[serviceClass]
            if (service != null) return service as T
            return repository.getService(serviceClass).also { services[serviceClass] = it }
        }
    
        protected fun <T : Any> getService(serviceClass: KClass<T>): T = getService(serviceClass.java)
    
        override fun onCleared() {
            services.clear()
        }
    }
    
    /**
     *  author: you : 2021/12/7
     */
    class MainViewModel : BaseViewModel() {
    
        private var httpJob0: Job? = null
    
        private var httpJob1: Job? = null
    
        fun testHttpRequest0() {
            httpJob0?.cancel()
            httpJob0 = viewModelScope.launch {
                getService(TestApi::class).getUserBean(TestApi.URL)
                    .onStart { Log.d("youxiaochen", "testHttpRequest0 loading start...") }
                    .catch { Log.d("youxiaochen", "testHttpRequest0 loading error ...$it") }
                    .onCompletion { Log.d("youxiaochen", "testHttpRequest0 loading complete...$it") }
                    .collect { Log.d("youxiaochen", "testHttpRequest0 result = $it") }
            }
        }
    
        fun testHttpRequest1() {
            httpJob1?.cancel()
            httpJob1 = viewModelScope.launch {
                getService(TestApi::class).getUserBean(TestApi.URL2)
                    .onStart { Log.d("youxiaochen", "testHttpRequest1 loading start...") }
                    .catch { Log.d("youxiaochen", "testHttpRequest1 loading error ...$it") } //erro
                    .onCompletion { Log.d("youxiaochen", "testHttpRequest1 loading complete...$it") } //cancel时 it不为空, error时it为空并触发catch...
                    .collect { Log.d("youxiaochen", "testHttpRequest1 result = $it") }
            }
        }
    }
    
    源码地址

    更多文章请关注:http://www.jianshu.com/u/b1cff340957c

    相关文章

      网友评论

          本文标题:协程Flow之FlowCallAdapterFactory

          本文链接:https://www.haomeiwen.com/subject/amqwdrtx.html