Fix receiving messages repeatedly

This commit is contained in:
mzdluo123 2020-08-21 17:57:34 +08:00
parent 72cccc74a0
commit 8e3fb15556
No known key found for this signature in database
GPG Key ID: 9F7BC2C154107A1D
2 changed files with 50 additions and 19 deletions

View File

@ -19,7 +19,7 @@ internal class MsgSvc : ProtoBuf {
@ProtoNumber(1) @JvmField val result: Int = 0, @ProtoNumber(1) @JvmField val result: Int = 0,
@ProtoNumber(2) @JvmField val errmsg: String = "", @ProtoNumber(2) @JvmField val errmsg: String = "",
@ProtoNumber(3) @JvmField val syncCookie: ByteArray? = EMPTY_BYTE_ARRAY, @ProtoNumber(3) @JvmField val syncCookie: ByteArray? = EMPTY_BYTE_ARRAY,
@ProtoNumber(4) @JvmField val syncFlag: SyncFlag, @ProtoNumber(4) @JvmField val syncFlag: SyncFlag = SyncFlag.CONTINUE,
@ProtoNumber(5) @JvmField val uinPairMsgs: List<MsgComm.UinPairMsg>? = null, @ProtoNumber(5) @JvmField val uinPairMsgs: List<MsgComm.UinPairMsg>? = null,
@ProtoNumber(6) @JvmField val bindUin: Long = 0L, @ProtoNumber(6) @JvmField val bindUin: Long = 0L,
@ProtoNumber(7) @JvmField val msgRspType: Int = 0, @ProtoNumber(7) @JvmField val msgRspType: Int = 0,

View File

@ -14,6 +14,7 @@ package net.mamoe.mirai.qqandroid.network.protocol.packet.chat.receive
import kotlinx.atomicfu.loop import kotlinx.atomicfu.loop
import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import kotlinx.io.core.ByteReadPacket import kotlinx.io.core.ByteReadPacket
import kotlinx.io.core.discardExact import kotlinx.io.core.discardExact
@ -43,6 +44,7 @@ import net.mamoe.mirai.qqandroid.network.protocol.packet.buildOutgoingUniPacket
import net.mamoe.mirai.qqandroid.network.protocol.packet.chat.GroupInfoImpl import net.mamoe.mirai.qqandroid.network.protocol.packet.chat.GroupInfoImpl
import net.mamoe.mirai.qqandroid.network.protocol.packet.chat.NewContact import net.mamoe.mirai.qqandroid.network.protocol.packet.chat.NewContact
import net.mamoe.mirai.qqandroid.network.protocol.packet.list.FriendList import net.mamoe.mirai.qqandroid.network.protocol.packet.list.FriendList
import net.mamoe.mirai.qqandroid.utils._miraiContentToString
import net.mamoe.mirai.qqandroid.utils.io.serialization.readProtoBuf import net.mamoe.mirai.qqandroid.utils.io.serialization.readProtoBuf
import net.mamoe.mirai.qqandroid.utils.io.serialization.writeProtoBuf import net.mamoe.mirai.qqandroid.utils.io.serialization.writeProtoBuf
import net.mamoe.mirai.qqandroid.utils.read import net.mamoe.mirai.qqandroid.utils.read
@ -56,6 +58,12 @@ import net.mamoe.mirai.utils.warning
* 获取好友消息和消息记录 * 获取好友消息和消息记录
*/ */
internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Response>("MessageSvc.PbGetMsg") { internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Response>("MessageSvc.PbGetMsg") {
private val msgUidQueue = ArrayDeque<Long>()
private val msgUidSet = hashSetOf<Long>()
private val msgQueueMutex = Mutex()
@Suppress("SpellCheckingInspection") @Suppress("SpellCheckingInspection")
operator fun invoke( operator fun invoke(
client: QQAndroidClient, client: QQAndroidClient,
@ -114,7 +122,8 @@ internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Re
private fun MsgComm.Msg.getNewMemberInfo(): MemberInfo { private fun MsgComm.Msg.getNewMemberInfo(): MemberInfo {
return object : MemberInfo { return object : MemberInfo {
override val nameCard: String get() = msgHead.authNick.takeIf { it.isNotEmpty() } override val nameCard: String
get() = msgHead.authNick.takeIf { it.isNotEmpty() }
?: msgHead.fromNick ?: msgHead.fromNick
override val permission: MemberPermission get() = MemberPermission.MEMBER override val permission: MemberPermission get() = MemberPermission.MEMBER
override val specialTitle: String get() = "" override val specialTitle: String get() = ""
@ -135,9 +144,23 @@ internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Re
.warning { "MessageSvcPushNotify: result != 0, result = ${resp.result}, errorMsg=${resp.errmsg}" } .warning { "MessageSvcPushNotify: result != 0, result = ${resp.result}, errorMsg=${resp.errmsg}" }
return EmptyResponse return EmptyResponse
} }
when (resp.msgRspType) {
0 -> {
bot.client.c2cMessageSync.syncCookie = resp.syncCookie bot.client.c2cMessageSync.syncCookie = resp.syncCookie
bot.client.c2cMessageSync.pubAccountCookie = resp.pubAccountCookie bot.client.c2cMessageSync.pubAccountCookie = resp.pubAccountCookie
}
1 -> {
bot.client.c2cMessageSync.syncCookie = resp.syncCookie
}
2 -> {
bot.client.c2cMessageSync.pubAccountCookie = resp.pubAccountCookie
}
}
// bot.logger.debug(resp.msgRspType._miraiContentToString())
// bot.logger.debug(resp.syncCookie._miraiContentToString())
bot.client.c2cMessageSync.msgCtrlBuf = resp.msgCtrlBuf bot.client.c2cMessageSync.msgCtrlBuf = resp.msgCtrlBuf
if (resp.uinPairMsgs == null) { if (resp.uinPairMsgs == null) {
@ -151,10 +174,21 @@ internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Re
.filter { msg: MsgComm.Msg -> msg.msgHead.msgTime > it.lastReadTime.toLong() and 4294967295L } .filter { msg: MsgComm.Msg -> msg.msgHead.msgTime > it.lastReadTime.toLong() and 4294967295L }
}.also { }.also {
MessageSvcPbDeleteMsg.delete(bot, it) // 删除消息 MessageSvcPbDeleteMsg.delete(bot, it) // 删除消息
// todo 实现一个锁来防止重复收到消息
} }
.mapNotNull<MsgComm.Msg, Packet> { msg -> .mapNotNull<MsgComm.Msg, Packet> { msg ->
msgQueueMutex.lock()
val msgUid = msg.msgHead.msgUid
if (msgUidSet.size > 50) {
msgUidSet.remove(msgUidQueue.removeFirst())
}
if (!msgUidSet.add(msgUid)) {
msgQueueMutex.unlock()
return@mapNotNull null
}
msgQueueMutex.unlock()
msgUidQueue.addLast(msgUid)
suspend fun createGroupForBot(groupUin: Long): Group? { suspend fun createGroupForBot(groupUin: Long): Group? {
val group = bot.getGroupByUinOrNull(groupUin) val group = bot.getGroupByUinOrNull(groupUin)
if (group != null) { if (group != null) {
@ -294,18 +328,15 @@ internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Re
return@mapNotNull null return@mapNotNull null
} }
if (friend.lastMessageSequence.compareAndSet( friend.lastMessageSequence.loop {
friend.lastMessageSequence.value, if (friend.lastMessageSequence.compareAndSet(it, msg.msgHead.msgSeq)) {
msg.msgHead.msgSeq
)
) {
return@mapNotNull FriendMessageEvent( return@mapNotNull FriendMessageEvent(
friend, friend,
msg.toMessageChain(bot, groupIdOrZero = 0, onlineSource = true), msg.toMessageChain(bot, groupIdOrZero = 0, onlineSource = true),
msg.msgHead.msgTime msg.msgHead.msgTime
) )
} else return@mapNotNull null
} }
return@mapNotNull null
} }
208 -> { 208 -> {
// friend ptt // friend ptt
@ -387,7 +418,7 @@ internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Re
MessageSvcPbGetMsg( MessageSvcPbGetMsg(
client, client,
MsgSvc.SyncFlag.CONTINUE, MsgSvc.SyncFlag.CONTINUE,
packet.syncCookie bot.client.c2cMessageSync.syncCookie
).sendAndExpect<Packet>() ).sendAndExpect<Packet>()
} }
return return
@ -398,7 +429,7 @@ internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Re
MessageSvcPbGetMsg( MessageSvcPbGetMsg(
client, client,
MsgSvc.SyncFlag.CONTINUE, MsgSvc.SyncFlag.CONTINUE,
packet.syncCookie bot.client.c2cMessageSync.syncCookie
).sendAndExpect<Packet>() ).sendAndExpect<Packet>()
} }
return return