Remove SendMessageHandler and introduce OutgoingMessagePipeline.

This commit is contained in:
Him188 2022-05-22 16:10:49 +01:00
parent e6b094031c
commit 3270192715
44 changed files with 1470 additions and 833 deletions

View File

@ -649,25 +649,6 @@ internal open class MiraiImpl : IMirai, LowLevelApiAccessor {
return jsonText?.let { json.decodeFromString(GroupHonorListData.serializer(), it) }
}
internal open suspend fun uploadMessageHighway(
bot: Bot,
sendMessageHandler: SendMessageHandler<*>,
message: Collection<ForwardMessage.INode>,
isLong: Boolean,
): String {
bot.asQQAndroidBot()
message.forEach {
it.messageChain.ensureSequenceIdAvailable()
}
val uploader = MultiMsgUploader(
client = bot.client,
isLong = isLong,
handler = sendMessageHandler,
).also { it.emitMain(message) }
return uploader.uploadAndReturnResId()
}
override suspend fun solveNewFriendRequestEvent(
bot: Bot,
eventId: Long,

View File

@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
@ -9,9 +9,11 @@
package net.mamoe.mirai.internal.contact
import net.mamoe.mirai.contact.Contact
import net.mamoe.mirai.Bot
import net.mamoe.mirai.contact.*
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.utils.childScopeContext
import kotlin.contracts.contract
import kotlin.coroutines.CoroutineContext
internal abstract class AbstractContact(
@ -19,4 +21,21 @@ internal abstract class AbstractContact(
parentCoroutineContext: CoroutineContext,
) : Contact {
final override val coroutineContext: CoroutineContext = parentCoroutineContext.childScopeContext()
}
internal val Contact.userIdOrNull: Long? get() = if (this is User) this.id else null
internal val Contact.groupIdOrNull: Long? get() = if (this is Group) this.id else null
internal val Contact.groupUinOrNull: Long? get() = if (this is Group) this.uin else null
internal val ContactOrBot.uin: Long
get() = when (this) {
is Group -> uin
is User -> uin
is OtherClient -> bot.uin
is Bot -> id
else -> this.id
}
internal fun Contact.impl(): AbstractContact {
contract { returns() implies (this@impl is AbstractContact) }
return this as AbstractContact
}

View File

@ -21,6 +21,10 @@ import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.message.contextualBugReportException
import net.mamoe.mirai.internal.message.flags.MiraiInternalMessageFlag
import net.mamoe.mirai.internal.message.image.*
import net.mamoe.mirai.internal.message.protocol.MessageProtocolFacade
import net.mamoe.mirai.internal.message.protocol.outgoing.HighwayUploader
import net.mamoe.mirai.internal.message.protocol.outgoing.MessageProtocolStrategy
import net.mamoe.mirai.internal.network.component.buildComponentStorage
import net.mamoe.mirai.internal.network.components.BdhSession
import net.mamoe.mirai.internal.network.highway.ChannelKind
import net.mamoe.mirai.internal.network.highway.Highway
@ -98,15 +102,14 @@ internal sealed class AbstractUser(
return when (resp) {
is LongConn.OffPicUp.Response.FileExists -> {
val imageType = getImageType(resp.imageInfo.fileType)
.takeIf { it != ExternalResource.DEFAULT_FORMAT_NAME }
?: resource.formatName
val imageType =
getImageType(resp.imageInfo.fileType).takeIf { it != ExternalResource.DEFAULT_FORMAT_NAME }
?: resource.formatName
resp.imageInfo.run {
OfflineFriendImage(
imageId = generateImageIdFromResourceId(
resourceId = resp.resourceId,
format = imageType
resourceId = resp.resourceId, format = imageType
) ?: kotlin.run {
if (resp.imageInfo.fileMd5.size == 16) {
generateImageId(resp.imageInfo.fileMd5, imageType)
@ -164,8 +167,7 @@ internal sealed class AbstractUser(
}
is ImgStore.GroupPicUp.Response.RequireUpload -> {
// val servers = response.uploadIpList.zip(response.uploadPortList)
Highway.uploadResourceBdh(
bot = bot,
Highway.uploadResourceBdh(bot = bot,
resource = resource,
kind = PRIVATE_IMAGE,
commandId = 2,
@ -176,8 +178,7 @@ internal sealed class AbstractUser(
ssoAddresses = response.uploadIpList.zip(response.uploadPortList)
.toMutableSet(),
)
}
)
})
}
}
}.recoverCatchingSuppressed {
@ -198,9 +199,9 @@ internal sealed class AbstractUser(
resourceKind = PRIVATE_IMAGE,
channelKind = ChannelKind.HTTP
) { ip, port ->
@Suppress("DEPRECATION", "DEPRECATION_ERROR")
Mirai.Http.postImage(
serverIp = ip, serverPort = port,
@Suppress("DEPRECATION", "DEPRECATION_ERROR") Mirai.Http.postImage(
serverIp = ip,
serverPort = port,
htcmd = "0x6ff0070",
uin = bot.id,
groupcode = null,
@ -210,8 +211,7 @@ internal sealed class AbstractUser(
}
}.recoverCatchingSuppressed {
// try upload by http on fallback server
@Suppress("DEPRECATION", "DEPRECATION_ERROR")
Mirai.Http.postImage(
@Suppress("DEPRECATION", "DEPRECATION_ERROR") Mirai.Http.postImage(
serverIp = "htdata2.qq.com",
htcmd = "0x6ff0070",
uin = bot.id,
@ -242,9 +242,10 @@ internal sealed class AbstractUser(
}
}
@Suppress("DuplicatedCode")
internal suspend fun <C : User> SendMessageHandler<out C>.sendMessageImpl(
internal suspend fun <C : AbstractContact> C.sendMessageImpl(
message: Message,
messageProtocolStrategy: MessageProtocolStrategy<C>,
preSendEventConstructor: (C, Message) -> MessagePreSendEvent,
postSendEventConstructor: (C, MessageChain, Throwable?, MessageReceipt<C>?) -> MessagePostSendEvent<C>,
): MessageReceipt<C> {
@ -252,20 +253,24 @@ internal suspend fun <C : User> SendMessageHandler<out C>.sendMessageImpl(
message.anyIsInstance<MiraiInternalMessageFlag>()
} else false
require(isMiraiInternal || !message.isContentEmpty()) { "message is empty" }
require(!message.isContentEmpty()) { "message is empty" }
val chain = contact.broadcastMessagePreSendEvent(message, isMiraiInternal, preSendEventConstructor)
val chain = broadcastMessagePreSendEvent(message, isMiraiInternal, preSendEventConstructor)
val result = this
.runCatching { sendMessage(message, chain, isMiraiInternal, SendMessageStep.FIRST) }
val result = kotlin.runCatching {
MessageProtocolFacade.preprocessAndSendOutgoing(this, message, buildComponentStorage {
set(MessageProtocolStrategy, messageProtocolStrategy)
set(HighwayUploader, HighwayUploader.Default)
})
}
if (result.isSuccess) {
// logMessageSent(result.getOrNull()?.source?.plus(chain) ?: chain) // log with source
contact.logMessageSent(chain)
bot.logger.verbose("$this <- $chain".replaceMagicCodes())
}
if (!isMiraiInternal) {
postSendEventConstructor(contact, chain, result.exceptionOrNull(), result.getOrNull()).broadcast()
postSendEventConstructor(this, chain, result.exceptionOrNull(), result.getOrNull()).broadcast()
}
return result.getOrThrow()

View File

@ -24,6 +24,8 @@ import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.contact.info.FriendInfoImpl
import net.mamoe.mirai.internal.contact.roaming.RoamingMessagesImplFriend
import net.mamoe.mirai.internal.message.data.OfflineAudioImpl
import net.mamoe.mirai.internal.message.protocol.outgoing.FriendMessageProtocolStrategy
import net.mamoe.mirai.internal.message.protocol.outgoing.MessageProtocolStrategy
import net.mamoe.mirai.internal.network.highway.*
import net.mamoe.mirai.internal.network.protocol.data.proto.Cmd0x346
import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody
@ -64,6 +66,9 @@ internal class FriendImpl(
) : Friend, AbstractUser(bot, parentCoroutineContext, info) {
override var nick: String by info::nick
override var remark: String by info::remark
private val messageProtocolStrategy: MessageProtocolStrategy<FriendImpl> = FriendMessageProtocolStrategy(this)
override suspend fun delete() {
check(bot.friends[id] != null) {
"Friend $id had already been deleted"
@ -73,12 +78,13 @@ internal class FriendImpl(
}
}
private val handler: FriendSendMessageHandler by lazy { FriendSendMessageHandler(this) }
@Suppress("DuplicatedCode")
override suspend fun sendMessage(message: Message): MessageReceipt<Friend> {
return handler.sendMessageImpl(message, ::FriendMessagePreSendEvent, ::FriendMessagePostSendEvent)
return sendMessageImpl(
message,
messageProtocolStrategy,
::FriendMessagePreSendEvent,
::FriendMessagePostSendEvent.cast()
)
}
override fun toString(): String = "Friend($id)"

View File

@ -13,6 +13,7 @@
package net.mamoe.mirai.internal.contact
import kotlinx.atomicfu.atomic
import net.mamoe.mirai.Bot
import net.mamoe.mirai.LowLevelApi
import net.mamoe.mirai.Mirai
import net.mamoe.mirai.contact.*
@ -28,11 +29,12 @@ import net.mamoe.mirai.internal.contact.file.RemoteFilesImpl
import net.mamoe.mirai.internal.contact.info.MemberInfoImpl
import net.mamoe.mirai.internal.message.contextualBugReportException
import net.mamoe.mirai.internal.message.data.OfflineAudioImpl
import net.mamoe.mirai.internal.message.flags.MiraiInternalMessageFlag
import net.mamoe.mirai.internal.message.image.OfflineGroupImage
import net.mamoe.mirai.internal.message.image.calculateImageInfo
import net.mamoe.mirai.internal.message.image.getIdByImageType
import net.mamoe.mirai.internal.message.image.getImageTypeById
import net.mamoe.mirai.internal.message.protocol.outgoing.GroupMessageProtocolStrategy
import net.mamoe.mirai.internal.message.protocol.outgoing.MessageProtocolStrategy
import net.mamoe.mirai.internal.network.components.BdhSession
import net.mamoe.mirai.internal.network.handler.logger
import net.mamoe.mirai.internal.network.highway.ChannelKind
@ -60,7 +62,6 @@ import net.mamoe.mirai.utils.*
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.contracts.contract
import kotlin.coroutines.CoroutineContext
import kotlin.time.ExperimentalTime
internal fun GroupImpl.Companion.checkIsInstance(instance: Group) {
contract { returns() implies (instance is GroupImpl) }
@ -114,6 +115,9 @@ private val logger by lazy {
MiraiLogger.Factory.create(GroupImpl::class.java, "Group")
}
internal fun Bot.nickIn(context: Contact): String =
if (context is Group) context.botAsMember.nameCardOrNick else bot.nick
@Suppress("PropertyName")
internal class GroupImpl constructor(
bot: QQAndroidBot,
@ -149,6 +153,8 @@ internal class GroupImpl constructor(
val groupPkgMsgParsingCache = GroupPkgMsgParsingCache()
private val messageProtocolStrategy: MessageProtocolStrategy<GroupImpl> = GroupMessageProtocolStrategy(this)
override suspend fun quit(): Boolean {
check(botPermission != MemberPermission.OWNER) { "An owner cannot quit from a owning group" }
@ -178,29 +184,15 @@ internal class GroupImpl constructor(
}
override suspend fun sendMessage(message: Message): MessageReceipt<Group> {
val isMiraiInternal = if (message is MessageChain) {
message.anyIsInstance<MiraiInternalMessageFlag>()
} else false
require(isMiraiInternal || !message.isContentEmpty()) { "message is empty" }
check(!isBotMuted) { throw BotIsBeingMutedException(this, message) }
val chain = broadcastMessagePreSendEvent(message, isMiraiInternal, ::GroupMessagePreSendEvent)
val result = GroupSendMessageHandler(this)
.runCatching { sendMessage(message, chain, isMiraiInternal, SendMessageStep.FIRST) }
if (result.isSuccess) {
// logMessageSent(result.getOrNull()?.source?.plus(chain) ?: chain) // log with source
logMessageSent(chain)
}
if (!isMiraiInternal) {
GroupMessagePostSendEvent(this, chain, result.exceptionOrNull(), result.getOrNull()).broadcast()
}
return result.getOrThrow()
return sendMessageImpl(
message,
messageProtocolStrategy,
::GroupMessagePreSendEvent,
::GroupMessagePostSendEvent.cast()
)
}
@OptIn(ExperimentalTime::class)
override suspend fun uploadImage(resource: ExternalResource): Image = resource.withAutoClose {
if (BeforeImageUploadEvent(this, resource).broadcast().isCancelled) {
throw EventCancelledException("cancelled by BeforeImageUploadEvent.ToGroup")

View File

@ -45,5 +45,21 @@ internal suspend fun <C : Contact> C.broadcastMessagePreSendEvent(
internal enum class SendMessageStep {
FIRST, LONG_MESSAGE, FRAGMENTED
FIRST {
override fun nextStepOrNull(): SendMessageStep {
return LONG_MESSAGE
}
},
LONG_MESSAGE {
override fun nextStepOrNull(): SendMessageStep {
return FRAGMENTED
}
},
FRAGMENTED {
override fun nextStepOrNull(): SendMessageStep? {
return null
}
};
abstract fun nextStepOrNull(): SendMessageStep?
}

View File

@ -19,6 +19,8 @@ import net.mamoe.mirai.contact.*
import net.mamoe.mirai.data.MemberInfo
import net.mamoe.mirai.event.broadcast
import net.mamoe.mirai.event.events.*
import net.mamoe.mirai.internal.message.protocol.outgoing.GroupTempMessageProtocolStrategy
import net.mamoe.mirai.internal.message.protocol.outgoing.MessageProtocolStrategy
import net.mamoe.mirai.internal.message.source.OnlineMessageSourceToTempImpl
import net.mamoe.mirai.internal.message.source.createMessageReceipt
import net.mamoe.mirai.internal.network.protocol.packet.chat.TroopManagement
@ -41,18 +43,19 @@ internal class NormalMemberImpl constructor(
override val joinTimestamp: Int get() = info.joinTimestamp
override val lastSpeakTimestamp: Int get() = info.lastSpeakTimestamp
override fun toString(): String = "NormalMember($id)"
private val messageProtocolStrategy: MessageProtocolStrategy<NormalMemberImpl> = GroupTempMessageProtocolStrategy
private val handler: GroupTempSendMessageHandler by lazy { GroupTempSendMessageHandler(this) }
override fun toString(): String = "NormalMember($id)"
@Suppress("DuplicatedCode")
override suspend fun sendMessage(message: Message): MessageReceipt<NormalMember> {
return asFriendOrNull()?.sendMessage(message)?.convert()
?: asStrangerOrNull()?.sendMessage(message)?.convert()
?: handler.sendMessageImpl<NormalMember>(
?: sendMessageImpl(
message = message,
preSendEventConstructor = ::GroupTempMessagePreSendEvent,
postSendEventConstructor = ::GroupTempMessagePostSendEvent.cast(),
messageProtocolStrategy = messageProtocolStrategy
)
}

View File

@ -1,496 +0,0 @@
/*
* Copyright 2020 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.contact
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.withTimeoutOrNull
import net.mamoe.mirai.Bot
import net.mamoe.mirai.contact.*
import net.mamoe.mirai.event.EventPriority
import net.mamoe.mirai.event.GlobalEventChannel
import net.mamoe.mirai.event.nextEvent
import net.mamoe.mirai.internal.asQQAndroidBot
import net.mamoe.mirai.internal.getMiraiImpl
import net.mamoe.mirai.internal.message.*
import net.mamoe.mirai.internal.message.data.ForwardMessageInternal
import net.mamoe.mirai.internal.message.data.checkIsImpl
import net.mamoe.mirai.internal.message.data.forwardMessage
import net.mamoe.mirai.internal.message.data.longMessage
import net.mamoe.mirai.internal.message.flags.DontAsLongMessage
import net.mamoe.mirai.internal.message.flags.ForceAsLongMessage
import net.mamoe.mirai.internal.message.flags.IgnoreLengthCheck
import net.mamoe.mirai.internal.message.image.FriendImage
import net.mamoe.mirai.internal.message.image.OfflineGroupImage
import net.mamoe.mirai.internal.message.source.OnlineMessageSourceToFriendImpl
import net.mamoe.mirai.internal.message.source.OnlineMessageSourceToGroupImpl
import net.mamoe.mirai.internal.message.source.createMessageReceipt
import net.mamoe.mirai.internal.message.source.ensureSequenceIdAvailable
import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.QQAndroidClient
import net.mamoe.mirai.internal.network.components.ClockHolder.Companion.clock
import net.mamoe.mirai.internal.network.components.MessageSvcSyncer
import net.mamoe.mirai.internal.network.handler.logger
import net.mamoe.mirai.internal.network.notice.group.GroupMessageProcessor.SendGroupMessageReceipt
import net.mamoe.mirai.internal.network.notice.priv.PrivateMessageProcessor.SendPrivateMessageReceipt
import net.mamoe.mirai.internal.network.protocol.data.proto.MsgComm
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.internal.network.protocol.packet.chat.FileManagement
import net.mamoe.mirai.internal.network.protocol.packet.chat.MusicSharePacket
import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.*
import net.mamoe.mirai.internal.utils.ImagePatcher
import net.mamoe.mirai.message.MessageReceipt
import net.mamoe.mirai.message.data.*
import net.mamoe.mirai.utils.castOrNull
import net.mamoe.mirai.utils.currentTimeSeconds
internal fun ContactOrBot.inferMessageSourceKind(): MessageSourceKind {
return when (this) {
is Group -> MessageSourceKind.GROUP
is Friend -> MessageSourceKind.FRIEND
is Member -> MessageSourceKind.TEMP
is Stranger -> MessageSourceKind.STRANGER
is Bot -> MessageSourceKind.FRIEND
else -> error("Unsupported contact: $this")
}
}
/**
* 处理 mirai 消息系统 `Message` 到协议数据结构的转换.
*
* 外部调用 [sendMessageImpl]
*/
internal abstract class SendMessageHandler<C : Contact> {
abstract val contact: C
abstract val senderName: String
val messageSourceKind: MessageSourceKind
get() {
return contact.inferMessageSourceKind()
}
val bot get() = contact.bot.asQQAndroidBot()
val targetUserUin: Long? get() = contact.castOrNull<User>()?.uin
val targetGroupUin: Long? get() = contact.castOrNull<Group>()?.uin
val targetGroupCode: Long? get() = contact.castOrNull<Group>()?.groupCode
val targetOtherClientBotUin: Long? get() = contact.castOrNull<OtherClient>()?.bot?.id
val targetUin: Long get() = targetGroupUin ?: targetOtherClientBotUin ?: contact.id
val groupInfo: MsgComm.GroupInfo?
get() = if (isToGroup) MsgComm.GroupInfo(
groupCode = targetGroupCode!!,
groupCard = senderName // Cinnamon
) else null
// For ForwardMessage display
val ForwardMessage.INode.groupInfo: MsgComm.GroupInfo
get() = MsgComm.GroupInfo(
groupCode = if (isToGroup) targetGroupCode!! else 0,
groupCard = senderName
)
val isToGroup: Boolean get() = contact is Group
suspend fun MessageChain.convertToLongMessageIfNeeded(
step: SendMessageStep,
): MessageChain {
suspend fun sendLongImpl(): MessageChain {
val resId = uploadLongMessageHighway(this)
return this + RichMessage.longMessage(
brief = takeContent(27),
resId = resId,
timeSeconds = currentTimeSeconds()
) // LongMessageInternal replaces all contents and preserves metadata
}
return when (step) {
SendMessageStep.FIRST -> {
// 只需要在第一次发送的时候验证长度
// 后续重试直接跳过
if (contains(ForceAsLongMessage)) {
return sendLongImpl()
}
if (!contains(IgnoreLengthCheck)) {
verifyLength(this, contact)
}
this
}
SendMessageStep.LONG_MESSAGE -> {
if (contains(DontAsLongMessage)) this // fragmented
else sendLongImpl()
}
SendMessageStep.FRAGMENTED -> this
}
}
/**
* Final process. Convert transformed message to protocol internals and transfer to server
*/
suspend fun sendMessagePacket(
originalMessage: Message,
transformedMessage: MessageChain,
finalMessage: MessageChain,
isMiraiInternal: Boolean,
step: SendMessageStep,
): MessageReceipt<C> {
bot.components[MessageSvcSyncer].joinSync()
val group = contact
var source: Deferred<OnlineMessageSource.Outgoing>? = null
bot.network.run {
sendMessageMultiProtocol(
bot.client, finalMessage,
fragmented = step == SendMessageStep.FRAGMENTED
) { source = it }.forEach { packet ->
when (val resp = bot.network.sendAndExpect<Packet>(packet, 5000, 2)) {
is MessageSvcPbSendMsg.Response -> {
if (resp is MessageSvcPbSendMsg.Response.MessageTooLarge) {
return when (step) {
SendMessageStep.FIRST -> {
sendMessageImpl(
originalMessage,
transformedMessage,
isMiraiInternal,
SendMessageStep.LONG_MESSAGE,
)
}
SendMessageStep.LONG_MESSAGE -> {
sendMessageImpl(
originalMessage,
transformedMessage,
isMiraiInternal,
SendMessageStep.FRAGMENTED,
)
}
else -> {
throw MessageTooLargeException(
group,
originalMessage,
finalMessage,
"Message '${finalMessage.content.take(10)}' is too large."
)
}
}
}
if (resp is MessageSvcPbSendMsg.Response.ServiceUnavailable) {
throw IllegalStateException("Send message to $contact failed, server service is unavailable.")
}
if (resp is MessageSvcPbSendMsg.Response.Failed) {
val contact = contact
when (resp.resultType) {
120 -> if (contact is Group) throw BotIsBeingMutedException(contact, originalMessage)
121 -> if (AtAll in finalMessage) throw SendMessageFailedException(
contact,
SendMessageFailedException.Reason.AT_ALL_LIMITED,
originalMessage
)
299 -> if (contact is Group) throw SendMessageFailedException(
contact,
SendMessageFailedException.Reason.GROUP_CHAT_LIMITED,
originalMessage
)
}
}
check(resp is MessageSvcPbSendMsg.Response.SUCCESS) {
"Send message failed: $resp"
}
}
is MusicSharePacket.Response -> {
resp.pkg.checkSuccess("send music share")
source = CompletableDeferred(constructSourceForSpecialMessage(finalMessage, 3116))
}
// is CommonOidbResponse<*> -> {
// when (resp.toResult("send message").getOrThrow()) {
// is Oidb0x6d9.FeedsRspBody -> {
// }
// }
// }
}
}
val sourceAwait = source?.await() ?: error("Internal error: source is not initialized")
try {
sourceAwait.ensureSequenceIdAvailable()
} catch (e: Exception) {
bot.network.logger.warning(
"Timeout awaiting sequenceId for message(${finalMessage.content.take(10)}). Some features may not work properly",
e
)
}
return sourceAwait.createMessageReceipt(contact, true)
}
}
private suspend fun sendMessageMultiProtocol(
client: QQAndroidClient,
message: MessageChain,
fragmented: Boolean,
sourceCallback: (Deferred<OnlineMessageSource.Outgoing>) -> Unit,
): List<OutgoingPacket> {
message.takeSingleContent<MusicShare>()?.let { musicShare ->
return listOf(
MusicSharePacket(
client, musicShare, contact.id,
targetKind = if (isToGroup) MessageSourceKind.GROUP else MessageSourceKind.FRIEND // always FRIEND
)
)
}
message.takeSingleContent<FileMessage>()?.let { file ->
file.checkIsImpl()
sourceCallback(contact.async { constructSourceForSpecialMessage(message, 2021) })
return listOf(FileManagement.Feed(client, contact.id, file.busId, file.id))
}
return messageSvcSendMessage(client, contact, message, fragmented, sourceCallback)
}
abstract val messageSvcSendMessage: (
client: QQAndroidClient,
contact: C,
message: MessageChain,
fragmented: Boolean,
sourceCallback: (Deferred<OnlineMessageSource.Outgoing>) -> Unit,
) -> List<OutgoingPacket>
abstract suspend fun constructSourceForSpecialMessage(
finalMessage: MessageChain,
fromAppId: Int,
): OnlineMessageSource.Outgoing
open suspend fun uploadLongMessageHighway(
chain: MessageChain,
): String = with(contact) {
return getMiraiImpl().uploadMessageHighway(
bot, this@SendMessageHandler,
listOf(
ForwardMessage.Node(
senderId = bot.id,
time = currentTimeSeconds().toInt(),
messageChain = chain,
senderName = bot.nick
)
),
true
)
}
open suspend fun preConversionTransformedMessage(message: Message): Message = message
open suspend fun conversionMessageChain(chain: MessageChain): MessageChain = chain
open suspend fun postTransformActions(chain: MessageChain) {
}
}
/**
* 处理需要 `suspend` 操作的消息转换. 这个转换只会在发送消息时进行, 而不会在处理合并转发 [net.mamoe.mirai.internal.network.protocol.packet.chat.calculateValidationData] 等其他操作时进行.
* 在发包前还会进行最后的 [net.mamoe.mirai.internal.message.protocol.MessageProtocolFacade.encode] 转换, 这个转换会为所有操作使用.
*
* - [ForwardMessage] -> [ForwardMessageInternal] (by uploading through highway)
* - ... any others for future
*/
internal suspend fun <C : Contact> SendMessageHandler<C>.transformSpecialMessages(message: Message): MessageChain {
suspend fun processForwardMessage(
forward: ForwardMessage,
): ForwardMessageInternal {
if (!(message is MessageChain && message.contains(IgnoreLengthCheck))) {
check(forward.nodeList.size <= 200) {
throw MessageTooLargeException(
contact, forward, forward,
"ForwardMessage allows up to 200 nodes, but found ${forward.nodeList.size}"
)
}
sequence {
forward.nodeList.forEach { yieldAll(it.messageChain) }
}.asIterable().verifyLength(forward, contact)
}
val resId = getMiraiImpl().uploadMessageHighway(
bot = contact.bot,
sendMessageHandler = this,
message = forward.nodeList,
isLong = false,
)
return RichMessage.forwardMessage(
resId = resId,
fileName = currentTimeSeconds().toString(),
forwardMessage = forward,
)
}
// loses MessageMetadata and other message types but fine for now.
return message.takeSingleContent<ForwardMessage>()?.let { processForwardMessage(it) }?.toMessageChain()
?: message.toMessageChain()
}
/**
* Send a message, and covert messages
*
* Don't recall this function.
*/
internal suspend fun <C : Contact> SendMessageHandler<C>.sendMessage(
originalMessage: Message,
transformedMessage: Message,
isMiraiInternal: Boolean,
step: SendMessageStep,
): MessageReceipt<C> = sendMessageImpl(
originalMessage,
conversionMessageChain(
transformSpecialMessages(
preConversionTransformedMessage(transformedMessage)
)
),
isMiraiInternal,
step
)
/**
* Might be recalled with [transformedMessage] `is` [LongMessageInternal] if length estimation failed (sendMessagePacket)
*/
private suspend fun <C : Contact> SendMessageHandler<C>.sendMessageImpl(
originalMessage: Message,
transformedMessage: MessageChain,
isMiraiInternal: Boolean,
step: SendMessageStep,
): MessageReceipt<C> { // Result cannot be in interface.
if (!isMiraiInternal && step == SendMessageStep.FIRST) {
transformedMessage.verifySendingValid()
}
val chain = transformedMessage.convertToLongMessageIfNeeded(step)
chain.findIsInstance<QuoteReply>()?.source?.ensureSequenceIdAvailable()
postTransformActions(chain)
return sendMessagePacket(originalMessage, transformedMessage, chain, isMiraiInternal, step)
}
internal sealed class UserSendMessageHandler<C : AbstractUser>(
override val contact: C,
) : SendMessageHandler<C>() {
override val senderName: String get() = bot.nick
override suspend fun constructSourceForSpecialMessage(
finalMessage: MessageChain,
fromAppId: Int,
): OnlineMessageSource.Outgoing {
throw UnsupportedOperationException("Sending MusicShare or FileMessage to User is not yet supported")
}
}
internal class FriendSendMessageHandler(
contact: FriendImpl,
) : UserSendMessageHandler<FriendImpl>(contact) {
override val messageSvcSendMessage: (client: QQAndroidClient, contact: FriendImpl, message: MessageChain, fragmented: Boolean, sourceCallback: (Deferred<OnlineMessageSource.Outgoing>) -> Unit) -> List<OutgoingPacket> =
MessageSvcPbSendMsg::createToFriend
override suspend fun constructSourceForSpecialMessage(
finalMessage: MessageChain,
fromAppId: Int
): OnlineMessageSource.Outgoing {
val receipt: SendPrivateMessageReceipt = withTimeoutOrNull(3000) {
GlobalEventChannel.parentScope(this).nextEvent(EventPriority.MONITOR) {
it.bot === bot && it.fromAppId == fromAppId
}
} ?: SendPrivateMessageReceipt.EMPTY
return OnlineMessageSourceToFriendImpl(
internalIds = intArrayOf(receipt.messageRandom),
sequenceIds = intArrayOf(receipt.sequenceId),
sender = bot,
target = contact,
time = bot.clock.server.currentTimeSeconds().toInt(),
originalMessage = finalMessage
)
}
}
internal class StrangerSendMessageHandler(
contact: StrangerImpl,
) : UserSendMessageHandler<StrangerImpl>(contact) {
override val messageSvcSendMessage: (client: QQAndroidClient, contact: StrangerImpl, message: MessageChain, fragmented: Boolean, sourceCallback: (Deferred<OnlineMessageSource.Outgoing>) -> Unit) -> List<OutgoingPacket> =
MessageSvcPbSendMsg::createToStranger
}
internal class GroupTempSendMessageHandler(
contact: NormalMemberImpl,
) : UserSendMessageHandler<NormalMemberImpl>(contact) {
override val messageSvcSendMessage: (client: QQAndroidClient, contact: NormalMemberImpl, message: MessageChain, fragmented: Boolean, sourceCallback: (Deferred<OnlineMessageSource.Outgoing>) -> Unit) -> List<OutgoingPacket> =
MessageSvcPbSendMsg::createToTemp
}
internal open class GroupSendMessageHandler(
override val contact: GroupImpl,
) : SendMessageHandler<GroupImpl>() {
override val messageSvcSendMessage: (client: QQAndroidClient, contact: GroupImpl, message: MessageChain, fragmented: Boolean, sourceCallback: (Deferred<OnlineMessageSource.Outgoing>) -> Unit) -> List<OutgoingPacket> =
MessageSvcPbSendMsg::createToGroup
override val senderName: String
get() = contact.botAsMember.nameCardOrNick
override suspend fun conversionMessageChain(chain: MessageChain): MessageChain = chain.map { element ->
when (element) {
is OfflineGroupImage -> {
contact.fixImageFileId(element)
element
}
is FriendImage -> {
contact.updateFriendImageForGroupMessage(element)
}
else -> element
}
}.toMessageChain()
override suspend fun constructSourceForSpecialMessage(
finalMessage: MessageChain,
fromAppId: Int,
): OnlineMessageSource.Outgoing {
val receipt: SendGroupMessageReceipt = withTimeoutOrNull(3000) {
GlobalEventChannel.parentScope(this).nextEvent(EventPriority.MONITOR) {
it.bot === bot && it.fromAppId == fromAppId
}
} ?: SendGroupMessageReceipt.EMPTY
return OnlineMessageSourceToGroupImpl(
contact,
internalIds = intArrayOf(receipt.messageRandom),
providedSequenceIds = intArrayOf(receipt.sequenceId),
sender = bot,
target = contact,
time = bot.clock.server.currentTimeSeconds().toInt(),
originalMessage = finalMessage
)
}
companion object {
private suspend fun GroupImpl.fixImageFileId(image: OfflineGroupImage) {
bot.components[ImagePatcher].patchOfflineGroupImage(this, image)
}
private suspend fun GroupImpl.updateFriendImageForGroupMessage(image: FriendImage): OfflineGroupImage {
return bot.components[ImagePatcher].patchFriendImageToGroupImage(this, image)
}
}
}

View File

@ -25,6 +25,8 @@ import net.mamoe.mirai.data.StrangerInfo
import net.mamoe.mirai.event.events.StrangerMessagePostSendEvent
import net.mamoe.mirai.event.events.StrangerMessagePreSendEvent
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.message.protocol.outgoing.MessageProtocolStrategy
import net.mamoe.mirai.internal.message.protocol.outgoing.StrangerMessageProtocolStrategy
import net.mamoe.mirai.internal.message.source.OnlineMessageSourceToStrangerImpl
import net.mamoe.mirai.internal.message.source.createMessageReceipt
import net.mamoe.mirai.internal.network.protocol.packet.list.StrangerList
@ -59,13 +61,14 @@ internal class StrangerImpl(
}
}
private val handler: StrangerSendMessageHandler by lazy { StrangerSendMessageHandler(this) }
private val messageProtocolStrategy: MessageProtocolStrategy<StrangerImpl> = StrangerMessageProtocolStrategy
@Suppress("DuplicatedCode")
override suspend fun sendMessage(message: Message): MessageReceipt<Stranger> {
return asFriendOrNull()?.sendMessage(message)?.convert()
?: handler.sendMessageImpl<Stranger>(
?: sendMessageImpl(
message = message,
messageProtocolStrategy = messageProtocolStrategy,
preSendEventConstructor = ::StrangerMessagePreSendEvent,
postSendEventConstructor = ::StrangerMessagePostSendEvent.cast()
)

View File

@ -13,68 +13,15 @@ package net.mamoe.mirai.internal.contact
import net.mamoe.mirai.Bot
import net.mamoe.mirai.contact.*
import net.mamoe.mirai.internal.message.data.FileMessageImpl
import net.mamoe.mirai.internal.message.data.LongMessageInternal
import net.mamoe.mirai.internal.utils.estimateLength
import net.mamoe.mirai.message.data.*
import net.mamoe.mirai.utils.*
import net.mamoe.mirai.utils.cast
import net.mamoe.mirai.utils.verbose
internal inline val Group.uin: Long get() = this.cast<GroupImpl>().uin
internal inline val Group.groupCode: Long get() = this.id
internal inline val User.uin: Long get() = this.id
internal inline val Bot.uin: Long get() = this.id
internal fun Contact.logMessageSent(message: Message) {
if (message !is LongMessageInternal) {
bot.logger.verbose("$this <- $message".replaceMagicCodes())
}
}
internal fun Iterable<SingleMessage>.countImages(): Int = this.count { it is Image }
private val logger by lazy { MiraiLogger.Factory.create(SendMessageHandler::class) }
private val ALLOW_SENDING_FILE_MESSAGE = systemProp("mirai.message.allow.sending.file.message", false)
internal fun Message.verifySendingValid() {
// fun fail(msg: String): Nothing = throw IllegalArgumentException(msg)
when (this) {
is MessageChain -> {
this.forEach { it.verifySendingValid() }
}
is FileMessage -> {
if (!ALLOW_SENDING_FILE_MESSAGE) { // #1715
if (this !is FileMessageImpl) error("Customized FileMessage cannot be send")
if (!this.allowSend) error(
"Sending FileMessage is not allowed, as it may cause unexpected results. " +
"Add JVM argument `-Dmirai.message.allow.sending.file.message=true` to disable this check. " +
"Do this only for compatibility!"
)
}
}
}
}
internal fun Iterable<SingleMessage>.verifyLength(
originalMessage: Message, target: Contact,
): Int {
val chain = this
val length = estimateLength(target, 15001)
if (length > 15000 || countImages() > 50) {
throw MessageTooLargeException(
target, originalMessage, this.toMessageChain(),
"message(${
chain.joinToString("", limit = 10).let { rsp ->
if (rsp.length > 100) {
rsp.take(100) + "..."
} else rsp
}
}) is too large. Allow up to 50 images or 5000 chars"
)
}
return length
}
@Suppress("RemoveRedundantQualifierName") // compiler bug
internal fun net.mamoe.mirai.event.events.MessageEvent.logMessageReceived() {
fun renderMessage(message: MessageChain): String {
@ -168,7 +115,3 @@ internal fun String.replaceMagicCodes(): String = this
internal fun Message.takeContent(length: Int): String =
this.toMessageChain().joinToString("", limit = length) { it.content }
internal inline fun <reified T : MessageContent> Message.takeSingleContent(): T? {
return this as? T ?: this.castOrNull<MessageChain>()?.findIsInstance()
}

View File

@ -19,7 +19,7 @@ internal data class ContextualBugReportException(
internal fun contextualBugReportException(
context: String,
forDebug: String,
forDebug: String?,
e: Throwable? = null,
additional: String = "",
): ContextualBugReportException {

View File

@ -0,0 +1,32 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.message.data
import net.mamoe.mirai.message.data.MessageChain
import net.mamoe.mirai.message.data.MessageChainBuilder
import net.mamoe.mirai.message.data.SingleMessage
import net.mamoe.mirai.message.data.toMessageChain
import net.mamoe.mirai.message.data.visitor.MessageVisitor
import net.mamoe.mirai.message.data.visitor.MessageVisitorUnit
import net.mamoe.mirai.message.data.visitor.accept
import net.mamoe.mirai.utils.replaceAllKotlin
internal fun MessageChainBuilder.acceptChildren(visitor: MessageVisitorUnit) {
forEach { it.accept(visitor) }
}
internal fun MessageChainBuilder.transformChildren(visitor: MessageVisitor<MessageChainBuilder, SingleMessage>) {
replaceAllKotlin { it.accept(visitor, this) }
}
internal inline fun MessageChain.transform(trans: (SingleMessage) -> SingleMessage?): MessageChain {
return this.mapNotNull(trans).toMessageChain()
}

View File

@ -0,0 +1,25 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.message.data
import net.mamoe.mirai.Bot
import net.mamoe.mirai.contact.*
import net.mamoe.mirai.message.data.MessageSourceKind
internal fun ContactOrBot.inferMessageSourceKind(): MessageSourceKind {
return when (this) {
is Group -> MessageSourceKind.GROUP
is Member -> MessageSourceKind.TEMP
is Friend -> MessageSourceKind.FRIEND
is Stranger -> MessageSourceKind.STRANGER
is Bot -> MessageSourceKind.FRIEND
else -> error("Unsupported contact: $this")
}
}

View File

@ -10,11 +10,18 @@
package net.mamoe.mirai.internal.message.data
import io.ktor.utils.io.core.*
import net.mamoe.mirai.internal.contact.SendMessageHandler
import net.mamoe.mirai.internal.contact.takeSingleContent
import net.mamoe.mirai.contact.Contact
import net.mamoe.mirai.contact.Group
import net.mamoe.mirai.internal.contact.groupCode
import net.mamoe.mirai.internal.contact.impl
import net.mamoe.mirai.internal.contact.uin
import net.mamoe.mirai.internal.contact.userIdOrNull
import net.mamoe.mirai.internal.message.protocol.MessageProtocolFacade
import net.mamoe.mirai.internal.message.protocol.outgoing.HighwayUploader
import net.mamoe.mirai.internal.message.protocol.outgoing.MessageProtocolStrategy
import net.mamoe.mirai.internal.message.source.MessageSourceInternal
import net.mamoe.mirai.internal.network.QQAndroidClient
import net.mamoe.mirai.internal.network.component.buildComponentStorage
import net.mamoe.mirai.internal.network.highway.Highway
import net.mamoe.mirai.internal.network.highway.ResourceKind
import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody
@ -35,15 +42,21 @@ import kotlin.random.Random
internal open class MultiMsgUploader(
val client: QQAndroidClient,
val isLong: Boolean,
val handler: SendMessageHandler<*>,
val tmpRand: Random = Random.Default,
val facade: MessageProtocolFacade,
val contact: Contact,
val strategy: MessageProtocolStrategy<*>,
val senderName: String
) {
protected open fun newUploader(): MultiMsgUploader = MultiMsgUploader(
isLong = isLong,
handler = handler,
client = client,
tmpRand = tmpRand,
facade = facade,
contact = contact,
senderName = senderName,
strategy = strategy
)
val mainMsg = mutableListOf<MsgComm.Msg>()
@ -123,11 +136,14 @@ internal open class MultiMsgUploader(
msgs.forEach { msg ->
var msgChain = msg.messageChain
msgChain.takeSingleContent<ForwardMessage>()?.let { nestedForward ->
msgChain[ForwardMessage]?.let { nestedForward ->
msgChain = convertNestedForwardMessage(nestedForward, msgChain)
}
msgChain = handler.conversionMessageChain(msgChain)
msgChain = facade.preprocess(contact.impl(), msgChain, buildComponentStorage {
set(MessageProtocolStrategy, strategy)
set(HighwayUploader, HighwayUploader.Default)
})
var seq: Int = -1
var uid: Int = -1
@ -149,7 +165,7 @@ internal open class MultiMsgUploader(
msgHead = MsgComm.MsgHead(
fromUin = msg.senderId,
toUin = if (isLong) {
handler.targetUserUin ?: 0
contact.userIdOrNull ?: 0
} else 0,
msgSeq = seq,
msgTime = msg.time,
@ -159,13 +175,17 @@ internal open class MultiMsgUploader(
msgId = 1,
),
msgType = 82, // troop,
groupInfo = handler.run { msg.groupInfo },
groupInfo = if (contact is Group) MsgComm.GroupInfo(
groupCode = contact.groupCode,
groupCard = senderName // Cinnamon
) else null,
isSrcMsg = false,
),
msgBody = ImMsgBody.MsgBody(
richText = ImMsgBody.RichText(
elems = MessageProtocolFacade.encode(
msgChain, messageTarget = handler.contact,
msgChain,
messageTarget = contact,
withGeneralFlags = false,
isForward = true
)
@ -201,11 +221,11 @@ internal open class MultiMsgUploader(
buType = if (isLong) 1 else 2,
client = client,
messageData = data,
dstUin = handler.targetUin
), 5000, 2
dstUin = contact.uin
)
)
val resId: String
lateinit var resId: String
when (response) {
is MultiMsg.ApplyUp.Response.MessageTooLarge ->
error(
@ -221,7 +241,7 @@ internal open class MultiMsgUploader(
msgUpReq = listOf(
LongMsg.MsgUpReq(
msgType = 3, // group
dstUin = handler.targetUin,
dstUin = contact.uin,
msgId = 0,
msgUkey = response.proto.msgUkey,
needCache = 0,
@ -246,6 +266,6 @@ internal open class MultiMsgUploader(
}
}
return resId
return resId // this must be initialized, 'lateinit' due to IDE complaint
}
}

View File

@ -11,6 +11,10 @@ package net.mamoe.mirai.internal.message.protocol
import net.mamoe.mirai.internal.message.protocol.decode.MessageDecoder
import net.mamoe.mirai.internal.message.protocol.encode.MessageEncoder
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePostprocessor
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePreprocessor
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessageSender
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessageTransformer
import net.mamoe.mirai.message.data.SingleMessage
import kotlin.reflect.KClass
@ -29,6 +33,7 @@ internal abstract class MessageProtocol(
const val PRIORITY_CONTENT: UInt = 1000u
const val PRIORITY_IGNORE: UInt = 500u
const val PRIORITY_UNSUPPORTED: UInt = 100u
const val PRIORITY_GENERAL_SENDER: UInt = 100u
}
object PriorityComparator : Comparator<MessageProtocol> {
@ -55,6 +60,12 @@ internal abstract class ProcessorCollector {
abstract fun <T : SingleMessage> add(encoder: MessageEncoder<T>, elementType: KClass<T>)
abstract fun add(decoder: MessageDecoder)
abstract fun add(preprocessor: OutgoingMessagePreprocessor)
abstract fun add(transformer: OutgoingMessageTransformer)
abstract fun add(sender: OutgoingMessageSender)
abstract fun add(postprocessor: OutgoingMessagePostprocessor)
}

View File

@ -11,26 +11,42 @@ package net.mamoe.mirai.internal.message.protocol
import net.mamoe.mirai.Bot
import net.mamoe.mirai.contact.ContactOrBot
import net.mamoe.mirai.internal.contact.AbstractContact
import net.mamoe.mirai.internal.contact.SendMessageStep
import net.mamoe.mirai.internal.contact.impl
import net.mamoe.mirai.internal.message.DeepMessageRefiner.refineDeep
import net.mamoe.mirai.internal.message.EmptyRefineContext
import net.mamoe.mirai.internal.message.LightMessageRefiner.refineLight
import net.mamoe.mirai.internal.message.RefineContext
import net.mamoe.mirai.internal.message.contextualBugReportException
import net.mamoe.mirai.internal.message.protocol.decode.*
import net.mamoe.mirai.internal.message.protocol.encode.*
import net.mamoe.mirai.internal.message.protocol.outgoing.*
import net.mamoe.mirai.internal.network.component.ComponentStorage
import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody
import net.mamoe.mirai.internal.utils.runCoroutineInPlace
import net.mamoe.mirai.internal.utils.structureToString
import net.mamoe.mirai.message.MessageReceipt
import net.mamoe.mirai.message.data.*
import net.mamoe.mirai.message.data.visitor.RecursiveMessageVisitor
import net.mamoe.mirai.message.data.visitor.accept
import net.mamoe.mirai.utils.MutableTypeSafeMap
import net.mamoe.mirai.utils.buildTypeSafeMap
import net.mamoe.mirai.utils.castUp
import java.util.*
import kotlin.reflect.KClass
internal interface MessageProtocolFacade {
val encoderPipeline: MessageEncoderPipeline
val decoderPipeline: MessageDecoderPipeline
val preprocessorPipeline: OutgoingMessagePipeline
val outgoingPipeline: OutgoingMessagePipeline
val loaded: List<MessageProtocol>
/**
* Encode high-level [MessageChain] to give list of low-level and protocol-specific [ImMsgBody.Elem]s.
*/
fun encode(
chain: MessageChain,
messageTarget: ContactOrBot?, // for At.display, QuoteReply, Image, and more.
@ -38,6 +54,11 @@ internal interface MessageProtocolFacade {
isForward: Boolean = false, // is inside forward, for At.display
): List<ImMsgBody.Elem>
/**
* Decode list of low-level and protocol-specific [ImMsgBody.Elem]s to give a high-level [MessageChain].
*
* [SingleMessage]s are appended to the [builder].
*/
fun decode(
elements: List<ImMsgBody.Elem>,
groupIdOrZero: Long,
@ -46,6 +67,41 @@ internal interface MessageProtocolFacade {
builder: MessageChainBuilder,
)
/**
* Pre-process a message
* @see OutgoingMessagePreprocessor
*/
suspend fun <C : AbstractContact> preprocess(
target: C,
message: Message,
components: ComponentStorage,
): MessageChain
/**
* Send a message
* @see OutgoingMessageProcessor
*/
suspend fun <C : AbstractContact> sendOutgoing(
target: C,
message: Message,
components: ComponentStorage,
): MessageReceipt<C>
/**
* Preprocess and send a message
* @see OutgoingMessagePreprocessor
* @see OutgoingMessageProcessor
*/
suspend fun <C : AbstractContact> preprocessAndSendOutgoing(
target: C,
message: Message,
components: ComponentStorage,
): MessageReceipt<C>
/**
* Decode list of low-level and protocol-specific [ImMsgBody.Elem]s to give a high-level [MessageChain].
*/
fun decode(
elements: List<ImMsgBody.Elem>,
groupIdOrZero: Long,
@ -53,6 +109,11 @@ internal interface MessageProtocolFacade {
bot: Bot,
): MessageChain = buildMessageChain { decode(elements, groupIdOrZero, messageSourceKind, bot, this) }
fun copy(): MessageProtocolFacade
/**
* The default global instance.
*/
companion object INSTANCE : MessageProtocolFacade by MessageProtocolFacadeImpl()
}
@ -74,10 +135,12 @@ internal suspend fun MessageProtocolFacade.decodeAndRefineDeep(
internal class MessageProtocolFacadeImpl(
protocols: Iterable<MessageProtocol> = ServiceLoader.load(MessageProtocol::class.java)
private val protocols: Iterable<MessageProtocol> = ServiceLoader.load(MessageProtocol::class.java)
) : MessageProtocolFacade {
override val encoderPipeline: MessageEncoderPipeline = MessageEncoderPipelineImpl()
override val decoderPipeline: MessageDecoderPipeline = MessageDecoderPipelineImpl()
override val preprocessorPipeline: OutgoingMessagePipeline = OutgoingMessagePipelineImpl()
override val outgoingPipeline: OutgoingMessagePipeline = OutgoingMessagePipelineImpl()
override val loaded: List<MessageProtocol> = kotlin.run {
val instances: PriorityQueue<MessageProtocol> = protocols
@ -97,6 +160,21 @@ internal class MessageProtocolFacadeImpl(
this@MessageProtocolFacadeImpl.decoderPipeline.registerProcessor(MessageDecoderProcessor(decoder))
}
override fun add(preprocessor: OutgoingMessagePreprocessor) {
preprocessorPipeline.registerProcessor(OutgoingMessageProcessorAdapter(preprocessor))
}
override fun add(transformer: OutgoingMessageTransformer) {
outgoingPipeline.registerProcessor(OutgoingMessageProcessorAdapter(transformer))
}
override fun add(sender: OutgoingMessageSender) {
outgoingPipeline.registerProcessor(OutgoingMessageProcessorAdapter(sender))
}
override fun add(postprocessor: OutgoingMessagePostprocessor) {
outgoingPipeline.registerProcessor(OutgoingMessageProcessorAdapter(postprocessor))
}
})
}
instances.toList()
@ -122,7 +200,7 @@ internal class MessageProtocolFacadeImpl(
chain.accept(object : RecursiveMessageVisitor<Unit>() {
override fun visitSingleMessage(message: SingleMessage, data: Unit) {
runCoroutineInPlace {
builder.addAll(pipeline.process(message, attributes))
builder.addAll(pipeline.process(message, attributes).collected)
}
}
})
@ -146,7 +224,77 @@ internal class MessageProtocolFacadeImpl(
}
runCoroutineInPlace {
elements.forEach { builder.addAll(pipeline.process(it, attributes)) }
elements.forEach { builder.addAll(pipeline.process(it, attributes).collected) }
}
}
override suspend fun <C : AbstractContact> preprocess(
target: C,
message: Message,
components: ComponentStorage
): MessageChain {
val attributes = createAttributesForOutgoingMessage(target, message, components)
return preprocessorPipeline.process(message.toMessageChain(), attributes).context.currentMessageChain
}
override suspend fun <C : AbstractContact> sendOutgoing(
target: C, message: Message,
components: ComponentStorage
): MessageReceipt<C> {
val attributes = createAttributesForOutgoingMessage(target, message, components)
val (_, result) = outgoingPipeline.process(message.toMessageChain(), attributes)
return getSingleReceipt(result, message)
}
override suspend fun <C : AbstractContact> preprocessAndSendOutgoing(
target: C,
message: Message,
components: ComponentStorage
): MessageReceipt<C> {
val attributes = createAttributesForOutgoingMessage(target, message, components)
val (context, _) = preprocessorPipeline.process(message.toMessageChain(), attributes)
val (_, result) = outgoingPipeline.process(message.toMessageChain(), context, attributes)
return getSingleReceipt(result, message)
}
override fun copy(): MessageProtocolFacade {
return MessageProtocolFacadeImpl(protocols)
}
private fun <C : AbstractContact> getSingleReceipt(
result: Collection<MessageReceipt<*>>,
message: Message
): MessageReceipt<C> {
when (result.size) {
0 -> throw contextualBugReportException(
"Internal error: no MessageReceipt was returned from OutgoingMessagePipeline for message",
forDebug = message.structureToString()
)
1 -> return result.single().castUp()
else -> throw contextualBugReportException(
"Internal error: multiple MessageReceipts were returned from OutgoingMessagePipeline: $result",
forDebug = message.structureToString()
)
}
}
private fun <C : AbstractContact> createAttributesForOutgoingMessage(
target: C,
message: Message,
context: ComponentStorage
): MutableTypeSafeMap {
val attributes = buildTypeSafeMap {
set(OutgoingMessagePipelineContext.CONTACT, target.impl())
set(OutgoingMessagePipelineContext.ORIGINAL_MESSAGE, message)
set(OutgoingMessagePipelineContext.STEP, SendMessageStep.FIRST)
set(OutgoingMessagePipelineContext.PROTOCOL_STRATEGY, context[MessageProtocolStrategy].castUp())
set(OutgoingMessagePipelineContext.HIGHWAY_UPLOADER, context[HighwayUploader])
}
return attributes
}
}

View File

@ -1,20 +0,0 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.message.protocol
import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody
internal interface OutgoingMessageContext {
fun collect(elem: ImMsgBody.Elem)
fun processAlso()
}

View File

@ -24,6 +24,8 @@ internal interface MessageDecoder : PipelineConsumptionMarker {
internal class MessageDecoderProcessor(
private val decoder: MessageDecoder,
) : Processor<MessageDecoderContext, ImMsgBody.Elem> {
override val origin: Any get() = this
override suspend fun process(context: MessageDecoderContext, data: ImMsgBody.Elem) {
@Suppress("ILLEGAL_RESTRICTED_SUSPENDING_FUNCTION_CALL")
decoder.run { context.process(data) }

View File

@ -20,7 +20,8 @@ import net.mamoe.mirai.message.data.MessageSourceKind
import net.mamoe.mirai.utils.*
import kotlin.coroutines.RestrictsSuspension
internal interface MessageDecoderPipeline : ProcessorPipeline<MessageDecoderProcessor, ImMsgBody.Elem, Message>
internal interface MessageDecoderPipeline :
ProcessorPipeline<MessageDecoderProcessor, MessageDecoderContext, ImMsgBody.Elem, Message>
@RestrictsSuspension // Implementor can only call `MessageDecoderContext.process` and `processAlso` so there will be no suspension point
internal interface MessageDecoderContext : ProcessorPipelineContext<ImMsgBody.Elem, Message> {
@ -41,7 +42,8 @@ internal open class MessageDecoderPipelineImpl :
inner class MessageDecoderContextImpl(attributes: TypeSafeMap) : MessageDecoderContext, BaseContextImpl(attributes)
override fun createContext(attributes: TypeSafeMap): MessageDecoderContext = MessageDecoderContextImpl(attributes)
override fun createContext(data: ImMsgBody.Elem, attributes: TypeSafeMap): MessageDecoderContext =
MessageDecoderContextImpl(attributes)
companion object {
@TestOnly

View File

@ -27,6 +27,8 @@ internal class MessageEncoderProcessor<T : SingleMessage>(
private val encoder: MessageEncoder<T>,
private val elementType: KClass<T>,
) : Processor<MessageEncoderContext, SingleMessage> {
override val origin: Any get() = this
override suspend fun process(context: MessageEncoderContext, data: SingleMessage) {
if (elementType.isInstance(data)) {
@Suppress("ILLEGAL_RESTRICTED_SUSPENDING_FUNCTION_CALL")

View File

@ -21,7 +21,7 @@ import net.mamoe.mirai.utils.*
import kotlin.coroutines.RestrictsSuspension
internal interface MessageEncoderPipeline :
ProcessorPipeline<MessageEncoderProcessor<*>, SingleMessage, ImMsgBody.Elem> {
ProcessorPipeline<MessageEncoderProcessor<*>, MessageEncoderContext, SingleMessage, ImMsgBody.Elem> {
}
@RestrictsSuspension
@ -73,7 +73,8 @@ internal open class MessageEncoderPipelineImpl :
}
}
override fun createContext(attributes: TypeSafeMap): MessageEncoderContext = MessageEncoderContextImpl(attributes)
override fun createContext(data: SingleMessage, attributes: TypeSafeMap): MessageEncoderContext =
MessageEncoderContextImpl(attributes)
companion object {
private val PB_RESERVE_FOR_ELSE = "78 00 F8 01 00 C8 02 00".hexToBytes()

View File

@ -9,21 +9,92 @@
package net.mamoe.mirai.internal.message.protocol.impl
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.io.core.readUShort
import net.mamoe.mirai.internal.contact.SendMessageStep
import net.mamoe.mirai.internal.message.data.FileMessageImpl
import net.mamoe.mirai.internal.message.data.checkIsImpl
import net.mamoe.mirai.internal.message.protocol.MessageProtocol
import net.mamoe.mirai.internal.message.protocol.ProcessorCollector
import net.mamoe.mirai.internal.message.protocol.decode.MessageDecoder
import net.mamoe.mirai.internal.message.protocol.decode.MessageDecoderContext
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext.Companion.CONTACT
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext.Companion.PROTOCOL_STRATEGY
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessageSender
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessageTransformer
import net.mamoe.mirai.internal.message.source.createMessageReceipt
import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody
import net.mamoe.mirai.internal.network.protocol.data.proto.ObjMsg
import net.mamoe.mirai.internal.network.protocol.packet.chat.FileManagement
import net.mamoe.mirai.internal.utils.io.serialization.readProtoBuf
import net.mamoe.mirai.message.data.FileMessage
import net.mamoe.mirai.message.data.MessageChain
import net.mamoe.mirai.message.data.visitor.RecursiveMessageVisitor
import net.mamoe.mirai.message.data.visitor.acceptChildren
import net.mamoe.mirai.utils.read
import net.mamoe.mirai.utils.systemProp
internal class FileMessageProtocol : MessageProtocol() {
override fun ProcessorCollector.collectProcessorsImpl() {
// no encoder
add(Decoder())
add(OutgoingMessageTransformer {
if (attributes[OutgoingMessagePipelineContext.STEP] == SendMessageStep.FIRST
) {
verifyFileMessage(currentMessageChain)
}
})
add(FileMessageSender())
}
companion object {
private val ALLOW_SENDING_FILE_MESSAGE = systemProp("mirai.message.allow.sending.file.message", false)
fun verifyFileMessage(message: MessageChain) {
message.acceptChildren(object : RecursiveMessageVisitor<Unit>() {
override fun visitFileMessage(message: FileMessage, data: Unit) {
if (ALLOW_SENDING_FILE_MESSAGE) return
// #1715
if (message !is FileMessageImpl) error("Customized FileMessage cannot be send")
if (!message.allowSend) error(
"Sending FileMessage is not allowed, as it may cause unexpected results. " +
"Add JVM argument `-Dmirai.message.allow.sending.file.message=true` to disable this check. " +
"Do this only for compatibility!"
)
}
})
}
}
private class FileMessageSender : OutgoingMessageSender {
override suspend fun OutgoingMessagePipelineContext.process() {
val file = currentMessageChain[FileMessage] ?: return
markAsConsumed()
file.checkIsImpl()
val contact = attributes[CONTACT]
val bot = contact.bot
val strategy = attributes[PROTOCOL_STRATEGY]
val source = coroutineScope {
val source = async {
strategy.constructSourceForSpecialMessage(currentMessageChain, 2021)
}
bot.network.sendAndExpect(FileManagement.Feed(bot.client, contact.id, file.busId, file.id))
source.await()
}
collect(source.createMessageReceipt(contact, true))
}
}
private class Decoder : MessageDecoder {

View File

@ -0,0 +1,63 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.message.protocol.impl
import net.mamoe.mirai.contact.MessageTooLargeException
import net.mamoe.mirai.internal.message.data.forwardMessage
import net.mamoe.mirai.internal.message.flags.IgnoreLengthCheck
import net.mamoe.mirai.internal.message.protocol.MessageProtocol
import net.mamoe.mirai.internal.message.protocol.ProcessorCollector
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext.Companion.CONTACT
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext.Companion.HIGHWAY_UPLOADER
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext.Companion.PROTOCOL_STRATEGY
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePreprocessor
import net.mamoe.mirai.message.data.ForwardMessage
import net.mamoe.mirai.message.data.RichMessage
import net.mamoe.mirai.message.data.toMessageChain
import net.mamoe.mirai.utils.currentTimeSeconds
internal class ForwardMessageProtocol : MessageProtocol() {
override fun ProcessorCollector.collectProcessorsImpl() {
add(ForwardMessageUploader())
}
class ForwardMessageUploader : OutgoingMessagePreprocessor {
override suspend fun OutgoingMessagePipelineContext.process() {
val forward = currentMessageChain[ForwardMessage] ?: return
val contact = attributes[CONTACT]
if (!currentMessageChain.contains(IgnoreLengthCheck)) {
check(forward.nodeList.size <= 200) {
throw MessageTooLargeException(
contact, forward, forward,
"ForwardMessage allows up to 200 nodes, but found ${forward.nodeList.size}"
)
}
sequence {
forward.nodeList.forEach { yieldAll(it.messageChain) }
}.asIterable().verifyLength(forward, contact)
}
val resId = attributes[HIGHWAY_UPLOADER].uploadMessages(
contact,
attributes[PROTOCOL_STRATEGY],
forward.nodeList,
false
)
currentMessageChain = RichMessage.forwardMessage(
resId = resId,
fileName = currentTimeSeconds().toString(),
forwardMessage = forward,
).toMessageChain()
}
}
}

View File

@ -0,0 +1,118 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.message.protocol.impl
import kotlinx.coroutines.Deferred
import net.mamoe.mirai.contact.*
import net.mamoe.mirai.internal.AbstractBot
import net.mamoe.mirai.internal.contact.SendMessageStep
import net.mamoe.mirai.internal.message.protocol.MessageProtocol
import net.mamoe.mirai.internal.message.protocol.ProcessorCollector
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext.Companion.CONTACT
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext.Companion.PROTOCOL_STRATEGY
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext.Companion.STEP
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessageSender
import net.mamoe.mirai.internal.message.source.createMessageReceipt
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.MessageSvcPbSendMsg
import net.mamoe.mirai.message.data.AtAll
import net.mamoe.mirai.message.data.OnlineMessageSource
import net.mamoe.mirai.message.data.content
import net.mamoe.mirai.message.data.toMessageChain
import net.mamoe.mirai.utils.buildTypeSafeMap
import net.mamoe.mirai.utils.truncated
internal class GeneralMessageSenderProtocol : MessageProtocol(PRIORITY_GENERAL_SENDER) {
override fun ProcessorCollector.collectProcessorsImpl() {
add(GeneralMessageSender())
}
class GeneralMessageSender : OutgoingMessageSender {
override suspend fun OutgoingMessagePipelineContext.process() {
markAsConsumed()
val strategy = attributes[PROTOCOL_STRATEGY]
val step = attributes[STEP]
val contact = attributes[CONTACT]
val bot = contact.bot
var source: Deferred<OnlineMessageSource.Outgoing>? = null
val packets = strategy.createPacketsForGeneralMessage(
client = bot.client,
contact = contact,
message = currentMessageChain,
fragmented = step == SendMessageStep.FRAGMENTED,
sourceCallback = { source = it }
)
sendAllPackets(bot, step, contact, packets)
val sourceAwait = source?.await() ?: error("Internal error: source is not initialized")
sourceAwait.tryEnsureSequenceIdAvailable()
collect(sourceAwait.createMessageReceipt(contact, true))
}
private suspend fun OutgoingMessagePipelineContext.sendAllPackets(
bot: AbstractBot,
step: SendMessageStep,
contact: Contact,
packets: List<OutgoingPacket>
) = packets.forEach { packet ->
val originalMessage = attributes[OutgoingMessagePipelineContext.ORIGINAL_MESSAGE]
val finalMessage = currentMessageChain
val resp = bot.network.sendAndExpect(packet) as MessageSvcPbSendMsg.Response
if (resp is MessageSvcPbSendMsg.Response.MessageTooLarge) {
val next = step.nextStepOrNull()
?: throw MessageTooLargeException(
contact,
originalMessage,
finalMessage,
"Message '${finalMessage.content.truncated(10)}' is too large."
)
// retry with next step
processAlso(
originalMessage.toMessageChain(),
extraAttributes = buildTypeSafeMap {
set(STEP, next)
},
) // We expect to get a Receipt from processAlso
return@forEach
}
if (resp is MessageSvcPbSendMsg.Response.ServiceUnavailable) {
throw IllegalStateException("Send message to $contact failed, server service is unavailable.")
}
if (resp is MessageSvcPbSendMsg.Response.Failed) {
when (resp.resultType) {
120 -> if (contact is Group) throw BotIsBeingMutedException(contact, originalMessage)
121 -> if (AtAll in currentMessageChain) throw SendMessageFailedException(
contact,
SendMessageFailedException.Reason.AT_ALL_LIMITED,
originalMessage
)
299 -> if (contact is Group) throw SendMessageFailedException(
contact,
SendMessageFailedException.Reason.GROUP_CHAT_LIMITED,
originalMessage
)
}
}
check(resp is MessageSvcPbSendMsg.Response.SUCCESS) {
"Send message failed: $resp"
}
}
}
}

View File

@ -10,6 +10,9 @@
package net.mamoe.mirai.internal.message.protocol.impl
import net.mamoe.mirai.contact.User
import net.mamoe.mirai.internal.asQQAndroidBot
import net.mamoe.mirai.internal.contact.GroupImpl
import net.mamoe.mirai.internal.message.data.transform
import net.mamoe.mirai.internal.message.image.*
import net.mamoe.mirai.internal.message.protocol.MessageProtocol
import net.mamoe.mirai.internal.message.protocol.ProcessorCollector
@ -18,8 +21,12 @@ import net.mamoe.mirai.internal.message.protocol.decode.MessageDecoderContext
import net.mamoe.mirai.internal.message.protocol.encode.MessageEncoder
import net.mamoe.mirai.internal.message.protocol.encode.MessageEncoderContext
import net.mamoe.mirai.internal.message.protocol.encode.MessageEncoderContext.Companion.contact
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext.Companion.CONTACT
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePreprocessor
import net.mamoe.mirai.internal.network.protocol.data.proto.CustomFace
import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody
import net.mamoe.mirai.internal.utils.ImagePatcher
import net.mamoe.mirai.internal.utils.io.serialization.loadAs
import net.mamoe.mirai.message.data.ImageType
import net.mamoe.mirai.message.data.ShowImageFlag
@ -30,6 +37,29 @@ internal class ImageProtocol : MessageProtocol() {
override fun ProcessorCollector.collectProcessorsImpl() {
add(ImageEncoder())
add(ImageDecoder())
add(ImagePatcherForGroup())
}
private class ImagePatcherForGroup : OutgoingMessagePreprocessor {
override suspend fun OutgoingMessagePipelineContext.process() {
val contact = attributes[CONTACT]
if (contact !is GroupImpl) return
val patcher = contact.bot.asQQAndroidBot().components[ImagePatcher]
currentMessageChain = currentMessageChain.transform { element ->
when (element) {
is OfflineGroupImage -> {
patcher.patchOfflineGroupImage(contact, element)
element
}
is FriendImage -> {
patcher.patchFriendImageToGroupImage(contact, element)
}
else -> element
}
}
}
}
private class ImageDecoder : MessageDecoder {

View File

@ -0,0 +1,84 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.message.protocol.impl
import net.mamoe.mirai.internal.contact.AbstractContact
import net.mamoe.mirai.internal.contact.SendMessageStep
import net.mamoe.mirai.internal.contact.takeContent
import net.mamoe.mirai.internal.message.data.longMessage
import net.mamoe.mirai.internal.message.flags.DontAsLongMessage
import net.mamoe.mirai.internal.message.flags.ForceAsLongMessage
import net.mamoe.mirai.internal.message.flags.IgnoreLengthCheck
import net.mamoe.mirai.internal.message.protocol.MessageProtocol
import net.mamoe.mirai.internal.message.protocol.ProcessorCollector
import net.mamoe.mirai.internal.message.protocol.outgoing.MessageProtocolStrategy
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext.Companion.CONTACT
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext.Companion.HIGHWAY_UPLOADER
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext.Companion.PROTOCOL_STRATEGY
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext.Companion.STEP
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessageTransformer
import net.mamoe.mirai.message.data.MessageChain
import net.mamoe.mirai.message.data.RichMessage
import net.mamoe.mirai.utils.currentTimeSeconds
internal class LongMessageProtocol : MessageProtocol() {
override fun ProcessorCollector.collectProcessorsImpl() {
add(OutgoingMessageTransformer {
currentMessageChain =
convertToLongMessageIfNeeded(
currentMessageChain,
attributes[STEP],
attributes[CONTACT],
attributes[PROTOCOL_STRATEGY]
)
})
}
private suspend fun OutgoingMessagePipelineContext.convertToLongMessageIfNeeded(
chain: MessageChain,
step: SendMessageStep,
contact: AbstractContact,
strategy: MessageProtocolStrategy<*>
): MessageChain {
val uploader = attributes[HIGHWAY_UPLOADER]
suspend fun sendLongImpl(): MessageChain {
val time = currentTimeSeconds()
val resId = uploader.uploadLongMessage(contact, strategy, chain, time.toInt())
return chain + RichMessage.longMessage(
brief = chain.takeContent(27),
resId = resId,
timeSeconds = time
) // LongMessageInternal replaces all contents and preserves metadata
}
return when (step) {
SendMessageStep.FIRST -> {
// 只需要在第一次发送的时候验证长度
// 后续重试直接跳过
if (chain.contains(ForceAsLongMessage)) {
return sendLongImpl()
}
if (!chain.contains(IgnoreLengthCheck)) {
chain.verifyLength(chain, contact)
}
chain
}
SendMessageStep.LONG_MESSAGE -> {
if (chain.contains(DontAsLongMessage)) chain // fragmented
else sendLongImpl()
}
SendMessageStep.FRAGMENTED -> chain
}
}
}

View File

@ -9,10 +9,17 @@
package net.mamoe.mirai.internal.message.protocol.impl
import net.mamoe.mirai.contact.Group
import net.mamoe.mirai.internal.message.protocol.MessageProtocol
import net.mamoe.mirai.internal.message.protocol.ProcessorCollector
import net.mamoe.mirai.internal.message.protocol.encode.MessageEncoder
import net.mamoe.mirai.internal.message.protocol.encode.MessageEncoderContext
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePipelineContext.Companion.CONTACT
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessageSender
import net.mamoe.mirai.internal.message.source.createMessageReceipt
import net.mamoe.mirai.internal.network.protocol.packet.chat.MusicSharePacket
import net.mamoe.mirai.message.data.MessageSourceKind
import net.mamoe.mirai.message.data.MusicShare
import net.mamoe.mirai.message.data.PlainText
import net.mamoe.mirai.message.data.content
@ -22,6 +29,8 @@ internal class MusicShareProtocol : MessageProtocol() {
add(Encoder())
// no decoder. refined from LightApp
// add(Decoder())
add(Sender())
}
private class Encoder : MessageEncoder<MusicShare> {
@ -32,4 +41,25 @@ internal class MusicShareProtocol : MessageProtocol() {
processAlso(PlainText(data.content))
}
}
private class Sender : OutgoingMessageSender {
override suspend fun OutgoingMessagePipelineContext.process() {
val contact = attributes[CONTACT]
val bot = contact.bot
val musicShare = currentMessageChain[MusicShare] ?: return
val packet = MusicSharePacket(
bot.client, musicShare, contact.id,
targetKind = if (contact is Group) MessageSourceKind.GROUP else MessageSourceKind.FRIEND // always FRIEND
)
val result = bot.network.sendAndExpect(packet)
result.pkg.checkSuccess("send music share")
val strategy = attributes[OutgoingMessagePipelineContext.PROTOCOL_STRATEGY]
val source = strategy.constructSourceForSpecialMessage(currentMessageChain, 3116)
source.tryEnsureSequenceIdAvailable()
collect(source.createMessageReceipt(contact, true))
}
}
}

View File

@ -21,8 +21,10 @@ import net.mamoe.mirai.internal.message.protocol.decode.MessageDecoderContext.Co
import net.mamoe.mirai.internal.message.protocol.encode.MessageEncoder
import net.mamoe.mirai.internal.message.protocol.encode.MessageEncoderContext
import net.mamoe.mirai.internal.message.protocol.encode.MessageEncoderContext.Companion.contact
import net.mamoe.mirai.internal.message.protocol.outgoing.OutgoingMessagePreprocessor
import net.mamoe.mirai.internal.message.source.MessageSourceInternal
import net.mamoe.mirai.internal.message.source.OfflineMessageSourceImplData
import net.mamoe.mirai.internal.message.source.ensureSequenceIdAvailable
import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody
import net.mamoe.mirai.message.data.At
import net.mamoe.mirai.message.data.OnlineMessageSource
@ -32,6 +34,10 @@ internal class QuoteReplyProtocol : MessageProtocol(PRIORITY_METADATA) {
override fun ProcessorCollector.collectProcessorsImpl() {
add(Encoder())
add(Decoder())
add(OutgoingMessagePreprocessor {
currentMessageChain[QuoteReply]?.source?.ensureSequenceIdAvailable()
})
}
private class Decoder : MessageDecoder {

View File

@ -0,0 +1,71 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.message.protocol.outgoing
import net.mamoe.mirai.internal.contact.AbstractContact
import net.mamoe.mirai.internal.contact.nickIn
import net.mamoe.mirai.internal.message.data.MultiMsgUploader
import net.mamoe.mirai.internal.message.protocol.MessageProtocolFacade
import net.mamoe.mirai.internal.message.source.ensureSequenceIdAvailable
import net.mamoe.mirai.internal.network.component.ComponentKey
import net.mamoe.mirai.message.data.ForwardMessage
import net.mamoe.mirai.message.data.MessageChain
internal interface HighwayUploader {
suspend fun uploadMessages(
contact: AbstractContact,
strategy: MessageProtocolStrategy<*>,
nodes: Collection<ForwardMessage.INode>,
isLong: Boolean,
facade: MessageProtocolFacade = MessageProtocolFacade,
senderName: String = contact.bot.nickIn(contact),
): String {
nodes.forEach { it.messageChain.ensureSequenceIdAvailable() }
val uploader = MultiMsgUploader(
client = contact.bot.client,
isLong = isLong,
facade = facade,
contact = contact,
senderName = senderName,
strategy = strategy
).also { it.emitMain(nodes) }
return uploader.uploadAndReturnResId()
}
suspend fun uploadLongMessage(
contact: AbstractContact,
strategy: MessageProtocolStrategy<*>,
chain: MessageChain,
timeSeconds: Int,
senderName: String = contact.bot.nickIn(contact),
): String {
val bot = contact.bot
return uploadMessages(
contact,
strategy,
listOf(
ForwardMessage.Node(
senderId = bot.id,
time = timeSeconds,
messageChain = chain,
senderName = senderName
)
),
true,
senderName = senderName
)
}
companion object : ComponentKey<HighwayUploader>
object Default : HighwayUploader
}

View File

@ -0,0 +1,149 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.message.protocol.outgoing
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.withTimeoutOrNull
import net.mamoe.mirai.event.EventPriority
import net.mamoe.mirai.event.GlobalEventChannel
import net.mamoe.mirai.event.nextEvent
import net.mamoe.mirai.internal.contact.*
import net.mamoe.mirai.internal.message.source.OnlineMessageSourceToFriendImpl
import net.mamoe.mirai.internal.message.source.OnlineMessageSourceToGroupImpl
import net.mamoe.mirai.internal.network.QQAndroidClient
import net.mamoe.mirai.internal.network.component.ComponentKey
import net.mamoe.mirai.internal.network.components.ClockHolder.Companion.clock
import net.mamoe.mirai.internal.network.notice.group.GroupMessageProcessor
import net.mamoe.mirai.internal.network.notice.priv.PrivateMessageProcessor
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.*
import net.mamoe.mirai.message.data.MessageChain
import net.mamoe.mirai.message.data.OnlineMessageSource
internal sealed interface MessageProtocolStrategy<in C : AbstractContact> {
suspend fun createPacketsForGeneralMessage(
client: QQAndroidClient,
contact: C,
message: MessageChain,
fragmented: Boolean,
sourceCallback: (Deferred<OnlineMessageSource.Outgoing>) -> Unit,
): List<OutgoingPacket>
suspend fun constructSourceForSpecialMessage(
finalMessage: MessageChain,
fromAppId: Int,
): OnlineMessageSource.Outgoing
companion object : ComponentKey<MessageProtocolStrategy<*>>
}
internal sealed class UserMessageProtocolStrategy<C : AbstractUser> : MessageProtocolStrategy<C> {
override suspend fun constructSourceForSpecialMessage(
finalMessage: MessageChain,
fromAppId: Int
): OnlineMessageSource.Outgoing {
throw UnsupportedOperationException("Sending MusicShare or FileMessage to User is not yet supported")
}
}
internal class FriendMessageProtocolStrategy(
private val contact: FriendImpl,
) : UserMessageProtocolStrategy<FriendImpl>() {
override suspend fun createPacketsForGeneralMessage(
client: QQAndroidClient,
contact: FriendImpl,
message: MessageChain,
fragmented: Boolean,
sourceCallback: (Deferred<OnlineMessageSource.Outgoing>) -> Unit
): List<OutgoingPacket> {
return MessageSvcPbSendMsg.createToFriend(client, contact, message, fragmented, sourceCallback)
}
override suspend fun constructSourceForSpecialMessage(
finalMessage: MessageChain,
fromAppId: Int
): OnlineMessageSource.Outgoing {
val receipt: PrivateMessageProcessor.SendPrivateMessageReceipt = withTimeoutOrNull(3000) {
GlobalEventChannel.parentScope(this).nextEvent(EventPriority.MONITOR) {
it.bot === contact.bot && it.fromAppId == fromAppId
}
} ?: PrivateMessageProcessor.SendPrivateMessageReceipt.EMPTY
return OnlineMessageSourceToFriendImpl(
internalIds = intArrayOf(receipt.messageRandom),
sequenceIds = intArrayOf(receipt.sequenceId),
sender = contact.bot,
target = contact,
time = contact.bot.clock.server.currentTimeSeconds().toInt(),
originalMessage = finalMessage
)
}
}
internal object StrangerMessageProtocolStrategy : UserMessageProtocolStrategy<StrangerImpl>() {
override suspend fun createPacketsForGeneralMessage(
client: QQAndroidClient,
contact: StrangerImpl,
message: MessageChain,
fragmented: Boolean,
sourceCallback: (Deferred<OnlineMessageSource.Outgoing>) -> Unit
): List<OutgoingPacket> {
return MessageSvcPbSendMsg.createToStranger(client, contact, message, fragmented, sourceCallback)
}
}
internal object GroupTempMessageProtocolStrategy : UserMessageProtocolStrategy<NormalMemberImpl>() {
override suspend fun createPacketsForGeneralMessage(
client: QQAndroidClient,
contact: NormalMemberImpl,
message: MessageChain,
fragmented: Boolean,
sourceCallback: (Deferred<OnlineMessageSource.Outgoing>) -> Unit
): List<OutgoingPacket> {
return MessageSvcPbSendMsg.createToTemp(client, contact, message, fragmented, sourceCallback)
}
}
internal open class GroupMessageProtocolStrategy(
private val contact: GroupImpl,
) : MessageProtocolStrategy<GroupImpl> {
override suspend fun createPacketsForGeneralMessage(
client: QQAndroidClient,
contact: GroupImpl,
message: MessageChain,
fragmented: Boolean,
sourceCallback: (Deferred<OnlineMessageSource.Outgoing>) -> Unit
): List<OutgoingPacket> {
return MessageSvcPbSendMsg.createToGroup(client, contact, message, fragmented, sourceCallback)
}
override suspend fun constructSourceForSpecialMessage(
finalMessage: MessageChain,
fromAppId: Int
): OnlineMessageSource.Outgoing {
val receipt: GroupMessageProcessor.SendGroupMessageReceipt = withTimeoutOrNull(3000) {
GlobalEventChannel.parentScope(this).nextEvent(EventPriority.MONITOR) {
it.bot === contact.bot && it.fromAppId == fromAppId
}
} ?: GroupMessageProcessor.SendGroupMessageReceipt.EMPTY
return OnlineMessageSourceToGroupImpl(
contact,
internalIds = intArrayOf(receipt.messageRandom),
providedSequenceIds = intArrayOf(receipt.sequenceId),
sender = contact.bot,
target = contact,
time = contact.bot.clock.server.currentTimeSeconds().toInt(),
originalMessage = finalMessage
)
}
}

View File

@ -0,0 +1,120 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.message.protocol.outgoing
import net.mamoe.mirai.contact.Contact
import net.mamoe.mirai.contact.MessageTooLargeException
import net.mamoe.mirai.internal.contact.AbstractContact
import net.mamoe.mirai.internal.contact.SendMessageStep
import net.mamoe.mirai.internal.message.source.ensureSequenceIdAvailable
import net.mamoe.mirai.internal.network.handler.logger
import net.mamoe.mirai.internal.pipeline.AbstractProcessorPipeline
import net.mamoe.mirai.internal.pipeline.PipelineConfiguration
import net.mamoe.mirai.internal.pipeline.ProcessorPipeline
import net.mamoe.mirai.internal.pipeline.ProcessorPipelineContext
import net.mamoe.mirai.internal.utils.estimateLength
import net.mamoe.mirai.message.MessageReceipt
import net.mamoe.mirai.message.data.*
import net.mamoe.mirai.utils.*
///////////////////////////////////////////////////////////////////////////
// Infrastructure for a ProcessorPipeline
///////////////////////////////////////////////////////////////////////////
// Just to change this easily — you'd know it's actually Unit — a placeholder.
internal typealias OutgoingMessagePipelineInput = MessageChain
internal interface OutgoingMessagePipeline :
ProcessorPipeline<OutgoingMessagePipelineProcessor, OutgoingMessagePipelineContext, OutgoingMessagePipelineInput, MessageReceipt<*>>
internal open class OutgoingMessagePipelineImpl :
AbstractProcessorPipeline<OutgoingMessagePipelineProcessor, OutgoingMessagePipelineContext, OutgoingMessagePipelineInput, MessageReceipt<*>>(
PipelineConfiguration(stopWhenConsumed = false), @OptIn(TestOnly::class) defaultTraceLogging
), OutgoingMessagePipeline {
inner class OutgoingMessagePipelineContextImpl(
attributes: TypeSafeMap, override var currentMessageChain: MessageChain
) : OutgoingMessagePipelineContext, BaseContextImpl(attributes)
override fun createContext(
data: OutgoingMessagePipelineInput, attributes: TypeSafeMap
): OutgoingMessagePipelineContext = OutgoingMessagePipelineContextImpl(attributes, data)
companion object {
@TestOnly
val defaultTraceLogging: MiraiLoggerWithSwitch by lazy {
MiraiLogger.Factory.create(OutgoingMessagePipelineImpl::class, "OutgoingMessagePipeline")
.withSwitch(systemProp("mirai.message.outgoing.pipeline.log.full", false))
}
}
}
internal interface OutgoingMessagePipelineContext :
ProcessorPipelineContext<OutgoingMessagePipelineInput, MessageReceipt<*>> {
var currentMessageChain: MessageChain
suspend fun MessageSource.tryEnsureSequenceIdAvailable() {
val contact = attributes[CONTACT]
val bot = contact.bot
try {
ensureSequenceIdAvailable()
} catch (e: Exception) {
bot.network.logger.warning(
"Timeout awaiting sequenceId for message(${currentMessageChain.content.take(10)}). Some features may not work properly.",
e
)
}
}
fun Iterable<SingleMessage>.countImages(): Int = this.count { it is Image }
fun Iterable<SingleMessage>.verifyLength(
originalMessage: Message, target: Contact,
): Int {
val chain = this
val length = estimateLength(target, 15001)
if (length > 15000 || countImages() > 50) {
throw MessageTooLargeException(
target, originalMessage, this.toMessageChain(),
"message(${
chain.joinToString("", limit = 10).let { rsp ->
if (rsp.length > 100) {
rsp.take(100) + "..."
} else rsp
}
}) is too large. Allow up to 50 images or 5000 chars"
)
}
return length
}
companion object {
/**
* Original
*/
val ORIGINAL_MESSAGE = TypeKey<Message>("originalMessage")
/**
* Message target
*/
val CONTACT = TypeKey<AbstractContact>("contact")
val STEP = TypeKey<SendMessageStep>("step")
val PROTOCOL_STRATEGY = TypeKey<MessageProtocolStrategy<AbstractContact>>("protocolStrategy")
val HIGHWAY_UPLOADER = TypeKey<HighwayUploader>("highwayUploader")
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.message.protocol.outgoing
import net.mamoe.mirai.internal.pipeline.Processor
internal sealed interface OutgoingMessagePipelineProcessor :
Processor<OutgoingMessagePipelineContext, OutgoingMessagePipelineInput>
/**
* Adapter for [OutgoingMessageProcessor] to be used as [Processor].
*/
internal class OutgoingMessageProcessorAdapter(
private val processor: OutgoingMessageProcessor,
) : OutgoingMessagePipelineProcessor {
override val origin: OutgoingMessageProcessor get() = processor
override suspend fun process(context: OutgoingMessagePipelineContext, data: OutgoingMessagePipelineInput) {
processor.run { context.process() }
}
override fun toString(): String {
return "OutgoingMessageProcessorAdapter(transformer=$processor)"
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.message.protocol.outgoing
import net.mamoe.mirai.contact.Contact
import net.mamoe.mirai.internal.message.data.LongMessageInternal
import net.mamoe.mirai.internal.pipeline.PipelineConsumptionMarker
import net.mamoe.mirai.message.MessageReceipt
import net.mamoe.mirai.message.data.MessageChain
/**
* A handler that is responsible for some logic in sending a [MessageChain] to a target [Contact].
*
* Broadcasting events is not of responsibility of [OutgoingMessageProcessor].
*/
internal sealed interface OutgoingMessageProcessor {
suspend fun OutgoingMessagePipelineContext.process()
}
/**
* A preprocessor will be called in the first place.
*
* It is designed to do type conversions, and pre-conditional checks.
*
* **Preprocessors are not called in nested processing.**
*/
internal fun interface OutgoingMessagePreprocessor : OutgoingMessageProcessor
/**
* A transformer will be called after [Preprocessors][OutgoingMessagePreprocessor] and before [Postprocessors][OutgoingMessagePostprocessor].
*
* It is capable for handling some special messages, e.g. transform long messages as [LongMessageInternal].
*/
internal fun interface OutgoingMessageTransformer : OutgoingMessageProcessor
/**
* A transformer will be called after [Preprocessors][OutgoingMessagePreprocessor] and before [Postprocessors][OutgoingMessagePostprocessor].
*
* It is capable for sending packets, and create [MessageReceipt].
* Senders can finish the pipeline by [OutgoingMessagePipelineContext.markAsConsumed].
*/
internal fun interface OutgoingMessageSender : OutgoingMessageProcessor, PipelineConsumptionMarker
/**
* A postprocessor will be called in the last place.
*
* It is designed to do cleanup.
*
* Note that if an exception was thrown by previous processors, postprocessors may not be called,
* so do not reply on the postprocessor to close resources.
*/
internal fun interface OutgoingMessagePostprocessor : OutgoingMessageProcessor
//internal interface MessageRefiner
//
//internal fun interface DeepMessageRefiner : MessageRefiner, OutgoingMessageProcessor
//
//@RestrictsSuspension // only allowed to call `processAlso`
//internal fun interface LightMessageRefiner : MessageRefiner {
// suspend fun OutgoingMessagePipelineContext.process()
//}

View File

@ -11,7 +11,6 @@ package net.mamoe.mirai.internal.message.source
import kotlinx.serialization.Transient
import net.mamoe.mirai.contact.Contact
import net.mamoe.mirai.internal.contact.SendMessageHandler
import net.mamoe.mirai.internal.message.LightMessageRefiner.dropMiraiInternalFlags
import net.mamoe.mirai.internal.message.LightMessageRefiner.refineLight
import net.mamoe.mirai.internal.message.visitor.ex
@ -60,8 +59,6 @@ internal interface OutgoingMessageSourceInternal : MessageSourceInternal {
/**
* This 'overrides' [MessageSource.originalMessage].
*
* @see SendMessageHandler.sendMessagePacket
*/
var originalMessage: MessageChain
}

View File

@ -1,10 +1,10 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.network.component
@ -37,6 +37,10 @@ internal interface ComponentStorage {
}
}
internal fun buildComponentStorage(builderAction: MutableComponentStorage.() -> Unit): ComponentStorage {
return ConcurrentComponentStorage(builderAction)
}
internal fun ComponentStorage?.withFallback(fallback: ComponentStorage?): ComponentStorage {
if (this == null) return fallback ?: return ComponentStorage.EMPTY
if (fallback == null) return this

View File

@ -37,7 +37,8 @@ import kotlin.reflect.KClass
/**
* Centralized processor pipeline for [MessageSvcPbGetMsg] and [OnlinePushPbPushTransMsg]
*/
internal interface NoticeProcessorPipeline : ProcessorPipeline<NoticeProcessor, ProtocolStruct, Packet> {
internal interface NoticeProcessorPipeline :
ProcessorPipeline<NoticeProcessor, NoticePipelineContext, ProtocolStruct, Packet> {
companion object : ComponentKey<NoticeProcessorPipeline> {
val ComponentStorage.noticeProcessorPipeline get() = get(NoticeProcessorPipeline)
@ -46,7 +47,7 @@ internal interface NoticeProcessorPipeline : ProcessorPipeline<NoticeProcessor,
data: ProtocolStruct,
attributes: TypeSafeMap = TypeSafeMap.EMPTY,
): Packet {
return components.noticeProcessorPipeline.process(data, attributes).toPacket()
return components.noticeProcessorPipeline.process(data, attributes).collected.toPacket()
}
}
}
@ -93,10 +94,10 @@ internal open class NoticeProcessorPipelineImpl protected constructor(
override suspend fun processAlso(
data: ProtocolStruct,
attributes: TypeSafeMap
): Collection<Packet> {
): ProcessResult<out ProcessorPipelineContext<ProtocolStruct, Packet>, Packet> {
traceLogging.info { "processAlso: data=${data.structureToStringAndDesensitizeIfAvailable()}" }
return process(data, this.attributes + attributes).also { packets ->
this.collected.data += packets
this.collected.data += packets.collected
traceLogging.info { "processAlso: result=$packets" }
}
}
@ -120,7 +121,8 @@ internal open class NoticeProcessorPipelineImpl protected constructor(
)
}
override fun createContext(attributes: TypeSafeMap): NoticePipelineContext = ContextImpl(attributes)
override fun createContext(data: ProtocolStruct, attributes: TypeSafeMap): NoticePipelineContext =
ContextImpl(attributes)
protected open fun packetToString(data: Any?): String =
data.toDebugString("mirai.network.notice.pipeline.log.full")

View File

@ -147,7 +147,6 @@ internal interface NetworkHandler : CoroutineScope {
*/
suspend fun <P : Packet?> sendAndExpect(packet: OutgoingPacket, timeout: Long = 5000, attempts: Int = 2): P
/**
* Sends [packet] and does not expect any response.
*

View File

@ -13,29 +13,17 @@ package net.mamoe.mirai.internal.network.protocol.packet.chat
import kotlinx.io.core.ByteReadPacket
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.contact.SendMessageHandler
import net.mamoe.mirai.internal.message.contextualBugReportException
import net.mamoe.mirai.internal.message.protocol.MessageProtocolFacade
import net.mamoe.mirai.internal.message.source.MessageSourceInternal
import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.QQAndroidClient
import net.mamoe.mirai.internal.network.components.PacketCodec
import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody
import net.mamoe.mirai.internal.network.protocol.data.proto.MsgComm
import net.mamoe.mirai.internal.network.protocol.data.proto.MsgTransmit
import net.mamoe.mirai.internal.network.protocol.data.proto.MultiMsg
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketFactory
import net.mamoe.mirai.internal.network.protocol.packet.buildOutgoingUniPacket
import net.mamoe.mirai.internal.utils.io.serialization.readProtoBuf
import net.mamoe.mirai.internal.utils.io.serialization.toByteArray
import net.mamoe.mirai.internal.utils.io.serialization.writeProtoBuf
import net.mamoe.mirai.internal.utils.structureToString
import net.mamoe.mirai.message.data.ForwardMessage
import net.mamoe.mirai.message.data.MessageSource
import net.mamoe.mirai.message.data.toMessageChain
import net.mamoe.mirai.utils.gzip
import net.mamoe.mirai.utils.md5
import net.mamoe.mirai.utils.toLongUnsigned
internal class MessageValidationData(
val data: ByteArray,
@ -46,71 +34,6 @@ internal class MessageValidationData(
}
}
internal fun Collection<ForwardMessage.INode>.calculateValidationData(
client: QQAndroidClient,
random: Int,
handler: SendMessageHandler<*>,
isLong: Boolean,
): MessageValidationData {
val offeredSourceIds = mutableSetOf<Int>()
fun calculateMsgSeq(node: ForwardMessage.INode): Int {
node.messageChain[MessageSource]?.let { source ->
source as MessageSourceInternal
val sid = source.sequenceIds.first()
// Duplicate message added
if (offeredSourceIds.add(sid)) {
return sid
}
}
return client.atomicNextMessageSequenceId()
}
val msgList = map { chain ->
MsgComm.Msg(
msgHead = MsgComm.MsgHead(
fromUin = chain.senderId,
toUin = if (isLong) {
handler.targetUserUin ?: 0
} else 0,
msgSeq = calculateMsgSeq(chain),
msgTime = chain.time,
msgUid = 0x01000000000000000L or random.toLongUnsigned(),
mutiltransHead = MsgComm.MutilTransHead(
status = 0,
msgId = 1
),
msgType = 82, // troop
groupInfo = handler.run { chain.groupInfo },
isSrcMsg = false
),
msgBody = ImMsgBody.MsgBody(
richText = ImMsgBody.RichText(
elems = MessageProtocolFacade.encode(
chain.messageChain.toMessageChain(),
handler.contact,
withGeneralFlags = false,
isForward = true
)
)
)
)
}
val msgTransmit = MsgTransmit.PbMultiMsgTransmit(
msg = msgList,
pbItemList = listOf(
MsgTransmit.PbMultiMsgItem(
fileName = "MultiMsg",
buffer = MsgTransmit.PbMultiMsgNew(msgList).toByteArray(MsgTransmit.PbMultiMsgNew.serializer())
)
)
)
val bytes = msgTransmit.toByteArray(MsgTransmit.PbMultiMsgTransmit.serializer())
return MessageValidationData(bytes.gzip())
}
internal class MultiMsg {
object ApplyUp : OutgoingPacketFactory<ApplyUp.Response>("MultiMsg.ApplyUp") {

View File

@ -19,11 +19,13 @@ import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.ConcurrentLinkedQueue
internal interface Processor<C : ProcessorPipelineContext<D, *>, D> : PipelineConsumptionMarker {
val origin: Any get() = this
suspend fun process(context: C, data: D)
}
internal interface ProcessorPipeline<P : Processor<out ProcessorPipelineContext<D, *>, D>, D, R> {
val processors: Collection<P>
internal interface ProcessorPipeline<P : Processor<C, D>, C : ProcessorPipelineContext<D, R>, D, R> {
val processors: MutableCollection<ProcessorBox<P>>
fun interface DisposableRegistry : Closeable {
fun dispose()
@ -37,15 +39,51 @@ internal interface ProcessorPipeline<P : Processor<out ProcessorPipelineContext<
fun registerBefore(processor: P): DisposableRegistry
/**
* Process using the [context].
*/
suspend fun process(
data: D,
context: C,
attributes: TypeSafeMap = TypeSafeMap.EMPTY,
): ProcessResult<C, R>
/**
* Process with a new context
*/
suspend fun process(
data: D,
attributes: TypeSafeMap = TypeSafeMap.EMPTY
): Collection<R>
): ProcessResult<C, R>
}
internal inline fun <P : Processor<*, *>, Pip : ProcessorPipeline<P, *, *, *>> Pip.replaceProcessor(
predicate: (origin: Any) -> Boolean,
processor: P
): Boolean {
for (box in processors) {
val value = box.value
if (predicate(value.origin)) {
box.value = processor
return true
}
}
return false
}
internal data class ProcessorBox<P : Processor<*, *>>(
var value: P
)
internal data class ProcessResult<C : ProcessorPipelineContext<*, R>, R>(
val context: C,
val collected: Collection<R>,
)
@JvmInline
internal value class MutableProcessResult<R>(
internal value class MutablePipelineResult<R>(
val data: MutableCollection<R>
)
@ -59,10 +97,10 @@ internal interface ProcessorPipelineContext<D, R> {
*/
val attributes: TypeSafeMap
val collected: MutableProcessResult<R>
val collected: MutablePipelineResult<R>
// DSL to simplify some expressions
operator fun MutableProcessResult<R>.plusAssign(result: R?) {
operator fun MutablePipelineResult<R>.plusAssign(result: R?) {
if (result != null) collect(result)
}
@ -103,10 +141,13 @@ internal interface ProcessorPipelineContext<D, R> {
/**
* Fire the [data] into the processor pipeline, and collect the results to current [collected].
*
* @param attributes extra attributes
* @param extraAttributes extra attributes
* @return result collected from processors. This would also have been collected to this context (where you call [processAlso]).
*/
suspend fun processAlso(data: D, attributes: TypeSafeMap = TypeSafeMap.EMPTY): Collection<R>
suspend fun processAlso(
data: D,
extraAttributes: TypeSafeMap = TypeSafeMap.EMPTY
): ProcessResult<out ProcessorPipelineContext<D, R>, R>
}
internal abstract class AbstractProcessorPipelineContext<D, R>(
@ -130,7 +171,7 @@ internal abstract class AbstractProcessorPipelineContext<D, R>(
}
}
override val collected: MutableProcessResult<R> = MutableProcessResult(ConcurrentLinkedQueue())
override val collected: MutablePipelineResult<R> = MutablePipelineResult(ConcurrentLinkedQueue())
override fun collect(result: R) {
collected.data.add(result)
@ -151,37 +192,42 @@ internal abstract class AbstractProcessorPipeline<P : Processor<C, D>, C : Proce
protected constructor(
val configuration: PipelineConfiguration,
val traceLogging: MiraiLogger,
) : ProcessorPipeline<P, D, R> {
) : ProcessorPipeline<P, C, D, R> {
constructor(configuration: PipelineConfiguration) : this(configuration, SilentLogger)
/**
* Must be ordered
*/
override val processors = ConcurrentLinkedDeque<P>()
override val processors: ConcurrentLinkedDeque<ProcessorBox<P>> = ConcurrentLinkedDeque()
override fun registerProcessor(processor: P): ProcessorPipeline.DisposableRegistry {
processors.add(processor)
val box = ProcessorBox(processor)
processors.add(box)
return ProcessorPipeline.DisposableRegistry {
processors.remove(processor)
processors.remove(box)
}
}
override fun registerBefore(processor: P): ProcessorPipeline.DisposableRegistry {
processors.addFirst(processor)
val box = ProcessorBox(processor)
processors.addFirst(box)
return ProcessorPipeline.DisposableRegistry {
processors.remove(processor)
processors.remove(box)
}
}
protected abstract fun createContext(attributes: TypeSafeMap): C
protected abstract fun createContext(data: D, attributes: TypeSafeMap): C
abstract inner class BaseContextImpl(
attributes: TypeSafeMap,
) : AbstractProcessorPipelineContext<D, R>(attributes, traceLogging) {
override suspend fun processAlso(data: D, attributes: TypeSafeMap): Collection<R> {
override suspend fun processAlso(
data: D,
extraAttributes: TypeSafeMap
): ProcessResult<out ProcessorPipelineContext<D, R>, R> {
traceLogging.info { "processAlso: data=${data.structureToStringAndDesensitizeIfAvailable()}" }
return process(data, this.attributes + attributes).also {
this.collected.data += it
return process(data, this.attributes + extraAttributes).also {
this.collected.data += it.collected
traceLogging.info { "processAlso: result=$it" }
}
}
@ -195,14 +241,17 @@ protected constructor(
e: Throwable
): Unit = throw e
override suspend fun process(data: D, attributes: TypeSafeMap): Collection<R> {
override suspend fun process(data: D, attributes: TypeSafeMap): ProcessResult<C, R> {
return process(data, createContext(data, attributes), attributes)
}
override suspend fun process(data: D, context: C, attributes: TypeSafeMap): ProcessResult<C, R> {
traceLogging.info { "process: data=${data.structureToStringAndDesensitizeIfAvailable()}" }
val context = createContext(attributes)
val diff = if (traceLogging.isEnabled) CollectionDiff<R>() else null
diff?.save(context.collected.data)
for (processor in processors) {
for ((processor) in processors) {
val result = kotlin.runCatching {
processor.process(context, data)
@ -226,6 +275,6 @@ protected constructor(
break
}
}
return context.collected.data
return ProcessResult(context, context.collected.data)
}
}

View File

@ -21,4 +21,7 @@ net.mamoe.mirai.internal.message.protocol.impl.QuoteReplyProtocol
net.mamoe.mirai.internal.message.protocol.impl.RichMessageProtocol
net.mamoe.mirai.internal.message.protocol.impl.TextProtocol
net.mamoe.mirai.internal.message.protocol.impl.VipFaceProtocol
net.mamoe.mirai.internal.message.protocol.impl.ForwardMessageProtocol
net.mamoe.mirai.internal.message.protocol.impl.LongMessageProtocol
net.mamoe.mirai.internal.message.protocol.impl.UnsupportedMessageProtocol
net.mamoe.mirai.internal.message.protocol.impl.GeneralMessageSenderProtocol

View File

@ -14,7 +14,7 @@ import kotlinx.coroutines.Deferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import net.mamoe.mirai.contact.ContactOrBot
import net.mamoe.mirai.contact.Group
import net.mamoe.mirai.internal.contact.inferMessageSourceKind
import net.mamoe.mirai.internal.message.data.inferMessageSourceKind
import net.mamoe.mirai.internal.message.protocol.MessageProtocol
import net.mamoe.mirai.internal.message.protocol.MessageProtocolFacade
import net.mamoe.mirai.internal.message.protocol.MessageProtocolFacadeImpl

View File

@ -0,0 +1,110 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.message.protocol.impl
import net.mamoe.mirai.internal.message.protocol.MessageProtocol
import net.mamoe.mirai.internal.message.protocol.decodeAndRefineLight
import net.mamoe.mirai.message.data.Face
import net.mamoe.mirai.message.data.MessageSourceKind
import net.mamoe.mirai.message.data.messageChainOf
import net.mamoe.mirai.utils.hexToBytes
import org.junit.jupiter.api.Test
internal class QuoteReplyProtocolTest : AbstractMessageProtocolTest() {
override val protocols: Array<out MessageProtocol> = arrayOf(QuoteReplyProtocol(), TextProtocol())
@Test
fun `decode group reference group`() {
buildChecks {
elem(
net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody.Elem(
srcMsg = net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody.SourceMsg(
origSeqs = intArrayOf(1803),
senderUin = 1230001,
time = 1653147259,
flag = 1,
elems = mutableListOf(
net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody.Elem(
text = net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody.Text(
str = "a",
),
),
),
pbReserve = "18 AB 85 9D 81 82 80 80 80 01".hexToBytes(),
),
),
net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody.Elem(
text = net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody.Text(
str = "s",
),
),
)
// message(
// QuoteReply(
//// OfflineMessageSourceImplData(
////
//// )
// )
// )
targetGroup()
useOrdinaryEquality()
}.doDecoderChecks()
}
@Test
fun `can decode`() {
doDecoderChecks(
messageChainOf(Face(Face.YIN_XIAN)),
) {
decodeAndRefineLight(
listOf(
net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody.Elem(
face = net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody.Face(
index = 108,
old = "14 AD".hexToBytes(),
),
)
),
groupIdOrZero = 0,
MessageSourceKind.GROUP,
bot,
)
}
}
private fun ChecksBuilder.targetGroup() {
target(bot.addGroup(1, 1))
}
private fun ChecksBuilder.targetFriend() {
target(bot.addFriend(1))
}
}

View File

@ -63,7 +63,7 @@ internal abstract class AbstractNoticeProcessorTest : AbstractNettyNHTest(), Gro
bot.components[SsoProcessor].firstLoginResult.value = FirstLoginResult.PASSED
val handler = LoggingPacketHandlerAdapter(PacketLoggingStrategyImpl(bot), bot.logger)
val context = UseTestContext(attributes.toMutableTypeSafeMap())
return pipeline.process(block(context), context.attributes).also { list ->
return pipeline.process(block(context), context.attributes).collected.also { list ->
for (packet in list) {
handler.handlePacket(IncomingPacket("test", packet))
}
@ -77,10 +77,10 @@ internal abstract class AbstractNoticeProcessorTest : AbstractNettyNHTest(), Gro
): Collection<Packet> =
use(attributes, pipeline = object : NoticeProcessorPipelineImpl(bot) {
init {
bot.components.noticeProcessorPipeline.processors.forEach { registerProcessor(it) }
bot.components.noticeProcessorPipeline.processors.forEach { registerProcessor(it.value) }
}
override fun createContext(attributes: TypeSafeMap): NoticePipelineContext =
override fun createContext(data: ProtocolStruct, attributes: TypeSafeMap): NoticePipelineContext =
createContext(this, attributes)
}, block)

View File

@ -9,35 +9,29 @@
package net.mamoe.mirai.internal.message.data
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import net.mamoe.mirai.Bot
import net.mamoe.mirai.internal.AbstractTestWithMiraiImpl
import net.mamoe.mirai.internal.MockBot
import net.mamoe.mirai.internal.contact.*
import net.mamoe.mirai.internal.contact.info.GroupInfoImpl
import net.mamoe.mirai.internal.contact.AbstractContact
import net.mamoe.mirai.internal.message.protocol.MessageProtocolFacade
import net.mamoe.mirai.internal.message.protocol.impl.GeneralMessageSenderProtocol
import net.mamoe.mirai.internal.message.protocol.outgoing.*
import net.mamoe.mirai.internal.message.source.OnlineMessageSourceToGroupImpl
import net.mamoe.mirai.internal.network.QQAndroidClient
import net.mamoe.mirai.internal.network.protocol.data.jce.StTroopNum
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.internal.message.source.createMessageReceipt
import net.mamoe.mirai.internal.network.component.ConcurrentComponentStorage
import net.mamoe.mirai.internal.notice.processors.GroupExtensions
import net.mamoe.mirai.internal.pipeline.replaceProcessor
import net.mamoe.mirai.internal.test.AbstractTest
import net.mamoe.mirai.internal.test.runBlockingUnit
import net.mamoe.mirai.message.data.*
import net.mamoe.mirai.message.data.ForwardMessage
import net.mamoe.mirai.message.data.buildForwardMessage
import net.mamoe.mirai.message.data.toMessageChain
import net.mamoe.mirai.utils.currentTimeSeconds
import org.junit.jupiter.api.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs
import kotlin.test.assertSame
import kotlin.test.assertTrue
internal class MessageReceiptTest : AbstractTestWithMiraiImpl() {
override suspend fun uploadMessageHighway(
bot: Bot,
sendMessageHandler: SendMessageHandler<*>,
message: Collection<ForwardMessage.INode>,
isLong: Boolean,
): String {
return "id"
}
internal class MessageReceiptTest : AbstractTest(), GroupExtensions {
private val bot = MockBot()
/**
@ -45,38 +39,56 @@ internal class MessageReceiptTest : AbstractTestWithMiraiImpl() {
*/ // We need #1304
@Test
fun `refine ForwardMessageInternal for MessageReceipt`() = runBlockingUnit {
val group =
GroupImpl(bot, bot.coroutineContext, 1, GroupInfoImpl(StTroopNum(1, 1, dwGroupOwnerUin = 2)), sequenceOf())
val group = bot.addGroup(123, 2)
val forward = buildForwardMessage(group) {
2 says "ok"
}
val message = forward.toMessageChain()
val handler = object : GroupSendMessageHandler(group) {
override val messageSvcSendMessage: (client: QQAndroidClient, contact: GroupImpl, message: MessageChain, fragmented: Boolean, sourceCallback: (Deferred<OnlineMessageSource.Outgoing>) -> Unit) -> List<OutgoingPacket> =
{ _, _, message, _, sourceCallback ->
val facade = MessageProtocolFacade.INSTANCE.copy()
assertIs<ForwardMessageInternal>(message[ForwardMessageInternal])
assertSame(forward, message[ForwardMessageInternal]?.origin)
assertTrue {
facade.preprocessorPipeline.replaceProcessor(
{ it is GeneralMessageSenderProtocol.GeneralMessageSender },
OutgoingMessageProcessorAdapter(object : OutgoingMessageSender {
override suspend fun OutgoingMessagePipelineContext.process() {
assertIs<ForwardMessageInternal>(currentMessageChain[ForwardMessageInternal])
assertSame(forward, currentMessageChain[ForwardMessageInternal]?.origin)
sourceCallback(
CompletableDeferred(
OnlineMessageSourceToGroupImpl(
group,
internalIds = intArrayOf(1),
sender = bot,
target = group,
time = currentTimeSeconds().toInt(),
originalMessage = message //,
// sourceMessage = message
)
val source = OnlineMessageSourceToGroupImpl(
group,
internalIds = intArrayOf(1),
sender = bot,
target = group,
time = currentTimeSeconds().toInt(),
originalMessage = currentMessageChain //,
// sourceMessage = message
)
)
listOf()
}
collect(source.createMessageReceipt(group, true))
}
})
)
}
val result = handler.sendMessage(message, message, false, SendMessageStep.FIRST)
val result = facade.preprocessAndSendOutgoing(group, message, ConcurrentComponentStorage {
set(MessageProtocolStrategy, object : GroupMessageProtocolStrategy(group) {
})
set(HighwayUploader, object : HighwayUploader {
override suspend fun uploadMessages(
contact: AbstractContact,
strategy: MessageProtocolStrategy<*>,
nodes: Collection<ForwardMessage.INode>,
isLong: Boolean,
facade: MessageProtocolFacade,
senderName: String
): String {
return "id"
}
})
})
assertIs<ForwardMessage>(result.source.originalMessage[ForwardMessage])
assertEquals(message, result.source.originalMessage)