Fix friend message sequence sync with multiple bots

This commit is contained in:
Him188 2021-01-29 11:05:01 +08:00
parent 245d08051b
commit 22ac89207a
2 changed files with 17 additions and 9 deletions

View File

@ -538,7 +538,7 @@ internal class QQAndroidBotNetworkHandler(coroutineContext: CoroutineContext, bo
private suspend fun syncMessageSvc() {
logger.info { "Syncing friend message history..." }
withTimeoutOrNull(30000) {
launch(CoroutineName("Syncing friend message history")) { syncFromEvent<MessageSvcPbGetMsg.GetMsgSuccess, Unit> { } }
launch(CoroutineName("Syncing friend message history")) { nextEvent<MessageSvcPbGetMsg.GetMsgSuccess> { it.bot == this@QQAndroidBotNetworkHandler.bot } }
MessageSvcPbGetMsg(bot.client, MsgSvc.SyncFlag.START, null).sendAndExpect<Packet>()
} ?: error("timeout syncing friend message history.")
@ -791,9 +791,15 @@ internal class QQAndroidBotNetworkHandler(coroutineContext: CoroutineContext, bo
require(timeoutMillis > 100) { "timeoutMillis must > 100" }
require(retry in 0..10) { "retry must in 0..10" }
check(bot.isActive) { "bot is dead therefore can't send ${this.commandName}" }
check(this@QQAndroidBotNetworkHandler.isActive) { "network is dead therefore can't send any packet" }
check(channel.isOpen) { "network channel is closed" }
if (!bot.isActive) {
throw CancellationException("bot is dead therefore can't send ${this.commandName}")
}
if (!this@QQAndroidBotNetworkHandler.isActive) {
throw CancellationException("network is dead therefore can't send any packet")
}
if (!channel.isOpen) {
throw CancellationException("network channel is closed")
}
val data = this.delegate.withUse { readBytes() }

View File

@ -91,7 +91,7 @@ internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Re
)
}
open class GetMsgSuccess(delegate: List<Packet>, syncCookie: ByteArray?) : Response(
open class GetMsgSuccess(delegate: List<Packet>, syncCookie: ByteArray?, val bot: QQAndroidBot) : Response(
MsgSvc.SyncFlag.STOP, delegate,
syncCookie
), Event,
@ -116,7 +116,9 @@ internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Re
"MessageSvcPbGetMsg.Response(syncFlagFromServer=$syncFlagFromServer, messages=<Iterable>))"
}
object EmptyResponse : GetMsgSuccess(emptyList(), null)
class EmptyResponse(
bot: QQAndroidBot
) : GetMsgSuccess(emptyList(), null, bot)
@OptIn(FlowPreview::class)
override suspend fun ByteReadPacket.decode(bot: QQAndroidBot): Response {
@ -132,7 +134,7 @@ internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Re
MessageSvcPbGetMsg(bot.client, syncCookie = null).sendWithoutExpect()
}
}
return EmptyResponse
return EmptyResponse(bot)
}
when (resp.msgRspType) {
0 -> {
@ -155,7 +157,7 @@ internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Re
bot.client.syncingController.msgCtrlBuf = resp.msgCtrlBuf
if (resp.uinPairMsgs.isEmpty()) {
return EmptyResponse
return EmptyResponse(bot)
}
val messages = resp.uinPairMsgs.asFlow()
@ -182,7 +184,7 @@ internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Re
val list: List<Packet> = messages.toList()
if (resp.syncFlag == MsgSvc.SyncFlag.STOP) {
return GetMsgSuccess(list, resp.syncCookie)
return GetMsgSuccess(list, resp.syncCookie, bot)
}
return Response(resp.syncFlag, list, resp.syncCookie)
}