diff --git a/mirai-core-api/src/commonMain/kotlin/event/Extensions.kt b/mirai-core-api/src/commonMain/kotlin/event/Extensions.kt index 28f049e27..053eb52f3 100644 --- a/mirai-core-api/src/commonMain/kotlin/event/Extensions.kt +++ b/mirai-core-api/src/commonMain/kotlin/event/Extensions.kt @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 Mamoe Technologies and contributors. + * Copyright 2019-2022 Mamoe Technologies and contributors. * * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. @@ -10,36 +10,109 @@ package net.mamoe.mirai.event import kotlinx.coroutines.* +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.firstOrNull import kotlin.coroutines.resume import kotlin.reflect.KClass - /** * 挂起当前协程, 直到监听到事件 [E] 的广播并通过 [filter], 返回这个事件实例. * - * @param filter 过滤器. 返回 `true` 时表示得到了需要的实例. 返回 `false` 时表示继续监听 + * 本函数是 [EventChannel.subscribe] 的衍生工具函数, 内部会调用 [EventChannel.subscribe]. * - * @see EventChannel.subscribe 普通地监听一个事件 - * @see EventChannel.syncFromEvent 挂起当前协程, 并尝试从事件中同步一个值 + * ## 挂起可取消 + * + * 本函数的挂起过程可以被[取消][CancellableContinuation.cancel]. 这意味着若在 [CoroutineScope.launch] 中使用本函数, 则 [launch] 启动的 [Job] 可以通过 [Job.cancel] 取消 (停止), 届时本函数会抛出 [CancellationException]. + * + * ## 异常处理 + * + * [filter] 抛出的异常属于监听方异常, 将会由 [nextEvent] 原样重新抛出. + * + * ## 使用 [Flow] 的替代方法 + * + * 在 Kotlin 可使用 [EventChannel.asFlow] 配合 [Flow.filter] 和 [Flow.first] 实现与 [nextEvent] 相似的功能 (注意, Flow 方法将会使用 [EventPriority.MONITOR] 优先级). + * + * 示例: + * + * ``` + * val event: GroupMessageEvent = GlobalEventChannel.asFlow().filterIsInstance().filter { it.sender.id == 123456 }.first() + * // 上下两行代码等价 + * val event: GroupMessageEvent = GlobalEventChannel.filterIsInstance().nextEvent(EventPriority.MONITOR) { it.sender.id == 123456 } + * ``` + * + * 由于 [Flow] 拥有更多操作 (如 [Flow.firstOrNull]), 在不需要指定[事件优先级][EventPriority]时使用 [Flow] 拥有更高自由度. + * + * @param filter 过滤器. 返回 `true` 时表示得到了需要的实例. 返回 `false` 时表示继续监听 * * @since 2.10 */ public suspend inline fun EventChannel<*>.nextEvent( priority: EventPriority = EventPriority.NORMAL, noinline filter: suspend (E) -> Boolean = { true } -): E = coroutineScope { this@nextEvent.nextEventImpl(E::class, this@coroutineScope, priority, filter) } +): E = coroutineScope { + suspendCancellableCoroutine { cont -> + var listener: Listener? = null + listener = this@nextEvent.parentScope(this@coroutineScope).subscribe(E::class, priority = priority) { event -> + val result = kotlin.runCatching { + if (!filter(event)) return@subscribe ListeningStatus.LISTENING + event + } + + try { + cont.resumeWith(result) + } finally { + listener?.complete() // ensure completed on exceptions + } + return@subscribe ListeningStatus.STOPPED + } + + cont.invokeOnCancellation { + kotlin.runCatching { listener.cancel("nextEvent outer scope cancelled", it) } + } + } +} /** * 挂起当前协程, 监听事件 [E], 并尝试从这个事件中**获取**一个值, 在超时时抛出 [TimeoutCancellationException] * + * 本函数是 [EventChannel.subscribe] 的衍生工具函数, 内部会调用 [EventChannel.subscribe]. + * + * ## 挂起可取消 + * + * 本函数的挂起过程可以被[取消][CancellableContinuation.cancel]. 这意味着若在 [CoroutineScope.launch] 中使用本函数, 则 [launch] 启动的 [Job] 可以通过 [Job.cancel] 取消 (停止), 届时本函数会抛出 [CancellationException]. + * + * ## 异常处理 + * + * [filter] 抛出的异常属于监听方异常, 将会由 [nextEvent] 原样重新抛出. + * + * ## 使用 [Flow] 的替代方法 + * + * 在 Kotlin 可使用 [EventChannel.asFlow] 配合 [Flow.filter] 和 [Flow.first] 实现与 [nextEvent] 相似的功能 (注意, Flow 方法将会使用 [EventPriority.MONITOR] 优先级). + * + * 示例: + * + * ``` + * val senderId: Long = GlobalEventChannel.asFlow() + * .filterIsInstance() + * .filter { it.sender.id == 123456L } + * .map { it.sender.id }.first() + * + * // 上下代码等价 + * + * val senderId: Long = GlobalEventChannel + * .filterIsInstance() + * .syncFromEvent(EventPriority.MONITOR) { if (it.sender.id = 123456) it.sender.name else null } + * ``` + * + * 由于 [Flow] 拥有更多操作且逻辑更清晰, 在不需要指定[事件优先级][EventPriority]时更推荐使用 [Flow]. + * * @param mapper 过滤转换器. 返回非 null 则代表得到了需要的值. [syncFromEvent] 会返回这个值 * - * @see asyncFromEvent 本函数的异步版本 * @see EventChannel.subscribe 普通地监听一个事件 * @see nextEvent 挂起当前协程, 并获取下一个事件实例 * - * @see syncFromEventOrNull 本函数的在超时后返回 `null` 的版本 - * * @throws Throwable 当 [mapper] 抛出任何异常时, 本函数会抛出该异常 * * @since 2.10 @@ -47,13 +120,34 @@ public suspend inline fun EventChannel<*>.nextEvent( public suspend inline fun EventChannel<*>.syncFromEvent( priority: EventPriority = EventPriority.NORMAL, noinline mapper: suspend (E) -> R? -): R = coroutineScope { this@syncFromEvent.syncFromEventImpl(E::class, this, priority, mapper) } +): R = coroutineScope { + suspendCancellableCoroutine { cont -> + var listener: Listener? = null + listener = this@syncFromEvent.parentScope(this).subscribe(E::class, priority = priority) { event -> + val result = kotlin.runCatching { + mapper(event) ?: return@subscribe ListeningStatus.LISTENING + } + + try { + cont.resumeWith(result) + } finally { + listener?.complete() // ensure completed on exceptions + } + return@subscribe ListeningStatus.STOPPED + } + + cont.invokeOnCancellation { + kotlin.runCatching { listener.cancel("syncFromEvent outer scope cancelled", it) } + } + } +} /** * @since 2.10 */ @PublishedApi +@Deprecated("For binary compatibility") internal suspend fun EventChannel.nextEventImpl( eventClass: KClass, coroutineScope: CoroutineScope, @@ -82,6 +176,7 @@ internal suspend fun EventChannel.nextEventImpl( * @since 2.10 */ @PublishedApi +@Deprecated("For binary compatibility") internal suspend fun EventChannel<*>.syncFromEventImpl( eventClass: KClass, coroutineScope: CoroutineScope, diff --git a/mirai-core-api/src/commonMain/kotlin/event/deprecated.syncFromEvent.kt b/mirai-core-api/src/commonMain/kotlin/event/deprecated.syncFromEvent.kt index d6cab562b..5b2abe61b 100644 --- a/mirai-core-api/src/commonMain/kotlin/event/deprecated.syncFromEvent.kt +++ b/mirai-core-api/src/commonMain/kotlin/event/deprecated.syncFromEvent.kt @@ -1,10 +1,10 @@ /* - * Copyright 2019-2021 Mamoe Technologies and contributors. + * Copyright 2019-2022 Mamoe Technologies and contributors. * - * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. - * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. * - * https://github.com/mamoe/mirai/blob/master/LICENSE + * https://github.com/mamoe/mirai/blob/dev/LICENSE */ @file:Suppress("unused") @@ -52,6 +52,7 @@ public suspend inline fun syncFromEvent( ): R { require(timeoutMillis == -1L || timeoutMillis > 0) { "timeoutMillis must be -1 or > 0" } + @Suppress("DEPRECATION") return if (timeoutMillis == -1L) { coroutineScope { GlobalEventChannel.syncFromEventImpl(E::class, this, priority) { mapper.invoke(it, it) } @@ -92,6 +93,7 @@ public suspend inline fun syncFromEventOrNull( require(timeoutMillis > 0) { "timeoutMillis must be > 0" } return withTimeoutOrNull(timeoutMillis) { + @Suppress("DEPRECATION") GlobalEventChannel.syncFromEventImpl(E::class, this, priority) { mapper.invoke(it, it) } } } diff --git a/mirai-core/src/commonTest/kotlin/event/NextEventTest.kt b/mirai-core/src/commonTest/kotlin/event/NextEventTest.kt index c90d2e426..d79ca444a 100644 --- a/mirai-core/src/commonTest/kotlin/event/NextEventTest.kt +++ b/mirai-core/src/commonTest/kotlin/event/NextEventTest.kt @@ -20,6 +20,7 @@ import org.junit.jupiter.api.assertThrows import java.util.concurrent.Executors import kotlin.test.assertEquals import kotlin.test.assertFalse +import kotlin.test.assertIs import kotlin.test.assertTrue @JvmBlockingBridge @@ -111,6 +112,25 @@ internal class NextEventTest : AbstractEventTest() { } } + @Test + suspend fun `nextEvent can cancel`() { + val channel = GlobalEventChannel + + withContext(dispatcher) { + coroutineScope { + val job = launch { + val result = kotlin.runCatching { channel.nextEvent(EventPriority.MONITOR) } + assertTrue { result.isFailure } + assertIs(result.exceptionOrNull()) + throw result.exceptionOrNull()!! + } + assertTrue { job.isActive } + job.cancelAndJoin() + assertTrue { job.isCancelled } + } + } + } + /////////////////////////////////////////////////////////////////////////// // nextEventOrNull ///////////////////////////////////////////////////////////////////////////