Improve concurrency

This commit is contained in:
Him188 2019-10-27 17:27:57 +08:00
parent fa0f4e49df
commit 4e27ad3262
14 changed files with 150 additions and 69 deletions

View File

@ -127,7 +127,7 @@ class Bot(val account: BotAccount, val logger: MiraiLogger) {
suspend fun getGroup(id: GroupId): Group = id.value.let {
if (groups.containsKey(it)) groups[it]!!
else groupsLock.withLock {
groups.getOrPut(it) { Group(this@Bot, it) }
groups.getOrPut(it) { Group(this@Bot, id) }
}
}
}

View File

@ -66,7 +66,8 @@ inline class GroupInternalId(val value: UInt)
* - Group ID([Group.internalId]) 是与调用 API 时使用的 id.( QQ 客户端中不可见)
* @author Him188moe
*/
class Group internal constructor(bot: Bot, id: UInt) : Contact(bot, id) {
@Suppress("MemberVisibilityCanBePrivate", "CanBeParameter")
class Group internal constructor(bot: Bot, val groupId: GroupId) : Contact(bot, groupId.value) {
val internalId = GroupId(id).toInternalId()
val members: ContactList<QQ>
//todo members

View File

@ -8,6 +8,8 @@ import kotlinx.coroutines.withContext
import net.mamoe.mirai.contact.Contact
import net.mamoe.mirai.event.internal.broadcastInternal
import net.mamoe.mirai.network.BotNetworkHandler
import net.mamoe.mirai.utils.DefaultLogger
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.log
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
@ -39,6 +41,18 @@ abstract class Event {
fun cancel() {
cancelled = true
}
init {
if (EventDebuggingFlag) {
EventLogger.logDebug(this::class.simpleName + " created")
}
}
}
internal object EventLogger : MiraiLogger by DefaultLogger("Event")
val EventDebuggingFlag: Boolean by lazy {
false
}
/**
@ -58,7 +72,16 @@ interface Cancellable {
@Suppress("UNCHECKED_CAST")
@JvmOverloads
suspend fun <E : Event> E.broadcast(context: CoroutineContext = EmptyCoroutineContext): E {
return withContext(EventScope.coroutineContext + context) { this@broadcast.broadcastInternal() }
if (EventDebuggingFlag) {
EventLogger.logDebug(this::class.simpleName + " pre broadcast")
}
try {
return withContext(EventScope.coroutineContext + context) { this@broadcast.broadcastInternal() }
} finally {
if (EventDebuggingFlag) {
EventLogger.logDebug(this::class.simpleName + " after broadcast")
}
}
}
/**

View File

@ -10,6 +10,9 @@ import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.event.events.FriendMessageEvent
import net.mamoe.mirai.event.events.GroupMessageEvent
import net.mamoe.mirai.message.*
import net.mamoe.mirai.utils.ExternalImage
import net.mamoe.mirai.utils.sendTo
import net.mamoe.mirai.utils.upload
import kotlin.jvm.JvmName
/**
@ -33,8 +36,19 @@ abstract class SenderAndMessage<S : Contact>(
*/
suspend fun reply(message: MessageChain) = subject.sendMessage(message)
suspend fun reply(plain: String) = reply(PlainText(plain))
suspend fun reply(message: Message) = reply(message.toChain())
suspend fun reply(plain: String) = reply(PlainText(plain))
// region Send to subject
suspend inline fun ExternalImage.send() = this.sendTo(subject)
suspend inline fun ExternalImage.upload(): Image = this.upload(subject)
suspend inline fun Image.send() = this.sendTo(subject)
suspend inline fun ImageId.send() = this.sendTo(subject)
suspend inline fun Message.send() = this.sendTo(subject)
// endregion
}
/**
@ -134,11 +148,16 @@ suspend inline fun Bot.subscribeFriendMessages(noinline listeners: suspend Messa
internal typealias MessageListener<T> = @MessageListenerDsl suspend T.(String) -> Unit
internal typealias MessageReplier<T> = @MessageListenerDsl suspend T.(String) -> String
internal typealias MessageReplier<T> = @MessageListenerDsl suspend T.(String) -> Message
internal suspend inline operator fun <T : SenderAndMessage<*>> MessageListener<T>.invoke(t: T) = this.invoke(t, t.message.toString())
internal typealias StringReplier<T> = @MessageListenerDsl suspend T.(String) -> String
internal suspend inline operator fun <T : SenderAndMessage<*>> MessageListener<T>.invoke(t: T) = this.invoke(t, t.message.stringValue)
@JvmName("invoke1") //Avoid Platform declaration clash
internal suspend inline operator fun <T : SenderAndMessage<*>> MessageReplier<T>.invoke(t: T) = this.invoke(t, t.message.toString())
internal suspend inline operator fun <T : SenderAndMessage<*>> StringReplier<T>.invoke(t: T): String = this.invoke(t, t.message.stringValue)
@JvmName("invoke2") //Avoid Platform declaration clash
internal suspend inline operator fun <T : SenderAndMessage<*>> MessageReplier<T>.invoke(t: T): Message = this.invoke(t, t.message.stringValue)
/**
* 消息订阅构造器
@ -148,27 +167,35 @@ internal suspend inline operator fun <T : SenderAndMessage<*>> MessageReplier<T>
*/
@Suppress("unused")
@MessageListenerDsl
inline class MessageSubscribersBuilder<T : SenderAndMessage<*>>(
//TODO 将这个类 inline 后会导致 kotlin 编译错误. 等待修复后再 inline
class MessageSubscribersBuilder<T : SenderAndMessage<*>>(
val handlerConsumer: suspend (MessageListener<T>) -> Unit
) {
suspend inline fun case(equals: String, trim: Boolean = true, noinline listener: MessageListener<T>) = content({ equals == if (trim) it.trim() else it }, listener)
suspend inline fun contains(value: String, noinline listener: MessageListener<T>) = content({ value in it }, listener)
suspend inline fun replyEndsWith(value: String, noinline replier: MessageReplier<T>) = content({ it.endsWith(value) }) { replier(this) }
suspend inline fun startsWith(start: String, noinline listener: MessageListener<T>) = content({ it.startsWith(start) }, listener)
suspend inline fun endsWith(start: String, noinline listener: MessageListener<T>) = content({ it.endsWith(start) }, listener)
suspend inline fun sentBy(id: UInt, noinline listener: MessageListener<T>) = content({ sender.id == id }, listener)
suspend inline fun sentBy(id: Long, noinline listener: MessageListener<T>) = sentBy(id.toUInt(), listener)
suspend inline fun <reified M : Message> has(noinline listener: MessageListener<T>) = handlerConsumer { if (message.any<M>()) listener(this) }
suspend inline fun content(noinline filter: T.(String) -> Boolean, noinline listener: MessageListener<T>) =
handlerConsumer { if (this.filter(message.toString())) listener(this) }
handlerConsumer { if (this.filter(message.stringValue)) listener(this) }
suspend infix fun String.caseReply(replier: StringReplier<T>) = case(this, true) { this@case.reply(replier(this)) }
suspend infix fun String.containsReply(replier: StringReplier<T>) = content({ this@containsReply in it }) { replier(this) }
suspend infix fun String.startsWithReply(replier: StringReplier<T>) = content({ it.startsWith(this@startsWithReply) }) { replier(this) }
suspend infix fun String.endswithReply(replier: StringReplier<T>) = content({ it.endsWith(this@endswithReply) }) { replier(this) }
suspend infix fun String.reply(reply: String) = case(this) { this@case.reply(reply) }
suspend infix fun String.reply(reply: StringReplier<T>) = case(this) { this@case.reply(reply(this)) }
/* 易产生迷惑感
suspend inline fun replyCase(equals: String, trim: Boolean = true, noinline replier: MessageReplier<T>) = case(equals, trim) { reply(replier(this)) }
suspend inline fun replyContains(value: String, noinline replier: MessageReplier<T>) = content({ value in it }) { replier(this) }
suspend inline fun replyStartsWith(value: String, noinline replier: MessageReplier<T>) = content({ it.startsWith(value) }) { replier(this) }
suspend infix fun String.reply(reply: String) = case(this) { this.reply(reply) }
suspend infix fun String.reply(reply: MessageReplier<T>) = case(this) { this.reply(reply(this)) }
suspend inline fun replyEndsWith(value: String, noinline replier: MessageReplier<T>) = content({ it.endsWith(value) }) { replier(this) }
*/
}

View File

@ -21,11 +21,11 @@ import kotlin.reflect.KClass
* @author Him188moe
*/
internal suspend fun <E : Event> KClass<E>.subscribeInternal(listener: Listener<E>): Unit = with(this.listeners()) {
if (mainMutex.tryLock()) {//能锁则代表这个事件目前没有正在广播.
if (mainMutex.tryLock(listener)) {//能锁则代表这个事件目前没有正在广播.
try {
add(listener)//直接修改主监听者列表
} finally {
mainMutex.unlock()
mainMutex.unlock(listener)
}
return
}
@ -38,7 +38,7 @@ internal suspend fun <E : Event> KClass<E>.subscribeInternal(listener: Listener<
EventScope.launch {
//启动协程并等待正在进行的广播结束, 然后将缓存转移到主监听者列表
//启动后的协程马上就会因为锁而被挂起
mainMutex.withLock {
mainMutex.withLock(listener) {
cacheMutex.withLock {
if (cache.size != 0) {
addAll(cache)
@ -55,6 +55,7 @@ internal suspend fun <E : Event> KClass<E>.subscribeInternal(listener: Listener<
* @author Him188moe
*/
sealed class Listener<in E : Event> {
internal val lock = Mutex()
abstract suspend fun onEvent(event: E): ListeningStatus
}
@ -127,8 +128,25 @@ internal object EventListenerManger {
@Suppress("UNCHECKED_CAST")
internal suspend fun <E : Event> E.broadcastInternal(): E {
suspend fun callListeners(listeners: EventListeners<in E>) = listeners.mainMutex.withLock {
listeners.inlinedRemoveIf { it.onEvent(this) == ListeningStatus.STOPPED }
suspend fun callListeners(listeners: EventListeners<in E>) {
suspend fun callAndRemoveIfRequired() {
listeners.inlinedRemoveIf {
if (it.lock.tryLock()) {
try {
it.onEvent(this) == ListeningStatus.STOPPED
} finally {
it.lock.unlock()
}
} else false
}
}
//自己持有, 则是在一个事件中
if (listeners.mainMutex.holdsLock(listeners)) {
callAndRemoveIfRequired()
} else listeners.mainMutex.withLock(listeners) {
callAndRemoveIfRequired()
}
}
callListeners(this::class.listeners() as EventListeners<in E>)

View File

@ -2,7 +2,6 @@ package net.mamoe.mirai.network
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelChildren
import net.mamoe.mirai.network.protocol.tim.TIMBotNetworkHandler.BotSocketAdapter
import net.mamoe.mirai.network.protocol.tim.TIMBotNetworkHandler.LoginHandler
@ -16,6 +15,7 @@ import net.mamoe.mirai.network.protocol.tim.packet.login.LoginResult
import net.mamoe.mirai.network.protocol.tim.packet.login.RequestSKeyPacket
import net.mamoe.mirai.utils.BotNetworkConfiguration
import net.mamoe.mirai.utils.io.PlatformDatagramChannel
import kotlin.coroutines.ContinuationInterceptor
/**
* Mirai 的网络处理器, 它承担所有数据包([Packet])的处理任务.
@ -87,6 +87,6 @@ interface BotNetworkHandler<Socket : DataPacketSocketAdapter> : CoroutineScope {
*/
suspend fun close(cause: Throwable? = null) {
//todo check??
coroutineContext[Job]!!.cancelChildren(CancellationException("handler closed", cause))
coroutineContext[ContinuationInterceptor]!!.cancelChildren(CancellationException("handler closed", cause))
}
}

View File

@ -79,8 +79,7 @@ class BotSession(
* @see Bot.withSession 转换接收器 (receiver, `this` 的指向) [BotSession]
*/
suspend inline fun <reified P : ServerPacket, R> OutgoingPacket.sendAndExpect(noinline handler: suspend (P) -> R): CompletableDeferred<R> {
val deferred: CompletableDeferred<R> =
coroutineContext[Job].takeIf { it != null }?.let { CompletableDeferred<R>(it) } ?: CompletableDeferred()
val deferred: CompletableDeferred<R> = CompletableDeferred(coroutineContext[Job])
bot.network.addHandler(TemporaryPacketHandler(P::class, deferred, this@BotSession).also {
it.toSend(this)
it.onExpect(handler)

View File

@ -24,7 +24,6 @@ import net.mamoe.mirai.network.protocol.tim.packet.login.*
import net.mamoe.mirai.network.session
import net.mamoe.mirai.qqAccount
import net.mamoe.mirai.utils.*
import net.mamoe.mirai.utils.internal.inlinedRemoveIf
import net.mamoe.mirai.utils.io.*
import kotlin.coroutines.CoroutineContext
@ -129,10 +128,11 @@ internal class TIMBotNetworkHandler internal constructor(private val bot: Bot) :
try {
channel.read(buffer)// JVM: withContext(IO)
} catch (e: ReadPacketInternalException) {
// read failed, continue and reread
bot.logger.logError("Socket channel read failed: ${e.message}")
continue
} catch (e: Throwable) {
e.log()// other unexpected exceptions caught.
bot.logger.logError("Caught unexpected exceptions")
bot.logger.logError(e)
continue
} finally {
if (!buffer.canRead() || buffer.readRemaining == 0) {//size==0
@ -146,7 +146,11 @@ internal class TIMBotNetworkHandler internal constructor(private val bot: Bot) :
// `.use`: Ensure that the packet is consumed **totally**
// so that all the buffers are released
ByteReadPacket(buffer, IoBuffer.Pool).use {
distributePacket(it.parseServerPacket(buffer.readRemaining))
try {
distributePacket(it.parseServerPacket(buffer.readRemaining))
} catch (e: Exception) {
bot.logger.logError(e)
}
}
}
}
@ -200,14 +204,15 @@ internal class TIMBotNetworkHandler internal constructor(private val bot: Bot) :
bot.logger.logCyan("Packet received: $packet")
}
//Remove first to release the lock
handlersLock.withLock {
temporaryPacketHandlers.inlinedRemoveIf {
it.shouldRemove(this@TIMBotNetworkHandler[ActionPacketHandler].session, packet)
}
temporaryPacketHandlers.filter { it.filter(session, packet) }
}.forEach {
it.doReceive(packet)
}
if (packet is ServerEventPacket) {
//must ensure the response packet is sent
//ensure the response packet is sent
sendPacket(packet.ResponsePacket(bot.qqAccount, sessionKey))
}

View File

@ -2,7 +2,6 @@ package net.mamoe.mirai.network.protocol.tim.handler
import net.mamoe.mirai.network.BotSession
import net.mamoe.mirai.network.protocol.tim.packet.ServerPacket
import kotlin.reflect.KClass
/**
* 数据包(接受/发送)处理器
@ -20,14 +19,13 @@ abstract class PacketHandler(
}
internal class PacketHandlerNode<T : PacketHandler>(
val clazz: KClass<T>,
val instance: T,
val key: PacketHandler.Key<T>
val instance: T,
val key: PacketHandler.Key<T>
)
internal fun <T : PacketHandler> T.asNode(key: PacketHandler.Key<T>): PacketHandlerNode<T> {
@Suppress("UNCHECKED_CAST")
return PacketHandlerNode(this::class as KClass<T>, this, key)
return PacketHandlerNode(this, key)
}
internal open class PacketHandlerList : MutableList<PacketHandlerNode<*>> by mutableListOf() {

View File

@ -20,9 +20,9 @@ import kotlin.reflect.KClass
* @see BotSession.sendAndExpect
*/
class TemporaryPacketHandler<P : ServerPacket, R>(
private val expectationClass: KClass<P>,
private val deferred: CompletableDeferred<R>,
private val fromSession: BotSession
private val expectationClass: KClass<P>,
private val deferred: CompletableDeferred<R>,
private val fromSession: BotSession
) {
private lateinit var toSend: OutgoingPacket
@ -45,19 +45,17 @@ class TemporaryPacketHandler<P : ServerPacket, R>(
session.socket.sendPacket(toSend)
}
suspend fun shouldRemove(session: BotSession, packet: ServerPacket): Boolean {
if (expectationClass.isInstance(packet) && session === this.fromSession) {
suspend fun filter(session: BotSession, packet: ServerPacket): Boolean = expectationClass.isInstance(packet) && session === this.fromSession
@Suppress("UNCHECKED_CAST")
val ret = try {
handler(packet as P)
} catch (e: Exception) {
deferred.completeExceptionally(e)
return true
}
deferred.complete(ret)
return true
suspend fun doReceive(packet: ServerPacket) {
@Suppress("UNCHECKED_CAST")
val ret = try {
handler(packet as P)
} catch (e: Exception) {
deferred.completeExceptionally(e)
return
}
return false
deferred.complete(ret)
return
}
}

View File

@ -2,6 +2,7 @@
package net.mamoe.mirai.network.protocol.tim.packet.action
import kotlinx.coroutines.withContext
import kotlinx.io.core.*
import net.mamoe.mirai.contact.Group
import net.mamoe.mirai.contact.GroupId
@ -16,6 +17,7 @@ import net.mamoe.mirai.qqAccount
import net.mamoe.mirai.utils.ExternalImage
import net.mamoe.mirai.utils.httpPostGroupImage
import net.mamoe.mirai.utils.io.*
import kotlin.coroutines.coroutineContext
/**
@ -32,24 +34,27 @@ class OverFileSizeMaxException : IllegalStateException()
* @throws OverFileSizeMaxException 如果文件过大, 服务器拒绝接收时
*/
suspend fun Group.uploadImage(image: ExternalImage): ImageId = withSession {
val userContext = coroutineContext
GroupImageIdRequestPacket(bot.qqAccount, internalId, image, sessionKey)
.sendAndExpect<GroupImageIdRequestPacket.Response, Unit> {
when (it.state) {
GroupImageIdRequestPacket.Response.State.REQUIRE_UPLOAD -> {
httpPostGroupImage(
botAccount = bot.qqAccount,
groupId = GroupId(id),
imageInput = image.input,
inputSize = image.inputSize,
uKeyHex = it.uKey!!.toUHexString("")
)
withContext(userContext) {
when (it.state) {
GroupImageIdRequestPacket.Response.State.REQUIRE_UPLOAD -> {
httpPostGroupImage(
botAccount = bot.qqAccount,
groupId = GroupId(id),
imageInput = image.input,
inputSize = image.inputSize,
uKeyHex = it.uKey!!.toUHexString("")
)
}
GroupImageIdRequestPacket.Response.State.ALREADY_EXISTS -> {
}
GroupImageIdRequestPacket.Response.State.OVER_FILE_SIZE_MAX -> throw OverFileSizeMaxException()
}
GroupImageIdRequestPacket.Response.State.ALREADY_EXISTS -> {
}
GroupImageIdRequestPacket.Response.State.OVER_FILE_SIZE_MAX -> throw OverFileSizeMaxException()
}
}.join()
image.groupImageId

View File

@ -42,6 +42,8 @@ interface MiraiLogger {
fun logError(any: Any?)
fun logError(e: Throwable) = log(e)
fun logDebug(any: Any?)
fun logCyan(any: Any?)
@ -94,6 +96,8 @@ abstract class MiraiLoggerPlatformBase : MiraiLogger {
follower?.logError(any)
}
override fun logError(e: Throwable) = log(e)
final override fun logDebug(any: Any?) {
logDebug0(any)
follower?.logDebug(any)

View File

@ -47,7 +47,7 @@ private fun readTestAccount(): BotAccount? {
suspend fun Bot.messageDSL() {
//监听所有 bot 的来自所有群和好友的消息
subscribeMessages {
replyCase("你好") { "你好" }
"你好" reply "你好!"
has<Image> {
// this: SenderAndMessage
@ -65,9 +65,9 @@ suspend fun Bot.messageDSL() {
reply(message)
}
replyContains("123") { "你的消息里面包含 123" }
"123" reply "你的消息里面包含 123"
replyCase("我的qq") { sender.id.toString() }
"我的qq" reply { sender.id.toString() }
sentBy(1040400290) {
reply("是你!")

View File

@ -2,7 +2,10 @@ apply plugin: "kotlin"
apply plugin: "java"
dependencies {
compile project(":mirai-core")
implementation project(":mirai-core")
api group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib-jdk8', version: kotlin_version
api group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: coroutines_version
compile group: 'com.alibaba', name: 'fastjson', version: '1.2.62'
implementation 'org.jsoup:jsoup:1.12.1'
}