From 81f2388c4472aeaadab5a26bd8e875969f6cf9bc Mon Sep 17 00:00:00 2001 From: Him188 Date: Fri, 20 Mar 2020 18:31:05 +0800 Subject: [PATCH] Add concurrency control to message listeners, close #152 --- .../event/subscribeMessages.kt | 49 ++++++++++++++++--- 1 file changed, 42 insertions(+), 7 deletions(-) diff --git a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/subscribeMessages.kt b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/subscribeMessages.kt index 7426807f5..30e6e2381 100644 --- a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/subscribeMessages.kt +++ b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/subscribeMessages.kt @@ -19,6 +19,7 @@ import net.mamoe.mirai.Bot import net.mamoe.mirai.contact.isAdministrator import net.mamoe.mirai.contact.isOperator import net.mamoe.mirai.contact.isOwner +import net.mamoe.mirai.event.events.BotEvent import net.mamoe.mirai.message.FriendMessage import net.mamoe.mirai.message.GroupMessage import net.mamoe.mirai.message.MessagePacket @@ -37,6 +38,7 @@ import kotlin.coroutines.EmptyCoroutineContext @OptIn(ExperimentalContracts::class) fun CoroutineScope.subscribeMessages( coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT, listeners: MessageSubscribersBuilder>.() -> R ): R { // contract 可帮助 IDE 进行类型推断. 无实际代码作用. @@ -47,7 +49,7 @@ fun CoroutineScope.subscribeMessages( return MessageSubscribersBuilder { messageListener: MessageListener> -> // subscribeAlways 即注册一个监听器. 这个监听器收到消息后就传递给 [messageListener] // messageListener 即为 DSL 里 `contains(...) { }`, `startsWith(...) { }` 的代码块. - subscribeAlways(coroutineContext) { + subscribeAlways(coroutineContext, concurrencyKind) { messageListener.invoke(this, this.message.toString()) // this.message.toString() 即为 messageListener 中 it 接收到的值 } @@ -62,13 +64,14 @@ fun CoroutineScope.subscribeMessages( @OptIn(ExperimentalContracts::class) fun CoroutineScope.subscribeGroupMessages( coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT, listeners: MessageSubscribersBuilder.() -> R ): R { contract { callsInPlace(listeners, InvocationKind.EXACTLY_ONCE) } return MessageSubscribersBuilder { listener -> - subscribeAlways(coroutineContext) { + subscribeAlways(coroutineContext, concurrencyKind) { listener(this, this.message.toString()) } }.run(listeners) @@ -82,13 +85,14 @@ fun CoroutineScope.subscribeGroupMessages( @OptIn(ExperimentalContracts::class) fun CoroutineScope.subscribeFriendMessages( coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT, listeners: MessageSubscribersBuilder.() -> R ): R { contract { callsInPlace(listeners, InvocationKind.EXACTLY_ONCE) } return MessageSubscribersBuilder { listener -> - subscribeAlways(coroutineContext) { + subscribeAlways(coroutineContext, concurrencyKind) { listener(this, this.message.toString()) } }.run(listeners) @@ -102,13 +106,14 @@ fun CoroutineScope.subscribeFriendMessages( @OptIn(ExperimentalContracts::class) fun Bot.subscribeMessages( coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT, listeners: MessageSubscribersBuilder>.() -> R ): R { contract { callsInPlace(listeners, InvocationKind.EXACTLY_ONCE) } return MessageSubscribersBuilder> { listener -> - this.subscribeAlways(coroutineContext) { + this.subscribeAlways(coroutineContext, concurrencyKind) { listener(this, this.message.toString()) } }.run(listeners) @@ -124,13 +129,14 @@ fun Bot.subscribeMessages( @OptIn(ExperimentalContracts::class) fun Bot.subscribeGroupMessages( coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT, listeners: MessageSubscribersBuilder.() -> R ): R { contract { callsInPlace(listeners, InvocationKind.EXACTLY_ONCE) } return MessageSubscribersBuilder { listener -> - this.subscribeAlways(coroutineContext) { + this.subscribeAlways(coroutineContext, concurrencyKind) { listener(this, this.message.toString()) } }.run(listeners) @@ -144,13 +150,14 @@ fun Bot.subscribeGroupMessages( @OptIn(ExperimentalContracts::class) fun Bot.subscribeFriendMessages( coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT, listeners: MessageSubscribersBuilder.() -> R ): R { contract { callsInPlace(listeners, InvocationKind.EXACTLY_ONCE) } return MessageSubscribersBuilder { listener -> - this.subscribeAlways(coroutineContext) { + this.subscribeAlways(coroutineContext, concurrencyKind) { listener(this, this.message.toString()) } }.run(listeners) @@ -170,10 +177,11 @@ fun Bot.subscribeFriendMessages( */ inline fun CoroutineScope.incoming( coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT, capacity: Int = Channel.UNLIMITED ): ReceiveChannel { return Channel(capacity).apply { - val listener = this@incoming.subscribeAlways(coroutineContext) { + val listener = this@incoming.subscribeAlways(coroutineContext, concurrencyKind) { send(this) } this.invokeOnClose { @@ -183,6 +191,33 @@ inline fun CoroutineScope.incoming( } +/** + * 打开一个来自指定 [Bot] 的指定事件的接收通道 + * + * @param capacity 同 [Channel] 的参数, 参见 [Channel.Factory] 中的常量. + * + * @see capacity 默认无限大小. 详见 [Channel.Factory] 中的常量 [Channel.UNLIMITED], [Channel.CONFLATED], [Channel.RENDEZVOUS]. + * 请谨慎使用 [Channel.RENDEZVOUS]: 在 [Channel] 未被 [接收][Channel.receive] 时他将会阻塞事件处理 + * + * @see subscribeFriendMessages + * @see subscribeMessages + * @see subscribeGroupMessages + */ +inline fun Bot.incoming( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT, + capacity: Int = Channel.UNLIMITED +): ReceiveChannel { + return Channel(capacity).apply { + val listener = this@incoming.subscribeAlways(coroutineContext, concurrencyKind) { + send(this) + } + this.invokeOnClose { + listener.cancel(CancellationException("ReceiveChannel closed", it)) + } + } +} + /** * 消息事件的处理器. *