协程Flow的冷热流
作者:访客发布时间:2024-01-04分类:程序开发学习浏览:103
冷流
特点
- 只有在
flow{}
括号内部才会产生数据如:emit(xxx)
必须有订阅才会生产数据
,也就是需要末端操作符如:collect()
- 生产者和消费者是一一对应的
private suspend fun demo1() {
flow {
(1..5).forEach {
delay(1000)
//必须在{}内部
emit(it)
}
}.collect {
//不调用collect,上面的emit不会执行
XLogUtils.d("collect value= $it")
}
}
同步执行
在同一个协程中多个flow
是同步执行的,第二个collect
需要等带上一个collect
执行完毕
/**
* 这里suspend方法就代表两个flow在同一个协程中同步执行
*/
private suspend fun demo2() {
flowOf(1, 2, 3, 4, 5).onEach {
delay(1000)
}.collect {
XLogUtils.d("collect value= $it")
}
//上面flow会阻塞下面的,是同步执行的
flowOf(7, 8, 9, 10, 11).onEach {
delay(1000)
}.collect {
XLogUtils.d("collect value= $it")
}
}
日志如下:
D collect value= 1
D collect value= 2
D collect value= 3
D collect value= 4
D collect value= 5
D collect value= 7
D collect value= 8
D collect value= 9
D collect value= 10
D collect value= 11
异步执行
想异步怎么办?每个flow
单独一个协程不就可以吗?
private fun demo33() {
lifecycleScope.launch {
flowOf(1, 2, 3, 4, 5).onEach {
delay(1000)
}.collect {
XLogUtils.d("collect value= $it")
}
}
//上面flow会阻塞下面的,是同步执行的
lifecycleScope.launch {
flowOf(7, 8, 9, 10, 11).onEach {
delay(1000)
}.collect {
XLogUtils.d("collect value= $it")
}
}
}
日志如下:
D collect value= 1
D collect value= 7
D collect value= 2
D collect value= 8
D collect value= 3
D collect value= 9
D collect value= 4
D collect value= 10
D collect value= 5
D collect value= 11
或者其中一个协程切换线程也是ok的,不懂的建议学习下协程的原理
切换线程
不要在flow
中使用withContext()
来切线程
不要在flow
中使用withContext()
来切线程
不要在flow
中使用withContext()
来切线程
重要的事情说三遍,切线程使用flowOn()
/**
* 值得注意的地方,不要使用 withContext() 来切换 flow 的线程。
*/
private suspend fun demo4() {
flow { emit(1) }
.onEach {
delay(1000)
}
.map {
XLogUtils.d("map1->${Thread.currentThread().name}")
}
.flowOn(Dispatchers.IO)//对map进行线程切换,在线程池中切换
.map {
XLogUtils.e("map2->${Thread.currentThread().name}")
}
.flowOn(Dispatchers.Main)
.collect {
//collect 执行的线程取决于 整个方法所在的线程
XLogUtils.d("${Thread.currentThread().name}: $it")
}
}
日志如下:
D map1->DefaultDispatcher-worker-1
E map2->main
D main: kotlin.Unit
从日志看flowOn()
作用于它上面的代码,可以使用多个flowOn()
切换不同逻辑代码执行的线程
取消 or 中断
private var cancelJob: Job? = null
/**
* flow在挂起函数内是可以被中断的
*/
private suspend fun demo5() {
//第二次点击取消协程
if (cancelJob != null) {
cancelJob?.cancel()
return
}
cancelJob = lifecycleScope.launch {
(0..100).asFlow().onEach {
delay(1000)
}.collect {
XLogUtils.d("collect value= $it")
}
}
}
collect()
是suspend
函数,取消其所在的协程就可以中断flow生产数据
flow 执行完成
onCompletion
操作符意思是flow
执行完成,最终会走到.onCompletion {}
不管释放发生异常都会走,内部是try/catch
实现,可以在此方法中做一些释放的操作。
/**
* 看onCompletion源码可知 内部是通过 try finally实现的,
* 不过正常还是异常结束都会走onCompletion
*/
private suspend fun demo6() {
//onCompletion不管是否有异常都会走
//可以借助扩展函数来实现只有成功才会走,也就是onCompleted方法
flow {
emit(1)
delay(1000)
emit(2)
throw RuntimeException("发生异常")
}
// .onCompleted { XLogUtils.i("执行完毕") }
.onCompletion { XLogUtils.i("执行完毕") }
.catch { XLogUtils.e("异常= ${it.printStackTrace()}") }
.collect {
XLogUtils.d("collect value= $it")
}
}
/**
* 只有在成功的时候才会执行onCompleted
*/
fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
collect { value -> emit(value) }
action()
}
重试机制
private var retryCount = 0
/**
* 重试机制
*/
private suspend fun demo7() {
(1..5).asFlow().onEach {
if (it == 3 && retryCount == 0) throw RuntimeException("出错啦")
}.retry(2) {//重试两次都失败的情况 会抛出异常
retryCount++;
if (it is RuntimeException) {
return@retry true
}
return@retry false
}
.onEach { XLogUtils.d("数据 $it") }
.catch { it.printStackTrace() }
.collect()
}
发生异常会生产者重新执行一遍,日志如下:
D 数据 1
D 数据 2
D 数据 1
D 数据 2
D 数据 3
D 数据 4
D 数据 5
热流
特点
- 不需要在
flow{}
生产数据,可以在其他任意地方 - 不管是否有消费者订阅,生产者都会产生数据
热流有两种StateFlow
和SharedFlow
SharedFlow
看下官方例子
class EventBus {
private val _events = MutableSharedFlow<Event>() // private mutable shared flow
val events = _events.asSharedFlow() // publicly exposed as read-only shared flow
suspend fun produceEvent(event: Event) {
_events.emit(event) // suspends until all subscribers receive it
}
}
是不是有点像LiveData
,不同点看MutableSharedFlow
的构造方法
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
//............
}
参数一:replay
Int
类型,可以理解为粘性事件,当不同的消费者订阅后后重新接收replay
个之前事件生产的数据,看例子:
private fun dome1() {
val flow = MutableSharedFlow<String>(replay = 1)
lifecycleScope.launch {
//第一个订阅者
launch {
flow.collect {
XLogUtils.d("$TAG collect1 $it")
}
}
//生产数据
launch {
(1..10).forEach {
flow.emit("第${it}个")
}
}
}
//模拟在生产者产生数据后才订阅
lifecycleScope.launch {
delay(3000)
//第二个订阅者
flow.collect {
XLogUtils.d("$TAG collect2 $it")
}
}
}
日志如下:
D FlowHotClodActivityTAG collect1 第1个
D FlowHotClodActivityTAG collect1 第2个
D FlowHotClodActivityTAG collect1 第3个
D FlowHotClodActivityTAG collect1 第4个
D FlowHotClodActivityTAG collect1 第5个
D FlowHotClodActivityTAG collect1 第6个
D FlowHotClodActivityTAG collect1 第7个
D FlowHotClodActivityTAG collect1 第8个
D FlowHotClodActivityTAG collect1 第9个
D FlowHotClodActivityTAG collect1 第10个
D FlowHotClodActivityTAG collect2 第10个
从日志来看replay = n
时,多次订阅会将生产者的最后n次事件重新发送一遍。
参数二:extraBufferCapacity
Int
类型,译文:额外的缓冲池,为什么是额外的呢,第一个参数其实也是缓存的数量,例如replay = n extraBufferCapacity=m
,那么总共的size
就是 m+n
个,通常用于生产速率>消费的速率的情况。
来看例子,模拟了生产>消费的情况
private fun dome2() {
val flow = MutableSharedFlow<String>(replay = 2, extraBufferCapacity = 5)
lifecycleScope.launch {
//第一个订阅者
launch {
flow.collect {
delay(1000)
XLogUtils.d("$TAG 消费 $it")
}
}
//生产数据
launch {
(1..10).forEach {
val v = "第${it}个"
XLogUtils.e("$TAG 生产 $v")
flow.emit(v)
delay(500)
}
}
}
//模拟在生产者产生数据后才订阅
lifecycleScope.launch {
delay(15000)
//第二个订阅者
flow.collect {
XLogUtils.d("$TAG collect2 $it")
}
}
}
从日志可以看出来生产者很快就发送了10条数据,而消费的速度慢,后续才慢慢的处理完所有的数据;
看到这里是不是有疑问,extraBufferCapacity=5
在这里好像没有作用啊? 将上述代码改造一下
private fun dome2() {
val flow = MutableSharedFlow<String>(replay = 2, extraBufferCapacity =1)
lifecycleScope.launch {
//第一个订阅者
launch {
flow.collect {
delay(2000)
XLogUtils.d("$TAG 消费 $it")
}
}
//生产数据
launch {
(1..10).forEach {
val v = "第${it}个"
XLogUtils.e("$TAG 生产 $v")
flow.emit(v)
delay(100)
}
}
}
}
改小了生产的时间,减小的缓存池数量,再来看日志
可以看到当缓存池满了后,是消费掉一个才会生产一个(第6个开始)
,也就是将生产者挂起了,这就牵扯到了第三个参数。
参数三:onBufferOverflow
缓冲区策略,看在这里仔细想想有没有点像线程池的构造方法呢?核心线程,非核心线程,拒绝策略??
反正我是觉得这里的设计思想跟Java线程池
类似,而第三个参数就是线程池的拒绝策略,flow提供了三种策略
public enum class BufferOverflow {
//默认的,当生产速率大于消费速率并且缓冲池已满的情况,会挂起生产者,等待消费者
SUSPEND,
//丢弃老的数据
DROP_OLDEST,
//丢弃新的数据
DROP_LATEST
}
这里就不做多介绍了,可以说跟线程池拒绝策略非常像,不懂的可以学习下Java线程池
;
上述所说的挂起操作,感兴趣可以阅读下SharedFlowImpl
的tryEmit
方法,不是本文重点。
override suspend fun emit(value: T) {
if (tryEmit(value)) return // fast-path
emitSuspend(value)
}
StatedFlow
StatedFlow
是一种的特殊的SharedFlow
,看源码StatedFlow
实现了SharedFlow<T>
接口replayCache
就是listof(构造)
,也就是说StatedFlow
是size=1
的SharedFlow
,有点绕就接着往下看
与SharedFlow
区别
replay==1缓存数量为1
private class StateFlowImpl<T>(
initialState: Any // T | NULL
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
private val _state = atomic(initialState) // T | NULL
public override var value: T
get() = NULL.unbox(_state.value)
set(value) { updateState(null, value ?: NULL) }
//关键
override val replayCache: List<T>
get() = listOf(value)
}
上面只贴了关键代码,可以看出来replay==1
必须要有初始值且不能为空
也就是上面代码中的 initialState: Any // T | NULL
,重复的代码就不贴了
两次数据一致的情况不会发送第二次
看源码中的updateState
方法,如下
private fun updaupdateteState(expectedState: Any?, newState: Any): Boolean {
var curSequence = 0
var curSlots: Array<StateFlowSlot?>? = this.slots // benign race, we will not use it
synchronized(this) {
val oldState = _state.value
if (expectedState != null && oldState != expectedState) return false // CAS support
//这里这里这里这里这里
if (oldState == newState) return true // Don't do anything if value is not changing,
//。。。。。。。。。。。。。。。
}
重写了value
的set()
方法,调用的updaupdateteState()
对比LiveData
相同点
- 允许多个消费者订阅
- 粘性事件,事件数量==1
- 当生产的数据太快时都会丢失数据
1,2点没什么说的,前面叙述都有体现,第3点可以看StateFlowImpl
源码
默认是BufferOverflow.DROP_OLDEST
而SharedFlow是BufferOverflow.SUSPEND
通过StateFlow
顶部的官方注释也能看得出来
不同点
- 必须要默认值
StateFlow
默认是防抖的,LiveData
不防抖StateFlow
没有跟生命周期绑定
第二点,LiveData
想要防抖可以使用distinctUntilChanged
viewModel.liveData.distinctUntilChanged().observe(this, Observer {
})
第三点,没有跟生命周期绑定的话就需要我们利用LifeCycle,参考google官方写法
就是在协程作用域内使用repeatOnLifecycle
本文涉及的源码1
本文涉及的源码2
总结
以下代表个人观点,不对的地方请指出
-
StateFlow
是特殊的SharedFlow
,replay=1 onBufferOverflow=BufferOverflow.SUSPEND
; -
StateFlow
可以替代LiveData使用,但是我认为还是LiveData
好用; -
StateFlow
必须有默认值,默认防抖,有粘性事件,没有生命周期绑定; -
SharedFlow
没有默认值,粘性事件,可以自定义事件的数量,有缓冲池; -
StateFlow
常用来表示状态,如View
的显示和隐藏,按钮selected
,都是需要默认值; -
SharedFlow
我理解是储存数据,如列表的list<T>
数据,在屏幕发生旋转,或者在Fragmetn
的View
销毁重建后的数据恢复;第5,6点用新闻列表来打个比方
列表的整个数据用
SharedFlow<List<Data>>
,而点赞数,收藏这些数据变化监听就可以用StateFlow
; 列表数据是一次性的,拉取下来后基本不会变动,而点赞收藏需要默认值,而且会在我们操作后发生变化,所有我认为使用StateFlow
更合适。
- 程序开发学习排行
-
- 1鸿蒙HarmonyOS:Web组件网页白屏检测
- 2HTTPS协议是安全传输,为啥还要再加密?
- 3HarmonyOS鸿蒙应用开发——数据持久化Preferences
- 4记解决MaterialButton背景颜色与设置值不同
- 5鸿蒙HarmonyOS实战-ArkUI组件(RelativeContainer)
- 6鸿蒙HarmonyOS实战-ArkUI组件(Stack)
- 7[Android][NDK][Cmake]一文搞懂Android项目中的Cmake
- 8鸿蒙HarmonyOS实战-ArkUI组件(mediaquery)
- 9鸿蒙HarmonyOS实战-ArkUI组件(GridRow/GridCol)
- 最近发表
-
- WooCommerce最好的WordPress常用插件下载博客插件模块的相关产品
- 羊驼机器人最好的WordPress常用插件下载博客插件模块
- IP信息记录器最好的WordPress常用插件下载博客插件模块
- Linkly for WooCommerce最好的WordPress常用插件下载博客插件模块
- 元素聚合器Forms最好的WordPress常用插件下载博客插件模块
- Promaker Chat 最好的WordPress通用插件下载 博客插件模块
- 自动更新发布日期最好的WordPress常用插件下载博客插件模块
- WordPress官方最好的获取回复WordPress常用插件下载博客插件模块
- Img to rss最好的wordpress常用插件下载博客插件模块
- WPMozo为Elementor最好的WordPress常用插件下载博客插件模块添加精简版