Fix events

This commit is contained in:
Him188 2019-11-01 22:17:24 +08:00
parent 8178a8c81a
commit e3c3c71290
11 changed files with 133 additions and 47 deletions

View File

@ -2,10 +2,7 @@
package net.mamoe.mirai.event
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.newCoroutineContext
import kotlinx.coroutines.withContext
import kotlinx.coroutines.*
import net.mamoe.mirai.contact.Contact
import net.mamoe.mirai.event.internal.broadcastInternal
import net.mamoe.mirai.network.BotNetworkHandler
@ -66,8 +63,9 @@ interface Cancellable {
/**
* 广播一个事件的唯一途径.
* [context] 不包含 [CoroutineExceptionHandler], 将会使用默认的异常捕获, [error]
* 也就是说, 这个方法不会抛出异常, 只会把异常交由 [context] 捕获
* 这个方法将会把处理挂起在 [context] 下运行. 默认为使用 [EventDispatcher] 调度事件协程.
*
* @param context 事件处理协程运行的 [CoroutineContext].
*/
@Suppress("UNCHECKED_CAST")
@JvmOverloads
@ -84,13 +82,23 @@ suspend fun <E : Event> E.broadcast(context: CoroutineContext = EmptyCoroutineCo
}
}
/**
* 事件协程调度器.
*
* JVM: 共享 [Dispatchers.Default]
*/
internal expect val EventDispatcher: CoroutineDispatcher
/**
* 事件协程作用域.
* 所有的事件 [broadcast] 过程均在此作用域下运行.
*
* 然而, 若在事件处理过程中使用到 [Contact.sendMessage] 等会 [发送数据包][BotNetworkHandler.sendPacket] 的方法,
* 发送过程将会通过 [withContext] 将协程切换到 [BotNetworkHandler.NetworkScope]
* 发送过程将会通过 [withContext] 将协程切换到 [BotNetworkHandler] 作用域下执行.
*/
object EventScope : CoroutineScope {
override val coroutineContext: CoroutineContext = EmptyCoroutineContext
override val coroutineContext: CoroutineContext =
EventDispatcher + CoroutineExceptionHandler { _, e ->
MiraiLogger.error("An exception is thrown in EventScope", e)
}
}

View File

@ -173,7 +173,12 @@ class MessageSubscribersBuilder<T : SenderAndMessage<*>>(
* @param trim `true` 则删除首尾空格后比较
* @param ignoreCase `true` 则不区分大小写
*/
suspend fun case(equals: String, trim: Boolean = true, ignoreCase: Boolean = false, onEvent: @MessageDsl suspend T.(String) -> Unit) =
suspend fun case(
equals: String,
trim: Boolean = true,
ignoreCase: Boolean = false,
onEvent: @MessageDsl suspend T.(String) -> Unit
) =
content({ equals.equals(if (trim) it.trim() else it, ignoreCase = ignoreCase) }, onEvent)
/**
@ -184,7 +189,11 @@ class MessageSubscribersBuilder<T : SenderAndMessage<*>>(
/**
* 如果消息的前缀是 [prefix], 就执行 [onEvent]
*/
suspend fun startsWith(prefix: String, removePrefix: Boolean = false, onEvent: @MessageDsl suspend T.(String) -> Unit) =
suspend fun startsWith(
prefix: String,
removePrefix: Boolean = false,
onEvent: @MessageDsl suspend T.(String) -> Unit
) =
content({ it.startsWith(prefix) }) {
if (removePrefix) this.onEvent(this.message.stringValue.substringAfter(prefix))
else onEvent(this)
@ -199,7 +208,8 @@ class MessageSubscribersBuilder<T : SenderAndMessage<*>>(
/**
* 如果是这个人发的消息, 就执行 [onEvent]. 消息可以是好友消息也可以是群消息
*/
suspend fun sentBy(qqId: UInt, onEvent: @MessageDsl suspend T.(String) -> Unit) = content({ sender.id == qqId }, onEvent)
suspend fun sentBy(qqId: UInt, onEvent: @MessageDsl suspend T.(String) -> Unit) =
content({ sender.id == qqId }, onEvent)
/**
* 如果是这个人发的消息, 就执行 [onEvent]. 消息可以是好友消息也可以是群消息
@ -231,15 +241,22 @@ class MessageSubscribersBuilder<T : SenderAndMessage<*>>(
suspend infix fun String.caseReply(replier: String) = case(this, true) { this@case.reply(replier) }
suspend infix fun String.caseReply(replier: StringReplier<T>) = case(this, true) { this@case.reply(replier(this)) }
suspend infix fun String.containsReply(replier: String) = content({ this@containsReply in it }) { this@content.reply(replier) }
suspend infix fun String.containsReply(replier: String) =
content({ this@containsReply in it }) { this@content.reply(replier) }
suspend infix fun String.containsReply(replier: StringReplier<T>) = content({ this@containsReply in it }) { replier(this) }
suspend infix fun String.containsReply(replier: StringReplier<T>) =
content({ this@containsReply in it }) { replier(this) }
suspend infix fun String.startsWithReply(replier: StringReplier<T>) = content({ it.startsWith(this@startsWithReply) }) { replier(this) }
suspend infix fun String.startsWithReply(replier: StringReplier<T>) =
content({ it.startsWith(this@startsWithReply) }) { replier(this) }
suspend infix fun String.endswithReply(replier: StringReplier<T>) = content({ it.endsWith(this@endswithReply) }) { replier(this) }
suspend infix fun String.endswithReply(replier: StringReplier<T>) =
content({ it.endsWith(this@endswithReply) }) { replier(this) }
suspend infix fun String.reply(reply: String) = case(this) {
this@case.reply(reply)
}
suspend infix fun String.reply(reply: String) = case(this) { this@case.reply(reply) }
suspend infix fun String.reply(reply: StringReplier<T>) = case(this) { this@case.reply(reply(this)) }

View File

@ -22,6 +22,10 @@ enum class ListeningStatus {
// region 顶层方法
/**
* 订阅所有 [E] 及其子类事件.
*
*/
suspend inline fun <reified E : Event> subscribe(noinline handler: suspend (E) -> ListeningStatus) = E::class.subscribe(handler)
suspend inline fun <reified E : Event> subscribeAlways(noinline listener: suspend (E) -> Unit) = E::class.subscribeAlways(listener)

View File

@ -6,12 +6,14 @@ import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import net.mamoe.mirai.Bot
import net.mamoe.mirai.event.Event
import net.mamoe.mirai.event.EventLogger
import net.mamoe.mirai.event.EventScope
import net.mamoe.mirai.event.ListeningStatus
import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.utils.internal.inlinedRemoveIf
import kotlin.jvm.JvmField
import kotlin.reflect.KClass
import kotlin.reflect.KFunction
/**
* 监听和广播实现.
@ -25,6 +27,7 @@ internal suspend fun <E : Event> KClass<E>.subscribeInternal(listener: Listener<
if (mainMutex.tryLock(listener)) {//能锁则代表这个事件目前没有正在广播.
try {
add(listener)//直接修改主监听者列表
EventLogger.debug("Added a listener to ${this@subscribeInternal.simpleName}")
} finally {
mainMutex.unlock(listener)
}
@ -34,6 +37,7 @@ internal suspend fun <E : Event> KClass<E>.subscribeInternal(listener: Listener<
//不能锁住, 则这个事件正在广播, 那么要将新的监听者放入缓存
cacheMutex.withLock {
cache.add(listener)
EventLogger.debug("Added a listener to cache of ${this@subscribeInternal.simpleName}")
}
EventScope.launch {
@ -44,6 +48,7 @@ internal suspend fun <E : Event> KClass<E>.subscribeInternal(listener: Listener<
if (cache.size != 0) {
addAll(cache)
cache.clear()
EventLogger.debug("Cache of ${this@subscribeInternal.simpleName} is now transferred to main")
}
}
}
@ -61,7 +66,7 @@ sealed class Listener<in E : Event> {
}
@PublishedApi
internal class Handler<E : Event>(@JvmField val handler: suspend (E) -> ListeningStatus) : Listener<E>() {
internal class Handler<in E : Event>(@JvmField val handler: suspend (E) -> ListeningStatus) : Listener<E>() {
override suspend fun onEvent(event: E): ListeningStatus = handler.invoke(event)
}
@ -89,7 +94,7 @@ internal class HandlerWithBot<E : Event>(val bot: Bot, @JvmField val handler: su
/**
* 这个事件类的监听器 list
*/
internal suspend fun <E : Event> KClass<E>.listeners(): EventListeners<E> = EventListenerManger.get(this)
internal suspend fun <E : Event> KClass<out E>.listeners(): EventListeners<E> = EventListenerManger.get(this)
internal class EventListeners<E : Event> : MutableList<Listener<E>> by mutableListOf() {
/**
@ -105,6 +110,14 @@ internal class EventListeners<E : Event> : MutableList<Listener<E>> by mutableLi
* 等待加入到主 list 的监听者. 务必使用 [cacheMutex]
*/
val cache: MutableList<Listener<E>> = mutableListOf()
init {
this::class.members.filterIsInstance<KFunction<*>>().forEach {
if (it.name == "add") {
it.isExternal
}
}
}
}
/**
@ -112,11 +125,11 @@ internal class EventListeners<E : Event> : MutableList<Listener<E>> by mutableLi
* [EventListeners] lazy : 它们只会在被需要的时候才创建和存储.
*/
internal object EventListenerManger {
private val registries: MutableMap<KClass<out Event>, EventListeners<out Event>> = mutableMapOf()
private val registries: MutableMap<KClass<out Event>, EventListeners<*>> = mutableMapOf()
private val registriesMutex = Mutex()
@Suppress("UNCHECKED_CAST")
internal suspend fun <E : Event> get(clazz: KClass<E>): EventListeners<E> = registriesMutex.withLock {
internal suspend fun <E : Event> get(clazz: KClass<out E>): EventListeners<E> = registriesMutex.withLock {
if (registries.containsKey(clazz)) {
return registries[clazz] as EventListeners<E>
} else {
@ -128,46 +141,49 @@ internal object EventListenerManger {
}
}
@Suppress("UNCHECKED_CAST")
internal suspend fun <E : Event> E.broadcastInternal(): E {
suspend fun callListeners(listeners: EventListeners<in E>) {
suspend fun callAndRemoveIfRequired() {
listeners.inlinedRemoveIf {
if (it.lock.tryLock()) {
try {
it.onEvent(this) == ListeningStatus.STOPPED
} finally {
it.lock.unlock()
}
} else false
}
suspend fun callAndRemoveIfRequired() = listeners.inlinedRemoveIf {
if (it.lock.tryLock()) {
try {
it.onEvent(this) == ListeningStatus.STOPPED
} finally {
it.lock.unlock()
}
} else false
}
//自己持有, 则是在一个事件中
if (listeners.mainMutex.holdsLock(listeners)) {
callAndRemoveIfRequired()
} else {
while (!listeners.mainMutex.tryLock(this)) {
while (!listeners.mainMutex.tryLock(listeners)) {
delay(10)
}
try {
callAndRemoveIfRequired()
} finally {
listeners.mainMutex.unlock(this)
listeners.mainMutex.unlock(listeners)
}
}
}
callListeners(this::class.listeners() as EventListeners<in E>)
callListeners(this::class.listeners())
applyAllListeners(this::class) { callListeners(it as EventListeners<in E>) }
applySuperListeners(this::class) { callListeners(it) }
return this
}
private suspend inline fun <E : Event> applyAllListeners(
clazz: KClass<E>,
block: (EventListeners<in E>) -> Unit
) = clazz.supertypes.map { it.classifier }.filterIsInstance<KClass<out Event>>().forEach {
/**
* apply [block] to all the [EventListeners] in [clazz]'s superclasses
*/
private tailrec suspend fun <E : Event> applySuperListeners(
clazz: KClass<out E>,
block: suspend (EventListeners<in E>) -> Unit
) {
val superEventClass =
clazz.supertypes.map { it.classifier }.filterIsInstance<KClass<out Event>>().firstOrNull() ?: return
@Suppress("UNCHECKED_CAST")
block(it.listeners() as EventListeners<in E>)
}
block(superEventClass.listeners() as EventListeners<in E>)
applySuperListeners(superEventClass, block)
}

View File

@ -29,6 +29,13 @@ import net.mamoe.mirai.utils.io.*
import net.mamoe.mirai.utils.solveCaptcha
import kotlin.coroutines.CoroutineContext
/**
* 包处理协程调度器.
*
* JVM: 独立的 4 thread 调度器
*/
expect val NetworkDispatcher: CoroutineDispatcher
/**
* [BotNetworkHandler] TIM PC 协议实现
*
@ -38,7 +45,7 @@ internal class TIMBotNetworkHandler internal constructor(private val bot: Bot) :
BotNetworkHandler<TIMBotNetworkHandler.BotSocketAdapter>, PacketHandlerList() {
override val coroutineContext: CoroutineContext =
Dispatchers.Default + CoroutineExceptionHandler { _, e ->
NetworkDispatcher + CoroutineExceptionHandler { _, e ->
bot.logger.error("An exception was thrown in a coroutine under TIMBotNetworkHandler", e)
} + SupervisorJob()

View File

@ -19,6 +19,7 @@ import net.mamoe.mirai.network.protocol.tim.packet.event.IgnoredServerEventPacke
import net.mamoe.mirai.network.protocol.tim.packet.event.ServerFriendMessageEventPacket
import net.mamoe.mirai.network.protocol.tim.packet.event.ServerGroupMessageEventPacket
import net.mamoe.mirai.network.protocol.tim.packet.event.ServerGroupUploadFileEventPacket
import net.mamoe.mirai.network.qqAccount
/**
* 处理消息事件, 承担消息发送任务.
@ -69,10 +70,17 @@ class EventPacketHandler(session: BotSession) : PacketHandler(session) {
}
suspend fun sendFriendMessage(qq: QQ, message: MessageChain) {
session.socket.sendPacket(SendFriendMessagePacket(session.bot.account.id, qq.id, session.sessionKey, message))
session.socket.sendPacket(SendFriendMessagePacket(session.qqAccount, qq.id, session.sessionKey, message))
}
suspend fun sendGroupMessage(group: Group, message: MessageChain) {
session.socket.sendPacket(SendGroupMessagePacket(session.bot.account.id, group.internalId, session.sessionKey, message))
session.socket.sendPacket(
SendGroupMessagePacket(
session.qqAccount,
group.internalId,
session.sessionKey,
message
)
)
}
}

View File

@ -18,7 +18,7 @@ class OutgoingPacket(
name: String?,
override val packetId: PacketId,
override val sequenceId: UShort,
val delegate: ByteReadPacket
internal val delegate: ByteReadPacket
) : Packet {
private val name: String by lazy {
name ?: packetId.toString()
@ -49,7 +49,7 @@ interface OutgoingPacketBuilder {
private val sequenceIdInternal = atomic(1)
@PublishedApi
internal fun atomicNextSequenceId() = sequenceIdInternal.getAndIncrement().toUShort()
internal fun atomicNextSequenceId(): UShort = sequenceIdInternal.getAndIncrement().toUShort()
}
}

View File

@ -160,7 +160,7 @@ internal fun Packet.packetToString(name: String = this::class.simpleName.toStrin
PacketNameFormatter.adjustName(name + "(${this.idHexString})") +
this::class.members
.filterIsInstance<KProperty<*>>()
.filterNot { it.isConst || it.isSuspend || it.visibility == KVisibility.PRIVATE }
.filterNot { it.isConst || it.isSuspend || it.visibility != KVisibility.PUBLIC }
.filterNot { prop -> prop.name in IgnoreIdListEquals || IgnoreIdListInclude.any { it in prop.name } }
.joinToString(", ", "{", "}") { it.briefDescription(this@packetToString) }

View File

@ -0,0 +1,6 @@
package net.mamoe.mirai.event
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
internal actual val EventDispatcher: CoroutineDispatcher get() = Dispatchers.Default

View File

@ -0,0 +1,11 @@
package net.mamoe.mirai.network.protocol.tim
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.asCoroutineDispatcher
import java.util.concurrent.Executors
/**
* 独立的 4 thread 调度器
*/
actual val NetworkDispatcher: CoroutineDispatcher
get() = Executors.newFixedThreadPool(4).asCoroutineDispatcher()

View File

@ -7,6 +7,8 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import net.mamoe.mirai.Bot
import net.mamoe.mirai.BotAccount
import net.mamoe.mirai.event.Event
import net.mamoe.mirai.event.subscribeAlways
import net.mamoe.mirai.event.subscribeMessages
import net.mamoe.mirai.login
import net.mamoe.mirai.network.protocol.tim.packet.login.requireSuccess
@ -35,6 +37,13 @@ suspend fun main() {
)
).apply { login().requireSuccess() }
/**
* 监听所有事件
*/
subscribeAlways<Event> {
//bot.logger.verbose("收到了一个事件: ${it::class.simpleName}")
}
bot.subscribeMessages {
"你好" reply "你好!"