联系我们
简单又实用的WordPress网站制作教学
当前位置:网站首页 > 程序开发学习 > 正文

Kotlin 协程源码阅读笔记 —— Channel

作者:访客发布时间:2024-01-03分类:程序开发学习浏览:123


导读:Kotlin协程源码阅读笔记——Channel在前面的文章中我介绍了Flow,如果对Flow感兴趣的同学可以看看:Kotlin协程源码阅读笔记——Flow,本篇...

Kotlin 协程源码阅读笔记 —— Channel

在前面的文章中我介绍了 Flow,如果对 Flow 感兴趣的同学可以看看:Kotlin 协程源码阅读笔记 —— Flow,本篇文章主要介绍 Channel,相对于 Flow 是一种冷流,而 Channel 的表现更加像一种热流,准备好了就开始今天的内容吧。

源码阅读基于 Kotlin 协程 1.8.0-RC2

Channel 的简单使用

我们通常是使用 Channel() 方法来创建一个 Channel 实例,其中有以下几个参数,我来简单描述一下它们:

  • capacityChannel 的容量,当调用 send() 方法发送数据时,但是没有地方通过 receive() 方法来接收消费这个数据,那么就会占用这个容量。

  • onBufferOverflow:当 capacity 占用满了后,再调用 send() 方法的处理逻辑,主要包括以下几种:

    • SUSPEND: 将 send() 方法挂起,直到 capacity 容量可以用为止。
    • DROP_OLDEST: 丢弃最久的一条数据。
    • DROP_LATEST: 丢弃最新的一条数据。
  • onUndeliveredElement: 当 onBufferOverflowDROP_OLDEST 或者 DROP_LATEST 时,丢弃数据时会回该对象;在 Channel 关闭后继续发送消息,也会回调该对象。

ChannelFlow 不同的是,它每次的 send() 方法只能有一个对应的 receive() 方法能够接收到这个数据,我这里写一个 demo

        runBlocking(Dispatchers.Default) {
            val myChannel = Channel<Int>()
            launch {
                repeat(10) {
                    delay(100)
                    myChannel.send(it)
                    println("Send: $it")
                }
                myChannel.close()
                println("Send Finish")
            }

            launch {
                for (e in myChannel) {
                    delay(200)
                    println("Receive1: $e")
                }
                println("Receive1 Finish")
            }

            launch {
                for (e in myChannel) {
                    delay(200)
                    println("Receive2: $e")
                }
                println("Receive2 Finish")
            }
        }

最终的输出结果是:

Send: 0
Send: 1
Receive1: 0
Send: 2
Receive2: 1
Send: 3
Receive1: 2
Send: 4
Receive2: 3
Send: 5
Receive1: 4
Send: 6
Receive2: 5
Send: 7
Receive1: 6
Send: 8
Receive2: 7
Send: 9
Send Finish
Receive1: 8
Receive1 Finish
Receive2: 9
Receive2 Finish

我们发现由协程1和协程2它们交替来获取发送的数据。

我们再写一个 demo 来看看 Channel 丢弃数据的逻辑:

runBlocking(Dispatchers.Default) {
    val myChannel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
        println("Send Drop: $it")
    }
    launch {
        repeat(10) {
            delay(100)
            myChannel.send(it)
            println("Send: $it")
        }
        myChannel.close()
        println("Send Finish")
    }

    launch {
        for (e in myChannel) {
            delay(300)
            println("Receive1: $e")
        }
        println("Receive1 Finish")
    }
}

首先发送方发送的速度比处理方的处理的速度要快,这里就会触发 BufferOverFlow 的逻辑,我设置的 Channel 容量是 2,超过容量后会丢弃最久的一条数据。

Send: 0
Send: 1
Send: 2
Receive1: 0
Send: 3
Send Drop: 2
Send: 4
Send Drop: 3
Send: 5
Receive1: 1
Send: 6
Send Drop: 5
Send: 7
Send Drop: 6
Send: 8
Receive1: 4
Send: 9
Send Finish
Receive1: 7
Receive1: 8
Receive1: 9
Receive1 Finish

相信到这里你也知道如何使用 Channel 了,它的使用本来也非常简单,那么后续就开始分析 Channel 的工作原理了。

Channel 工作原理

直接看看 Channel() 方法的实现:

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> {
            if (onBufferOverflow == BufferOverflow.SUSPEND)
                BufferedChannel(RENDEZVOUS, onUndeliveredElement) // an efficient implementation of rendezvous channel
            else
                ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
        }
        CONFLATED -> {
            require(onBufferOverflow == BufferOverflow.SUSPEND) {
                "CONFLATED capacity cannot be used with non-default onBufferOverflow"
            }
            ConflatedBufferedChannel(1, BufferOverflow.DROP_OLDEST, onUndeliveredElement)
        }
        UNLIMITED -> BufferedChannel(UNLIMITED, onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
        BUFFERED -> { // uses default capacity with SUSPEND
            if (onBufferOverflow == BufferOverflow.SUSPEND) BufferedChannel(CHANNEL_DEFAULT_CAPACITY, onUndeliveredElement)
            else ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement)
        }
        else -> {
            if (onBufferOverflow === BufferOverflow.SUSPEND) BufferedChannel(capacity, onUndeliveredElement)
            else ConflatedBufferedChannel(capacity, onBufferOverflow, onUndeliveredElement)
        }
    }

总的来说就是如果 BufferOverflowSUSPEND 那么 Channel 的实现类就是 BufferedChannel;其他的情况就是 ConflatedBufferedChannel。所以我们的分析方向也是针对 BufferedChannelConflatedBufferedChannel 他们对 Channel 的实现。

BufferedChannel

首先看看它的构造函数:

init {
    @Suppress("LeakingThis")
    val firstSegment = ChannelSegment(id = 0, prev = null, channel = this, pointers = 3)
    sendSegment = atomic(firstSegment)
    receiveSegment = atomic(firstSegment)
    // If this channel is rendezvous or has unlimited capacity, the algorithm never
    // invokes the buffer expansion procedure, and the corresponding segment reference
    // points to a special `NULL_SEGMENT` one and never updates.
    @Suppress("UNCHECKED_CAST")
    bufferEndSegment = atomic(if (isRendezvousOrUnlimited) (NULL_SEGMENT as ChannelSegment<E>) else firstSegment)
}

首先构建一个 ChannelSegment,然后 sendSegmentreceiveSegment 都是指向它,如果不是无限大的缓存 bufferEndSegment 也会指向这个 ChannelSegment

我在讲 Mutex 的时候也有说到 Segment,他们的工作方式也都是类似的,如果感兴趣可以看看:Kotlin 协程源码阅读笔记 —— Mutex。为了方便后续更好的理解源码我在这里先大体介绍一下 Segment 的工作方式,在 Segment 中可以存放 32 个(Mutex 中是 16 个)发送的数据和其对应的状态,当 Segment 已经满了后就会创建新的 Segment,而旧的 Segment 就会指向新的 SegmentSegmentSegment 之间是以链表的数据结构的方式建立连接。上面源码中的 sendSegment 指向下一次发送数据时的 SegmentreceiveSegment 指向下一次接收数据对应的 SegmentbufferEndSegment 始终指向 Semgent 链表的最后一个数据。所以 sendSegmentreceiveSegmentbufferEndSegment 他们都是指向同一个链表的不同的节点。

发送数据

override suspend fun send(element: E): Unit =
    sendImpl( // <-- this is an inline function
        element = element,
        // Do not create a continuation until it is required;
        // it is created later via [onNoWaiterSuspend], if needed.
        waiter = null,
        // Finish immediately if a rendezvous happens
        // or the element has been buffered.
        onRendezvousOrBuffered = {},
        // As no waiter is provided, suspension is impossible.
        onSuspend = { _, _ -> assert { false } },
        // According to the `send(e)` contract, we need to call
        // `onUndeliveredElement(..)` handler and throw an exception
        // if the channel is already closed.
        onClosed = { onClosedSend(element) },
        // When `send(e)` decides to suspend, the corresponding
        // `onNoWaiterSuspend` function that creates a continuation
        // is called. The tail-call optimization is applied here.
        onNoWaiterSuspend = { segm, i, elem, s -> sendOnNoWaiterSuspend(segm, i, elem, s) }
    )

这里发送数据调用的是 sendImpl() 方法,这里要注意它的参数 waiter 是传入的空,这是后续分析的一个关键逻辑;onRendezvousOrBuffered 参数表示当前调用不需要挂起时会回调,不过是一个空的实现;onNoWaiterSuspend 参数表示需要挂起时的回调;onClosed 参数表示 Channel 被关闭后的回调。其他的回调在 send() 方法中都不会触发。

我们继续看看 sendImpl() 方法的实现:

protected inline fun <R> sendImpl(
    element: E,
    waiter: Any?,
    onRendezvousOrBuffered: () -> R,
    onSuspend: (segm: ChannelSegment<E>, i: Int) -> R,
    onClosed: () -> R,
    onNoWaiterSuspend: (
        segm: ChannelSegment<E>,
        i: Int,
        element: E,
        s: Long
    ) -> R = { _, _, _, _ -> error("unexpected") }
): R {
    var segment = sendSegment.value
    while (true) {
        // 获取 Sender 的状态和序号
        val sendersAndCloseStatusCur = sendersAndCloseStatus.getAndIncrement()
        // Sender 的序号,它是每次调用 sendImpl() 方法都会加 1
        val s = sendersAndCloseStatusCur.sendersCounter
        // 获取当前 Sender 的状态 是否已经关闭
        val closed = sendersAndCloseStatusCur.isClosedForSend0
        
        // 获取对应 Segment 的 ID
        val id = s / SEGMENT_SIZE
        // 获取对应在 Segment 中的 Index
        val i = (s % SEGMENT_SIZE).toInt()
        if (segment.id != id) {
            // 如果 senderSegment 中的 id 和计算出来的 id 不一致,表示需要去查找或者创建正确的 Segment
            segment = findSegmentSend(id, segment) ?:
                if (closed) {
                    return onClosed()
                } else {
                    continue
                }
        }
        // 更新对应的 Segment 第 i 位置的元素和对应的状态。
        when (updateCellSend(segment, i, element, s, waiter, closed)) {
            // 表示有地方调用 receive() 方法,不需要挂起
            RESULT_RENDEZVOUS -> {
                segment.cleanPrev()
                return onRendezvousOrBuffered()
            }
            // 表示已经存储到 Buffer 中,不需要挂起
            RESULT_BUFFERED -> {
  
                return onRendezvousOrBuffered()
            }
            RESULT_SUSPEND -> {
                // ...
            }
            // Channel 已经关闭
            RESULT_CLOSED -> {
                
                if (s < receiversCounter) segment.cleanPrev()
                return onClosed()
            }
            // 失败
            RESULT_FAILED -> {
                
                segment.cleanPrev()
                continue
            }
            // 挂起
            RESULT_SUSPEND_NO_WAITER -> {
                
                return onNoWaiterSuspend(segment, i, element, s)
            }
        }
    }
}

我注释掉了一些当前流程不会执行的逻辑,我解释一下上面的代码:

  1. 首先获取 sender 的序号和其对应的状态,sendersAndCloseStatusCur 中保存了 sender 的序号和状态,它每次调用 sendImpl() 方法都会加 1,sendersAndCloseStatusCur 是用 Long 类型来保存序号和状态的,低 60 位用来保存序号,高 4 位用来保存状态。
  2. 通过 sender 的序号来计算 Segmentid 和对应当前的元素在 Segment 中的 index
  3. 如果当前 senderSegmentid 和计算出来的不一致,通过 findSegmentSend() 方法去查找或者创建一个新的 Segment
  4. 通过 updateCellSend() 方法来更新对应的 Segment 第 i 位置的元素和对应的状态。在我们的逻辑中如果返回 RESULT_BUFFERED / RESULT_RENDEZVOUS 表示不需要挂起;如果返回 RESULT_SUSPEND_NO_WAITER 就表示需要挂起。

我们再来看看 updateCellSend() 方法是如何更新 Segment 中的元素和对应的状态的:

private fun updateCellSend(
    segment: ChannelSegment<E>,
    index: Int,
    element: E,
    s: Long,
    waiter: Any?,
    closed: Boolean
): Int {
    // 存储发送的元素到 Segment 中
    segment.storeElement(index, element)
    if (closed) return updateCellSendSlow(segment, index, element, s, waiter, closed)
    // 获取对应的状态
    val state = segment.getState(index)
    when {
        // 没有对应的 receive() 方法在等待
        state === null -> {
            // 判断是否达到 capacity 上限
            if (bufferOrRendezvousSend(s)) {
                // 没有达到 capacity 上限,直接更新状态为 BUFFERED
                if (segment.casState(index, null, BUFFERED)) {
                    return RESULT_BUFFERED
                }
            } else {
                // 达到 capacity 上限,需要挂起
                if (waiter == null) {
                    // 我们的当前逻辑执行这里
                    return RESULT_SUSPEND_NO_WAITER
                } else {
                   // 如果 waiter 不为空将其设置到 state 中
                    if (segment.casState(index, null, waiter)) return RESULT_SUSPEND
                }
            }
        }
        
        // 表示有 receive() 方法正在等待发送的数据
        state is Waiter -> {
            // 这个 waiter 其实就是等待的 receive() 方法对应的 Continuation,这里也相当于调用它的 resumeWith() 方法来恢复挂起的 receive() 方法。
            return if (state.tryResumeReceiver(element)) {
                // 更新状态表示完成
                segment.setState(index, DONE_RCV)
                onReceiveDequeued()
                RESULT_RENDEZVOUS
            } else {
                if (segment.getAndSetState(index, INTERRUPTED_RCV) !== INTERRUPTED_RCV) {
                    segment.onCancelledRequest(index, true)
                }
                RESULT_FAILED
            }
        }
    }
    return updateCellSendSlow(segment, index, element, s, waiter, closed)
}

首先将元素储存到 Segment 中,然后从 Segment 中获取对应的状态,这个时候分为两种情况:

  1. state 为空
    这表示当前没有地方调用 receive() 在等待数据。这里还分为两种情况,缓存是否可用,是通过 bufferOrRendezvousSend() 方法判断,如果可用直接返回 RESULT_BUFFERED,这种情况不需要挂起;如果不可用需要判断是否有传递 waiter 参数,如果有传递就返回 RESULT_SUSPEND,同时将这个 waiter 添加到 Segment 中,这个 waiter 其实就是协程中的 Continuation 对象,当有地方调用 receive() 方法时就会获取到这个 waiter,然后获取到这个元素,然后调用 Continuation#resumeWith() 方法将其恢复。我们当前的情况是 waiter 为空,直接返回 RESULT_SUSPEND_NO_WAITER,无论是 RESULT_SUSPEND 还是 RESULT_SUSPEND_NO_WAITER 最终 send() 方法都会被挂起。
  2. stateWaiter
    这种情况表示有 receive() 方法正在等待数据,上面也说到这个 Waiter 其实是 Continuation 对象,所以我们就需要恢复正在等待的 receive() 方法,这里使用的是 tryResumeReceiver() 方法,它最终会调用 Continaution#resumeWith() 方法,当 receive() 正常恢复后,会将 Segment 对应的状态设置为 DONE_RCV,表示该元素已经接收。

我们再回过头来看看 send() 方法:

override suspend fun send(element: E): Unit =
    sendImpl( // <-- this is an inline function
        element = element,
        // Do not create a continuation until it is required;
        // it is created later via [onNoWaiterSuspend], if needed.
        waiter = null,
        // Finish immediately if a rendezvous happens
        // or the element has been buffered.
        onRendezvousOrBuffered = {},
        // As no waiter is provided, suspension is impossible.
        onSuspend = { _, _ -> assert { false } },
        // According to the `send(e)` contract, we need to call
        // `onUndeliveredElement(..)` handler and throw an exception
        // if the channel is already closed.
        onClosed = { onClosedSend(element) },
        // When `send(e)` decides to suspend, the corresponding
        // `onNoWaiterSuspend` function that creates a continuation
        // is called. The tail-call optimization is applied here.
        onNoWaiterSuspend = { segm, i, elem, s -> sendOnNoWaiterSuspend(segm, i, elem, s) }
    )

其中 onRendezvousOrBuffered 回调就表示已经缓存或者已经发送给 receive() 方法,这时不用做任何操作;而 onNoWaiterSuspend 回调表示没有 receive() 方法在等待数据同时 Buffer 也已经满了,所以需要挂起当前的 send() 方法,这里是调用 sendOnNoWaiterSuspend() 方法来处理挂起。我们继续看看 sendOnNoWaiterSuspend() 方法的实现:

private suspend fun sendOnNoWaiterSuspend(
    /* The working cell is specified by
    the segment and the index in it. */
    segment: ChannelSegment<E>,
    index: Int,
    /** The element to be inserted. */
    element: E,
    /** The global index of the cell. */
    s: Long
) = suspendCancellableCoroutineReusable sc@{ cont ->
    sendImplOnNoWaiter(
        segment = segment, index = index, element = element, s = s,
        waiter = cont,
        onRendezvousOrBuffered = { cont.resume(Unit) },
        onClosed = { onClosedSendOnNoWaiterSuspend(element, cont) },
    )
}

这里会调用 sendImplOnNoWaiter() 方法,注意这里它会用它的 Continuation 对象作为 waiter,我们继续看看 sendImplOnNoWaiter() 方法的实现:

private inline fun sendImplOnNoWaiter(
    segment: ChannelSegment<E>,
    index: Int,
    element: E,
    s: Long,
    waiter: Waiter,
    onRendezvousOrBuffered: () -> Unit,
    onClosed: () -> Unit,
) {
    when (updateCellSend(segment, index, element, s, waiter, false)) {
        RESULT_RENDEZVOUS -> {
            segment.cleanPrev()
            onRendezvousOrBuffered()
        }
        RESULT_BUFFERED -> {
            onRendezvousOrBuffered()
        }
        RESULT_SUSPEND -> {
            waiter.prepareSenderForSuspension(segment, index)
        }
        RESULT_CLOSED -> {
            if (s < receiversCounter) segment.cleanPrev()
            onClosed()
        }
        RESULT_FAILED -> {
            segment.cleanPrev()
            sendImpl(
                element = element,
                waiter = waiter,
                onRendezvousOrBuffered = onRendezvousOrBuffered,
                onSuspend = { _, _ -> },
                onClosed = onClosed,
            )
        }
        else -> error("unexpected")
    }
}

这里会继续调用 updateCellSend() 方法,上面已经分析过这个方法了,不同的是这次传递的 waiter 不为空,按照正常的情况会返回 RESULT_SUSPEND,并把当前的 waiter 添加到 Segment 中等待 receive() 方法将当前的 waiter 唤醒,使 send() 方法恢复执行。由于多线程的原因,这个时候状态也可能变为 RESULT_RENDEZVOUS 或者 RESULT_BUFFERED,这时候就不需要挂起 send() 方法,直接调用 resumeWith() 方法恢复。

接收数据

override suspend fun receive(): E =
    receiveImpl(
        waiter = null,
        onElementRetrieved = { element ->
            return element
        },
        onSuspend = { _, _, _ -> error("unexpected") },
        onClosed = { throw recoverStackTrace(receiveException) },
        onNoWaiterSuspend = { segm, i, r -> receiveOnNoWaiterSuspend(segm, i, r) }
    )

这里调用了 receiveImpl() 方法,它的参数和 sendImpl() 中的参数是类似的,这里的 waiter 依然为空,如果不需要挂起,通过 onElementRetrieved 参数回调返回对应的元素,如果需要挂起就会回调 onNoWaiterSuspend 参数,然后调用 receiveOnNoWaiterSuspend() 方法挂起 send() 方法。

private inline fun <R> receiveImpl(
    waiter: Any?,
    onElementRetrieved: (element: E) -> R,
    onSuspend: (segm: ChannelSegment<E>, i: Int, r: Long) -> R,
    onClosed: () -> R,
    onNoWaiterSuspend: (
        segm: ChannelSegment<E>,
        i: Int,
        r: Long
    ) -> R = { _, _, _ -> error("unexpected") }
): R {
    var segment = receiveSegment.value
    while (true) {
        if (isClosedForReceive) return onClosed()
        // 获取 receiver 的序号和将其序号加 1
        val r = this.receivers.getAndIncrement()
        // 通过序号计算 Segment 的 id 和对应的 index
        val id = r / SEGMENT_SIZE
        val i = (r % SEGMENT_SIZE).toInt()
        // 如果当前的 Segment 的 id 和计算的 id 不一致,需要查找正确的 Segment 或者创建新的 Segment.
        if (segment.id != id) {
            segment = findSegmentReceive(id, segment) ?:
                continue
        }
        // 更新 Segment 状态
        val updCellResult = updateCellReceive(segment, i, r, waiter)
        return when {
            // 挂起,但是有 waiter
            updCellResult === SUSPEND -> {
                (waiter as? Waiter)?.prepareReceiverForSuspension(segment, i)
                onSuspend(segment, i, r)
            }
            updCellResult === FAILED -> {
                if (r < sendersCounter) segment.cleanPrev()
                continue
            }
            // 挂起,但是没有 waiter
            updCellResult === SUSPEND_NO_WAITER -> {
                onNoWaiterSuspend(segment, i, r)
            }
            // 不需要挂起,并且已经获取到元素
            else -> {
                segment.cleanPrev()
                @Suppress("UNCHECKED_CAST")
                onElementRetrieved(updCellResult as E)
            }
        }
    }
}

简单说明一下上面的代码,很多地方的逻辑和 sendImpl() 方法类似:

  1. 获取 receiver 的序号和将其序号加 1,通过序号计算 Segmentid 和 对应的元素在 Segment 中的 index
  2. 如果当前的 Segmentid 和计算出来的 id 不一致,需要通过 findSegmentReceive() 方法去查找正确的 Segment,或者创建一个新的 Segment
  3. 调用 updateCellReceive() 方法更新 Segment 的状态,如果返回 SUSPEND 表示需要挂起,而且有 waiter;如果返回 SUSPEND_NO_WAITER 表示需要挂起,但是没有 waiter;返回 FAILED 表示出错,需要重试;其他情况就表示不需要挂起,返回值就是对应的元素。
private fun updateCellReceive(
    segment: ChannelSegment<E>,
    index: Int,
    r: Long,
    waiter: Any?,
): Any? {
    val state = segment.getState(index)
    when {
        // 表示当前没有数据,需要挂起
        state === null -> {
            val senders = sendersAndCloseStatus.value.sendersCounter
            if (r >= senders) {
                // waiter 为空直接返回 SUSPEND_NO_WAITER
                if (waiter === null) {
                    return SUSPEND_NO_WAITER
                }
                // waiter 不会空,将其 waiter 添加到 Segment 状态中去
                if (segment.casState(index, state, waiter)) {
                    expandBuffer()
                    return SUSPEND
                }
            }
        }
        // 表示可以直接取数据,同时不需要恢复 send() 方法,这里会把状态修改成 DONE_RCV,然后直接从 Segment 中获取对应的元素然后返回
        state === BUFFERED -> if (segment.casState(index, state, DONE_RCV)) {
            // Retrieve the element and expand the buffer.
            expandBuffer()
            return segment.retrieveElement(index)
        }
    }
    // 通过 updateCellReceiveSlow() 方法来处理其他情况, 包括需要恢复 send() 方法的逻辑
    return updateCellReceiveSlow(segment, index, r, waiter)
}

上面代码主要是针对 Segment 中的不同状态作出不同的处理:

  1. state 为空
    表示当前没有数据可用,需要挂起当前的 receive() 方法,这里分为两种情况:waiter 是否为空,如果为空直接返回 SUSPEND_NO_WAITER;如果不为空需要将状态修改为 waiter,然后返回 SUSPEND,这里就需要在调用 send() 方法时恢复它,前面讲发送数据的时候已经讲过这个逻辑。
  2. stateBUFFERED
    表示可以直接取数据,同时不需要恢复 send() 方法,这里会把状态修改成 DONE_RCV,然后直接从 Segment 中获取对应的元素然后返回。
  3. 其他情况
    通过 updateCellReceiveSlow() 方法来处理其他情况, 包括需要恢复 send() 方法的逻辑。

我们再看看 updateCellReceiveSlow() 方法:

private fun updateCellReceiveSlow(
    segment: ChannelSegment<E>,
    index: Int,
    r: Long,
    waiter: Any?,
): Any? {
    while (true) {
        val state = segment.getState(index)
        when {
            state === null || state === IN_BUFFER -> {
               // ...
            }
            // The cell stores a buffered element.
            state === BUFFERED -> {
              // ...
            }
            state === RESUMING_BY_EB -> continue
            else -> {
                if (segment.casState(index, state, RESUMING_BY_RCV)) {
                    
                    val helpExpandBuffer = state is WaiterEB
                    val sender = if (state is WaiterEB) state.waiter else state
                    // 恢复被挂起的 send() 方法
                    return if (sender.tryResumeSender(segment, index)) {
                        // 将状态修改为 DONE_RCV
                        segment.setState(index, DONE_RCV)
                        expandBuffer()
                        // 获取对应的元素,并返回
                        segment.retrieveElement(index)
                    } else {
                        segment.setState(index, INTERRUPTED_SEND)
                        segment.onCancelledRequest(index, false)
                        if (helpExpandBuffer) expandBuffer()
                        FAILED
                    }
                }
            }
        }
    }
}

上面的代码我注释掉了一些其他的逻辑的代码,只保留了恢复 send() 方法的逻辑,代码也非常简单,通过 tryResumeSender() 方法来恢复被挂起的 send() 方法,它最终也是调用的 Continuation#resumeWith() 方法,然后修改状态为 DONE_RCV,然后获取对应的元素并返回。

我们再继续看看 receive() 方法被挂起的逻辑,上面提到如果被挂起 receive() 方法会继续调用 receiveOnNoWaiterSuspend() 方法,我们看看它的实现:

private suspend fun receiveOnNoWaiterSuspend(
    segment: ChannelSegment<E>,
    index: Int,
    r: Long
) = suspendCancellableCoroutineReusable { cont ->
    receiveImplOnNoWaiter(
        segment = segment, index = index, r = r,
        waiter = cont,
        onElementRetrieved = { element ->
            // 表示 元素可以直接使用,就不需要挂起
            val onCancellation = onUndeliveredElement?.bindCancellationFun(element, cont.context)
            cont.resume(element, onCancellation)
        },
        onClosed = { onClosedReceiveOnNoWaiterSuspend(cont) },
    )
}

这里会调用 receiveImplOnNoWaiter() 方法,注意这里会直接将 Continuation 对象作为 waiter,我们再看看 receiveImplOnNoWaiter() 方法的实现:

private inline fun receiveImplOnNoWaiter(
    segment: ChannelSegment<E>,
    index: Int,
    r: Long,
    waiter: Waiter,
    onElementRetrieved: (element: E) -> Unit,
    onClosed: () -> Unit
) {
    val updCellResult = updateCellReceive(segment, index, r, waiter)
    when {
        updCellResult === SUSPEND -> {
            waiter.prepareReceiverForSuspension(segment, index)
        }
        updCellResult === FAILED -> {
            if (r < sendersCounter) segment.cleanPrev()
            receiveImpl(
                waiter = waiter,
                onElementRetrieved = onElementRetrieved,
                onSuspend = { _, _, _ -> },
                onClosed = onClosed
            )
        }
        else -> {
            segment.cleanPrev()
            @Suppress("UNCHECKED_CAST")
            onElementRetrieved(updCellResult as E)
        }
    }
}

同样是通过 updateCellReceive() 方法来更新状态,上面已经分析过了,不过不同的是这次的 waiter 不为空,如果需要挂起,这个 waiter 就会被添加到 Segment 的状态中,等待下次调用 send() 方法时才会恢复。

BufferedChannel 的小总结

到这里 BufferedChannel 就分析完毕了,你看懂了吗?如果没有看懂,我先总结一下,然后你再回去看看源码,一定能够看懂的。

  • 发送数据
    首先通过发送序列号(依次递增的)来计算 Segmentidindex,通过 id 去查找 Segment,如果没有就创建一个新的 Segment,将需要发送的数据添加到 Segment 中,然后获取 Segment 的状态:

    • 如果状态为空就表示没有调用 receive() 接收数据的地方,然后需要判断缓存是否可用,如果可用就直接将 Segment 状态修改为 BUFFERED;如果缓存不可用就将 suspend 函数中的 Continuation 作为状态添加到 Segment 中,当前 send() 函数将挂起,等待 receive() 函数调用将它恢复。
    • 如果状态是 waiter 对象,就表示当前有地方调用 receive() 方法正在等待元素,这个时候就会调用 waiter (也就是 receive() 函数中的 Continuation) 的 resumeWith() 函数恢复挂起的 receive() 函数,receive() 函数就能够获取到对应的元素。
  • 接收数据
    通过接收序号(依此递增的)来计算 Segmentidindex,通过 id 去查找 Segment,如果没有就创建一个新的 Segment,然后获取 Segment 的状态:

    • 如果状态为空就表示没有可用的元素,需要将当前 suspend 函数中的 Continuation 添加到 Segment 状态中去,当前的 receive() 函数将被挂起,等待下次调用 send() 方法时将它恢复。
    • 如果状态是 BUFFERED,表示数据可用,且不用恢复 send() 函数,直接从 Segment 中获取元素然后返回。
    • 如果状态是 waiter 类型,表示数据可用,但是需要恢复被挂起的 send() 函数,通过调用 waiterresumeWith() 函数恢复 send() 函数后,读取 Segment 中的元素然后返回。

ConflatedBufferedChannel

ConflatedBufferedChannel 它是继承于 BufferedChannel,当你理解了 BufferedChannel 后,它的代码看上去就非常简单。

发送数据

override suspend fun send(element: E) {
    // Should never suspend, implement via `trySend(..)`.
    trySendImpl(element, isSendOp = true).onClosed { // fails only when this channel is closed.
        onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
            it.addSuppressed(sendException)
            throw it
        }
        throw sendException
    }
}

这个 send() 方法永远不会被挂起,它调用了 trySendImpl() 函数,它是一个非 suspend 函数,它只会返回结果表示是否发送数据成功。我们继续看看 trySendImpl() 方法的实现:

private fun trySendImpl(element: E, isSendOp: Boolean) =
    if (onBufferOverflow === DROP_LATEST) trySendDropLatest(element, isSendOp)
    else trySendDropOldest(element)

这里会判断 onBufferOverflow 的类型,也就是判断 Buffer 满了后如何丢弃数据,如果是 DROP_LATEST 就调用 trySendDropLatest() 方法,满了后就丢弃最新的数据,反之就调用 trySendDropOldest() 方法,满了之后就丢弃最旧的数据。我们分别来看这两个方法。

trySendDropLatest()
private fun trySendDropLatest(element: E, isSendOp: Boolean): ChannelResult<Unit> {
    // Try to send the element without suspension.
    val result = super.trySend(element)
    // Complete on success or if this channel is closed.
    if (result.isSuccess || result.isClosed) return result
    // This channel is full. Drop the sending element.
    // Call the `onUndeliveredElement` lambda ONLY for 'send()' invocations,
    // for 'trySend()' it is responsibility of the caller
    if (isSendOp) {
        onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
            throw it
        }
    }
    return success(Unit)
}

这里会调用 BufferedChannel#trySend() 方法来发送数据,如果发送失败会调用 onUndeliveredElement 来通知被丢弃的数据。

我们再看看 BufferedChannel#trySend() 的实现:

override fun trySend(element: E): ChannelResult<Unit> {
    if (shouldSendSuspend(sendersAndCloseStatus.value)) return failure()
    return sendImpl( // <-- this is an inline function
        element = element,
        waiter = INTERRUPTED_SEND,
        onRendezvousOrBuffered = { success(Unit) },
        onSuspend = { segm, _ ->
            segm.onSlotCleaned()
            failure()
        },
        onClosed = { closed(sendException) }
    )
}

它的实现也是朴实无华,它也调用了我们上面分析过的 sendImpl() 方法,不过不同的是它的 waiter 不是空,而是 INTERRUPTED_SEND,当 sendImpl() 需要挂起时Segment 的状态会被设置成 INTERRUPTED_SEND,然后 onSuspend 中直接返回失败;如果成功会回调 onRendezvousOrBuffered 然后直接返回成功。当 receive() 方法遇到状态是 INTERRUPTED_SEND 时,会直接跳过。

trySendDropOldest()
private fun trySendDropOldest(element: E): ChannelResult<Unit> =
    sendImpl(
        element = element,
        waiter = BUFFERED,
        onRendezvousOrBuffered = { return success(Unit) },
        onSuspend = { segm, i ->
            dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(segm.id * SEGMENT_SIZE + i)
            return success(Unit)
        },
        onClosed = { return closed(sendException) }
    )

相对于上面分析的 trySend() 方法,这里的 waiter 修改成了 BUFFERED,然后在挂起的回调中还调用了 dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(),也就是被挂起的的时候状态会被修改成 BUFFERED,和使用缓存时候的状态一样。

然后我们再看看 BufferedChannel#dropFirstElementUntilTheSpecifiedCellIsInTheBuffer() 方法的源码实现:

protected fun dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(globalCellIndex: Long) {
    assert { isConflatedDropOldest }
    var segment = receiveSegment.value
    while (true) {
        val r = this.receivers.value
        if (globalCellIndex < max(r + capacity, bufferEndCounter)) return
        if (!this.receivers.compareAndSet(r, r + 1)) continue
        val id = r / SEGMENT_SIZE
        val i = (r % SEGMENT_SIZE).toInt()
        if (segment.id != id) {
            segment = findSegmentReceive(id, segment) ?:
                continue
        }
        val updCellResult = updateCellReceive(segment, i, r, null)
        when {
            updCellResult === FAILED -> {
                if (r < sendersCounter) segment.cleanPrev()
            }
            else -> { 
                @Suppress("UNCHECKED_CAST")
                onUndeliveredElement?.callUndeliveredElementCatchingException(updCellResult as E)?.let { throw it }
            }
        }
    }
}

上面的代码就相当于调用了一次 receive() 方法,也就是获取最久的一个元素,不同的是获取到的元素直接通知给 onUndeliveredElement

接收数据

接收数据和就是使用的 BufferedChannel#receive() 实现,前面已经分析过了。

最后

到这里 Channel 的介绍就已经结束了,如果还没有看懂那就多看几次吧。


标签:笔记Kotlin程源Channel


程序开发学习排行
最近发表
网站分类
标签列表