美文网首页
协程Flow之FlowCallAdapterFactory

协程Flow之FlowCallAdapterFactory

作者: youxiaochen | 来源:发表于2023-01-14 14:35 被阅读0次

Flow是kotlin协程的一个类似RxJava的流式API,它的出现可以替代RxJava, 所以Retrofit的CallAdapterFactory也可以替换了

代码非常少就一个类,直接上源码吧
class FlowCallAdapterFactory private constructor(
    private val dispatcher: CoroutineDispatcher?,
    private val isAsync: Boolean
) : CallAdapter.Factory() {

    companion object {
        @JvmStatic
        fun createAsync() = FlowCallAdapterFactory(null, true)

        @JvmStatic
        fun createSynchronous(dispatcher: CoroutineDispatcher? = null) = FlowCallAdapterFactory(dispatcher, false)
    }

    override fun get(returnType: Type, annotations: Array<out Annotation>, retrofit: Retrofit): CallAdapter<*, *>? {
        val rawType = getRawType(returnType)
        if (rawType != Flow::class.java) return null
        if (returnType !is ParameterizedType) {
            throw IllegalStateException("Flow return type must be parameterized as Flow<Foo> or Flow<out Foo>")
        }
        val observableType = getParameterUpperBound(0, returnType)
        //Log.d("FlowCallAdapterFactory", "rawType = $rawType, returnType = $returnType, observableType = $observableType")
        return if (isAsync) AsyncFlowCallAdapter<Any>(observableType) else FlowCallAdapter<Any>(observableType, dispatcher)
    }

    class FlowCallAdapter<T>(
        private val responseType: Type,
        private val dispatcher: CoroutineDispatcher?
    ) : CallAdapter<T, Flow<T>> {

        override fun responseType(): Type = responseType

        override fun adapt(call: Call<T>): Flow<T> {
            val adaptFlow = flow {
                suspendCancellableCoroutine<T> { continuation ->
                    continuation.invokeOnCancellation {
                        call.cancel()
                    }
                    try {
                        val response = call.execute()
                        var body: T? = null
                        if (response.isSuccessful && response.body().let { body = it; it != null }) {
                            continuation.resume(body!!)
                        } else {
                            continuation.resumeWithException(HttpException(response))
                        }
                    } catch (e: Exception) {
                        continuation.resumeWithException(e)
                    }
                }.also { emit(it) }
            }
            return dispatcher?.let { adaptFlow.flowOn(it) } ?: adaptFlow
        }
    }

    class AsyncFlowCallAdapter<T>(private val responseType: Type) : CallAdapter<T, Flow<T>> {

        override fun responseType(): Type = responseType

        override fun adapt(call: Call<T>): Flow<T> = flow {
            suspendCancellableCoroutine<T> { continuation ->
                continuation.invokeOnCancellation {
                    call.cancel()
                }
                call.enqueue(object : Callback<T> {
                    override fun onResponse(call: Call<T>, response: Response<T>) {
                        var body: T? = null
                        if (response.isSuccessful && response.body().let { body = it; it != null }) {
                            continuation.resume(body!!)
                        } else {
                            continuation.resumeWithException(HttpException(response))
                        }
                    }

                    override fun onFailure(call: Call<T>, t: Throwable) {
                        continuation.resumeWithException(t)
                    }
                })
            }.also { emit(it) }
        }
    }
}
使用示例
open class BaseViewModel(private val repository: DataRepository = DataRepository) : ViewModel(), DefaultLifecycleObserver {

    private val services = ArrayMap<Class<*>, Any>()

    protected fun <T> getService(serviceClass: Class<T>): T {
        val service = services[serviceClass]
        if (service != null) return service as T
        return repository.getService(serviceClass).also { services[serviceClass] = it }
    }

    protected fun <T : Any> getService(serviceClass: KClass<T>): T = getService(serviceClass.java)

    override fun onCleared() {
        services.clear()
    }
}
/**
 *  author: you : 2021/12/7
 */
class MainViewModel : BaseViewModel() {

    private var httpJob0: Job? = null

    private var httpJob1: Job? = null

    fun testHttpRequest0() {
        httpJob0?.cancel()
        httpJob0 = viewModelScope.launch {
            getService(TestApi::class).getUserBean(TestApi.URL)
                .onStart { Log.d("youxiaochen", "testHttpRequest0 loading start...") }
                .catch { Log.d("youxiaochen", "testHttpRequest0 loading error ...$it") }
                .onCompletion { Log.d("youxiaochen", "testHttpRequest0 loading complete...$it") }
                .collect { Log.d("youxiaochen", "testHttpRequest0 result = $it") }
        }
    }

    fun testHttpRequest1() {
        httpJob1?.cancel()
        httpJob1 = viewModelScope.launch {
            getService(TestApi::class).getUserBean(TestApi.URL2)
                .onStart { Log.d("youxiaochen", "testHttpRequest1 loading start...") }
                .catch { Log.d("youxiaochen", "testHttpRequest1 loading error ...$it") } //erro
                .onCompletion { Log.d("youxiaochen", "testHttpRequest1 loading complete...$it") } //cancel时 it不为空, error时it为空并触发catch...
                .collect { Log.d("youxiaochen", "testHttpRequest1 result = $it") }
        }
    }
}
源码地址

更多文章请关注:http://www.jianshu.com/u/b1cff340957c

相关文章

  • 协程Flow之FlowCallAdapterFactory

    Flow是kotlin协程的一个类似RxJava的流式API,它的出现可以替代RxJava, 所以Retrofit...

  • Kotlin Flow啊,你将流向何方?

    前言 前边一系列的协程文章铺垫了很久,终于要分析Flow了。如果说协程是Kotlin的精华,那么Flow就是协程的...

  • KSP - 元编程编译提速的小助手

    前言 前边一系列的协程文章铺垫了很久,终于要分析Flow了。如果说协程是Kotlin的精华,那么Flow就是协程的...

  • 协程Flow

    一、介绍 Flow,在Kotlin协程当中是自成体系的知识点。简单的异步场景我们可以直接使用挂起函数、launch...

  • kotlin协程,Flow,DataStore学习总结

    有人会问协程和Flow可以替换RxJava答案肯定是:可以的 这里总结了下kotlin协程以及Flow的学习记录,...

  • Kotlin协程:Flow基础原理

    本文分析示例代码如下: 一.Flow的创建 在协程中,可以通过flow方法创建一个Flow对象,一个Flow对象代...

  • 协程Flow简单使用

    前言 本文是阅读协程Flow的总结笔记。 什么是Flow Kotlin中的Flow API是可以更好的异步处理按顺...

  • 协程Flow之FlowPermission权限请求

    Flow是kotlin协程的一个类似RxJava的流式API,它的出现可以替代RxJava,这里就介绍RxJava...

  • Kotlin学习路线

    1:kotlin的教程,主要学习协程 和 flow 参考链接:https://www.jianshu.com/p/...

  • kotlin flow (二)

    Flow操作符 buffer(int) 该操作符会新起一个协程来收集buffer之前的代码运行结果,新协程通过ch...

网友评论

      本文标题:协程Flow之FlowCallAdapterFactory

      本文链接:https://www.haomeiwen.com/subject/amqwdrtx.html