NextEventAndIntercept and NextMessageAndIntercept (#2177)

* rewrote pr, no tested

* test and change param position

* enable ci

* finish test

* modified as required

* typo
This commit is contained in:
Eritque arcus 2022-08-23 06:07:03 -04:00 committed by GitHub
parent 3ae6c2802e
commit b4066ac6d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 85 additions and 2 deletions

View File

@ -51,6 +51,44 @@ import kotlin.reflect.KClass
public suspend inline fun <reified E : Event> EventChannel<*>.nextEvent(
priority: EventPriority = EventPriority.NORMAL,
noinline filter: suspend (E) -> Boolean = { true }
): E = nextEvent(priority, false, filter)
/**
* 挂起当前协程, 直到监听到事件 [E] 的广播并通过 [filter], 返回这个事件实例.
*
* 本函数是 [EventChannel.subscribe] 的衍生工具函数, 内部会调用 [EventChannel.subscribe].
*
* ## 挂起可取消
*
* 本函数的挂起过程可以被[取消][CancellableContinuation.cancel]. 这意味着若在 [CoroutineScope.launch] 中使用本函数, [launch] 启动的 [Job] 可以通过 [Job.cancel] 取消 (停止), 届时本函数会抛出 [CancellationException].
*
* ## 异常处理
*
* [filter] 抛出的异常属于监听方异常, 将会由 [nextEvent] 原样重新抛出.
*
* ## 使用 [Flow] 的替代方法
*
* Kotlin 可使用 [EventChannel.asFlow] 配合 [Flow.filter] [Flow.first] 实现与 [nextEvent] 相似的功能 (注意, Flow 方法将会使用 [EventPriority.MONITOR] 优先级).
*
* 示例:
*
* ```
* val event: GroupMessageEvent = GlobalEventChannel.asFlow().filterIsInstance<GroupMessageEvent>().filter { it.sender.id == 123456 }.first()
* // 上下两行代码等价
* val event: GroupMessageEvent = GlobalEventChannel.filterIsInstance<GroupMessageEvent>().nextEvent(EventPriority.MONITOR) { it.sender.id == 123456 }
* ```
*
* 由于 [Flow] 拥有更多操作 ( [Flow.firstOrNull]), 在不需要指定[事件优先级][EventPriority]时使用 [Flow] 拥有更高自由度.
*
* @param intercept 是否拦截, 传入 `true` 时表示拦截此事件不让接下来的监听器处理, 传入 `false` 时表示让接下来的监听器处理
* @param filter 过滤器. 返回 `true` 时表示得到了需要的实例. 返回 `false` 时表示继续监听
*
* @since 2.13
*/
public suspend inline fun <reified E : Event> EventChannel<*>.nextEvent(
priority: EventPriority = EventPriority.NORMAL,
intercept: Boolean = false,
noinline filter: suspend (E) -> Boolean = { true }
): E = coroutineScope {
suspendCancellableCoroutine { cont ->
var listener: Listener<E>? = null
@ -61,7 +99,7 @@ public suspend inline fun <reified E : Event> EventChannel<*>.nextEvent(
}
try {
cont.resumeWith(result)
cont.resumeWith(result.apply { onSuccess { if (intercept) intercept() } })
} finally {
listener?.complete() // ensure completed on exceptions
}

View File

@ -52,6 +52,28 @@ public suspend inline fun <reified P : MessageEvent> P.nextMessage(
timeoutMillis: Long = -1,
priority: EventPriority = EventPriority.MONITOR,
noinline filter: suspend P.(P) -> Boolean = { true }
): MessageChain = nextMessage(timeoutMillis, priority, false, filter)
/**
* 挂起当前协程, 等待下一条 [MessageEvent.sender] [MessageEvent.subject] [this] 相同且通过 [筛选][filter] [MessageEvent] 并拦截该事件
*
* [filter] 抛出了一个异常, 本函数会立即抛出这个异常.
*
* @param timeoutMillis 超时. 单位为毫秒. `-1` 为不限制
* @param intercept 是否拦截, 传入 `true` 时表示拦截此事件不让接下来的监听器处理, 传入 `false` 时表示让接下来的监听器处理
* @param filter 过滤器. 返回非 null 则代表得到了需要的值. [syncFromEvent] 会返回这个值
*
* @see syncFromEvent 实现原理
* @see MessageEvent.intercept 拦截事件
*
* @since 2.13
*/
@JvmSynthetic
public suspend inline fun <reified P : MessageEvent> P.nextMessage(
timeoutMillis: Long = -1,
priority: EventPriority = EventPriority.HIGH,
intercept: Boolean = false,
noinline filter: suspend P.(P) -> Boolean = { true }
): MessageChain {
val mapper: suspend (P) -> P? = createMapper(filter)
@ -61,7 +83,7 @@ public suspend inline fun <reified P : MessageEvent> P.nextMessage(
withTimeout(timeoutMillis) {
GlobalEventChannel.syncFromEvent(priority, mapper)
}
}).message
}).apply { if (intercept) intercept() }.message
}
/**

View File

@ -303,4 +303,27 @@ internal class EventTests : AbstractEventTest() {
PriorityTestEvent().broadcast()
}
}
@Test
fun `test next event and intercept`() {
resetEventListeners()
GlobalEventChannel.subscribeOnce<TestEvent> {
GlobalEventChannel.nextEvent<TestEvent>(priority = EventPriority.HIGH, intercept = true)
}
GlobalEventChannel.subscribeAlways<TestEvent>(priority = EventPriority.LOW) {
this.triggered = true
}
val tmp = TestEvent()
val tmp2 = TestEvent()
runBlocking {
launch {
tmp.broadcast()
}
launch {
tmp2.broadcast()
}
}
assertTrue { (tmp.triggered || tmp2.triggered) && (tmp.triggered != tmp2.triggered) }
resetEventListeners()
}
}