美文网首页
Kotlin Flow操作符大全

Kotlin Flow操作符大全

作者: hao_developer | 来源:发表于2023-08-06 16:35 被阅读0次

冷流

冷流是 无消费者时,则不会生产数据

热流

热流是 无观察者时,也会生产数据

Flow分类

一般Flow

一般的Flow,仅有观察者这。 冷流

//构建

val testFlow = flow<String>{
    emit("hello")
    emit("flow")
}

//接收
coroutineScope.launch{
  testFlow.collect{ value->
     println(value)
  }
}

//打印
hello
flow

StateFlow

有状态的Flow,可以有多个观察者,热流
构造时需要传入初始值:initialState常用作与UI相关的数据观察,类比LiveData

//创建
val uiState=MutableStateFlow(Result.Loading)

//监听
coroutineScope.launch{
    uiState.collect{ value->
         println(value)
    }    
}

//赋值
uiState.value=Result.Sucess 

//打印结果
Result.Loading
Result.Sucess

SharedFlow

可定制化的StateFlow,可以有多个观察者,热流.无需初始值,有三个可选参数:
replay - 重播给新订阅者的值的数量(不能为负,默认为零)
extraBufferCapacity - 出了replay之外缓冲的值的数量,当有剩余缓冲区空间时,emit不会挂起(可选,不能为负,默认为零)
onBufferOverflow - 配置缓冲区溢出的操作(可以,默认为暂停尝试发出值)
使用ShareFlow你可以写个FlowEventBus

//创建
val signEvent=MutableSharedFlow <String> ()

//监听
coroutineScope.launch{
    signEvent.collect{ value->
         println(value)
    }    
}
//赋值
signEvent.tryEmit("hello")
signEvent.tryEmit("shared flow")

//打印结果
hello
shared flow

2操作符

  • 中间操作符
    一般来说是用来执行一些操作,不会立即执行,返回值还是个Flow
  • 末端操作符
    会触发流的执行,返回值不是Flow


    image.png

创建Flow

  • flow

创建flow的基本方法
使用emit发射单个值
使用emitAll发射一个流,类似list.addAll(anotherList)

flow<Int>{
    emit(1)
    emit(2)
    emit(flowOf(1,2,3))
}
  • flowof

快速创建flow,类比listOf()

val testFLow = flowOf(1,2,3)
launch{
    testFLow.collect{ value->
        print(value)
    }
}

//打印结果
1
2
3
  • asFlow

将其它数据转换成flow,一般是集合想Flow的转换

fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
    val callback = object : Callback {
        override fun onNextValue(value: T) {
            send(value)
                .onFailure { throwable ->
                
                }

        }

        override fun onApiError(cause: Throwable) {
            cancel(CancellationException("API Error", cause))
        }

        override fun onCompleted() = channel.close()
    }
    api.register(callback)
    awaitClose { api.unregister(callback) }
}
  • callbackFlow

将回调方法改造成flow,类似suspendCoroutine

fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
    val callback = object : Callback {
        override fun onNextValue(value: T) {
            send(value)
                .onFailure { throwable ->
                
                }

        }

        override fun onApiError(cause: Throwable) {
            cancel(CancellationException("API Error", cause))
        }

        override fun onCompleted() = channel.close()
    }
    api.register(callback)
    awaitClose { api.unregister(callback) }
}
  • emptyFlow

返回一个空流

emptyFlow<Int>()
  • channelFlow

在一般的flow在构造代码中不允许切换线程,ChannelFlow则允许内部切换线程

//构建

val channelFlow = channelFlow<String> {
 send("hello")
    withContext(Dispatchers.IO) {
     send("channel flow")
    }
 }

//监听

coroutineScope.launch{
    signEvent.collect{ value->
         println(value)
    }
}

末端操作符

collect

触发flow的运行,通常的监听方式

launch{
    flowOf(1,2,3).collect{ value->
        print(value)
    }
}

// 1 2 3
  • collectIndexed

带下标的收集操作

launch{
    flowOf(1,2,3).collectIndexed{ value->
        print(value)
    }
}
// 1 2 3
  • collectLatest

与collect的区别是,有新值发出时,如果此时上个收集尚未完成,则会取消上个值的手机操作

flow {
 emit(1)
    delay(50)
    emit(2)
 } .collectLatest { value ->

 println("Collecting $value")
    delay(100) // Emulate work
    println("$value collected")

 }

//输出
Collecting 1
Collecting 2
2 collected

只想要最新的数据,中间值可以丢弃时可以使用此方式

  • toCollection

将结果添加到集合

val array = arrayListOf(0)
launch {
  flow {
     emit(1)
     emit(2)
    } .toCollection(array)
 }

array.forEach { value->
  print(value)
 }

//打印结果 

0 1 2
  • toList

将结果转换为List

flow {
   emit(1)
   emit(2)
} .toList().forEach{value->
    print(value)
}
// 1
// 2
  • toSet

将结果转换为Set

flow {
   emit(1)
   emit(2)
} .toSet().forEach{value->
    print(value)
}
// 1
// 2
  • launchIn

直接触发流的执行,不设置action,入参为coroutineScope,一般不会直接调用,会搭配别的操作符一起使用,如果onEach, onCompletion,返回值是Job

flow {
                emit(1)
                emit(2)
            }.onEach {
                println(it)
            }.onCompletion {
                println("完成")
            }.launchIn(lifecycleScope )
  • last

返回流 发出 的最后一个值,如果为空会抛异常

val myFlow= flow {
   emit(1)
   emit(2)
 }

launch{
    print(myFlow.last())
}

// 2
  • lastOrNull

返回流 发出 的最后一个值,可以为空

val myFlow= emptyFlow<Int>()
launch{
    print(myFlow.lastOrNull())
}

// null
  • first

返回流 发出 的第一个值,如果为空会抛异常

val myFlow= flow {
   emit(1)
   emit(2)
 }
launch{
    print(myFlow.first())
}

// 1
  • firstOrNull

返回流 发出 的第一个值,可以为空

val myFlow= emptyFlow<Int>()
launch{
    print(myFlow.firstOrNull())
}
// null
  • single

接受流发送的第一个值,区别于first(),如果为空或者发了不止一个,则都会报错

val myFlow= flow {
     emit(1)
}

launch {
     print(myFlow.single()) // 1
}

val myFlow1= flow {
   emit(1)
   emit(2)
 }

launch {
   print(myFlow 1 . single ()) // error
}
  • singleOrNull

接收流发送的第一个值,可以为空,发出多值的话除第一个,后面均被置为null

val myFlow= flow {
 emit(1)
}

launch  {
 print(myFlow. singleOrNull ()) // 1
}
  • count

返回流发送值的个数,类似list.size(),住:sharedFlow无效(无意义)

val myFlow= flow {
   emit(1)
   emit(2)
}
launch{
    print(myFlow.count())
}
//2
  • fold

从初始值开始执行遍历,并将结果作为下个执行的参数

val sum= flowOf(2, 3, 4)
            .fold(1, { result, value ->
                 result + value
            })
// sum = 10, 相当于 1 + 2 + 3 + 4
  • reduce

和fold差不多,无初始值

val result= flowOf(1, 2, 3)
                .reduce { acc, value ->
                     acc + value
                }
 //result = 6   1 + 2  +3 

回调操作符

  • onStart

在上游流开始之前被调用,可以发出额外元素,也可以处理其它事情,比如发埋点

flow<Result>{
   emit(Result.Success)
}.onStart{
   emit(Result.Loading)
}
  • onCompletion

在取消或者结束时调用,可以执行发送元素,发埋点等操作

flow<Result>{
   emit(Result.Success)
}.onCompletion{
   emit(Result.End)
}
  • onEach

在上游向下游发出元素之前调用

flow<Int>{
   emit(1)
   emit(2)
   emit(3)
}.onEach{ value->
   println(value)
}.launchIn(lifecycleScope)

// 打印结果
1
2
3
  • onEmpty

当流完成却没有发出任何元素时回调,可以用来兜底

emptyFlow<String>().onEmpty {
   emit("兜底数据")
 } .launchIn(lifecycleScope)
  • onSubscription

ShareFlow专属操作符(StateFlow是SharedFlow的一种特殊实现)
在建立订阅之后回调,和onStaret有些区别,SharedFlow是热流,因此如何在onStart里发送值,则下游可能接收不到

val state = MutableSharedFlow<String>().onSubscription {
     emit("onSubscription")
 }

launch{
    state.collect { value->
        println(value)
    }
}


//打印结果

onSubscription

变换操作符

  • map

将发出的值 进行变换,lambda的返回值为最终发送的值

flow {
    emit(1)
    emit(2)
 } .map { value ->
    value * 2
 } .collect {
    println(value)
}

//打印结果
2
4
  • mapLatest

类比collectLatest,当有新值发送时如果上个变换还没结束,会先取消掉

flow {
    emit("a")
    delay(100)
    emit("b")
}.mapLatest { value ->
    println("Started computing $value")
    delay(200)
    "Computed $value"
}.collect {value->
 print(value)
}

// 打印结果
Started computing a
Started computing b
Computed b
  • mapNotNull

仅发送map后不为空的值

flow {
    emit("a")
    delay(100)
    emit("b")
}.mapLatest { value ->
    println("Started computing $value")
    delay(200)
    "Computed $value"
}.collect {value->
 print(value)
}

// 打印结果
Started computing a
Started computing b
Computed b
  • transform

对发送的值 进行变换,区别于map,transform的接受者是FlowCollector,因此它非常灵活,可以变换,跳过它或多次发送

flow {
    emit(1)
    emit(2)
 } .transform { value ->
  if (value == 1) {
        emit("value :$value*2")
    }
    emit("transform :$value")
 }.collect { value->
  println(value)
}

// 打印结果

value : 1*2
transform :1
transform :2
flow {
    emit(1)
    emit(2)
 } .transform { value ->
  if (value == 1) {
        emit("value :$value*2")
    }
    emit("transform :$value")
 }.collect { value->
  println(value)
}

// 打印结果

value : 1*2
transform :1
transform :2
  • transformLatest

类比mapLatest,当有新值发送时如果上个变换还没结束,会先取消掉

flow {
    emit(1)
    emit(2)
 } .transform { value ->
  if (value == 1) {
        emit("value :$value*2")
    }
    emit("transform :$value")
 }.collect { value->
  println(value)
}

// 打印结果

value : 1*2
transform :1
transform :2
flow {
    emit(1)
    emit(2)
 } .transform { value ->
  if (value == 1) {
        emit("value :$value*2")
    }
    emit("transform :$value")
 }.collect { value->
  println(value)
}

// 打印结果

value : 1*2
transform :1
transform :2
  • transformWhile

这个变化的lambda的返回值是boolean,如果为false,则不再进行后续变换,为true,则继续执行

flow {
    emit("a")
    emit("b")
 } .transformWhile { value ->
    emit(value)
    true
 } .collect { value->
  println(value)
}


//结果
a
b

--------------------

flow {
    emit("a")
    emit("b")
 }.transformWhile { value ->
    emit(value)
    false
 }.collect { value->
    println(value)
}

//结果
a
  • asStateFlow

将mutablestateflow转换为stateflow,就是变成不可变的,常用在对外暴露属性时使用

private val _uiState = MutableStateFlow<UIState>(Loading)

val uiState = _uiState.asStateFlow()
  • asSharedFlow

将mutablesharedflow转换为sharedflow,就是变成不可变的,常用在对外暴露属性时使用

private val _uiState = MutableStateFlow<UIState>(Loading)

val uiState = _uiState.asStateFlow()
  • receiverAsFlow

将channel转换为flow,可以有多个观察者,但是不多播,可能会轮流收到值

private val _event = Channel<Event>()

val event= _event.receiveAsFlow() 
  • consumeAsFlow

将channel转化为flow,但不能有多个观察者(会crash)

private val _event = Channel<Event>()

val event= _event.consumeAsFlow () 
  • withIndex

将结果包装成IndexedValue类型

flow {
    emit("a")
    emit("b")
 } .withIndex().collect {
  print(it.index + ": " + it.value)
}


//结果
0 : a
1 : b
  • scan

和fold相似,区别是fold返回的是最终结果,scan返回的是个flow,会把初始值和每一步的操作结果发送出去

flowOf(1, 2, 3).scan(0) { acc, value ->
    acc + value 
 }.collect {
  print(it)
}

// 0 1 3 6
acc 是上一步操作的结果, value 是发射的值

0 是 初始值 
1 是 0 + 1 = 1
3 是 1 + 2 = 3
6 是 3 + 3 = 6
  • produceIn

转换为ReceiverChannel,不常用

注:Channel内部有ReciverChannel和SendChannel之分,看名字就是一个发送,一个接收

flowOf(1, 2, 3).produceIn(this)
               .consumeEach { value->
                    print(value)
               }
               
//1 2 3
  • runningFold

区别于fold,就是返回一个新流,将每步结果发射出去

flowOf(1, 2, 3).runningFold(1){ acc, value ->
     acc + value
 } .collect { value->
     print(value)
 }
 // 1 2 4 7
  • runningReduce

区别于reduce,就是返回一个新流,将每步的记过发送出去

flowOf(1, 2, 3).runningReduce { acc, value ->
     acc + value
 } .collect { value->
     print(value)
 }
 // 1 3 6
  • shareIn

将普通flow转化为ShareFlow,其有三个参数:
scope: CoroutineScope开始共享的携程范围
startted:SharingStarted控制何时开始和停止共享的策略
replay: Int = 0 发给 新的订阅者 的旧值数量

其中started有一些可选项:
Eagerly:共享立即开始,用不停止
Lazily:当第一个订阅者出现时,永不停止
WhileSubscribed:在第一个订阅者出现时开始共享,在最后一个订阅者消失时立即停止(默认情况下),永久保留重播缓存(默认情况下)
WhileSubscribed具有以下可选参数:
stopTimeoutMillis --- 配置最后一个订阅者消失到携程停止共享之间的延迟(以毫秒为单位)。默认为零(立即停止)
replayExpirationMillis --- 共享的协程从停止到重新激活,这期间缓存的时效

val share = flowOf(1,2,3).shareIn(this,SharingStarted.Eagerly)

//可以有多个观察者
state.collect{value->
  print(value)
}
  • stateIn

将普通flow转化为StateFlow。其有三个参数:
scope:开始共享的协程范围
started:控制何时开始和停止共享的策略
initialValue:状态流的初始值

val  state = flowOf(Success).stateIn(lifecycleScope,SharingStarted.Eagerly,Loading)


state.collect{value->
  print(value)
}
// Loading  Success

stateIn和sharedIn通常用在其它来源的flow的改造监听

过滤操作符

  • filter

筛选出符合条件的值

flow {
    emit("a")
    emit("b")
}.filter { value ->
    value == "a"
}.collect { value->
    print(value)
}


//结果
a
  • filterInstance

筛选对应类型的值

flow {
    emit("a")
    emit("b")
    emit(1)
 }.filterIsInstance<String>().collect { value->
    print(value)
 }

//结果

a
b
  • filterNot

筛选不符合条件相反的值,相当于filter取反

flow {
    emit("a")
    emit("b")
 }.filterNot { it == "a" } .collect { value ->
   print(value)
}



//结果
b
  • filterNotNull

筛选不为空的值

flow {
    emit("a")
    emit(null)
    emit("b")
 }.filterNotNull().collect { value->
  print(value)
}


//结果
a
b
  • drop

入参count为int类型,作用是 丢弃掉前n个的值

flow {
    emit(1)
    emit(2)
    emit(3)
 }.drop(2).collect { value ->
  print(value)
}


//结果
3
  • dropWhile

这个操作符有点特别,和filter不同,它是找到一个不满足条件的,返回其和其之后的值。如果首项就不满足条件,则是全部返回

flow {
 emit(3)
 emit(1) //从此项开始不满足条件
 emit(2)
 emit(4)
}. dropWhile { it == 3  } .collect { value ->
  print(value)
}


//结果 
1 2 4


flow {
 emit(1) //从首项开始就不满足条件
 emit(2)
 emit(3)
 emit(4)
}. dropWhile { it == 3  } .collect { value ->
 print(value)
}

//结果

1 2 3 4
  • take

返回前n个元素

flow {
    emit(1)
    emit(2)
    emit(3)
 } .take(2) .collect { value ->
    print(value)
}

//结果
1
2
  • takeWhile

也是找第一个不满足条件的项,但是取其之前的值,和dropWhile相反
如果第一项就不满足,则为空流

flow {
    emit(1)
    emit(2)
    emit(3) //从此项开始不满足条件
    emit(4)
 } .takeWhile { it <3  } .collect { value ->
    print(value)
}

//结果
1 2 





flow {
    emit(3)  //从此项开始不满足条件
    emit(1)
    emit(2)
    emit(4)
 } .takeWhile { it <3  } .onEmpty {
  print( "empty")
 }.collect { value ->
  print(value)
}

//结果
empty
  • debounce

防抖节流,指定时间内的值只接收最新的一个,其它的过滤掉。搜索联想场景使用

flow {
    emit(1)
    delay(90)
    emit(2)
    delay(90)
    emit(3)
    delay(1010)
    emit(4)
    delay(1010)
    emit(5)
}.debounce(1000)

 // 3 4 5
  • sample

采样。给定一个时间周期,仅获取周期内最新发出的值

flow {
    repeat(10) {
        emit(it)
        delay(110)
    }
}.sample(200)


// 1 3 5 7 9

//图示

       【1】

|-----------|

1          200  

               2    【3】

           |------------|

          200           400
  • distinctUntilChangedBy

去重操作符,判断连续的两个值是否重复,可以选择是否丢弃重复值

keySelector:(T)->Any? 指定用来判断是否需要比较的key

有点儿类似Recyclerview的DiffUtil机制

flowOf(
    Funny(name = "Tom", age = 8),
    Funny(name = "Tom", age = 12),
    Funny(name = "Tom", age = 12)
).distinctUntilChangedBy { it.name } .collect { value ->
     print(value.toString())
}

// Funny(name=Tom, age=8)
  • distinctUntilChanged

过滤用。distinctUnitChangedBy的简化调用。连续两个值一样,则跳过发送

flowOf(1, 1, 3,1).distinctUntilChanged()
                .collect { value ->
                   print(value)
                }

// 1 3 1

组合操作符

  • combine

组合每个流最新发出的值

val flow = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
flow.combine(flow2) { i, s -> i.toString() + s } .collect {
  println(it) // Will print "1a 2a 2b 2c"
}
  • combineTransform

顾名思义combine + transform

val numberFlow = flowOf(1, 2).onEach { delay(10) }
val stringFlow = flowOf("a", "b", "c").onEach { delay(15) }

numberFlow.combineTransform(stringFlow) { number, string ->
     emit("$number :$string")
 }.collect { value ->
     println( value )
 }



//结果
1 :a
2 :a
2 :b
2 :c
  • merge

合并多个流成一个流。可以用在 多级缓存加载上

val numberFlow = flowOf(1, 2).onEach { delay(10) }
val stringFlow = flowOf("a", "b", "c").onEach { delay(15) }

listOf(numberFlow,stringFlow).merge()
                             .collect { value ->
                                 print(value)
                             }


// 1 a 2 b c
  • flattenConcat

以顺序方式将给定的流展开为单个流,是Flow<Flow>的扩展函数

flow {
    emit(flowOf(1, 2, 3))
    emit(flowOf(4, 5, 6))
 } .flattenConcat().collect { value->
     print(value)
 }

// 1 2 3 4 5 6
  • flattenMerge

作用和flattenConcat一样,但是可以设置并发收集流的数量

有个入参:concurrency:Int,当其 == 1时,效果和flattenConcat一样,大于1时,则是并发收集

flow {
    emit(flowOf(1, 2, 3).flowOn(Dispatchers.IO))
    emit(flowOf(4, 5, 6).flowOn(Dispatchers.IO))
    emit(flowOf(7, 8, 9).flowOn(Dispatchers.IO))
 }.flattenMerge(3).collect { value->
     print(value)
 }


//1 2 3 7 8 9 4 5 6 (顺序并不固定)
  • flatMapContact

这是一个组合操作符,相当于map + flattenConcat,通过map转成一个流,再通过flattenConcat

展开合并成一个流

flowOf(1, 2, 3).flatMapConcat {
     flowOf(it.toString() + " map")
 } .collect { value ->
     print ln (value)
 }

// 1 map 
// 2 map
// 3 map
  • flatMapLatest

和其它带Lates的操作符一样,如果下个值来了,上个变换还没结束,就取消掉

相当于transformLatest + emitAll

flow {
     emit("a")
     delay(100)
     emit("b")
 }.flatMapLatest { value ->
     flow {
         emit(value)
         delay(200)
         emit(value + "_last")
     }
 }.collect { value ->
     print(value)
 }

 // a b b_last
  • flatMapMerge

组合操作符,简化使用。map+flattenMerge。因此也是有concurrency:Int这样一个参数,来限制并发数

 val flow1 = flowOf(1, 2, 3).onEach { delay(10) }
            val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }

            flow1.flatMapMerge(3) {
                flow2
            }.collect {
                println(it)
            }

//打印
a
a
b
a
b
c
b
c
d
c
d
d

Process finished with exit code 0
  • zip

对两个流进行组合,分别从二者取值,一旦一个流结束了,那整个过程就结束了

val flow = flowOf(1, 2, 3).onEach { delay(10) }

val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }

flow.zip(flow2) { i, s -> i.toString() + s }.collect {
    println(it) 
}

 // Will print "1a 2b 3c"

功能性操作符

  • cancellable

接收的时候判断 携程是否被取消,如果已取消,则抛出异常

val job= flowOf(1,3,5,7).cancellable().onEach { value->
     print(value)
 } .launchIn(lifecycleScope)
 
 //取消
 job.cancel()
  • catch

对上游异常进行捕获,对下游无影响
上游 指的是 此操作符之前的流
下游 指的是 此操作符之后的流

flow<Int> {
  throw IOException("")     
 } .catch { e ->
  if(e is IOException){
        //...
 }
}
  • retryWhen

有条件的进行重试,lambda中有两个参数,一个是异常原因,一个是当前重试的index(从0开始)

flow<Int> {
    print("doing")
    throw IOException("")
 } .retryWhen { cause,attempt->
     if(attempt > 4){
        return@retryWhen false
     }
    cause is IOException
 }
  • retry

重试机制,当流发生异常时可以重新执行,retryWhen的简化版
retries:Long = Long.MAX_VALUE指定重试次数,以及控制是否继续重试(默认为true)

flow<Int> {
  throw IOException("")     
 }. retry (3){ e->
  if(e is IOException){
      true
  }else {
      false
  }
}



flow<Int> {
  throw IOException("")     
 }.retry(3)
  • buffer

如果操作符的代码需要相当 长时间来执行,可使用buffer操作符在执行期间为其创建一个单独携程
capacity: Int = BUFFERED缓冲区的容量
onBufferOverflow: BufferOverflow = BufferOverflow SUSPEND溢出的话执行的操作
有三个选择:SUSPEND挂起,DROP_OLDEST丢掉旧的,DROP_LATEST丢掉新的

flowOf("A", "B", "C")
    .onEach  { println("1$it") }
    .collect { println("2$it") }

    
Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->--


flowOf("A", "B", "C")
 .onEach  { println("1$it") }
 .buffer()  // <--------------- buffer between onEach and collect
 .collect { println("2$it") }


P : -->-- [1A] -- [1B] -- [1C] ---------->--  // flowOf(...).onEach { ... }

                      |
                      | channel               // buffer()
                      V

Q : -->---------- [2A] -- [2B] -- [2C] -->--  // collect
  • conflate

仅保留最新值,内部就是buffer(CONFLATED)

flow {
      repeat(30) {
      delay(100)
      emit(it)
    }
 }.conflate().onEach { delay(1000) } .collect { value ->
     print(value)
 }

// 0 7 15 22 29  (结果不固定)
  • flowOn

指定上游操作的执行 线程。想要切换执行线程 就用它

flow.map { ... } // Will be executed in IO
 . flowOn (Dispatchers.IO) // This one takes precedence
 . collect{ ... }

总结

在实际场景中按需要使用:
eg: 搜索场景使用debounce防抖
eg: 网络请求使用retry
eg: 组件通信使用shareFlow
eg: 数据合并使用combine
etc

相关文章

网友评论

      本文标题:Kotlin Flow操作符大全

      本文链接:https://www.haomeiwen.com/subject/ogoipdtx.html