1. launchWhenX 当处于不活跃状态时,通过pause() 修改变量 paused = true 来控制block不执行,但这会导致上游资源浪费, 在活跃状态,通过 resume() paused = false drainQueue()重新 执行协程体代码块
2. repeatOnLifecycle or flowWithLifecycle 非活跃状态 job.cancel() 销毁协程,活跃状态 ** 重新开启新协程 ** 执行block()
DispatchQueue.kt
private val queue: Queue<Runnable> = ArrayDeque<Runnable>()
@MainThread
fun pause() {
paused = true
}
@MainThread
fun resume() {
if (!paused) {
return
}
check(!finished) {
"Cannot resume a finished dispatcher"
}
paused = false
drainQueue()
}
@MainThread
fun drainQueue() {
if (isDraining) {
// Block re-entrant calls to avoid deep stacks
return
}
try {
isDraining = true
while (queue.isNotEmpty()) {
//pasuse() canRun = true 退出当前循环
//resume() canRun = false 执行协程体代码块
if (!canRun()) {
break
}
queue.poll()?.run()
}
} finally {
isDraining = false
}
}
// 是否可以运行
@MainThread
fun canRun() = finished || !paused
flowWithLifecycle
@OptIn(ExperimentalCoroutinesApi::class)
public fun <T> Flow<T>.flowWithLifecycle(
lifecycle: Lifecycle,
minActiveState: Lifecycle.State = Lifecycle.State.STARTED
): Flow<T> = callbackFlow {
lifecycle.repeatOnLifecycle(minActiveState) {
this@flowWithLifecycle.collect {
send(it)
}
}
close()
}
repeatOnLifecycle
public suspend fun Lifecycle.repeatOnLifecycle(
state: Lifecycle.State,
block: suspend CoroutineScope.() -> Unit
) {
require(state !== Lifecycle.State.INITIALIZED) {
"repeatOnLifecycle cannot start work with the INITIALIZED lifecycle state."
}
if (currentState === Lifecycle.State.DESTROYED) {
return
}
// This scope is required to preserve context before we move to Dispatchers.Main
coroutineScope {
withContext(Dispatchers.Main.immediate) {
// Check the current state of the lifecycle as the previous check is not guaranteed
// to be done on the main thread.
if (currentState === Lifecycle.State.DESTROYED) return@withContext
// Instance of the running repeating coroutine
var launchedJob: Job? = null
// Registered observer
var observer: LifecycleEventObserver? = null
try {
// Suspend the coroutine until the lifecycle is destroyed or
// the coroutine is cancelled
suspendCancellableCoroutine<Unit> { cont ->
// Lifecycle observers that executes `block` when the lifecycle reaches certain state, and
// cancels when it falls below that state.
val startWorkEvent = Lifecycle.Event.upTo(state)
val cancelWorkEvent = Lifecycle.Event.downFrom(state)
val mutex = Mutex()
observer = LifecycleEventObserver { _, event ->
if (event == startWorkEvent) {
// Launch the repeating work preserving the calling context
launchedJob = this@coroutineScope.launch {
// Mutex makes invocations run serially,
// coroutineScope ensures all child coroutines finish
mutex.withLock {
coroutineScope {
block()
}
}
}
return@LifecycleEventObserver
}
if (event == cancelWorkEvent) {
launchedJob?.cancel()
launchedJob = null
}
if (event == Lifecycle.Event.ON_DESTROY) {
cont.resume(Unit)
}
}
this@repeatOnLifecycle.addObserver(observer as LifecycleEventObserver)
}
} finally {
launchedJob?.cancel()
observer?.let {
this@repeatOnLifecycle.removeObserver(it)
}
}
}
}
}
@FlowPreview
@AndroidEntryPoint
class HIltActivity : AppCompatActivity(R.layout.activity_hilt) {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
val binding =
DataBindingUtil.setContentView<ActivityHiltBinding>(this, R.layout.activity_hilt)
lifecycleScope.launchWhenResumed {
vm.getFlowTest() .collect {
printlnLog("collect---> $it")
}
}
lifecycleScope.launch {
vm.getFlowTest().flowWithLifecycle(lifecycle)
.collect {
printlnLog("--------collect---> $it")
}
}
}
override fun onResume() {
super.onResume()
printlnLog("onResume")
}
override fun onPause() {
super.onPause()
printlnLog("onPPPause")
}
}
@FlowPreview
@HiltViewModel
class HiltVm @Inject constructor(
private val localHiltRepository: LocalHiltRepository,
private val remoteHiltRepository: RemoteHiltRepository
) : ViewModel() {
private var i = 0
fun getFlowTest():Flow<Int>{
return flow<Int> {
while (true){
delay(2000)
printlnLog("-----------------$i")
emit(i++)
}
}.flowOn(Dispatchers.IO)
}
launchWhenResumed 打印如下:

repeatOnLifecycle 打印如下:

网友评论