联系我们
简单又实用的WordPress网站制作教学
当前位置:网站首页 > 程序开发学习 > 正文

Kotlin 协程源码阅读笔记 —— Flow

作者:访客发布时间:2023-12-31分类:程序开发学习浏览:134


导读:Kotlin协程源码阅读笔记——FlowFlow就是Kotlin协程中的流,我们可以通过它在Kotlin中进行写出流式代码,大名鼎鼎的RxJava就是流式编程...

Kotlin 协程源码阅读笔记 —— Flow

Flow 就是 Kotlin 协程中的流,我们可以通过它在 Kotlin 中进行写出流式代码,大名鼎鼎的 RxJava 就是流式编程的库(不过我遇到很多的人他们只会用 RxJava 切线程😂),在 Kotlin 的早期 Flow 的各种操作符相对于 RxJava 比较少,不过现在 Flow 的操作符也是非常的丰富,RxJava 中常用的操作符在 Flow 中几乎都能够找到功能类似的操作符。所以如果项目中的 RxJava 现在也可以无缝迁移到 Flow
Flow 属于冷流(如果不知道冷流,热流的同学建议去网上找找相关的概念),对应到 RxJava 中的就是 ObservableFlowableSingleMaybeCompletable 等等。Kotlin 协程中的热流实现是 ChannelMutableSharedFlowMutableStateFlow 等等,对应到 RxJava 中的热流是 PublisherSubjectBehaviorSubject
本篇文章主要是介绍 Kotlin 协程中的冷流 Flow,其他的热流相关的类考虑后面的文章再介绍。

源码阅读基于 Kotlin 协程 1.8.0-RC2

Flow 的简单使用

我在以下的代码就构建了一个简单的 Flow

val myFlow = flow {
    repeat(10) {
        emit(it)
    }
}

我的上面我创建的 Flow 会发送 10 个元素,依次从 0 到 9。

runBlocking(Dispatchers.Default) {
    val myFlow = flow {
        repeat(10) {
            emit(it)
        }
    }
    launch { 
        myFlow.collect {
            println("Coroutine1: $it")
        }
    }
    launch {
        myFlow.collect {
            println("Coroutine2: $it")
        }
    }
}

协程1和协程2都通过 Flow#collect() 来订阅 Flow,协程1和协程2之间的订阅他们之间是相互不影响的,他们的订阅都会触发一次 flow() 方法中的 Lambda 调用,而对应的 flow() 中的 Lambda 中每调用一次 emit() 方法都会触发一次 collect() 方法订阅时传递的 Lambda

Flow 也有一些限制,flow()Lambda 中调用 emit() 方法时不能够修改原来的 CoroutineContext,比如切换 ContinuationContext,假如我把上面的代码改成下面这样就会抛出异常(如果想要绕过这个限制可以使用 ChannelFlow):

runBlocking(Dispatchers.Default) {
    val myFlow = flow {
        repeat(10) {
            withContext(Dispatchers.IO) {
                emit(it)
            }
        }
    }
    launch {
        myFlow.collect {
            println("Coroutine1: $it")
        }
    }
}

我将上面的代码修改成使用 ChannelFlow 就可以正常工作了:

runBlocking(Dispatchers.Default) {
    val myFlow = channelFlow {
        repeat(10) {
            withContext(Dispatchers.IO) {
                channel.send(it)
            }
        }
    }
    launch {
        myFlow.collect {
            println("Coroutine1: $it")
        }
    }
}

Flow 还有一个限制就是如果 collect()Lambda 方法中出现异常后,就算在 emit() 方法调用时将它 catch 掉,后续的使用还是会继续报错,简单来说就是 collect()Lambda 方法中报错了后那么这次 collect() 订阅就结束了,不能够再使用,我继续把上面的代码改成下面这样,也是不能够使用的:

runBlocking(Dispatchers.Default) {
    val myFlow = flow {
        repeat(10) {
            try {
                emit(it)
            } catch (e: Throwable) {
                emit(233)
            }
        }
    }
    launch {
        myFlow.collect {
            if (it == 2) {
                error("Test Error")
            }
            println("Coroutine1: $it")
        }
    }
}

Flow 工作原理

我们先看看 flow() 方法的源码:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

直接创建一个 Flow 的实现类 SafeFlow,它继承于 AbstractFlow,订阅时调用的是 collect() 方法,我们看看 AbstractFlow#collect() 方法的实现:

public final override suspend fun collect(collector: FlowCollector<T>) {
    val safeCollector = SafeCollector(collector, coroutineContext)
    try {
        collectSafely(safeCollector)
    } finally {
        safeCollector.releaseIntercepted()
    }
}

collect() 方法中将外部传入的 FlowCollector 使用 SafeCollector 代理了,然后传递给 SafeFlow#collectSafely() 方法,我们看看它的实现:

override suspend fun collectSafely(collector: FlowCollector<T>) {
    collector.block()
}

上面的代码也是朴实无华,直接通过 SafeCollector 调用 flow() 方法中传入的方法,我们想要发送元素时也是通过 emit() 方法来完成,我们看看 SafeCollector#emit() 方法的实现(如果不被 SafeCollector 对象代理,这个时候我们直接调用 collect() 方法中传入的 FlowCollector 对象的 emit() 方法就能够完成 Flow 向订阅者的通信了,有一个 unsafeFlow 就是这么实现的):

actual override suspend fun emit(value: T) {
    return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
        try {
            emit(uCont, value)
        } catch (e: Throwable) {
            lastEmissionContext = DownstreamExceptionContext(e, uCont.context)
            throw e
        }
    }
}

首先拿到对应的协程的 Continuation 对象,然后再调用 emit() 方法,注意这里如果调用 emit() 方法报错时会将当前的 CoroutineContext 和错误保存在 DownstreamExceptionContext 对象中,然后赋值给 lastEmissionContextlastEmissionContext 是用来保存上次 emit()CoroutineContext 的,如果上次的 lastEmissionContextDownstreamExceptionContext 对象就表示上次的 emit() 调用出了异常,后面我们会看到处理 DownstreamExceptionContext 的代码,我们再来看看这个非 suspendemit() 方法:


private fun emit(uCont: Continuation<Unit>, value: T): Any? {
    val currentContext = uCont.context
    currentContext.ensureActive()
    // This check is triggered once per flow on a happy path.
    val previousContext = lastEmissionContext
    // 当前的 Context 和 上次的 Context 不一样,需要检查 Context
    if (previousContext !== currentContext) {
        checkContext(currentContext, previousContext, value)
        lastEmissionContext = currentContext
    }
    completion_ = uCont
    // 这里就是直接调用被代理的 FlowCollector 的 emit() 方法。
    val result = emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    /*
     * If the callee hasn't suspended, that means that it won't (it's forbidden) call 'resumeWith` (-> `invokeSuspend`)
     * and we don't have to retain a strong reference to it to avoid memory leaks.
     */
    if (result != COROUTINE_SUSPENDED) {
        completion_ = null
    }
    return result
}

如果前一次调用 emit()CoroutineContext 和当前的 CoroutineContext 不是同一个实例,就需通过 checkContext() 方法检查 CoroutineContext 是否有错(也就是我们上面测试代码中的两个限制), 然后就直接调用被代理的 FlowCollectoremit() 方法,也就是我们订阅时传入的 Lambda 方法。
我们来看看 checkContext() 方法是如何检查 CoroutineContext 的:

private fun checkContext(
    currentContext: CoroutineContext,
    previousContext: CoroutineContext?,
    value: T
) {
    // 如果上次 emit 由错误,这次的 emit 也是直接抛出异常
    if (previousContext is DownstreamExceptionContext) {
        exceptionTransparencyViolated(previousContext, value)
    }
    // 检查当前的 CoroutineContext 和 collect() 方法中的是否一致
    checkContext(currentContext)
}

private fun exceptionTransparencyViolated(exception: DownstreamExceptionContext, value: Any?) {
    
    error("""
        Flow exception transparency is violated:
            Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
            Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
            For a more detailed explanation, please refer to Flow documentation.
        """.trimIndent())
}


internal fun SafeCollector<*>.checkContext(currentContext: CoroutineContext) {
    val result = currentContext.fold(0) fold@{ count, element ->
        val key = element.key
        val collectElement = collectContext[key]
        if (key !== Job) {
            return@fold if (element !== collectElement) Int.MIN_VALUE
            else count + 1
        }

        val collectJob = collectElement as Job?
        val emissionParentJob = (element as Job).transitiveCoroutineParent(collectJob)
        if (emissionParentJob !== collectJob) {
            error(
                "Flow invariant is violated:\n" +
                        "\t\tEmission from another coroutine is detected.\n" +
                        "\t\tChild of $emissionParentJob, expected child of $collectJob.\n" +
                        "\t\tFlowCollector is not thread-safe and concurrent emissions are prohibited.\n" +
                        "\t\tTo mitigate this restriction please use 'channelFlow' builder instead of 'flow'"
            )
        }
        if (collectJob == null) count else count + 1
    }
    if (result != collectContextSize) {
        error(
            "Flow invariant is violated:\n" +
                    "\t\tFlow was collected in $collectContext,\n" +
                    "\t\tbut emission happened in $currentContext.\n" +
                    "\t\tPlease refer to 'flow' documentation or use 'flowOn' instead"
        )
    }
}

上面的代码比较简单,如果上次的 emit() 有异常,直接抛出异常;然后检查当前的 CoroutineContext 和调用 collect() 方法传入的是否一致,如果不一致就抛出异常,具体如何判断 CoroutineContext 是否一致,大家自己看看上面的代码,我就不解释了。

总的来说 Flow 本身的实现是非常简单的,如果不是被 SafeCollector 代理去检查异常和 CoroutineContext 它的代码会更加简单。在调用 collect() 方法订阅时,会创建一个 FlowCollector 对象(Lambda),collect() 方法同时会触发构建 Flowflow() 方法中的 Lambda 的调用,在 flow() 方法中的 Lambda 就通过 collect() 时传递的 FlowCollectoremit() 方法来向其发送消息。

Flow 中还有非常丰富的操作符方法,我们继续分析一些有代表性的操作符。

map() 操作符

map 操作符可以说是非常的简单了,我就不介绍它的具体使用了,我们直接看对应的方法:

public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
    return@transform emit(transform(value))
}

我们看到这里调用了 transform() 方法,然后在 transform() 方法中传入了一个 Lambda,在这个方法中又会调用 map() 方法传入的参数来对原来的元素进行转换,最后将转换后的元素通过 emit() 方法再发送给下游。这里要再说明一下,很多的操作符都是依赖于这个 transform() 方法,比如 filter()filterNot()filterIsInstance()filterNotNull()mapNotNull()withIndex()onEatch 等等。所以我们看懂了 transform() 方法后上面的基于它实现的操作符也都很容易理解了。

public inline fun <T, R> Flow<T>.transform(
    @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = flow { // Note: safe flow is used here, because collector is exposed to transform on each operation
    collect { value ->
        // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
        return@collect transform(value)
    }
}

在看 Flow 流的操作符时,我们先定一个概念:转换前的流通常称为上游(Upstream),通常操作符处理后需要构建一个新的流,而新的流在对原来的流的元素处理后再发送给下游(Downstream)。
我们继续看 transform() 方法,它首先通过 flow() 方法构建一个新的流,构建新的流的 Lambda 中会订阅上游的流,收到上游的数据后通过 transform 参数再处理,在 map 操作符中就是对原来的数据做简单的转换,然后将转换后的数据通过新的 Flow 中的 FlowCollector#emit() 方法再发送出去。

flatMap() 类操作符

Flow 中原来的 flatMap() 操作符已经被弃用了,原来的 flatMap() 操作符用 flatMapConcat() 代替,根据不同的功能还衍生出了 flatMapMerge() 操作符和 flatMapLatest() 操作符,我相信很多人对这三个 flatMap 都搞得不是很清楚,所以在分析源码前要先介绍一下他们的区别。

flatMapConcat()flatMapMerge()flatMapLatest() 之间的区别

flatMapConcat()

它相当于 RxJava 中的 concatMap() 操作符,flatMapConcat() 会将原来流的元素构建成一个新的流,而新的流中的元素数据一定会按照原来的流的元素的数据,可能说得有点抽象,我这里举一个例子。

fun main(args: Array<String>) {
    runBlocking(Dispatchers.Default) {
        val myFlow = flow {
            repeat(3) {
                emit(it)
            }
        }

        launch {
            myFlow
                .flatMapConcat { upstreamValue ->
                    flow {
                        delay(1000L - upstreamValue * 100)
                        repeat(2) {
                            emit(upstreamValue * 10 + it)
                        }
                    }
                }
                .collect {
                    println(it)
                }
        }
    }
}

最后得输出结果是:

0
1
10
11
20
21

我的测试代码中原来得 Flow 会发送 0 1 2 三个元素,然后通过 flatMapConcat() 将原来的一个元素转换成一个包含两个元素的 Flow。越是先发送的元素延迟的时间越长,虽然是这样但是还是按照原来的 Flow 的元素 0 1 2 构成的顺序处理转换后的 Flow

flatMapMerge()

它相当于 RxJava 中的 flatMap() 操作符,和 flatMapConcat() 相比,它就不会保证原来的顺序,而是哪个流先处理完成就先发送数据。还是上面的例子,修改成 flatMapMerge() 操作符。

fun main(args: Array<String>) {
    runBlocking(Dispatchers.Default) {
        val myFlow = flow {
            repeat(3) {
                emit(it)
            }
        }

        launch {
            myFlow
                .flatMapMerge { upstreamValue ->
                    flow {
                        delay(1000L - upstreamValue * 100)
                        repeat(2) {
                            emit(upstreamValue * 10 + it)
                        }
                    }
                }
                .collect {
                    println(it)
                }
        }
    }
}

最后的输出结果就是:

20
21
10
11
0
1

flatMapMerge() 中还有一个非常重要的参数 concurrency,它的默认值是 16,他表示可以并行执行的 Flow 的数量,如果达到最大的并行值,后续的 Flow 就需要等待前面的并行的 Flow 执行完毕后才能执行。当 concurrency 为 1 时,处理的逻辑就是和 flatMapConcat() 一样,也就是每个 Flow 必须排队执行。我们将上面的代码的 concurrency 修改成 2 来试试:

fun main(args: Array<String>) {
    runBlocking(Dispatchers.Default) {
        val myFlow = flow {
            repeat(3) {
                emit(it)
            }
        }

        launch {
            myFlow
                .flatMapMerge(2) { upstreamValue ->
                    flow {
                        delay(1000L - upstreamValue * 100)
                        repeat(2) {
                            emit(upstreamValue * 10 + it)
                        }
                    }
                }
                .collect {
                    println(it)
                }
        }
    }
}

然后输出的结果就是:

10
11
0
1
20
21

flatMapLatest()

它相当于 RxJava 中的 swtichMap() 操作符,前面没有执行完成的 Flow 会被取消,然后被后续的 Flow 替换,还是上面的例子修改成 flatMapLatest() 操作符:

fun main(args: Array<String>) {
    runBlocking(Dispatchers.Default) {
        val myFlow = flow {
            repeat(3) {
                emit(it)
            }
        }

        launch {
            myFlow
                .flatMapLatest { upstreamValue ->
                    flow {
                        delay(1000L - upstreamValue * 100)
                        repeat(2) {
                            emit(upstreamValue * 10 + it)
                        }
                    }
                }
                .collect {
                    println(it)
                }
        }
    }
}

最后的输出结果是:

20
21

flatMapConcat() 操作符

public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
    map(transform).flattenConcat()

通过 map() 操作符将原来的流的元素转换成 Flow,然后通过 flattenConcat() 处理 Flow<Flow<R>>,最后返回的是 Flow<R>,我们再看看 flattenConcat() 是如何处理的:

public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
    collect { value -> emitAll(value) }
}

处理非常简单,构建一个新的 Flow,这个 Flow 在被订阅的时候它会再去订阅他的上游,上游的类型是 Flow<Flow<R>>,所以上游发送来的数据类型是 Flow<R>,新的 Flow 将上游发送来的数据再通过他的 emitAll() 方法直接将这个 Flow<R> 中的数据发送给下游,我们再看看 FlowCollector#emitAll() 方法的实现:

public suspend fun <T> FlowCollector<T>.emitAll(flow: Flow<T>) {
    ensureActive()
    flow.collect(this)
}

处理也是非常的简单,直接使用新构建的 Flow 中的 FlowCollector 来收集参数中的 flow。通过上面的方法来收集每个上游中的 Flow<R> 元素时,需要每个 Flow<R> 处理完成后才会去处理下一个,等待中的 Flow<R> 元素是挂起状态。虽然上面的代码很少,但是逻辑是有点绕的,需要你去多多理解一下。

flatMapMerge() 操作符

flatMapMerge() 操作符的实现可要比 flatMapConcat() 操作符要复杂多了,整理好心情我们继续。

public fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R> =
    map(transform).flattenMerge(concurrency)

这个 concurrency 参数表示可以并行执行的 Flow 的数量,默认是 16,前面我讲过,这个值如果是 1,就和 flatMapConcat() 的处理方式一样。我们看看 flattenMerge() 方法的实现:

public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
    require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
    return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}

concurrency 必须大于 0;如果 concurrency 为 1,那么用 flattenConcat() 处理(上面已经分析过这个方法了);其他情况使用 ChannelFlowMerge 来处理。
ChannelFlowMerge 它是一个 ChannelFlow,上面我们讲 Flow 使用的时候有讲到 ChannelFlow,在很多的地方都有使用到 ChannelFlow,我们也借助分析 ChannelFlowMerge 时来理解 ChannelFlow 的工作方式。

首先 ChannelFlow 也是一个 Flow,我们从上面也知道,触发订阅 Flow,是通过它的 collect() 方法,那么我们直接看看 ChannelFlow#collect() 方法的实现:

override suspend fun collect(collector: FlowCollector<T>): Unit =
    coroutineScope {
        collector.emitAll(produceImpl(this))
    }

首先通过 coroutineScope() 方法构建一个新的 CoroutineScope(通过 CoroutineScope 也就可以启动新的协程),然后通过 FlowCollector#emitAll() 方法将 produceImpl() 返回的结果全部发送过去,produceImpl() 的返回结果是一个 Channel(),我们看看 FlowCollector#emitAll() 方法是如何处理 Channel 的。

public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Unit =
    emitAllImpl(channel, consume = true)

private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
    ensureActive()
    var cause: Throwable? = null
    try {
        for (element in channel) {
            emit(element)
        }
    } catch (e: Throwable) {
        cause = e
        throw e
    } finally {
        if (consume) channel.cancelConsumed(cause)
    }
}

上面代码非常简单,遍历 channel 中的元素,然后通过 emit() 方法发送给下游,最后将 channel 关闭。

OK,我们继续看看 ChannelFlow#produceImpl() 方法是怎么生成 Channel 的:

public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
    scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)

继续调用 CoroutineScope#produce() 方法,这里要注意一下,上面的 collectToFuncollectTo() 函数,它是一个抽象函数,是由 ChannelFlowMerge#collectTo() 实现。
继续看看 produce() 方法:

internal fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    onCompletion: CompletionHandler? = null,
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
    val channel = Channel<E>(capacity, onBufferOverflow)
    val newContext = newCoroutineContext(context)
    val coroutine = ProducerCoroutine(newContext, channel)
    if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
    coroutine.start(start, coroutine, block)
    return coroutine
}

首先构建一个 Channel 实例,然后构建一个 ProducerCoroutine 协程,然后启动一个协程,我在前面的文章中有介绍过协程启动的基本流程,不熟悉这个流程的同学可以看看我的这篇文章:Kotlin 协程源码阅读笔记 —— 协程工作原理,协程开始执行时的入口方法就是那个 block,也就是我们上面说到的 ChannelFlowMerge#collectTo(),注意我们的协程 ProducerCoroutine,他也是一个 Channel,而真正的实现是方法开始时构建的 Channel。我们继续看看 ChannelFlowMerge#collectTo() 方法的实现:

override suspend fun collectTo(scope: ProducerScope<T>) {
    val semaphore = Semaphore(concurrency)
    val collector = SendingCollector(scope)
    val job: Job? = coroutineContext[Job]
    flow.collect { inner ->
        /*
         * We launch a coroutine on each emitted element and the only potential
         * suspension point in this collector is `semaphore.acquire` that rarely suspends,
         * so we manually check for cancellation to propagate it to the upstream in time.
         */
        job?.ensureActive()
        semaphore.acquire()
        scope.launch {
            try {
                inner.collect(collector)
            } finally {
                semaphore.release() // Release concurrency permit
            }
        }
    }
}

上面的参数 ProduceScope 其实就是上面的 ProducerCoroutine,他是一个 Channel;然后会构建一个 Semaphore 用来控制并行的 Channel 的数量,这个不是 Java 中的 Semaphore,我在讲 Mutex 时有讲这个对象,感兴趣的同学可以参考我前面的文章: Kotlin 协程源码阅读笔记 —— Mutex;然后订阅上游的流,类型是 Flow<Flow<R>>,收到的元素类型是 Flow<R>,收到元素后通过 Semaphore 来控制并行数量,如果可以执行,通过 launch() 方法来创建一个新的协程来订阅这个 Flow<R> 中的数据,对应的 FlowCollector 实现是 SendingCollector,我们来看看它的实现:

public class SendingCollector<T>(
    private val channel: SendChannel<T>
) : FlowCollector<T> {
    override suspend fun emit(value: T): Unit = channel.send(value)
}

简单高效,直接把上游发送来的数据,通过 Channel 发送出去,根据上面的代码我们知道,这个 Channel 中的数据最终会被发送给下游。

到这里 flatMapMerge() 操作符就分析完毕了,你看懂了吗?没有看懂可以再看一遍。

flatMapLatest() 操作符

public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
   transformLatest { emitAll(transform(it)) }

flatMapLatest() 借助了 transformLatest() 方法,在他的 Lambda 中直接将返回的 Flow<R> 中的元素发送给下游。

public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> =
    ChannelFlowTransformLatest(transform, this)

这里实现是通过 ChannelFlowTransformLatest 对象,它也是一个 ChannelFlow,它是继承于 ChannelFlowOperatorChannelFlowOperator 继承于 ChannelFlow。由上面分析 ChannleFlow 我们直到最终的订阅时会构建一个协程,然后协程执行开始时的函数时 collectTo() 方法,所以我们也从 ChannelFlowOperator#collectTo() 方法开始分析:

protected override suspend fun collectTo(scope: ProducerScope<T>) =
    flowCollect(SendingCollector(scope))

先构建了一个 SendingCollector 对象(前面有分析了),然后调用 flowCollect() 方法,它是一个抽象方法,我们看看 ChannelFlowTransformLatest#flowCollect() 的实现:

override suspend fun flowCollect(collector: FlowCollector<R>) {
    assert { collector is SendingCollector } // So cancellation behaviour is not leaking into the downstream
    coroutineScope {
        var previousFlow: Job? = null
        flow.collect { value ->
            previousFlow?.apply {
                cancel(ChildCancelledException())
                join()
            }
            // Do not pay for dispatch here, it's never necessary
            previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
                collector.transform(value)
            }
        }
    }
}

它处理每一个 Flow<R> 也是创建一个新的协程,当有新的 Flow<R> 元素来的时候,就会把上次的处理 Flow<R> 的任务取消。

flowOn() 操作符

我们通常使用 flowOn() 操作符来切换 ContinuationInterceptor,通过这样来控制下游执行的线程,准确地说 flowOn() 是用来控制下游 CoroutineContext 的。 我们看看它的实现:

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
    checkFlowContext(context)
    return when {
        context == EmptyCoroutineContext -> this
        this is FusibleFlow -> fuse(context = context)
        else -> ChannelFlowOperatorImpl(this, context = context)
    }
}

以上代码分为 3 种情况:

  1. CoroutineContext 为空,直接不处理。
  2. 如果是 FusibleFlow 就使用它的 fuse() 方法来返回一个新的 Flow,巧了不是 ChannelFlow 就是继承于 FusibleFlow
  3. 其他情况通过 ChannelFlowOperatorImpl 来处理,他也是一个 ChannelFlow,和 flatMapLatest() 一样它的实现也是通过继承 ChannelFlowOperator

我们以 ChannelFlowMerge#fuse() 的实现来看看 fuse() 方法:

public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> {
    assert { capacity != Channel.CONFLATED } // CONFLATED must be desugared to (0, DROP_OLDEST) by callers
    // note: previous upstream context (specified before) takes precedence
    val newContext = context + this.context
    val newCapacity: Int
    val newOverflow: BufferOverflow
    if (onBufferOverflow != BufferOverflow.SUSPEND) {
        // this additional buffer never suspends => overwrite preceding buffering configuration
        newCapacity = capacity
        newOverflow = onBufferOverflow
    } else {
        // combine capacities, keep previous overflow strategy
        newCapacity = when {
            this.capacity == Channel.OPTIONAL_CHANNEL -> capacity
            capacity == Channel.OPTIONAL_CHANNEL -> this.capacity
            this.capacity == Channel.BUFFERED -> capacity
            capacity == Channel.BUFFERED -> this.capacity
            else -> {
                // sanity checks
                assert { this.capacity >= 0 }
                assert { capacity >= 0 }
                // combine capacities clamping to UNLIMITED on overflow
                val sum = this.capacity + capacity
                if (sum >= 0) sum else Channel.UNLIMITED // unlimited on int overflow
            }
        }
        newOverflow = this.onBufferOverflow
    }
    if (newContext == this.context && newCapacity == this.capacity && newOverflow == this.onBufferOverflow)
        return this
    return create(newContext, newCapacity, newOverflow)
}

根据传入的参数构建一个新的 CoroutineContext,剩下的一大堆代码其实就是为了计算出一个新的 capacity,具体的计算逻辑大家自己看看,其实就是 Channel 中的容量,然后调用 create() 方法。我们看看 ChannelFlowMerge#create() 方法的实现:

override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
    ChannelFlowMerge(flow, concurrency, context, capacity, onBufferOverflow)

然后我们再看看 ChannelFlowOperatorImpl#flowCollect() 方法的实现:

override suspend fun flowCollect(collector: FlowCollector<T>) =
    flow.collect(collector)

因为 flowCollect() 会工作在新的 CoroutineContext 的协程中,所以就不用特殊处理了,直接将上游的数据发送给下游就好了,下游就会工作在新的 CoroutineContext 中了。

最后

本篇文章介绍了 Flow 的基本工作原理和一些常用的操作符,如果你想知道某个操作符具体是用来干什么的,看源码比看注释可能更容易理解,操作符的源码本来也就不难,和我分析的操作符的方式也都是大同小异,本篇文章也是 2023 年的最后一篇文章,由它来作为 2023 年的句号,希望 2024 年能够更加好,祝大家新年快乐!!!!


标签:笔记Kotlin程源Flow


程序开发学习排行
最近发表
网站分类
标签列表