mirror of
https://github.com/mamoe/mirai.git
synced 2025-03-28 08:40:09 +08:00
Fix massive concurrent loading
This commit is contained in:
parent
6a7ece1cb8
commit
afb093ba44
@ -186,7 +186,8 @@ internal abstract class QQAndroidBotBase constructor(
|
|||||||
|
|
||||||
// internally visible only
|
// internally visible only
|
||||||
fun getGroupByUin(uin: Long): Group {
|
fun getGroupByUin(uin: Long): Group {
|
||||||
return getGroupByUinOrNull(uin) ?: throw NoSuchElementException("Group $uin not found")
|
return getGroupByUinOrNull(uin)
|
||||||
|
?: throw NoSuchElementException("Group ${Group.calculateGroupCodeByGroupUin(uin)} not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
fun getGroupByUinOrNull(uin: Long): Group? {
|
fun getGroupByUinOrNull(uin: Long): Group? {
|
||||||
|
@ -15,7 +15,6 @@ import kotlinx.coroutines.*
|
|||||||
import kotlinx.coroutines.sync.Mutex
|
import kotlinx.coroutines.sync.Mutex
|
||||||
import kotlinx.coroutines.sync.withLock
|
import kotlinx.coroutines.sync.withLock
|
||||||
import kotlinx.io.core.ByteReadPacket
|
import kotlinx.io.core.ByteReadPacket
|
||||||
import kotlinx.io.core.Input
|
|
||||||
import kotlinx.io.core.buildPacket
|
import kotlinx.io.core.buildPacket
|
||||||
import kotlinx.io.core.use
|
import kotlinx.io.core.use
|
||||||
import net.mamoe.mirai.event.*
|
import net.mamoe.mirai.event.*
|
||||||
@ -236,15 +235,8 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
|
|||||||
data.friendList.forEach {
|
data.friendList.forEach {
|
||||||
// atomic add
|
// atomic add
|
||||||
bot.friends.delegate.addLast(
|
bot.friends.delegate.addLast(
|
||||||
QQImpl(
|
QQImpl(bot, bot.coroutineContext, it.friendUin, FriendInfoImpl(it))
|
||||||
bot,
|
).also { currentFriendCount++ }
|
||||||
bot.coroutineContext,
|
|
||||||
it.friendUin,
|
|
||||||
FriendInfoImpl(it)
|
|
||||||
)
|
|
||||||
).also {
|
|
||||||
currentFriendCount++
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
logger.verbose { "正在加载好友列表 ${currentFriendCount}/${totalFriendCount}" }
|
logger.verbose { "正在加载好友列表 ${currentFriendCount}/${totalFriendCount}" }
|
||||||
if (currentFriendCount >= totalFriendCount) {
|
if (currentFriendCount >= totalFriendCount) {
|
||||||
@ -264,8 +256,8 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
|
|||||||
val troopListData = FriendList.GetTroopListSimplify(bot.client)
|
val troopListData = FriendList.GetTroopListSimplify(bot.client)
|
||||||
.sendAndExpect<FriendList.GetTroopListSimplify.Response>(retry = 3)
|
.sendAndExpect<FriendList.GetTroopListSimplify.Response>(retry = 3)
|
||||||
|
|
||||||
troopListData.groups.chunked(100).forEach { chunk ->
|
troopListData.groups.chunked(50).forEach { chunk ->
|
||||||
coroutineScope {
|
supervisorScope {
|
||||||
chunk.forEach { troopNum ->
|
chunk.forEach { troopNum ->
|
||||||
// 别用 fun, 别 val, 编译失败警告
|
// 别用 fun, 别 val, 编译失败警告
|
||||||
lateinit var loadGroup: suspend () -> Unit
|
lateinit var loadGroup: suspend () -> Unit
|
||||||
@ -326,7 +318,7 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
supervisorScope {
|
runCatching {
|
||||||
withTimeoutOrNull(30000) {
|
withTimeoutOrNull(30000) {
|
||||||
lateinit var listener: Listener<PacketReceivedEvent>
|
lateinit var listener: Listener<PacketReceivedEvent>
|
||||||
listener = this.subscribeAlways {
|
listener = this.subscribeAlways {
|
||||||
@ -337,6 +329,9 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
|
|||||||
|
|
||||||
MessageSvc.PbGetMsg(bot.client, MsgSvc.SyncFlag.START, currentTimeSeconds).sendAndExpect<Packet>()
|
MessageSvc.PbGetMsg(bot.client, MsgSvc.SyncFlag.START, currentTimeSeconds).sendAndExpect<Packet>()
|
||||||
} ?: error("timeout syncing friend message history")
|
} ?: error("timeout syncing friend message history")
|
||||||
|
}.exceptionOrNull()?.let {
|
||||||
|
logger.error("exception while loading syncing friend message history: ${it.message}")
|
||||||
|
logger.error(it)
|
||||||
}
|
}
|
||||||
bot.firstLoginSucceed = true
|
bot.firstLoginSucceed = true
|
||||||
|
|
||||||
@ -404,7 +399,7 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
|
|||||||
* @param input 一个完整的包的内容, 去掉开头的 int 包长度
|
* @param input 一个完整的包的内容, 去掉开头的 int 包长度
|
||||||
*/
|
*/
|
||||||
@OptIn(ExperimentalCoroutinesApi::class)
|
@OptIn(ExperimentalCoroutinesApi::class)
|
||||||
fun parsePacketAsync(input: Input): Job {
|
fun parsePacketAsync(input: ByteReadPacket): Job {
|
||||||
return this.launch(
|
return this.launch(
|
||||||
start = CoroutineStart.ATOMIC
|
start = CoroutineStart.ATOMIC
|
||||||
) {
|
) {
|
||||||
@ -423,12 +418,12 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
|
|||||||
*
|
*
|
||||||
* @param input 一个完整的包的内容, 去掉开头的 int 包长度
|
* @param input 一个完整的包的内容, 去掉开头的 int 包长度
|
||||||
*/
|
*/
|
||||||
suspend fun parsePacket(input: Input) {
|
suspend fun parsePacket(input: ByteReadPacket) {
|
||||||
generifiedParsePacket<Packet>(input)
|
generifiedParsePacket<Packet>(input)
|
||||||
}
|
}
|
||||||
|
|
||||||
// with generic type, less mistakes
|
// with generic type, less mistakes
|
||||||
private suspend fun <P : Packet?> generifiedParsePacket(input: Input) {
|
private suspend fun <P : Packet?> generifiedParsePacket(input: ByteReadPacket) {
|
||||||
KnownPacketFactories.parseIncomingPacket(
|
KnownPacketFactories.parseIncomingPacket(
|
||||||
bot,
|
bot,
|
||||||
input
|
input
|
||||||
|
@ -10,7 +10,6 @@
|
|||||||
package net.mamoe.mirai.qqandroid.network.protocol.packet
|
package net.mamoe.mirai.qqandroid.network.protocol.packet
|
||||||
|
|
||||||
import kotlinx.io.core.*
|
import kotlinx.io.core.*
|
||||||
import kotlinx.io.pool.useInstance
|
|
||||||
import net.mamoe.mirai.event.Event
|
import net.mamoe.mirai.event.Event
|
||||||
import net.mamoe.mirai.qqandroid.QQAndroidBot
|
import net.mamoe.mirai.qqandroid.QQAndroidBot
|
||||||
import net.mamoe.mirai.qqandroid.network.Packet
|
import net.mamoe.mirai.qqandroid.network.Packet
|
||||||
@ -174,7 +173,11 @@ internal object KnownPacketFactories {
|
|||||||
// do not inline. Exceptions thrown will not be reported correctly
|
// do not inline. Exceptions thrown will not be reported correctly
|
||||||
@OptIn(MiraiInternalAPI::class)
|
@OptIn(MiraiInternalAPI::class)
|
||||||
@Suppress("UNCHECKED_CAST")
|
@Suppress("UNCHECKED_CAST")
|
||||||
suspend fun <T : Packet?> parseIncomingPacket(bot: QQAndroidBot, rawInput: Input, consumer: PacketConsumer<T>) =
|
suspend fun <T : Packet?> parseIncomingPacket(
|
||||||
|
bot: QQAndroidBot,
|
||||||
|
rawInput: ByteReadPacket,
|
||||||
|
consumer: PacketConsumer<T>
|
||||||
|
) =
|
||||||
with(rawInput) {
|
with(rawInput) {
|
||||||
// login
|
// login
|
||||||
val flag1 = readInt()
|
val flag1 = readInt()
|
||||||
@ -190,7 +193,7 @@ internal object KnownPacketFactories {
|
|||||||
|
|
||||||
readString(readInt() - 4)// uinAccount
|
readString(readInt() - 4)// uinAccount
|
||||||
|
|
||||||
ByteArrayPool.useInstance { data ->
|
ByteArrayPool.useInstance(this.remaining.toInt()) { data ->
|
||||||
val size = this.readAvailable(data)
|
val size = this.readAvailable(data)
|
||||||
|
|
||||||
kotlin.runCatching {
|
kotlin.runCatching {
|
||||||
@ -377,7 +380,7 @@ internal object KnownPacketFactories {
|
|||||||
}
|
}
|
||||||
0 -> {
|
0 -> {
|
||||||
val data = if (bot.client.loginState == 0) {
|
val data = if (bot.client.loginState == 0) {
|
||||||
ByteArrayPool.useInstance { byteArrayBuffer ->
|
ByteArrayPool.useInstance(this.remaining.toInt()) { byteArrayBuffer ->
|
||||||
val size = (this.remaining - 1).toInt()
|
val size = (this.remaining - 1).toInt()
|
||||||
this.readFully(byteArrayBuffer, 0, size)
|
this.readFully(byteArrayBuffer, 0, size)
|
||||||
|
|
||||||
|
@ -35,6 +35,7 @@ import net.mamoe.mirai.qqandroid.network.protocol.data.proto.TroopTips0x857
|
|||||||
import net.mamoe.mirai.qqandroid.network.protocol.packet.IncomingPacketFactory
|
import net.mamoe.mirai.qqandroid.network.protocol.packet.IncomingPacketFactory
|
||||||
import net.mamoe.mirai.qqandroid.network.protocol.packet.OutgoingPacket
|
import net.mamoe.mirai.qqandroid.network.protocol.packet.OutgoingPacket
|
||||||
import net.mamoe.mirai.qqandroid.network.protocol.packet.buildResponseUniPacket
|
import net.mamoe.mirai.qqandroid.network.protocol.packet.buildResponseUniPacket
|
||||||
|
import net.mamoe.mirai.qqandroid.utils._miraiContentToString
|
||||||
import net.mamoe.mirai.qqandroid.utils.io.JceStruct
|
import net.mamoe.mirai.qqandroid.utils.io.JceStruct
|
||||||
import net.mamoe.mirai.qqandroid.utils.io.readString
|
import net.mamoe.mirai.qqandroid.utils.io.readString
|
||||||
import net.mamoe.mirai.qqandroid.utils.io.serialization.decodeUniPacket
|
import net.mamoe.mirai.qqandroid.utils.io.serialization.decodeUniPacket
|
||||||
@ -90,7 +91,7 @@ internal class OnlinePush {
|
|||||||
flags and 8 != 0 -> MemberPermission.OWNER
|
flags and 8 != 0 -> MemberPermission.OWNER
|
||||||
flags == 0 -> MemberPermission.MEMBER
|
flags == 0 -> MemberPermission.MEMBER
|
||||||
else -> {
|
else -> {
|
||||||
bot.logger.warning("判断群员权限失败")
|
bot.logger.warning("判断群员权限失败: ${pbPushMsg.msg.msgHead._miraiContentToString()}. 请完整截图或复制此日志发送给 mirai 维护者以帮助解决问题.")
|
||||||
MemberPermission.MEMBER
|
MemberPermission.MEMBER
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -161,7 +162,7 @@ internal class OnlinePush {
|
|||||||
val groupUin = content.fromUin
|
val groupUin = content.fromUin
|
||||||
|
|
||||||
when (type) {
|
when (type) {
|
||||||
0x82 -> {
|
0x82 -> { // 2020/4/8: 在这里拿到了一个 Group xxx not found
|
||||||
bot.getGroupByUin(groupUin).let { group ->
|
bot.getGroupByUin(groupUin).let { group ->
|
||||||
val member = group.getOrNull(target) as? MemberImpl ?: return NoPacket
|
val member = group.getOrNull(target) as? MemberImpl ?: return NoPacket
|
||||||
return MemberLeaveEvent.Quit(member.also {
|
return MemberLeaveEvent.Quit(member.also {
|
||||||
@ -309,6 +310,9 @@ internal class OnlinePush {
|
|||||||
val dataBytes = this.readBytes(26)
|
val dataBytes = this.readBytes(26)
|
||||||
val size = this.readByte().toInt() // orthodox, don't `readUByte`
|
val size = this.readByte().toInt() // orthodox, don't `readUByte`
|
||||||
if (size < 0) {
|
if (size < 0) {
|
||||||
|
// java.lang.IllegalStateException: negative array size: -100, remaining bytes=B0 E6 99 90 D8 E8 02 98 06 01
|
||||||
|
// java.lang.IllegalStateException: negative array size: -121, remaining bytes=03 10 D9 F7 A2 93 0D 18 E0 DB E8 CA 0B 32 22 61 34 64 31 34 64 61 64 65 65 38 32 32 34 62 64 32 35 34 65 63 37 62 62 30 33 30 66 61 36 66 61 6D 6A 38 0E 48 00 58 01 70 C8 E8 9B 07 7A AD 02 3C 7B 22 69 63 6F 6E 22 3A 22 71 71 77 61 6C 6C 65 74 5F 63 75 73 74 6F 6D 5F 74 69 70 73 5F 69 64 69 6F 6D 5F 69 63 6F 6E 2E 70 6E 67 22 2C 22 61 6C 74 22 3A 22 22 7D 3E 3C 7B 22 63 6D 64 22 3A 31 2C 22 64 61 74 61 22 3A 22 6C 69 73 74 69 64 3D 31 30 30 30 30 34 35 32 30 31 32 30 30 34 30 38 31 32 30 30 31 30 39 36 31 32 33 31 34 35 30 30 26 67 72 6F 75 70 74 79 70 65 3D 31 22 2C 22 74 65 78 74 43 6F 6C 6F 72 22 3A 22 30 78 38 37 38 42 39 39 22 2C 22 74 65 78 74 22 3A 22 E6 8E A5 E9 BE 99 E7 BA A2 E5 8C 85 E4 B8 8B E4 B8 80 E4 B8 AA E6 8B BC E9 9F B3 EF BC 9A 22 7D 3E 3C 7B 22 63 6D 64 22 3A 31 2C 22 64 61 74 61 22 3A 22 6C 69 73 74 69 64 3D 31 30 30 30 30 34 35 32 30 31 32 30 30 34 30 38 31 32 30 30 31 30 39 36 31 32 33 31 34 35 30 30 26 67 72 6F 75 70 74 79 70 65 3D 31 22 2C 22 74 65 78 74 43 6F 6C 6F 72 22 3A 22 30 78 45 36 32 35 35 35 22 2C 22 74 65 78 74 22 3A 22 64 69 6E 67 22 7D 3E 82 01 0C E8 80 81 E5 83 A7 E5 85 A5 E5 AE 9A 88 01 03 92 01 04 64 69 6E 67 A0 01 00
|
||||||
|
// negative array size: -40, remaining bytes=D6 94 C3 8C D8 E8 02 98 06 01
|
||||||
error(
|
error(
|
||||||
"negative array size: $size, remaining bytes=${this.readBytes()
|
"negative array size: $size, remaining bytes=${this.readBytes()
|
||||||
.toUHexString()}"
|
.toUHexString()}"
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package net.mamoe.mirai.qqandroid.utils
|
package net.mamoe.mirai.qqandroid.utils
|
||||||
|
|
||||||
import kotlinx.io.pool.DefaultPool
|
import kotlinx.io.pool.DefaultPool
|
||||||
|
import kotlinx.io.pool.ObjectPool
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 缓存 [ByteArray] 实例的 [ObjectPool]
|
* 缓存 [ByteArray] 实例的 [ObjectPool]
|
||||||
@ -9,7 +10,7 @@ internal object ByteArrayPool : DefaultPool<ByteArray>(256) {
|
|||||||
/**
|
/**
|
||||||
* 每一个 [ByteArray] 的大小
|
* 每一个 [ByteArray] 的大小
|
||||||
*/
|
*/
|
||||||
const val BUFFER_SIZE: Int = 8192000
|
const val BUFFER_SIZE: Int = 81920 / 2
|
||||||
|
|
||||||
override fun produceInstance(): ByteArray = ByteArray(BUFFER_SIZE)
|
override fun produceInstance(): ByteArray = ByteArray(BUFFER_SIZE)
|
||||||
|
|
||||||
@ -22,4 +23,19 @@ internal object ByteArrayPool : DefaultPool<ByteArray>(256) {
|
|||||||
fun checkBufferSize(size: Long) {
|
fun checkBufferSize(size: Long) {
|
||||||
require(size <= BUFFER_SIZE) { "sizePerPacket is too large. Maximum buffer size=$BUFFER_SIZE" }
|
require(size <= BUFFER_SIZE) { "sizePerPacket is too large. Maximum buffer size=$BUFFER_SIZE" }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 请求一个大小至少为 [requestedSize] 的 [ByteArray] 实例.
|
||||||
|
*/ // 不要写为扩展函数. 它需要优先于 kotlinx.io 的扩展函数 resolve
|
||||||
|
inline fun <R> useInstance(requestedSize: Int = 0, block: (ByteArray) -> R): R {
|
||||||
|
if (requestedSize > BUFFER_SIZE) {
|
||||||
|
return ByteArray(requestedSize).run(block)
|
||||||
|
}
|
||||||
|
val instance = borrow()
|
||||||
|
try {
|
||||||
|
return block(instance)
|
||||||
|
} finally {
|
||||||
|
recycle(instance)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
@ -13,11 +13,12 @@
|
|||||||
|
|
||||||
package net.mamoe.mirai.qqandroid.utils.io
|
package net.mamoe.mirai.qqandroid.utils.io
|
||||||
|
|
||||||
import kotlinx.io.OutputStream
|
|
||||||
import kotlinx.io.charsets.Charset
|
import kotlinx.io.charsets.Charset
|
||||||
import kotlinx.io.charsets.Charsets
|
import kotlinx.io.charsets.Charsets
|
||||||
import kotlinx.io.core.*
|
import kotlinx.io.core.*
|
||||||
import kotlinx.io.pool.useInstance
|
import net.mamoe.mirai.qqandroid.utils.ByteArrayPool
|
||||||
|
import net.mamoe.mirai.qqandroid.utils.toReadPacket
|
||||||
|
import net.mamoe.mirai.qqandroid.utils.toUHexString
|
||||||
import net.mamoe.mirai.utils.MiraiInternalAPI
|
import net.mamoe.mirai.utils.MiraiInternalAPI
|
||||||
import kotlin.contracts.ExperimentalContracts
|
import kotlin.contracts.ExperimentalContracts
|
||||||
import kotlin.contracts.InvocationKind
|
import kotlin.contracts.InvocationKind
|
||||||
@ -25,16 +26,12 @@ import kotlin.contracts.contract
|
|||||||
import kotlin.jvm.JvmMultifileClass
|
import kotlin.jvm.JvmMultifileClass
|
||||||
import kotlin.jvm.JvmName
|
import kotlin.jvm.JvmName
|
||||||
import kotlin.jvm.JvmSynthetic
|
import kotlin.jvm.JvmSynthetic
|
||||||
import kotlinx.serialization.InternalSerializationApi
|
|
||||||
import net.mamoe.mirai.qqandroid.utils.ByteArrayPool
|
|
||||||
import net.mamoe.mirai.qqandroid.utils.toReadPacket
|
|
||||||
import net.mamoe.mirai.qqandroid.utils.toUHexString
|
|
||||||
|
|
||||||
@MiraiInternalAPI
|
@MiraiInternalAPI
|
||||||
internal inline fun <R> ByteReadPacket.useBytes(
|
internal inline fun <R> ByteReadPacket.useBytes(
|
||||||
n: Int = remaining.toInt(),//not that safe but adequate
|
n: Int = remaining.toInt(),//not that safe but adequate
|
||||||
block: (data: ByteArray, length: Int) -> R
|
block: (data: ByteArray, length: Int) -> R
|
||||||
): R = ByteArrayPool.useInstance {
|
): R = ByteArrayPool.useInstance(n) {
|
||||||
this.readFully(it, 0, n)
|
this.readFully(it, 0, n)
|
||||||
block(it, n)
|
block(it, n)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user