联系我们
简单又实用的WordPress网站制作教学
当前位置:网站首页 > 程序开发学习 > 正文

协程Flow的冷热流

作者:访客发布时间:2024-01-04分类:程序开发学习浏览:74


导读:冷流特点只有在flow{}括号内部才会产生数据如:emit(xxx)必须有订阅才会生产数据,也就是需要末端操作符如:collect()生产者和消费者是一一对应的priva...

冷流

特点

  • 只有在flow{}括号内部才会产生数据如:emit(xxx)
  • 必须有订阅才会生产数据,也就是需要末端操作符如:collect()
  • 生产者和消费者是一一对应的
private suspend fun demo1() {
    flow {
        (1..5).forEach {
            delay(1000)
            //必须在{}内部
            emit(it)
        }
    }.collect {
        //不调用collect,上面的emit不会执行
        XLogUtils.d("collect value= $it")
    }
}

同步执行

在同一个协程中多个flow是同步执行的,第二个collect需要等带上一个collect执行完毕

/**
 * 这里suspend方法就代表两个flow在同一个协程中同步执行
 */
private suspend fun demo2() {
    flowOf(1, 2, 3, 4, 5).onEach {
        delay(1000)
    }.collect {
        XLogUtils.d("collect value= $it")
    }
    //上面flow会阻塞下面的,是同步执行的
    flowOf(7, 8, 9, 10, 11).onEach {
        delay(1000)
    }.collect {
        XLogUtils.d("collect value= $it")
    }
}

日志如下:

   D  collect value= 1
   D  collect value= 2
   D  collect value= 3
   D  collect value= 4
   D  collect value= 5
   D  collect value= 7
   D  collect value= 8
   D  collect value= 9
   D  collect value= 10
   D  collect value= 11

异步执行

想异步怎么办?每个flow单独一个协程不就可以吗?

private fun demo33() {
    lifecycleScope.launch {
        flowOf(1, 2, 3, 4, 5).onEach {
            delay(1000)
        }.collect {
            XLogUtils.d("collect value= $it")
        }
    }

    //上面flow会阻塞下面的,是同步执行的
    lifecycleScope.launch {
        flowOf(7, 8, 9, 10, 11).onEach {
            delay(1000)
        }.collect {
            XLogUtils.d("collect value= $it")
        }
    }
}

日志如下:

  D  collect value= 1
  D  collect value= 7
  D  collect value= 2
  D  collect value= 8
  D  collect value= 3
  D  collect value= 9
  D  collect value= 4
  D  collect value= 10
  D  collect value= 5
  D  collect value= 11

或者其中一个协程切换线程也是ok的,不懂的建议学习下协程的原理

切换线程

不要在flow中使用withContext()来切线程

不要在flow中使用withContext()来切线程

不要在flow中使用withContext()来切线程

重要的事情说三遍,切线程使用flowOn()

/**
 * 值得注意的地方,不要使用 withContext() 来切换 flow 的线程。
 */
private suspend fun demo4() {
    flow { emit(1) }
        .onEach {
            delay(1000)
        }
        .map {
            XLogUtils.d("map1->${Thread.currentThread().name}")
        }
        .flowOn(Dispatchers.IO)//对map进行线程切换,在线程池中切换
        .map {
            XLogUtils.e("map2->${Thread.currentThread().name}")
        }
        .flowOn(Dispatchers.Main)
        .collect {
            //collect 执行的线程取决于 整个方法所在的线程
            XLogUtils.d("${Thread.currentThread().name}: $it")
        }
}

日志如下:

 D  map1->DefaultDispatcher-worker-1
 E  map2->main
 D  main: kotlin.Unit

从日志看flowOn()作用于它上面的代码,可以使用多个flowOn()切换不同逻辑代码执行的线程

取消 or 中断

private var cancelJob: Job? = null

/**
 * flow在挂起函数内是可以被中断的
 */
private suspend fun demo5() {
    //第二次点击取消协程
    if (cancelJob != null) {
        cancelJob?.cancel()
        return
    }
    cancelJob = lifecycleScope.launch {
        (0..100).asFlow().onEach {
            delay(1000)
        }.collect {
            XLogUtils.d("collect value= $it")
        }
    }
}

collect()suspend函数,取消其所在的协程就可以中断flow生产数据

flow 执行完成

onCompletion操作符意思是flow执行完成,最终会走到.onCompletion {}不管释放发生异常都会走,内部是try/catch实现,可以在此方法中做一些释放的操作。

    /**
     * 看onCompletion源码可知 内部是通过 try finally实现的,
     * 不过正常还是异常结束都会走onCompletion
     */
    private suspend fun demo6() {
        //onCompletion不管是否有异常都会走
        //可以借助扩展函数来实现只有成功才会走,也就是onCompleted方法
        flow {
            emit(1)
            delay(1000)
            emit(2)
            throw RuntimeException("发生异常")
        }
//            .onCompleted { XLogUtils.i("执行完毕") }
                .onCompletion { XLogUtils.i("执行完毕") }
            .catch { XLogUtils.e("异常= ${it.printStackTrace()}") }
            .collect {
                XLogUtils.d("collect value= $it")
            }
    }
    
   
    /**
     * 只有在成功的时候才会执行onCompleted
     */
    fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
        collect { value -> emit(value) }
        action()
    }

重试机制


private var retryCount = 0

/**
 * 重试机制
 */
private suspend fun demo7() {
    (1..5).asFlow().onEach {
        if (it == 3 && retryCount == 0) throw RuntimeException("出错啦")
    }.retry(2) {//重试两次都失败的情况 会抛出异常
        retryCount++;
        if (it is RuntimeException) {
            return@retry true
        }
        return@retry false
    }
        .onEach { XLogUtils.d("数据 $it") }
        .catch { it.printStackTrace() }
        .collect()
}

发生异常会生产者重新执行一遍,日志如下:

  D  数据 1
  D  数据 2
  D  数据 1
  D  数据 2
  D  数据 3
  D  数据 4
  D  数据 5

热流

特点

  • 不需要在flow{}生产数据,可以在其他任意地方
  • 不管是否有消费者订阅,生产者都会产生数据

热流有两种StateFlowSharedFlow

SharedFlow

看下官方例子

class EventBus {
    private val _events = MutableSharedFlow<Event>() // private mutable shared flow
    val events = _events.asSharedFlow() // publicly exposed as read-only shared flow
    suspend fun produceEvent(event: Event) {
        _events.emit(event) // suspends until all subscribers receive it
    }
}

是不是有点像LiveData,不同点看MutableSharedFlow的构造方法

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
   //............
}

参数一:replay

Int类型,可以理解为粘性事件,当不同的消费者订阅后后重新接收replay个之前事件生产的数据,看例子:


private fun dome1() {
    val flow = MutableSharedFlow<String>(replay = 1)
    lifecycleScope.launch {
        //第一个订阅者
        launch {
            flow.collect {
                XLogUtils.d("$TAG collect1 $it")
            }
        }
        //生产数据
        launch {
            (1..10).forEach {
                flow.emit("第${it}个")
            }
        }
    }
    //模拟在生产者产生数据后才订阅
    lifecycleScope.launch {
        delay(3000)
        //第二个订阅者
        flow.collect {
            XLogUtils.d("$TAG collect2 $it")
        }
    }
}

日志如下:

D  FlowHotClodActivityTAG collect1 第1个
D  FlowHotClodActivityTAG collect1 第2个
D  FlowHotClodActivityTAG collect1 第3个
D  FlowHotClodActivityTAG collect1 第4个
D  FlowHotClodActivityTAG collect1 第5个
D  FlowHotClodActivityTAG collect1 第6个
D  FlowHotClodActivityTAG collect1 第7个
D  FlowHotClodActivityTAG collect1 第8个
D  FlowHotClodActivityTAG collect1 第9个
D  FlowHotClodActivityTAG collect1 第10个
D  FlowHotClodActivityTAG collect2 第10个

从日志来看replay = n时,多次订阅会将生产者的最后n次事件重新发送一遍。

参数二:extraBufferCapacity

Int类型,译文:额外的缓冲池,为什么是额外的呢,第一个参数其实也是缓存的数量,例如replay = n extraBufferCapacity=m,那么总共的size就是 m+n个,通常用于生产速率>消费的速率的情况。

来看例子,模拟了生产>消费的情况

private fun dome2() {
    val flow = MutableSharedFlow<String>(replay = 2, extraBufferCapacity = 5)
    lifecycleScope.launch {
        //第一个订阅者
        launch {
            flow.collect {
                delay(1000)
                XLogUtils.d("$TAG 消费 $it")
            }
        }
        //生产数据
        launch {
            (1..10).forEach {
                val v = "第${it}个"
                XLogUtils.e("$TAG 生产 $v")
                flow.emit(v)
                delay(500)
            }
        }
    }
    //模拟在生产者产生数据后才订阅
    lifecycleScope.launch {
        delay(15000)
        //第二个订阅者
        flow.collect {
            XLogUtils.d("$TAG collect2 $it")
        }
    }
}

微信截图_20240103151106.png

从日志可以看出来生产者很快就发送了10条数据,而消费的速度慢,后续才慢慢的处理完所有的数据;

看到这里是不是有疑问,extraBufferCapacity=5在这里好像没有作用啊? 将上述代码改造一下

private fun dome2() {
    val flow = MutableSharedFlow<String>(replay = 2, extraBufferCapacity =1)
    lifecycleScope.launch {
        //第一个订阅者
        launch {
            flow.collect {
                delay(2000)
                XLogUtils.d("$TAG 消费 $it")
            }
        }
        //生产数据
        launch {
            (1..10).forEach {
                val v = "第${it}个"
                XLogUtils.e("$TAG 生产 $v")
                flow.emit(v)
                delay(100)
            }
        }
    }
}

改小了生产的时间,减小的缓存池数量,再来看日志

image.png

可以看到当缓存池满了后,是消费掉一个才会生产一个(第6个开始),也就是将生产者挂起了,这就牵扯到了第三个参数。

参数三:onBufferOverflow

缓冲区策略,看在这里仔细想想有没有点像线程池的构造方法呢?核心线程,非核心线程,拒绝策略??

反正我是觉得这里的设计思想跟Java线程池类似,而第三个参数就是线程池的拒绝策略,flow提供了三种策略

public enum class BufferOverflow {
    //默认的,当生产速率大于消费速率并且缓冲池已满的情况,会挂起生产者,等待消费者
    SUSPEND,
    //丢弃老的数据
    DROP_OLDEST,
    //丢弃新的数据
    DROP_LATEST
}

这里就不做多介绍了,可以说跟线程池拒绝策略非常像,不懂的可以学习下Java线程池

上述所说的挂起操作,感兴趣可以阅读下SharedFlowImpltryEmit方法,不是本文重点。

override suspend fun emit(value: T) {
    if (tryEmit(value)) return // fast-path
    emitSuspend(value)
}

StatedFlow

StatedFlow是一种的特殊的SharedFlow,看源码StatedFlow实现了SharedFlow<T>接口replayCache就是listof(构造),也就是说StatedFlowsize=1SharedFlow,有点绕就接着往下看

SharedFlow区别

replay==1缓存数量为1
private class StateFlowImpl<T>(
    initialState: Any // T | NULL
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
 private val _state = atomic(initialState) // T | NULL
  public override var value: T
        get() = NULL.unbox(_state.value)
        set(value) { updateState(null, value ?: NULL) }

    //关键
    override val replayCache: List<T>
        get() = listOf(value)
}

上面只贴了关键代码,可以看出来replay==1

必须要有初始值且不能为空

也就是上面代码中的 initialState: Any // T | NULL,重复的代码就不贴了

两次数据一致的情况不会发送第二次

看源码中的updateState方法,如下


private fun updaupdateteState(expectedState: Any?, newState: Any): Boolean {
    var curSequence = 0
    var curSlots: Array<StateFlowSlot?>? = this.slots // benign race, we will not use it
    synchronized(this) {
        val oldState = _state.value
        if (expectedState != null && oldState != expectedState) return false // CAS support
        //这里这里这里这里这里
        if (oldState == newState) return true // Don't do anything if value is not changing, 
        //。。。。。。。。。。。。。。。
}

重写了valueset()方法,调用的updaupdateteState()

对比LiveData

相同点
  1. 允许多个消费者订阅
  2. 粘性事件,事件数量==1
  3. 当生产的数据太快时都会丢失数据

1,2点没什么说的,前面叙述都有体现,第3点可以看StateFlowImpl源码

image.png

默认是BufferOverflow.DROP_OLDEST 而SharedFlow是BufferOverflow.SUSPEND

通过StateFlow顶部的官方注释也能看得出来

微信截图_20240103171929.png

不同点
  1. 必须要默认值
  2. StateFlow 默认是防抖的,LiveData 不防抖
  3. StateFlow没有跟生命周期绑定

第二点,LiveData想要防抖可以使用distinctUntilChanged

viewModel.liveData.distinctUntilChanged().observe(this, Observer {
   
})

第三点,没有跟生命周期绑定的话就需要我们利用LifeCycle,参考google官方写法

微信截图_20240103170927.png

就是在协程作用域内使用repeatOnLifecycle

本文涉及的源码1

本文涉及的源码2

总结

以下代表个人观点,不对的地方请指出

  1. StateFlow是特殊的SharedFlowreplay=1 onBufferOverflow=BufferOverflow.SUSPEND

  2. StateFlow可以替代LiveData使用,但是我认为还是LiveData好用;

  3. StateFlow必须有默认值,默认防抖,有粘性事件,没有生命周期绑定;

  4. SharedFlow没有默认值,粘性事件,可以自定义事件的数量,有缓冲池;

  5. StateFlow常用来表示状态,如View的显示和隐藏,按钮selected,都是需要默认值;

  6. SharedFlow我理解是储存数据,如列表的list<T> 数据,在屏幕发生旋转,或者在FragmetnView销毁重建后的数据恢复;

    第5,6点用新闻列表来打个比方

    列表的整个数据用SharedFlow<List<Data>>,而点赞数,收藏这些数据变化监听就可以用StateFlow; 列表数据是一次性的,拉取下来后基本不会变动,而点赞收藏需要默认值,而且会在我们操作后发生变化,所有我认为使用StateFlow更合适。


标签:热流协程Flow


程序开发学习排行
最近发表
网站分类
标签列表