mirror of
synced 2025-02-22 18:00:17 +08:00
Refine nextEvent and syncFromEvent: handle exceptions correctly
This commit is contained in:
@ -1,5 +1,5 @@
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
@ -10,36 +10,109 @@
package net.mamoe.mirai.event
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.firstOrNull
import kotlin.coroutines.resume
import kotlin.reflect.KClass
* 挂起当前协程, 直到监听到事件 [E] 的广播并通过 [filter], 返回这个事件实例.
* @param filter 过滤器. 返回 `true` 时表示得到了需要的实例. 返回 `false` 时表示继续监听
* 本函数是 [EventChannel.subscribe] 的衍生工具函数, 内部会调用 [EventChannel.subscribe].
* @see EventChannel.subscribe 普通地监听一个事件
* @see EventChannel.syncFromEvent 挂起当前协程, 并尝试从事件中同步一个值
* ## 挂起可取消
* 本函数的挂起过程可以被[取消][CancellableContinuation.cancel]. 这意味着若在 [CoroutineScope.launch] 中使用本函数, 则 [launch] 启动的 [Job] 可以通过 [Job.cancel] 取消 (停止), 届时本函数会抛出 [CancellationException].
* ## 异常处理
* [filter] 抛出的异常属于监听方异常, 将会由 [nextEvent] 原样重新抛出.
* ## 使用 [Flow] 的替代方法
* 在 Kotlin 可使用 [EventChannel.asFlow] 配合 [Flow.filter] 和 [Flow.first] 实现与 [nextEvent] 相似的功能 (注意, Flow 方法将会使用 [EventPriority.MONITOR] 优先级).
* 示例:
* ```
* val event: GroupMessageEvent = GlobalEventChannel.asFlow().filterIsInstance<GroupMessageEvent>().filter { it.sender.id == 123456 }.first()
* // 上下两行代码等价
* val event: GroupMessageEvent = GlobalEventChannel.filterIsInstance<GroupMessageEvent>().nextEvent(EventPriority.MONITOR) { it.sender.id == 123456 }
* ```
* 由于 [Flow] 拥有更多操作 (如 [Flow.firstOrNull]), 在不需要指定[事件优先级][EventPriority]时使用 [Flow] 拥有更高自由度.
* @param filter 过滤器. 返回 `true` 时表示得到了需要的实例. 返回 `false` 时表示继续监听
* @since 2.10
public suspend inline fun <reified E : Event> EventChannel<*>.nextEvent(
priority: EventPriority = EventPriority.NORMAL,
noinline filter: suspend (E) -> Boolean = { true }
): E = coroutineScope { this@nextEvent.nextEventImpl(E::class, this@coroutineScope, priority, filter) }
): E = coroutineScope {
suspendCancellableCoroutine { cont ->
var listener: Listener<E>? = null
listener = this@nextEvent.parentScope(this@coroutineScope).subscribe(E::class, priority = priority) { event ->
val result = kotlin.runCatching {
if (!filter(event)) return@subscribe ListeningStatus.LISTENING
try {
} finally {
listener?.complete() // ensure completed on exceptions
return@subscribe ListeningStatus.STOPPED
cont.invokeOnCancellation {
kotlin.runCatching { listener.cancel("nextEvent outer scope cancelled", it) }
* 挂起当前协程, 监听事件 [E], 并尝试从这个事件中**获取**一个值, 在超时时抛出 [TimeoutCancellationException]
* 本函数是 [EventChannel.subscribe] 的衍生工具函数, 内部会调用 [EventChannel.subscribe].
* ## 挂起可取消
* 本函数的挂起过程可以被[取消][CancellableContinuation.cancel]. 这意味着若在 [CoroutineScope.launch] 中使用本函数, 则 [launch] 启动的 [Job] 可以通过 [Job.cancel] 取消 (停止), 届时本函数会抛出 [CancellationException].
* ## 异常处理
* [filter] 抛出的异常属于监听方异常, 将会由 [nextEvent] 原样重新抛出.
* ## 使用 [Flow] 的替代方法
* 在 Kotlin 可使用 [EventChannel.asFlow] 配合 [Flow.filter] 和 [Flow.first] 实现与 [nextEvent] 相似的功能 (注意, Flow 方法将会使用 [EventPriority.MONITOR] 优先级).
* 示例:
* ```
* val senderId: Long = GlobalEventChannel.asFlow()
* .filterIsInstance<GroupMessageEvent>()
* .filter { it.sender.id == 123456L }
* .map { it.sender.id }.first()
* // 上下代码等价
* val senderId: Long = GlobalEventChannel
* .filterIsInstance<GroupMessageEvent>()
* .syncFromEvent(EventPriority.MONITOR) { if (it.sender.id = 123456) it.sender.name else null }
* ```
* 由于 [Flow] 拥有更多操作且逻辑更清晰, 在不需要指定[事件优先级][EventPriority]时更推荐使用 [Flow].
* @param mapper 过滤转换器. 返回非 null 则代表得到了需要的值. [syncFromEvent] 会返回这个值
* @see asyncFromEvent 本函数的异步版本
* @see EventChannel.subscribe 普通地监听一个事件
* @see nextEvent 挂起当前协程, 并获取下一个事件实例
* @see syncFromEventOrNull 本函数的在超时后返回 `null` 的版本
* @throws Throwable 当 [mapper] 抛出任何异常时, 本函数会抛出该异常
* @since 2.10
@ -47,13 +120,34 @@ public suspend inline fun <reified E : Event> EventChannel<*>.nextEvent(
public suspend inline fun <reified E : Event, R : Any> EventChannel<*>.syncFromEvent(
priority: EventPriority = EventPriority.NORMAL,
noinline mapper: suspend (E) -> R?
): R = coroutineScope { this@syncFromEvent.syncFromEventImpl(E::class, this, priority, mapper) }
): R = coroutineScope {
suspendCancellableCoroutine { cont ->
var listener: Listener<E>? = null
listener = this@syncFromEvent.parentScope(this).subscribe(E::class, priority = priority) { event ->
val result = kotlin.runCatching {
mapper(event) ?: return@subscribe ListeningStatus.LISTENING
try {
} finally {
listener?.complete() // ensure completed on exceptions
return@subscribe ListeningStatus.STOPPED
cont.invokeOnCancellation {
kotlin.runCatching { listener.cancel("syncFromEvent outer scope cancelled", it) }
* @since 2.10
@Deprecated("For binary compatibility")
internal suspend fun <E : Event> EventChannel<Event>.nextEventImpl(
eventClass: KClass<E>,
coroutineScope: CoroutineScope,
@ -82,6 +176,7 @@ internal suspend fun <E : Event> EventChannel<Event>.nextEventImpl(
* @since 2.10
@Deprecated("For binary compatibility")
internal suspend fun <E : Event, R> EventChannel<*>.syncFromEventImpl(
eventClass: KClass<E>,
coroutineScope: CoroutineScope,
@ -1,10 +1,10 @@
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* https://github.com/mamoe/mirai/blob/master/LICENSE
* https://github.com/mamoe/mirai/blob/dev/LICENSE
@ -52,6 +52,7 @@ public suspend inline fun <reified E : Event, R : Any> syncFromEvent(
): R {
require(timeoutMillis == -1L || timeoutMillis > 0) { "timeoutMillis must be -1 or > 0" }
return if (timeoutMillis == -1L) {
coroutineScope {
GlobalEventChannel.syncFromEventImpl(E::class, this, priority) { mapper.invoke(it, it) }
@ -92,6 +93,7 @@ public suspend inline fun <reified E : Event, R : Any> syncFromEventOrNull(
require(timeoutMillis > 0) { "timeoutMillis must be > 0" }
return withTimeoutOrNull(timeoutMillis) {
GlobalEventChannel.syncFromEventImpl(E::class, this, priority) { mapper.invoke(it, it) }
@ -20,6 +20,7 @@ import org.junit.jupiter.api.assertThrows
import java.util.concurrent.Executors
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertIs
import kotlin.test.assertTrue
@ -111,6 +112,25 @@ internal class NextEventTest : AbstractEventTest() {
suspend fun `nextEvent can cancel`() {
val channel = GlobalEventChannel
withContext(dispatcher) {
coroutineScope {
val job = launch {
val result = kotlin.runCatching { channel.nextEvent<TE>(EventPriority.MONITOR) }
assertTrue { result.isFailure }
throw result.exceptionOrNull()!!
assertTrue { job.isActive }
assertTrue { job.isCancelled }
// nextEventOrNull
Reference in New Issue
Block a user