[core] Implement a more efficient algorithm to fetch roaming messages for group:

- Added `RoamingMessagesImplGroup`.
- Dump API changes for Group RoamingMessages.
- [mock] Fix MockRoamingMessages missing MessageSource
- [core] Convert hierarchical TimeBasedRoamingMessagesImpl to common, to reduce code complexity
This commit is contained in:
Him188 2022-12-17 22:36:38 +00:00
parent db601928ea
commit f04c623658
12 changed files with 143 additions and 233 deletions

View File

@ -366,7 +366,7 @@ public abstract interface class net/mamoe/mirai/contact/Friend : kotlinx/corouti
public abstract fun setRemark (Ljava/lang/String;)V
}
public abstract interface class net/mamoe/mirai/contact/Group : kotlinx/coroutines/CoroutineScope, net/mamoe/mirai/contact/AudioSupported, net/mamoe/mirai/contact/Contact, net/mamoe/mirai/contact/FileSupported {
public abstract interface class net/mamoe/mirai/contact/Group : kotlinx/coroutines/CoroutineScope, net/mamoe/mirai/contact/AudioSupported, net/mamoe/mirai/contact/Contact, net/mamoe/mirai/contact/FileSupported, net/mamoe/mirai/contact/roaming/RoamingSupported {
public static final field Companion Lnet/mamoe/mirai/contact/Group$Companion;
public fun avatarUrl (Lnet/mamoe/mirai/contact/AvatarSpec;)Ljava/lang/String;
public abstract fun contains (J)Z

View File

@ -366,7 +366,7 @@ public abstract interface class net/mamoe/mirai/contact/Friend : kotlinx/corouti
public abstract fun setRemark (Ljava/lang/String;)V
}
public abstract interface class net/mamoe/mirai/contact/Group : kotlinx/coroutines/CoroutineScope, net/mamoe/mirai/contact/AudioSupported, net/mamoe/mirai/contact/Contact, net/mamoe/mirai/contact/FileSupported {
public abstract interface class net/mamoe/mirai/contact/Group : kotlinx/coroutines/CoroutineScope, net/mamoe/mirai/contact/AudioSupported, net/mamoe/mirai/contact/Contact, net/mamoe/mirai/contact/FileSupported, net/mamoe/mirai/contact/roaming/RoamingSupported {
public static final field Companion Lnet/mamoe/mirai/contact/Group$Companion;
public fun avatarUrl (Lnet/mamoe/mirai/contact/AvatarSpec;)Ljava/lang/String;
public abstract fun contains (J)Z

View File

@ -11,9 +11,7 @@ package net.mamoe.mirai.mock.database
import net.mamoe.mirai.contact.Contact
import net.mamoe.mirai.contact.roaming.RoamingMessageFilter
import net.mamoe.mirai.message.data.MessageChain
import net.mamoe.mirai.message.data.MessageSource
import net.mamoe.mirai.message.data.MessageSourceKind
import net.mamoe.mirai.message.data.*
import net.mamoe.mirai.mock.MockBot
import net.mamoe.mirai.mock.internal.db.MsgDatabaseImpl
import net.mamoe.mirai.utils.concatAsLong
@ -80,6 +78,29 @@ public data class MessageInfo(
public val time: Long, // seconds
public val message: MessageChain,
) {
public fun buildSource(bot: MockBot): MessageSource {
return bot.buildMessageSource(kind = kind) {
val info = this@MessageInfo
sender(info.sender)
time(info.time.toInt())
if (kind == MessageSourceKind.GROUP) {
target(subject)
} else {
if (info.sender == info.subject) {
target(bot.id)
} else {
target(info.subject)
}
}
ids = intArrayOf(info.id)
internalIds = intArrayOf(info.internal)
messages(info.message as Iterable<Message>)
}
}
// ids
public val id: Int get() = (mixinedMsgId shr 32).toInt()

View File

@ -20,6 +20,7 @@ import net.mamoe.mirai.contact.roaming.RoamingSupported
import net.mamoe.mirai.message.data.MessageChain
import net.mamoe.mirai.message.data.MessageSourceKind
import net.mamoe.mirai.mock.internal.MockBotImpl
import net.mamoe.mirai.mock.utils.mock
import net.mamoe.mirai.utils.JavaFriendlyAPI
import net.mamoe.mirai.utils.cast
import java.util.stream.Stream
@ -54,7 +55,7 @@ internal class MockRoamingMessages(
timeStart,
timeEnd,
filter ?: RoamingMessageFilter.ANY
).map { it.message }
).map { it.buildSource(contact.bot.mock()) + it.message }
}
@JavaFriendlyAPI

View File

@ -24,7 +24,7 @@ import kotlin.test.assertFails
import kotlin.test.assertNull
import kotlin.test.assertSame
internal class MessagingTest: MockBotTestBase() {
internal class MessagingTest : MockBotTestBase() {
@Test
internal fun testMessageEventBroadcast() = runTest {
@ -133,35 +133,51 @@ internal class MessagingTest: MockBotTestBase() {
@Test
internal fun testRoamingMessages() = runTest {
val mockFriend = bot.addFriend(1, "1")
broadcastMockEvents {
mockFriend says { append("Testing!") }
mockFriend says { append("Test2!") }
val allSent = mutableListOf<MessageSource>()
fun MutableList<MessageSource>.add(msg: MessageChain) {
add(msg.source)
}
mockFriend.sendMessage("Pong!")
fun MutableList<MessageSource>.convertToOffline() {
replaceAll { src ->
bot.buildMessageSource(src.kind) { allFrom(src) }
}
}
broadcastMockEvents {
allSent.add(mockFriend says { append("Testing!") })
allSent.add(mockFriend says { append("Test2!") })
}
allSent.add(mockFriend.sendMessage("Pong!").source)
allSent.convertToOffline()
mockFriend.roamingMessages.getAllMessages().toList().let { messages ->
assertEquals(3, messages.size)
assertEquals(messageChainOf(PlainText("Testing!")), messages[0])
assertEquals(messageChainOf(PlainText("Test2!")), messages[1])
assertEquals(messageChainOf(PlainText("Pong!")), messages[2])
assertEquals(messageChainOf(allSent[0] + PlainText("Testing!")), messages[0])
assertEquals(messageChainOf(allSent[1] + PlainText("Test2!")), messages[1])
assertEquals(messageChainOf(allSent[2] + PlainText("Pong!")), messages[2])
}
allSent.clear()
val mockGroup = bot.addGroup(2, "2")
val mockGroupMember1 = mockGroup.addMember(123, "123")
val mockGroupMember2 = mockGroup.addMember(124, "124")
val mockGroupMember3 = mockGroup.addMember(125, "125")
broadcastMockEvents {
mockGroupMember1 says { append("msg1") }
mockGroupMember2 says { append("msg2") }
mockGroupMember3 says { append("msg3") }
allSent.add(mockGroupMember1 says { append("msg1") })
allSent.add(mockGroupMember2 says { append("msg2") })
allSent.add(mockGroupMember3 says { append("msg3") })
}
allSent.convertToOffline()
with(mockGroup.roamingMessages.getAllMessages().toList()) {
assertEquals(3, size)
assertEquals(messageChainOf(PlainText("msg1")), get(0))
assertEquals(messageChainOf(PlainText("msg2")), get(1))
assertEquals(messageChainOf(PlainText("msg3")), get(2))
assertEquals(messageChainOf(allSent[0] + PlainText("msg1")), get(0))
assertEquals(messageChainOf(allSent[1] + PlainText("msg2")), get(1))
assertEquals(messageChainOf(allSent[2] + PlainText("msg3")), get(2))
}
}

View File

@ -9,79 +9,84 @@
package net.mamoe.mirai.internal.contact.roaming
import kotlinx.coroutines.flow.*
import net.mamoe.mirai.contact.roaming.RoamingMessageFilter
import net.mamoe.mirai.internal.contact.CommonGroupImpl
import net.mamoe.mirai.internal.message.getMessageSourceKindFromC2cCmdOrNull
import net.mamoe.mirai.internal.message.toMessageChainOnline
import net.mamoe.mirai.internal.network.protocol.data.proto.MsgComm
import net.mamoe.mirai.internal.network.protocol.packet.chat.TroopManagement
import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.MessageSvcPbGetGroupMsg
import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.MessageSvcPbGetRoamMsgReq
import net.mamoe.mirai.internal.utils.indexFirstBE
import net.mamoe.mirai.message.data.MessageChain
internal class RoamingMessagesImplGroup(
override val contact: CommonGroupImpl
) : TimeBasedRoamingMessagesImpl() {
override suspend fun requestRoamMsg(
) : AbstractRoamingMessages() {
private val bot get() = contact.bot
override suspend fun getMessagesIn(
timeStart: Long,
lastMessageTime: Long,
random: Long // unused field
): MessageSvcPbGetRoamMsgReq.Response {
val lastMsgSeq = contact.bot.network.sendAndExpect(
TroopManagement.GetGroupLastMsgSeq(
client = contact.bot.client,
groupUin = contact.uin
)
)
return when (lastMsgSeq) {
is TroopManagement.GetGroupLastMsgSeq.Response.Success -> {
val results = mutableListOf<MsgComm.Msg>()
var currentSeq = lastMsgSeq.seq
timeEnd: Long,
filter: RoamingMessageFilter?
): Flow<MessageChain> {
var currentSeq: Int = getLastMsgSeq() ?: return emptyFlow()
while (true) {
if (currentSeq <= 0) break
val resp = contact.bot.network.sendAndExpect(
MessageSvcPbGetGroupMsg(
client = contact.bot.client,
groupUin = contact.uin,
messageSequence = currentSeq,
20 // maximum 20
)
return flow {
while (true) {
val resp = contact.bot.network.sendAndExpect(
MessageSvcPbGetGroupMsg(
client = contact.bot.client,
groupUin = contact.uin,
messageSequence = currentSeq.toLong(),
count = 20 // maximum 20
)
if (resp is MessageSvcPbGetGroupMsg.Failed) break
if ((resp as MessageSvcPbGetGroupMsg.Success).msgElem.isEmpty()) break
// the message may be sorted increasing by message time,
// if so, additional sortBy will not take cost.
val msgElems = resp.msgElem.sortedBy { it.msgHead.msgTime }
results.addAll(0, msgElems)
val firstMsgElem = msgElems.first()
if (firstMsgElem.msgHead.msgTime < timeStart) {
break
} else {
currentSeq = (firstMsgElem.msgHead.msgSeq - 1).toLong()
}
}
// use binary search to find the first message that message time is lager than lastMessageTime
var right = results.indexFirstBE(lastMessageTime) { it.msgHead.msgTime.toLong() }
// check messages with same time
if (results[right].msgHead.msgTime.toLong() == lastMessageTime) {
do {
right++
} while (right <= results.size - 1 && results[right].msgHead.msgTime <= lastMessageTime)
}
// loops at most 20 times, just traverse
val left = results.indexOfFirst { it.msgHead.msgTime >= timeStart }
MessageSvcPbGetRoamMsgReq.Response(
if (left == right) null else results.subList(left, right),
if (left == right) -1L else results[right - 1].msgHead.msgTime.toLong(), -1L, byteArrayOf()
)
}
is TroopManagement.GetGroupLastMsgSeq.Response.Failed -> {
MessageSvcPbGetRoamMsgReq.Response(null, -1L, -1L, byteArrayOf())
if (resp is MessageSvcPbGetGroupMsg.Failed) break
resp as MessageSvcPbGetGroupMsg.Success // stupid smart cast
if (resp.msgElem.isEmpty()) break
// the message may be sorted increasing by message time,
// if so, additional sortBy will not take cost.
val messageTimeSequence = resp.msgElem.asSequence().map { it.time }
val maxTime = messageTimeSequence.max()
if (maxTime < timeStart) break // we have fetched all messages
emitAll(
resp.msgElem.asSequence()
.filter { getMessageSourceKindFromC2cCmdOrNull(it.msgHead.c2cCmd) != null } // ignore unsupported messages
.filter { it.time in timeStart..timeEnd }
.sortedByDescending { it.time } // Ensure caller receiver newer messages first
.filter { filter.apply(it) } // Call filter after sort
.asFlow()
.map { it.toMessageChainOnline(bot) }
)
currentSeq = resp.msgElem.minBy { it.time }.msgHead.msgSeq
}
}
}
private val MsgComm.Msg.time get() = msgHead.msgTime
private fun RoamingMessageFilter?.apply(
it: MsgComm.Msg
) = this?.invoke(createRoamingMessage(it, listOf())) != false
private suspend fun getLastMsgSeq(): Int? {
// Iterate from the newest message to find messages within [timeStart] and [timeEnd]
val lastMsgSeqResp = bot.network.sendAndExpect(
TroopManagement.GetGroupLastMsgSeq(
client = bot.client,
groupUin = contact.uin
)
)
return when (lastMsgSeqResp) {
TroopManagement.GetGroupLastMsgSeq.Response.Failed -> null
is TroopManagement.GetGroupLastMsgSeq.Response.Success -> lastMsgSeqResp.seq
}
}
}

View File

@ -1,71 +0,0 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.contact.roaming
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.isActive
import net.mamoe.mirai.contact.roaming.RoamingMessageFilter
import net.mamoe.mirai.internal.message.toMessageChainOnline
import net.mamoe.mirai.internal.network.protocol.data.proto.MsgComm
import net.mamoe.mirai.message.data.MessageChain
private typealias Seq = Long
internal sealed class SeqBasedRoamingMessageImpl : AbstractRoamingMessages() {
final override suspend fun getMessagesIn(
timeStart: Long,
timeEnd: Long,
filter: RoamingMessageFilter?
): Flow<MessageChain> {
val (seqStart, seqEnd) = getSeqForTime(timeStart, timeEnd)
return getMessageImpl(seqStart, seqEnd, filter)
}
protected abstract suspend fun getSeqForTime(timeStart: Long, timeEnd: Long): Pair<Seq, Seq>
@Suppress("DuplicatedCode") // Generalizing this code would even complicate logic
private suspend fun getMessageImpl(
seqStart: Seq,
seqEnd: Seq,
filter: RoamingMessageFilter?,
): Flow<MessageChain> {
return flow {
var currentSeqStart = seqEnd.coerceAtMost(seqStart)
while (currentCoroutineContext().isActive) {
val resp = requestRoamMsg(currentSeqStart, seqEnd)
val messages = resp.messages ?: break
if (filter == null || filter === RoamingMessageFilter.ANY) {
// fast path
messages.forEach { emit(it.toMessageChainOnline(contact.bot)) }
} else {
for (message in messages) {
if (filter.invoke(createRoamingMessage(message, messages))) {
emit(message.toMessageChainOnline(contact.bot))
}
}
}
currentSeqStart = resp.nextSeqStart
}
}
}
abstract suspend fun requestRoamMsg(
seqStart: Seq,
seqEnd: Seq,
): SeqBasedRoamingMessageChunk
}
internal interface SeqBasedRoamingMessageChunk {
val messages: List<MsgComm.Msg>?
val nextSeqStart: Seq
}

View File

@ -20,8 +20,7 @@ import net.mamoe.mirai.internal.message.toMessageChainOnline
import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.MessageSvcPbGetRoamMsgReq
import net.mamoe.mirai.message.data.MessageChain
// Can't make sealed, used by actuals
internal abstract class CommonTimeBasedRoamingMessagesImpl : AbstractRoamingMessages() {
internal sealed class TimeBasedRoamingMessagesImpl : AbstractRoamingMessages() {
override suspend fun getMessagesIn(
timeStart: Long,
timeEnd: Long,
@ -56,6 +55,3 @@ internal abstract class CommonTimeBasedRoamingMessagesImpl : AbstractRoamingMess
random: Long
): MessageSvcPbGetRoamMsgReq.Response
}
internal expect sealed class TimeBasedRoamingMessagesImpl() : CommonTimeBasedRoamingMessagesImpl

View File

@ -53,21 +53,26 @@ internal suspend fun List<MsgComm.Msg>.toMessageChainOnline(
return toMessageChain(bot, groupIdOrZero, true, messageSourceKind, facade).refineDeep(bot, refineContext)
}
internal fun getMessageSourceKindFromC2cCmdOrNull(c2cCmd: Int): MessageSourceKind? {
return when (c2cCmd) {
11 -> MessageSourceKind.FRIEND // bot 给其他人发消息
4 -> MessageSourceKind.FRIEND // bot 给自己作为好友发消息 (非 other client)
1 -> MessageSourceKind.GROUP
else -> null
}
}
internal fun getMessageSourceKindFromC2cCmd(c2cCmd: Int): MessageSourceKind {
return getMessageSourceKindFromC2cCmdOrNull(c2cCmd) ?: error("Could not get source kind from c2cCmd: $c2cCmd")
}
internal suspend fun MsgComm.Msg.toMessageChainOnline(
bot: Bot,
refineContext: RefineContext = EmptyRefineContext,
facade: MessageProtocolFacade = MessageProtocolFacade,
): MessageChain {
fun getSourceKind(c2cCmd: Int): MessageSourceKind {
return when (c2cCmd) {
11 -> MessageSourceKind.FRIEND // bot 给其他人发消息
4 -> MessageSourceKind.FRIEND // bot 给自己作为好友发消息 (非 other client)
1 -> MessageSourceKind.GROUP
else -> error("Could not get source kind from c2cCmd: $c2cCmd")
}
}
val kind = getSourceKind(msgHead.c2cCmd)
val kind = getMessageSourceKindFromC2cCmd(msgHead.c2cCmd)
val groupId = when (kind) {
MessageSourceKind.GROUP -> msgHead.groupInfo?.groupCode ?: 0
else -> 0
@ -141,6 +146,7 @@ internal object ReceiveMessageTransformer {
MessageSourceKind.STRANGER -> OnlineMessageSourceFromStrangerImpl(bot, messageList)
}
}
false -> {
OfflineMessageSourceImplData(bot, messageList, messageSourceKind)
}

View File

@ -466,14 +466,14 @@ internal class TroopManagement {
}
internal object GetGroupLastMsgSeq : OutgoingPacketFactory<GetGroupLastMsgSeq.Response>("OidbSvc.0x88d_0") {
sealed class Response(val groupUin: Long, val seq: Long) : Packet {
sealed class Response(val groupUin: Long, val seq: Int) : Packet {
object Failed : Response(-1, -1) {
override fun toString(): String {
return "TroopManagement.GetGroupLastMsgSeq.Failed"
}
}
class Success(groupUin: Long, seq: Long) : Response(groupUin, seq) {
class Success(groupUin: Long, seq: Int) : Response(groupUin, seq) {
override fun toString(): String {
return "TroopManagement.GetGroupLastMsgSeq.Response(groupUin=${groupUin}, seq=${seq})"
}
@ -511,7 +511,7 @@ internal class TroopManagement {
val info = group.stgroupinfo ?: return Response.Failed
val seq = info.groupCurMsgSeq ?: return Response.Failed
return Response.Success(group.groupCode, seq.toLong())
return Response.Success(group.groupCode, seq)
}
}
}

View File

@ -1,51 +0,0 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.contact.roaming
import kotlinx.coroutines.runBlocking
import net.mamoe.mirai.contact.roaming.RoamingMessageFilter
import net.mamoe.mirai.internal.message.toMessageChainOnline
import net.mamoe.mirai.message.data.MessageChain
import net.mamoe.mirai.utils.JavaFriendlyAPI
import net.mamoe.mirai.utils.stream
import java.util.stream.Stream
internal actual sealed class TimeBasedRoamingMessagesImpl : CommonTimeBasedRoamingMessagesImpl() {
@JavaFriendlyAPI
override suspend fun getMessagesStream(
timeStart: Long,
timeEnd: Long,
filter: RoamingMessageFilter?,
): Stream<MessageChain> {
return stream {
var lastMessageTime = timeEnd
var random = 0L
while (true) {
val resp = runBlocking {
requestRoamMsg(timeStart, lastMessageTime, random)
}
val messages = resp.messages ?: break
if (filter == null || filter === RoamingMessageFilter.ANY) {
messages.forEach { yield(runBlocking { it.toMessageChainOnline(contact.bot) }) }
} else {
for (message in messages) {
if (filter.invoke(createRoamingMessage(message, messages))) {
yield(runBlocking { message.toMessageChainOnline(contact.bot) })
}
}
}
lastMessageTime = resp.lastMessageTime
random = resp.random
}
}
}
}

View File

@ -1,13 +0,0 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.contact.roaming
internal actual sealed class TimeBasedRoamingMessagesImpl actual constructor() :
CommonTimeBasedRoamingMessagesImpl()