From afb093ba44f5fce0725eeed6b83f18048aebda1a Mon Sep 17 00:00:00 2001 From: Him188 Date: Wed, 8 Apr 2020 13:50:48 +0800 Subject: [PATCH] Fix massive concurrent loading --- .../mirai/qqandroid/QQAndroidBot.common.kt | 3 ++- .../network/QQAndroidBotNetworkHandler.kt | 27 ++++++++----------- .../network/protocol/packet/PacketFactory.kt | 11 +++++--- .../packet/chat/receive/OnlinePush.kt | 8 ++++-- .../mirai/qqandroid/utils/ByteArrayPool.kt | 18 ++++++++++++- .../mamoe/mirai/qqandroid/utils/io/input.kt | 11 +++----- 6 files changed, 47 insertions(+), 31 deletions(-) diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/QQAndroidBot.common.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/QQAndroidBot.common.kt index ebbc20bcc..0c437b91c 100644 --- a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/QQAndroidBot.common.kt +++ b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/QQAndroidBot.common.kt @@ -186,7 +186,8 @@ internal abstract class QQAndroidBotBase constructor( // internally visible only fun getGroupByUin(uin: Long): Group { - return getGroupByUinOrNull(uin) ?: throw NoSuchElementException("Group $uin not found") + return getGroupByUinOrNull(uin) + ?: throw NoSuchElementException("Group ${Group.calculateGroupCodeByGroupUin(uin)} not found") } fun getGroupByUinOrNull(uin: Long): Group? { diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/QQAndroidBotNetworkHandler.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/QQAndroidBotNetworkHandler.kt index f3287dbf0..a2be7fc0f 100644 --- a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/QQAndroidBotNetworkHandler.kt +++ b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/QQAndroidBotNetworkHandler.kt @@ -15,7 +15,6 @@ import kotlinx.coroutines.* import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.io.core.ByteReadPacket -import kotlinx.io.core.Input import kotlinx.io.core.buildPacket import kotlinx.io.core.use import net.mamoe.mirai.event.* @@ -236,15 +235,8 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler data.friendList.forEach { // atomic add bot.friends.delegate.addLast( - QQImpl( - bot, - bot.coroutineContext, - it.friendUin, - FriendInfoImpl(it) - ) - ).also { - currentFriendCount++ - } + QQImpl(bot, bot.coroutineContext, it.friendUin, FriendInfoImpl(it)) + ).also { currentFriendCount++ } } logger.verbose { "正在加载好友列表 ${currentFriendCount}/${totalFriendCount}" } if (currentFriendCount >= totalFriendCount) { @@ -264,8 +256,8 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler val troopListData = FriendList.GetTroopListSimplify(bot.client) .sendAndExpect(retry = 3) - troopListData.groups.chunked(100).forEach { chunk -> - coroutineScope { + troopListData.groups.chunked(50).forEach { chunk -> + supervisorScope { chunk.forEach { troopNum -> // 别用 fun, 别 val, 编译失败警告 lateinit var loadGroup: suspend () -> Unit @@ -326,7 +318,7 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler } } - supervisorScope { + runCatching { withTimeoutOrNull(30000) { lateinit var listener: Listener listener = this.subscribeAlways { @@ -337,6 +329,9 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler MessageSvc.PbGetMsg(bot.client, MsgSvc.SyncFlag.START, currentTimeSeconds).sendAndExpect() } ?: error("timeout syncing friend message history") + }.exceptionOrNull()?.let { + logger.error("exception while loading syncing friend message history: ${it.message}") + logger.error(it) } bot.firstLoginSucceed = true @@ -404,7 +399,7 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler * @param input 一个完整的包的内容, 去掉开头的 int 包长度 */ @OptIn(ExperimentalCoroutinesApi::class) - fun parsePacketAsync(input: Input): Job { + fun parsePacketAsync(input: ByteReadPacket): Job { return this.launch( start = CoroutineStart.ATOMIC ) { @@ -423,12 +418,12 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler * * @param input 一个完整的包的内容, 去掉开头的 int 包长度 */ - suspend fun parsePacket(input: Input) { + suspend fun parsePacket(input: ByteReadPacket) { generifiedParsePacket(input) } // with generic type, less mistakes - private suspend fun

generifiedParsePacket(input: Input) { + private suspend fun

generifiedParsePacket(input: ByteReadPacket) { KnownPacketFactories.parseIncomingPacket( bot, input diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/PacketFactory.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/PacketFactory.kt index 8bc0f5301..d8e34df2b 100644 --- a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/PacketFactory.kt +++ b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/PacketFactory.kt @@ -10,7 +10,6 @@ package net.mamoe.mirai.qqandroid.network.protocol.packet import kotlinx.io.core.* -import kotlinx.io.pool.useInstance import net.mamoe.mirai.event.Event import net.mamoe.mirai.qqandroid.QQAndroidBot import net.mamoe.mirai.qqandroid.network.Packet @@ -174,7 +173,11 @@ internal object KnownPacketFactories { // do not inline. Exceptions thrown will not be reported correctly @OptIn(MiraiInternalAPI::class) @Suppress("UNCHECKED_CAST") - suspend fun parseIncomingPacket(bot: QQAndroidBot, rawInput: Input, consumer: PacketConsumer) = + suspend fun parseIncomingPacket( + bot: QQAndroidBot, + rawInput: ByteReadPacket, + consumer: PacketConsumer + ) = with(rawInput) { // login val flag1 = readInt() @@ -190,7 +193,7 @@ internal object KnownPacketFactories { readString(readInt() - 4)// uinAccount - ByteArrayPool.useInstance { data -> + ByteArrayPool.useInstance(this.remaining.toInt()) { data -> val size = this.readAvailable(data) kotlin.runCatching { @@ -377,7 +380,7 @@ internal object KnownPacketFactories { } 0 -> { val data = if (bot.client.loginState == 0) { - ByteArrayPool.useInstance { byteArrayBuffer -> + ByteArrayPool.useInstance(this.remaining.toInt()) { byteArrayBuffer -> val size = (this.remaining - 1).toInt() this.readFully(byteArrayBuffer, 0, size) diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/chat/receive/OnlinePush.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/chat/receive/OnlinePush.kt index f2091dac8..07c92da3e 100644 --- a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/chat/receive/OnlinePush.kt +++ b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/chat/receive/OnlinePush.kt @@ -35,6 +35,7 @@ import net.mamoe.mirai.qqandroid.network.protocol.data.proto.TroopTips0x857 import net.mamoe.mirai.qqandroid.network.protocol.packet.IncomingPacketFactory import net.mamoe.mirai.qqandroid.network.protocol.packet.OutgoingPacket import net.mamoe.mirai.qqandroid.network.protocol.packet.buildResponseUniPacket +import net.mamoe.mirai.qqandroid.utils._miraiContentToString import net.mamoe.mirai.qqandroid.utils.io.JceStruct import net.mamoe.mirai.qqandroid.utils.io.readString import net.mamoe.mirai.qqandroid.utils.io.serialization.decodeUniPacket @@ -90,7 +91,7 @@ internal class OnlinePush { flags and 8 != 0 -> MemberPermission.OWNER flags == 0 -> MemberPermission.MEMBER else -> { - bot.logger.warning("判断群员权限失败") + bot.logger.warning("判断群员权限失败: ${pbPushMsg.msg.msgHead._miraiContentToString()}. 请完整截图或复制此日志发送给 mirai 维护者以帮助解决问题.") MemberPermission.MEMBER } } @@ -161,7 +162,7 @@ internal class OnlinePush { val groupUin = content.fromUin when (type) { - 0x82 -> { + 0x82 -> { // 2020/4/8: 在这里拿到了一个 Group xxx not found bot.getGroupByUin(groupUin).let { group -> val member = group.getOrNull(target) as? MemberImpl ?: return NoPacket return MemberLeaveEvent.Quit(member.also { @@ -309,6 +310,9 @@ internal class OnlinePush { val dataBytes = this.readBytes(26) val size = this.readByte().toInt() // orthodox, don't `readUByte` if (size < 0) { + // java.lang.IllegalStateException: negative array size: -100, remaining bytes=B0 E6 99 90 D8 E8 02 98 06 01 + // java.lang.IllegalStateException: negative array size: -121, remaining bytes=03 10 D9 F7 A2 93 0D 18 E0 DB E8 CA 0B 32 22 61 34 64 31 34 64 61 64 65 65 38 32 32 34 62 64 32 35 34 65 63 37 62 62 30 33 30 66 61 36 66 61 6D 6A 38 0E 48 00 58 01 70 C8 E8 9B 07 7A AD 02 3C 7B 22 69 63 6F 6E 22 3A 22 71 71 77 61 6C 6C 65 74 5F 63 75 73 74 6F 6D 5F 74 69 70 73 5F 69 64 69 6F 6D 5F 69 63 6F 6E 2E 70 6E 67 22 2C 22 61 6C 74 22 3A 22 22 7D 3E 3C 7B 22 63 6D 64 22 3A 31 2C 22 64 61 74 61 22 3A 22 6C 69 73 74 69 64 3D 31 30 30 30 30 34 35 32 30 31 32 30 30 34 30 38 31 32 30 30 31 30 39 36 31 32 33 31 34 35 30 30 26 67 72 6F 75 70 74 79 70 65 3D 31 22 2C 22 74 65 78 74 43 6F 6C 6F 72 22 3A 22 30 78 38 37 38 42 39 39 22 2C 22 74 65 78 74 22 3A 22 E6 8E A5 E9 BE 99 E7 BA A2 E5 8C 85 E4 B8 8B E4 B8 80 E4 B8 AA E6 8B BC E9 9F B3 EF BC 9A 22 7D 3E 3C 7B 22 63 6D 64 22 3A 31 2C 22 64 61 74 61 22 3A 22 6C 69 73 74 69 64 3D 31 30 30 30 30 34 35 32 30 31 32 30 30 34 30 38 31 32 30 30 31 30 39 36 31 32 33 31 34 35 30 30 26 67 72 6F 75 70 74 79 70 65 3D 31 22 2C 22 74 65 78 74 43 6F 6C 6F 72 22 3A 22 30 78 45 36 32 35 35 35 22 2C 22 74 65 78 74 22 3A 22 64 69 6E 67 22 7D 3E 82 01 0C E8 80 81 E5 83 A7 E5 85 A5 E5 AE 9A 88 01 03 92 01 04 64 69 6E 67 A0 01 00 + // negative array size: -40, remaining bytes=D6 94 C3 8C D8 E8 02 98 06 01 error( "negative array size: $size, remaining bytes=${this.readBytes() .toUHexString()}" diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/ByteArrayPool.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/ByteArrayPool.kt index 82287aa17..e957dec82 100644 --- a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/ByteArrayPool.kt +++ b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/ByteArrayPool.kt @@ -1,6 +1,7 @@ package net.mamoe.mirai.qqandroid.utils import kotlinx.io.pool.DefaultPool +import kotlinx.io.pool.ObjectPool /** * 缓存 [ByteArray] 实例的 [ObjectPool] @@ -9,7 +10,7 @@ internal object ByteArrayPool : DefaultPool(256) { /** * 每一个 [ByteArray] 的大小 */ - const val BUFFER_SIZE: Int = 8192000 + const val BUFFER_SIZE: Int = 81920 / 2 override fun produceInstance(): ByteArray = ByteArray(BUFFER_SIZE) @@ -22,4 +23,19 @@ internal object ByteArrayPool : DefaultPool(256) { fun checkBufferSize(size: Long) { require(size <= BUFFER_SIZE) { "sizePerPacket is too large. Maximum buffer size=$BUFFER_SIZE" } } + + /** + * 请求一个大小至少为 [requestedSize] 的 [ByteArray] 实例. + */ // 不要写为扩展函数. 它需要优先于 kotlinx.io 的扩展函数 resolve + inline fun useInstance(requestedSize: Int = 0, block: (ByteArray) -> R): R { + if (requestedSize > BUFFER_SIZE) { + return ByteArray(requestedSize).run(block) + } + val instance = borrow() + try { + return block(instance) + } finally { + recycle(instance) + } + } } \ No newline at end of file diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/io/input.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/io/input.kt index 57f7265c6..41154986d 100644 --- a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/io/input.kt +++ b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/io/input.kt @@ -13,11 +13,12 @@ package net.mamoe.mirai.qqandroid.utils.io -import kotlinx.io.OutputStream import kotlinx.io.charsets.Charset import kotlinx.io.charsets.Charsets import kotlinx.io.core.* -import kotlinx.io.pool.useInstance +import net.mamoe.mirai.qqandroid.utils.ByteArrayPool +import net.mamoe.mirai.qqandroid.utils.toReadPacket +import net.mamoe.mirai.qqandroid.utils.toUHexString import net.mamoe.mirai.utils.MiraiInternalAPI import kotlin.contracts.ExperimentalContracts import kotlin.contracts.InvocationKind @@ -25,16 +26,12 @@ import kotlin.contracts.contract import kotlin.jvm.JvmMultifileClass import kotlin.jvm.JvmName import kotlin.jvm.JvmSynthetic -import kotlinx.serialization.InternalSerializationApi -import net.mamoe.mirai.qqandroid.utils.ByteArrayPool -import net.mamoe.mirai.qqandroid.utils.toReadPacket -import net.mamoe.mirai.qqandroid.utils.toUHexString @MiraiInternalAPI internal inline fun ByteReadPacket.useBytes( n: Int = remaining.toInt(),//not that safe but adequate block: (data: ByteArray, length: Int) -> R -): R = ByteArrayPool.useInstance { +): R = ByteArrayPool.useInstance(n) { this.readFully(it, 0, n) block(it, n) }