美文网首页
kotlin Flow(一)

kotlin Flow(一)

作者: 中路杀神ai | 来源:发表于2021-07-14 21:10 被阅读0次

1.什么是Flow ?

一种异步数据流,它按顺序发出值并正常或异常完成。
Flow 是一种 "冷流"(Cold Stream)。
"冷流" 是一种数据源,该类数据源的生产者会在每个监听者开始消费事件的时候执行,
从而在每个订阅上创建新的数据流。一旦消费者停止监听或者生产者的阻塞结束,数据流将会被自动关闭。

2.LiveData 和 Flow 的区别?

livedata: 是一种可感知生命周期的组件,持有可被观察的数据。
不会发生内存泄漏,保持最新的数据,(onStart时获取最新数据,相反也会丢失数据)。
flow :异步流 无生命周期感知,不会丢失数据,配合协程使用

3.简单介绍flow 使用

数据源 flow{}
 每500ms emit一个数据
val flow = flow<Int> {
        repeat(100) {
            emit(it)
            delay(500)
        }
    }
消费:  
onEach :返回上一个流,并执行对应的 action
订阅 collect
collect :函数是一个 suspend 方法,所以它必须发生在协程或者带有 suspend 的方法里面
onCompletion : 返回上一个流,完成or取消执行action
catch:上部流 的异常捕获
flow.onEach {
        println("onEach $it")
    }.onCompletion {}
     .catch {  }
     .collect {
        println("collect $it")
    }
launchIn:
除了 collect  flow 还有 launchIn并发执行
launchIn 可以把flow 放在协程作用域中,
scope 作用域直接触发flow .跟随这scope的取消而取消
------------------------------------------
 1. collect | launchIn 主要区别:
  collect 是阻塞的 串行执行
  launchIn 是非阻塞的 并行执行
/**
 *  交替执行
  输出:
    it1 = 0
    it2 = 0
    it1 = 1
    it2 = 1
    it1 = 2
    it2 = 2
    it1 = 3
    it2 = 3
 */
suspend fun launchIn() {
    val scope = CoroutineScope(Job())
    val job = scope.launch {
        delay(2000)
    }
    val flow1 = flow {
        List(100) {
            emit("it1 = $it")
        }
    }.onEach {
        delay(500)
        println(it)
    }

    val flow2 = flow {
        List(100) {
            emit("it2 = $it")
        }
    }.onEach {
        delay(500)
        println(it)
    }
    flow1.launchIn(scope)
    flow2.launchIn(scope)
    job.cancelAndJoin()
    delay(2500)
}

4.channel

协程间通信 (java:Handler)
一个面向多协程之间数据传输的 BlockQueue
receive() 只能获取一次
获取多次 遍历 channel

suspend fun channel() {

    val channel = Channel<String>()

    val job = CoroutineScope(SupervisorJob())
        .launch {
            launch {
                repeat(100) {
                    channel.send("$it")
                    delay(100)
                }
            }
            launch {
                while (isActive) {
                    println("receive = ${channel.receive()}")
                }
            }
        }
    withTimeoutOrNull(11000) {
        job.join()
    }
}

4.flow 常用操作符

1. filter map take transform

/**
 *
    filter 过滤
    map 转换操作符,将 A 变成 B
    take 接收多少个 emit 出的值
    transform 对给定流的每个值应用[转换]函数

    输出:
    collect 4
    collect 5
 */

suspend fun operator2() {
    flowOf(1, 2, 3, 4)
        .transform {
            emit(it +2)
        }
        .filter {
            it > 1
        }
        .map {
            it + 1
        }.take(2)
        .onEach {
            println("collect $it")
        }.collect()
}
single:
/**
 * 发射单个值
 *  it = 0
 *  多值会发生 :Flow has more than one element
 */
suspend fun single() {
    val flow1 = flow {
        List(2) {
            emit("it = $it")
            delay(100)
        }
    }
    println(flow1.single())
}

2.zip merge combine

/**
 * zip 组合两个流,双方都有新数据才会发射处理
 *
 * 将[当前]流中的值与[其它]流压缩,使用所提供的[transform]函数应用于每一对值。
 *  一旦一个流完成,产生的流就完成了,并且在剩余的流上调用cancel
 
 */
   val flow1 = flow {
        List(4) { emit(it) }
    }
    val flow2 = flow {
        List(2) { emit(it) }
    }.map {
        "it = $it"
    }

    flow1.zip(flow2) { f1, f2 ->
        "f1: $f1 - f2: $f2"
    }.collect {
        println(it)
    }

输出:f1: 0 - f2: it = 0
      f1: 1 - f2: it = 1
/**
 * merge 将给定的流合并成单个流
 */
suspend fun merge() {
    val flow1 = flow {
        List(3) {
            emit(it)
            delay(100)
        }
    }
    val flow2 = flow {
        List(2) {
            emit(it)
            delay(200)
        }
    }.map {
        "it = $it"
    }
    listOf(flow2, flow1).merge().collect {
        delay(100)
        println(it)
    }
}
输出:
  it = 0
  0
  1
  it = 1
  2
/**
 * combine
 * 组合两个流,在经过第一次发射以后,任意方有新数据来的时候就可以发射,另一方有可能是已经发射过的数据
 *
 *  每个最近的流的值的组合
 *
 
 *
 */
suspend fun combine() {
    val flow1 = flow {
        List(1) {
            emit(it)
            delay(100)
        }
    }
    val flow2 = flow {
        List(4) {
            emit(it)
            delay(150)
        }
    }.map {
        "it = $it"
    }
    flow1.combine(flow2) { f1, f2 ->
        "f1: $f1 - f2: $f2"
    }.collect {
        delay(100)
        println(it)
    }
}
输出:
 f1: 0 - f2: it = 0
 f1: 1 - f2: it = 0
 f1: 1 - f2: it = 1
 f1: 1 - f2: it = 2
 f1: 1 - f2: it = 3    

3.flatten家族

 /**
 *
 *  flattenConcat 串行处理数据 是按顺序拼接的
 *  flattenMerge,并发 collect 数据  它会并发拼接,因此结果不会保证顺序
 *
 */
suspend fun flattenConcat() {
    val flow = flow {
        List(4) {
            emit(it)
            delay(110)
        }
    }.map {
        flow {
            List(it) { emit(it) }
            delay(150)
        }
    }
    val time = measureTimeMillis {
        flow.flattenConcat()
            .collect { println(it) }
    }
    println(time)
}
输出:
  flattenConcat -> 0 , 01 ,012, 0123  time:1066
  flattenMerge -> 0 , 01 ,012, 0123  time:549

/**
 *
    flatMapConcat:等待内部流完成,然后开始收集下一个流
    1: First -- 122
    1: Second -- 623
    
    2: First -- 725
    2: Second -- 1226
    
    3: First -- 1328
    3: Second -- 1830
    1830

    flatMapMerge:
    同时收集所有传入流并将其值合并到单个流中,以便尽快发出值
    先执行序列 map{requestFlow(it)},然后对返回值调用 flattenMerge
    1: First -- 215
    2: First -- 296
    3: First -- 399
    
    1: Second -- 714
    2: Second -- 797
    3: Second -- 902
    949
 *
 *
 */

suspend fun flatMapConcat() {
    val start = System.currentTimeMillis()
    val time = measureTimeMillis {
        (1..3).asFlow()
            .onEach { delay(100) }
            .flatMapMerge {
                requestFlow(it)
            }.collect {
                println("$it -- ${System.currentTimeMillis() - start}")
            }
    }

    println(time)
}

4. Flow 的背压

/**
 * Flow 的背压
 * buffer并发 数据发射并发,collect 不并发
 *
 *  未使用 buffer, 一个emit 一个collect
 *  collect 0
    emit 0
    ...
    collect 3
    emit 3
    总时间 2828
    
    buffer 后:
    
    emit 0
    emit 1
    collect 0
    emit 2
    emit 3
    collect 1
    collect 2
    collect 3
    2064

 *
 */

suspend fun buffer() {
    val flow = flow<Int> {
        repeat(4) {
            emit(it)
            delay(200)
            println("emit $it")
        }
    }

    val time = measureTimeMillis {
        flow.buffer()
            .collect {
                delay(500)
                println("collect $it")
            }
    }
    println(time)

}

/**
 * 背压?
    如果我们只是单纯地添加缓存,而不是从根本上解决问题就始终会造成数据积压
    问题产生的根本原因是生产和消费速率的不匹配,
    除直接优化消费者的性能以外,我们也可以采取一些取舍的手段

    1.conflate。与 Channel 的 Conflate 模式一致,
    新数据会覆盖老数据

    emit 0
    emit 1
    emit 2
    emit 3
    collect 0
    collect 3
    1073

 */

suspend fun conflate() {
    val flow = flow<Int> {
        repeat(4) {
            emit(it)
            println("emit $it")
        }
    }

    val time = measureTimeMillis {
        flow.conflate()
            .collect {
                delay(500)
                println("collect $it")
            }
    }
    println(time)
}
/**
 * 2.collectLatest
 *  emit 0
    emit 1
    emit 2
    emit 3
    collect 3
    569
 *
 */

suspend fun collectLatest() {
    val flow = flow<Int> {
        repeat(4) {
            emit(it)
            delay(100)
            println("emit $it")
        }
    }

    val time = measureTimeMillis {
        flow
            .collectLatest {
                delay(500)
                println("collect $it")
            }
    }
    println(time)
}


5.Flow 生产者和消费者的通信是同步非阻塞的,也就是生产和消费会顺序交替进行,channelFlow 可实现生产消费可以并行执行

/**
 *
 * channelFlow 生产消费可以并行执行
 *
    0
    1
    2
    3
    channelFlow = 2082

    0
    1
    2
    3
    flow emit = 4019
 *
 */

suspend fun channelFlow() {
    val time1 = measureTimeMillis {
        productChannelFlow().collect {
            delay(500)
            println(it)
        }
    }
    println("channelFlow = $time1")

    val time2 = measureTimeMillis {
        product2().collect {
            delay(500)
            println(it)
        }
    }
    println("flow emit = $time2")

    val time3 = measureTimeMillis {
        createChannelFlow()
    }
    println("createChannelFlow = $time3")
}

相关文章

网友评论

      本文标题:kotlin Flow(一)

      本文链接:https://www.haomeiwen.com/subject/rahzeltx.html