Improve group message sending (#892)

This commit is contained in:
Karlatemp 2021-01-23 14:31:26 +08:00
parent c762ecd0a2
commit 4e2ba2f7e3
No known key found for this signature in database
GPG Key ID: 21FBDDF664FF06F8
4 changed files with 172 additions and 71 deletions

View File

@ -116,7 +116,7 @@ internal class GroupImpl(
val chain = broadcastGroupMessagePreSendEvent(message)
val result = sendMessageImpl(message, chain, false)
val result = sendMessageImpl(message, chain, GroupMessageSendingStep.FIRST)
// logMessageSent(result.getOrNull()?.source?.plus(chain) ?: chain) // log with source
logMessageSent(chain)

View File

@ -34,11 +34,11 @@ import net.mamoe.mirai.utils.currentTimeSeconds
internal suspend fun GroupImpl.sendMessageImpl(
originalMessage: Message,
transformedMessage: Message,
forceAsLongMessage: Boolean,
step: GroupMessageSendingStep,
): Result<MessageReceipt<Group>> { // Result<MessageReceipt<Group>>
val chain = transformedMessage
.transformSpecialMessages(this)
.convertToLongMessageIfNeeded(originalMessage, forceAsLongMessage, this)
.convertToLongMessageIfNeeded(step, this)
chain.findIsInstance<QuoteReply>()?.source?.ensureSequenceIdAvailable()
@ -49,8 +49,9 @@ internal suspend fun GroupImpl.sendMessageImpl(
return kotlin.runCatching {
sendMessagePacket(
originalMessage,
transformedMessage,
chain,
allowResendAsLongMessage = transformedMessage.takeSingleContent<LongMessageInternal>() == null
step
)
}
}
@ -94,37 +95,68 @@ private suspend fun Message.transformSpecialMessages(contact: Contact): MessageC
}?.toMessageChain() ?: toMessageChain()
}
internal enum class GroupMessageSendingStep {
FIRST, LONG_MESSAGE, FRAGMENTED
}
/**
* Final process
*/
private suspend fun GroupImpl.sendMessagePacket(
originalMessage: Message,
transformedMessage: Message,
finalMessage: MessageChain,
allowResendAsLongMessage: Boolean,
step: GroupMessageSendingStep,
): MessageReceipt<Group> {
println("SENDING")
println(originalMessage)
println(finalMessage)
val group = this
val source: OnlineMessageSourceToGroupImpl
bot.network.run {
MessageSvcPbSendMsg.createToGroup(bot.client, group, finalMessage) { source = it }
.sendAndExpect<MessageSvcPbSendMsg.Response>().let { resp ->
MessageSvcPbSendMsg.createToGroup(
bot.client,
group,
finalMessage,
step == GroupMessageSendingStep.FRAGMENTED
) { source = it }.forEach { packet ->
packet.sendAndExpect<MessageSvcPbSendMsg.Response>().let { resp ->
if (resp is MessageSvcPbSendMsg.Response.MessageTooLarge) {
if (allowResendAsLongMessage) {
return sendMessageImpl(originalMessage, finalMessage, true).getOrThrow()
} else {
throw MessageTooLargeException(
group,
originalMessage,
finalMessage,
"Message '${finalMessage.content.take(10)}' is too large."
)
}
return when (step) {
GroupMessageSendingStep.FIRST -> {
sendMessageImpl(
originalMessage,
transformedMessage,
GroupMessageSendingStep.LONG_MESSAGE
)
}
GroupMessageSendingStep.LONG_MESSAGE -> {
sendMessageImpl(
originalMessage,
transformedMessage,
GroupMessageSendingStep.FRAGMENTED
)
}
else -> {
throw MessageTooLargeException(
group,
originalMessage,
finalMessage,
"Message '${finalMessage.content.take(10)}' is too large."
)
}
}.getOrThrow()
}
check(resp is MessageSvcPbSendMsg.Response.SUCCESS) {
"Send group message failed: $resp"
}
}
}
}
try {
@ -156,21 +188,26 @@ private suspend fun GroupImpl.uploadGroupLongMessageHighway(
)
private suspend fun MessageChain.convertToLongMessageIfNeeded(
originalMessage: Message,
forceAsLongMessage: Boolean,
step: GroupMessageSendingStep,
groupImpl: GroupImpl,
): MessageChain {
if (forceAsLongMessage || this.shouldSendAsLongMessage(originalMessage, groupImpl)) {
val resId = groupImpl.uploadGroupLongMessageHighway(this)
return this + RichMessage.longMessage(
brief = takeContent(27),
resId = resId,
timeSeconds = currentTimeSeconds()
) // LongMessageInternal replaces all contents and preserves metadata
return when (step) {
GroupMessageSendingStep.FIRST -> {
// 只需要在第一次发送的时候验证长度
// 后续重试直接跳过
verityLength(this, groupImpl)
this
}
GroupMessageSendingStep.LONG_MESSAGE -> {
val resId = groupImpl.uploadGroupLongMessageHighway(this)
this + RichMessage.longMessage(
brief = takeContent(27),
resId = resId,
timeSeconds = currentTimeSeconds()
) // LongMessageInternal replaces all contents and preserves metadata
}
GroupMessageSendingStep.FRAGMENTED -> this
}
return this
}
/**
@ -187,9 +224,3 @@ private suspend fun GroupImpl.updateFriendImageForGroupMessage(image: FriendImag
).sendAndExpect<ImgStore.GroupPicUp.Response>()
}
}
private fun MessageChain.shouldSendAsLongMessage(originalMessage: Message, target: Contact): Boolean {
val length = verityLength(originalMessage, target)
return length > 700 || countImages() > 1
}

View File

@ -56,9 +56,9 @@ private fun <T> T.toJceDataImpl(subject: ContactOrBot?): ImMsgBody.SourceMsg
toUin = targetId, // group
msgType = 9, // 82?
c2cCmd = 11,
msgSeq = sequenceIds.single(), // TODO !!
msgSeq = sequenceIds.first(),
msgTime = time,
msgUid = pdReserve.origUids!!.single(), // TODO !!
msgUid = pdReserve.origUids!!.first(),
// groupInfo = MsgComm.GroupInfo(groupCode = delegate.msgHead.groupInfo.groupCode),
isSrcMsg = true
),
@ -150,24 +150,30 @@ internal class OnlineMessageSourceToGroupImpl(
get() = sender
override var isRecalledOrPlanned: AtomicBoolean = AtomicBoolean(false)
private val sequenceIdDeferred: Deferred<Int?> =
coroutineScope.asyncFromEventOrNull<SendGroupMessageReceipt, Int>(
timeoutMillis = 3000
private val sequenceIdDeferred: Deferred<IntArray?> = run {
val multi = mutableMapOf<Int, Int>()
coroutineScope.asyncFromEventOrNull<SendGroupMessageReceipt, IntArray>(
timeoutMillis = 3000L * this@OnlineMessageSourceToGroupImpl.internalIds.size
) {
if (it.messageRandom in this@OnlineMessageSourceToGroupImpl.internalIds) {
it.sequenceId
multi[it.messageRandom] = it.sequenceId
if (multi.size == this@OnlineMessageSourceToGroupImpl.internalIds.size) {
IntArray(multi.size) { index ->
multi[this@OnlineMessageSourceToGroupImpl.internalIds[index]]!!
}
} else null
} else null
}
}
@OptIn(ExperimentalCoroutinesApi::class)
override val sequenceIds: IntArray
get() = intArrayOf(
when {
sequenceIdDeferred.isCompleted -> sequenceIdDeferred.getCompleted() ?: -1
!sequenceIdDeferred.isActive -> -1
get() = when {
sequenceIdDeferred.isCompleted -> sequenceIdDeferred.getCompleted() ?: intArrayOf()
!sequenceIdDeferred.isActive -> intArrayOf()
else -> error("sequenceIds not yet available")
}
)
suspend fun ensureSequenceIdAvailable() = kotlin.run { sequenceIdDeferred.await() }

View File

@ -26,7 +26,6 @@ import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody
import net.mamoe.mirai.internal.network.protocol.data.proto.MsgComm
import net.mamoe.mirai.internal.network.protocol.data.proto.MsgCtrl
import net.mamoe.mirai.internal.network.protocol.data.proto.MsgSvc
import net.mamoe.mirai.internal.network.protocol.packet.EMPTY_BYTE_ARRAY
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketFactory
import net.mamoe.mirai.internal.network.protocol.packet.buildOutgoingUniPacket
@ -107,9 +106,13 @@ internal object MessageSvcPbSendMsg : OutgoingPacketFactory<MessageSvcPbSendMsg.
sequenceIds: AtomicReference<IntArray>,
sequenceIdsInitializer: (Int) -> IntArray,
randIds: AtomicReference<IntArray>,
doFragmented: Boolean = true,
postInit: () -> Unit
): List<OutgoingPacket> {
val fragmented = message.fragmented()
val fragmented = if (doFragmented)
message.fragmented()
else listOf(message)
val response = mutableListOf<OutgoingPacket>()
val div = if (fragmented.size == 1) 0 else Random.nextInt().absoluteValue
val pkgNum = fragmented.size
@ -291,12 +294,88 @@ internal object MessageSvcPbSendMsg : OutgoingPacketFactory<MessageSvcPbSendMsg.
* 发送群消息
*/
@Suppress("FunctionName")
internal fun createToGroupImpl(
internal inline fun createToGroupImpl(
client: QQAndroidClient,
targetGroup: Group,
message: MessageChain,
source: OnlineMessageSourceToGroupImpl
): OutgoingPacket = buildOutgoingUniPacket(client) {
fragmented: Boolean,
crossinline sourceCallback: (OnlineMessageSourceToGroupImpl) -> Unit
): List<OutgoingPacket> {
val sequenceIds = AtomicReference<IntArray>()
val randIds = AtomicReference<IntArray>()
return buildOutgoingMessageCommon(
client = client,
message = message,
fragmentTranslator = { subChain ->
ImMsgBody.MsgBody(
richText = ImMsgBody.RichText(
elems = subChain.toRichTextElems(messageTarget = targetGroup, withGeneralFlags = true),
ptt = subChain[PttMessage]?.run {
ImMsgBody.Ptt(
fileName = fileName.toByteArray(),
fileMd5 = md5,
boolValid = true,
fileSize = fileSize.toInt(),
fileType = 4,
pbReserve = byteArrayOf(0),
format = let {
if (it is Voice) {
it.codec
} else {
0
}
}
)
}
)
)
},
pbSendMsgReq = { msgBody, msgSeq, msgRand, contentHead ->
MsgSvc.PbSendMsgReq(
routingHead = MsgSvc.RoutingHead(grp = MsgSvc.Grp(groupCode = targetGroup.groupCode)),
contentHead = contentHead,
msgBody = msgBody,
msgSeq = msgSeq,
msgRand = msgRand,
syncCookie = client.syncingController.syncCookie ?: byteArrayOf(),
msgVia = 1,
msgCtrl =
if (message[ForwardMessageInternal] != null)
MsgCtrl.MsgCtrl(msgFlag = 4)
else null
)
},
sequenceIds = sequenceIds,
randIds = randIds,
sequenceIdsInitializer = { size ->
IntArray(size) { client.nextFriendSeq() }
},
postInit = {
randIds.get().forEach { id ->
client.syncingController.pendingGroupMessageReceiptCacheList.addCache(
PendingGroupMessageReceiptSyncId(
messageRandom = id,
)
)
}
sourceCallback(
OnlineMessageSourceToGroupImpl(
targetGroup,
internalIds = randIds.get(),
sender = client.bot,
target = targetGroup,
time = currentTimeSeconds().toInt(),
originalMessage = message//,
// sourceMessage = message
)
)
},
doFragmented = fragmented
)
}
/*
= buildOutgoingUniPacket(client) {
///writeFully("0A 08 0A 06 08 89 FC A6 8C 0B 12 06 08 01 10 00 18 00 1A 1F 0A 1D 12 08 0A 06 0A 04 F0 9F 92 A9 12 11 AA 02 0E 88 01 00 9A 01 08 78 00 F8 01 00 C8 02 00 20 9B 7A 28 F4 CA 9B B8 03 32 34 08 92 C2 C4 F1 05 10 92 C2 C4 F1 05 18 E6 ED B9 C3 02 20 89 FE BE A4 06 28 89 84 F9 A2 06 48 DE 8C EA E5 0E 58 D9 BD BB A0 09 60 1D 68 92 C2 C4 F1 05 70 00 40 01".hexToBytes())
// DebugLogger.debug("sending group message: " + message.toRichTextElems().contentToString())
@ -339,6 +418,7 @@ internal object MessageSvcPbSendMsg : OutgoingPacketFactory<MessageSvcPbSendMsg.
)
)
}
*/
override suspend fun ByteReadPacket.decode(bot: QQAndroidBot): Response {
val response = readProtoBuf(MsgSvc.PbSendMsgResp.serializer())
@ -429,33 +509,17 @@ internal inline fun MessageSvcPbSendMsg.createToGroup(
client: QQAndroidClient,
group: Group,
message: MessageChain,
fragmented: Boolean,
crossinline sourceCallback: (OnlineMessageSourceToGroupImpl) -> Unit
): OutgoingPacket {
): List<OutgoingPacket> {
contract {
callsInPlace(sourceCallback, InvocationKind.EXACTLY_ONCE)
}
val messageRandom = Random.nextInt().absoluteValue
val source = OnlineMessageSourceToGroupImpl(
group,
internalIds = intArrayOf(messageRandom),
sender = client.bot,
target = group,
time = currentTimeSeconds().toInt(),
originalMessage = message//,
// sourceMessage = message
)
sourceCallback(source)
client.syncingController.pendingGroupMessageReceiptCacheList.addCache(
PendingGroupMessageReceiptSyncId(
messageRandom = messageRandom,
)
)
return createToGroupImpl(
client,
group,
message,
source
fragmented,
sourceCallback
)
}