Add concurrency control to message listeners, close #152

This commit is contained in:
Him188 2020-03-20 18:31:05 +08:00
parent 55ed5f9d70
commit 81f2388c44

View File

@ -19,6 +19,7 @@ import net.mamoe.mirai.Bot
import net.mamoe.mirai.contact.isAdministrator import net.mamoe.mirai.contact.isAdministrator
import net.mamoe.mirai.contact.isOperator import net.mamoe.mirai.contact.isOperator
import net.mamoe.mirai.contact.isOwner import net.mamoe.mirai.contact.isOwner
import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.message.FriendMessage import net.mamoe.mirai.message.FriendMessage
import net.mamoe.mirai.message.GroupMessage import net.mamoe.mirai.message.GroupMessage
import net.mamoe.mirai.message.MessagePacket import net.mamoe.mirai.message.MessagePacket
@ -37,6 +38,7 @@ import kotlin.coroutines.EmptyCoroutineContext
@OptIn(ExperimentalContracts::class) @OptIn(ExperimentalContracts::class)
fun <R> CoroutineScope.subscribeMessages( fun <R> CoroutineScope.subscribeMessages(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
listeners: MessageSubscribersBuilder<MessagePacket<*, *>>.() -> R listeners: MessageSubscribersBuilder<MessagePacket<*, *>>.() -> R
): R { ): R {
// contract 可帮助 IDE 进行类型推断. 无实际代码作用. // contract 可帮助 IDE 进行类型推断. 无实际代码作用.
@ -47,7 +49,7 @@ fun <R> CoroutineScope.subscribeMessages(
return MessageSubscribersBuilder { messageListener: MessageListener<MessagePacket<*, *>> -> return MessageSubscribersBuilder { messageListener: MessageListener<MessagePacket<*, *>> ->
// subscribeAlways 即注册一个监听器. 这个监听器收到消息后就传递给 [messageListener] // subscribeAlways 即注册一个监听器. 这个监听器收到消息后就传递给 [messageListener]
// messageListener 即为 DSL 里 `contains(...) { }`, `startsWith(...) { }` 的代码块. // messageListener 即为 DSL 里 `contains(...) { }`, `startsWith(...) { }` 的代码块.
subscribeAlways(coroutineContext) { subscribeAlways(coroutineContext, concurrencyKind) {
messageListener.invoke(this, this.message.toString()) messageListener.invoke(this, this.message.toString())
// this.message.toString() 即为 messageListener 中 it 接收到的值 // this.message.toString() 即为 messageListener 中 it 接收到的值
} }
@ -62,13 +64,14 @@ fun <R> CoroutineScope.subscribeMessages(
@OptIn(ExperimentalContracts::class) @OptIn(ExperimentalContracts::class)
fun <R> CoroutineScope.subscribeGroupMessages( fun <R> CoroutineScope.subscribeGroupMessages(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
listeners: MessageSubscribersBuilder<GroupMessage>.() -> R listeners: MessageSubscribersBuilder<GroupMessage>.() -> R
): R { ): R {
contract { contract {
callsInPlace(listeners, InvocationKind.EXACTLY_ONCE) callsInPlace(listeners, InvocationKind.EXACTLY_ONCE)
} }
return MessageSubscribersBuilder<GroupMessage> { listener -> return MessageSubscribersBuilder<GroupMessage> { listener ->
subscribeAlways(coroutineContext) { subscribeAlways(coroutineContext, concurrencyKind) {
listener(this, this.message.toString()) listener(this, this.message.toString())
} }
}.run(listeners) }.run(listeners)
@ -82,13 +85,14 @@ fun <R> CoroutineScope.subscribeGroupMessages(
@OptIn(ExperimentalContracts::class) @OptIn(ExperimentalContracts::class)
fun <R> CoroutineScope.subscribeFriendMessages( fun <R> CoroutineScope.subscribeFriendMessages(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
listeners: MessageSubscribersBuilder<FriendMessage>.() -> R listeners: MessageSubscribersBuilder<FriendMessage>.() -> R
): R { ): R {
contract { contract {
callsInPlace(listeners, InvocationKind.EXACTLY_ONCE) callsInPlace(listeners, InvocationKind.EXACTLY_ONCE)
} }
return MessageSubscribersBuilder<FriendMessage> { listener -> return MessageSubscribersBuilder<FriendMessage> { listener ->
subscribeAlways(coroutineContext) { subscribeAlways(coroutineContext, concurrencyKind) {
listener(this, this.message.toString()) listener(this, this.message.toString())
} }
}.run(listeners) }.run(listeners)
@ -102,13 +106,14 @@ fun <R> CoroutineScope.subscribeFriendMessages(
@OptIn(ExperimentalContracts::class) @OptIn(ExperimentalContracts::class)
fun <R> Bot.subscribeMessages( fun <R> Bot.subscribeMessages(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
listeners: MessageSubscribersBuilder<MessagePacket<*, *>>.() -> R listeners: MessageSubscribersBuilder<MessagePacket<*, *>>.() -> R
): R { ): R {
contract { contract {
callsInPlace(listeners, InvocationKind.EXACTLY_ONCE) callsInPlace(listeners, InvocationKind.EXACTLY_ONCE)
} }
return MessageSubscribersBuilder<MessagePacket<*, *>> { listener -> return MessageSubscribersBuilder<MessagePacket<*, *>> { listener ->
this.subscribeAlways(coroutineContext) { this.subscribeAlways(coroutineContext, concurrencyKind) {
listener(this, this.message.toString()) listener(this, this.message.toString())
} }
}.run(listeners) }.run(listeners)
@ -124,13 +129,14 @@ fun <R> Bot.subscribeMessages(
@OptIn(ExperimentalContracts::class) @OptIn(ExperimentalContracts::class)
fun <R> Bot.subscribeGroupMessages( fun <R> Bot.subscribeGroupMessages(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
listeners: MessageSubscribersBuilder<GroupMessage>.() -> R listeners: MessageSubscribersBuilder<GroupMessage>.() -> R
): R { ): R {
contract { contract {
callsInPlace(listeners, InvocationKind.EXACTLY_ONCE) callsInPlace(listeners, InvocationKind.EXACTLY_ONCE)
} }
return MessageSubscribersBuilder<GroupMessage> { listener -> return MessageSubscribersBuilder<GroupMessage> { listener ->
this.subscribeAlways(coroutineContext) { this.subscribeAlways(coroutineContext, concurrencyKind) {
listener(this, this.message.toString()) listener(this, this.message.toString())
} }
}.run(listeners) }.run(listeners)
@ -144,13 +150,14 @@ fun <R> Bot.subscribeGroupMessages(
@OptIn(ExperimentalContracts::class) @OptIn(ExperimentalContracts::class)
fun <R> Bot.subscribeFriendMessages( fun <R> Bot.subscribeFriendMessages(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
listeners: MessageSubscribersBuilder<FriendMessage>.() -> R listeners: MessageSubscribersBuilder<FriendMessage>.() -> R
): R { ): R {
contract { contract {
callsInPlace(listeners, InvocationKind.EXACTLY_ONCE) callsInPlace(listeners, InvocationKind.EXACTLY_ONCE)
} }
return MessageSubscribersBuilder<FriendMessage> { listener -> return MessageSubscribersBuilder<FriendMessage> { listener ->
this.subscribeAlways(coroutineContext) { this.subscribeAlways(coroutineContext, concurrencyKind) {
listener(this, this.message.toString()) listener(this, this.message.toString())
} }
}.run(listeners) }.run(listeners)
@ -170,10 +177,11 @@ fun <R> Bot.subscribeFriendMessages(
*/ */
inline fun <reified E : Event> CoroutineScope.incoming( inline fun <reified E : Event> CoroutineScope.incoming(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
capacity: Int = Channel.UNLIMITED capacity: Int = Channel.UNLIMITED
): ReceiveChannel<E> { ): ReceiveChannel<E> {
return Channel<E>(capacity).apply { return Channel<E>(capacity).apply {
val listener = this@incoming.subscribeAlways<E>(coroutineContext) { val listener = this@incoming.subscribeAlways<E>(coroutineContext, concurrencyKind) {
send(this) send(this)
} }
this.invokeOnClose { this.invokeOnClose {
@ -183,6 +191,33 @@ inline fun <reified E : Event> CoroutineScope.incoming(
} }
/**
* 打开一个来自指定 [Bot] 的指定事件的接收通道
*
* @param capacity [Channel] 的参数, 参见 [Channel.Factory] 中的常量.
*
* @see capacity 默认无限大小. 详见 [Channel.Factory] 中的常量 [Channel.UNLIMITED], [Channel.CONFLATED], [Channel.RENDEZVOUS].
* 请谨慎使用 [Channel.RENDEZVOUS]: [Channel] 未被 [接收][Channel.receive] 时他将会阻塞事件处理
*
* @see subscribeFriendMessages
* @see subscribeMessages
* @see subscribeGroupMessages
*/
inline fun <reified E : BotEvent> Bot.incoming(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
capacity: Int = Channel.UNLIMITED
): ReceiveChannel<E> {
return Channel<E>(capacity).apply {
val listener = this@incoming.subscribeAlways<E>(coroutineContext, concurrencyKind) {
send(this)
}
this.invokeOnClose {
listener.cancel(CancellationException("ReceiveChannel closed", it))
}
}
}
/** /**
* 消息事件的处理器. * 消息事件的处理器.
* *