diff --git a/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt b/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt index d194fd89a..2960910e5 100644 --- a/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt +++ b/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt @@ -190,6 +190,7 @@ internal open class QQAndroidBot constructor( set( NoticeProcessorPipeline, NoticeProcessorPipelineImpl.create( + bot, MsgInfoDecoder(pipelineLogger.subLogger("MsgInfoDecoder")), GroupNotificationDecoder(), diff --git a/mirai-core/src/commonMain/kotlin/message/protocol/OutgoingMessageContext.kt b/mirai-core/src/commonMain/kotlin/message/protocol/OutgoingMessageContext.kt new file mode 100644 index 000000000..dceef13f5 --- /dev/null +++ b/mirai-core/src/commonMain/kotlin/message/protocol/OutgoingMessageContext.kt @@ -0,0 +1,20 @@ +/* + * Copyright 2019-2022 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/dev/LICENSE + */ + +package net.mamoe.mirai.internal.message.protocol + +import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody + +internal interface OutgoingMessageContext { + + fun collect(elem: ImMsgBody.Elem) + + fun processAlso() +} + diff --git a/mirai-core/src/commonMain/kotlin/network/components/NoticeProcessorPipeline.kt b/mirai-core/src/commonMain/kotlin/network/components/NoticeProcessorPipeline.kt index e1292665d..98af33013 100644 --- a/mirai-core/src/commonMain/kotlin/network/components/NoticeProcessorPipeline.kt +++ b/mirai-core/src/commonMain/kotlin/network/components/NoticeProcessorPipeline.kt @@ -1,16 +1,15 @@ /* - * Copyright 2019-2021 Mamoe Technologies and contributors. + * Copyright 2019-2022 Mamoe Technologies and contributors. * - * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. - * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. * - * https://github.com/mamoe/mirai/blob/master/LICENSE + * https://github.com/mamoe/mirai/blob/dev/LICENSE */ package net.mamoe.mirai.internal.network.components import net.mamoe.mirai.internal.QQAndroidBot -import net.mamoe.mirai.internal.message.contextualBugReportException import net.mamoe.mirai.internal.network.Packet import net.mamoe.mirai.internal.network.ParseErrorPacket import net.mamoe.mirai.internal.network.component.ComponentKey @@ -29,40 +28,19 @@ import net.mamoe.mirai.internal.network.protocol.data.proto.Structmsg import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.MessageSvcPbGetMsg import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.OnlinePushPbPushTransMsg import net.mamoe.mirai.internal.network.toPacket +import net.mamoe.mirai.internal.pipeline.AbstractProcessorPipeline +import net.mamoe.mirai.internal.pipeline.Processor +import net.mamoe.mirai.internal.pipeline.ProcessorPipeline +import net.mamoe.mirai.internal.pipeline.ProcessorPipelineContext import net.mamoe.mirai.internal.utils.io.ProtocolStruct import net.mamoe.mirai.utils.* -import java.io.Closeable -import java.util.* import java.util.concurrent.ConcurrentLinkedQueue import kotlin.reflect.KClass -internal typealias ProcessResult = Collection - /** * Centralized processor pipeline for [MessageSvcPbGetMsg] and [OnlinePushPbPushTransMsg] */ -internal interface NoticeProcessorPipeline { - val processors: Collection - - fun interface DisposableRegistry : Closeable { - fun dispose() - - override fun close() { - dispose() - } - } - - fun registerProcessor(processor: NoticeProcessor): DisposableRegistry - - /** - * Process [data] into [Packet]s. Exceptions are wrapped into [ParseErrorPacket] - */ - suspend fun process( - bot: QQAndroidBot, - data: ProtocolStruct, - attributes: TypeSafeMap = TypeSafeMap.EMPTY - ): ProcessResult - +internal interface NoticeProcessorPipeline : ProcessorPipeline { companion object : ComponentKey { val ComponentStorage.noticeProcessorPipeline get() = get(NoticeProcessorPipeline) @@ -71,71 +49,15 @@ internal interface NoticeProcessorPipeline { data: ProtocolStruct, attributes: TypeSafeMap = TypeSafeMap.EMPTY, ): Packet { - return components.noticeProcessorPipeline.process(this, data, attributes).toPacket() + return components.noticeProcessorPipeline.process(data, attributes).toPacket() } } } -@JvmInline -internal value class MutableProcessResult( - val data: MutableCollection -) - -internal interface NoticePipelineContext : BotAware, NewContactSupport { +internal interface NoticePipelineContext : BotAware, NewContactSupport, + ProcessorPipelineContext { override val bot: QQAndroidBot - val attributes: TypeSafeMap - - - val isConsumed: Boolean - - /** - * Marks the input as consumed so that there will not be warnings like 'Unknown type xxx'. This will not stop the pipeline. - * - * If this is executed, make sure you provided all information important for debugging. - * - * You need to invoke [markAsConsumed] if your implementation includes some `else` branch which covers all situations, - * and throws a [contextualBugReportException] or logs something. - */ - @ConsumptionMarker - fun NoticeProcessor.markAsConsumed(marker: Any = this) - - /** - * Marks the input as not consumed, if it was marked by this [NoticeProcessor]. - */ - @ConsumptionMarker - fun NoticeProcessor.markNotConsumed(marker: Any = this) - - @DslMarker - annotation class ConsumptionMarker // to give an explicit color. - - - val collected: MutableProcessResult - - // DSL to simplify some expressions - operator fun MutableProcessResult.plusAssign(packet: Packet?) { - if (packet != null) collect(packet) - } - - - /** - * Collect a result. - */ - fun collect(packet: Packet) - - /** - * Collect results. - */ - fun collect(packets: Iterable) - - /** - * Fire the [data] into the processor pipeline, and collect the results to current [collected]. - * - * @param attributes extra attributes - * @return result collected from processors. This would also have been collected to this context (where you call [processAlso]). - */ - suspend fun processAlso(data: ProtocolStruct, attributes: TypeSafeMap = TypeSafeMap.EMPTY): ProcessResult - companion object { val KEY_FROM_SYNC = TypeKey("fromSync") val KEY_MSG_INFO = TypeKey("msgInfo") @@ -150,82 +72,73 @@ internal interface NoticePipelineContext : BotAware, NewContactSupport { } } -internal abstract class AbstractNoticePipelineContext( - override val bot: QQAndroidBot, override val attributes: TypeSafeMap, -) : NoticePipelineContext { - private val consumers: Stack = Stack() - - override val isConsumed: Boolean get() = consumers.isNotEmpty() - override fun NoticeProcessor.markAsConsumed(marker: Any) { - traceLogging.info { "markAsConsumed: marker=$marker" } - consumers.push(marker) - } - - override fun NoticeProcessor.markNotConsumed(marker: Any) { - if (consumers.peek() === marker) { - consumers.pop() - traceLogging.info { "markNotConsumed: Y, marker=$marker" } - } else { - traceLogging.info { "markNotConsumed: N, marker=$marker" } - } - } - - override val collected = MutableProcessResult(ConcurrentLinkedQueue()) - - override fun collect(packet: Packet) { - collected.data.add(packet) - traceLogging.info { "collect: $packet" } - } - - override fun collect(packets: Iterable) { - this.collected.data.addAll(packets) - traceLogging.info { - val list = packets.toList() - "collect: [${list.size}] ${list.joinToString()}" - } - } - - abstract override suspend fun processAlso(data: ProtocolStruct, attributes: TypeSafeMap): ProcessResult -} - - internal inline val NoticePipelineContext.context get() = this -private val traceLogging: MiraiLogger by lazy { +private val defaultTraceLogging: MiraiLogger by lazy { MiraiLogger.Factory.create(NoticeProcessorPipelineImpl::class, "NoticeProcessorPipeline") .withSwitch(systemProp("mirai.network.notice.pipeline.log.full", false)) } -internal open class NoticeProcessorPipelineImpl protected constructor() : NoticeProcessorPipeline { +internal open class NoticeProcessorPipelineImpl protected constructor( + private val bot: QQAndroidBot, + traceLogging: MiraiLogger = defaultTraceLogging, +) : NoticeProcessorPipeline, + AbstractProcessorPipeline(traceLogging) { /** * Must be ordered */ override val processors = ConcurrentLinkedQueue() - override fun registerProcessor(processor: NoticeProcessor): NoticeProcessorPipeline.DisposableRegistry { + override fun registerProcessor(processor: NoticeProcessor): ProcessorPipeline.DisposableRegistry { processors.add(processor) - return NoticeProcessorPipeline.DisposableRegistry { + return ProcessorPipeline.DisposableRegistry { processors.remove(processor) } } open inner class ContextImpl( - bot: QQAndroidBot, attributes: TypeSafeMap, - ) : AbstractNoticePipelineContext(bot, attributes) { - override suspend fun processAlso(data: ProtocolStruct, attributes: TypeSafeMap): ProcessResult { + attributes: TypeSafeMap, + ) : BaseContextImpl(attributes), NoticePipelineContext { + override val bot: QQAndroidBot + get() = this@NoticeProcessorPipelineImpl.bot + + override suspend fun processAlso( + data: ProtocolStruct, + attributes: TypeSafeMap + ): Collection { traceLogging.info { "processAlso: data=$data" } - return process(bot, data, this.attributes + attributes).also { + return process(data, this.attributes + attributes).also { this.collected.data += it traceLogging.info { "processAlso: result=$it" } } } } + override fun handleExceptionInProcess( + data: ProtocolStruct, + context: NoticePipelineContext, + attributes: TypeSafeMap, + processor: NoticeProcessor, + e: Throwable + ) { + context.collect( + ParseErrorPacket( + data, + IllegalStateException( + "Exception in $processor while processing packet ${packetToString(data)}.", + e, + ), + ) + ) + } - override suspend fun process(bot: QQAndroidBot, data: ProtocolStruct, attributes: TypeSafeMap): ProcessResult { + override suspend fun process( + data: ProtocolStruct, + attributes: TypeSafeMap + ): Collection { traceLogging.info { "process: data=$data" } - val context = createContext(bot, attributes) + val context = createContext(attributes) val diff = if (traceLogging.isEnabled) CollectionDiff() else null diff?.save(context.collected.data) @@ -259,18 +172,15 @@ internal open class NoticeProcessorPipelineImpl protected constructor() : Notice return context.collected.data } - protected open fun createContext( - bot: QQAndroidBot, - attributes: TypeSafeMap - ): NoticePipelineContext = ContextImpl(bot, attributes) + override fun createContext(attributes: TypeSafeMap): NoticePipelineContext = ContextImpl(attributes) protected open fun packetToString(data: Any?): String = data.toDebugString("mirai.network.notice.pipeline.log.full") companion object { - fun create(vararg processors: NoticeProcessor): NoticeProcessorPipelineImpl = - NoticeProcessorPipelineImpl().apply { + fun create(bot: QQAndroidBot, vararg processors: NoticeProcessor): NoticeProcessorPipelineImpl = + NoticeProcessorPipelineImpl(bot, defaultTraceLogging).apply { for (processor in processors) { registerProcessor(processor) } @@ -285,9 +195,7 @@ internal open class NoticeProcessorPipelineImpl protected constructor() : Notice /** * A processor handling some specific type of message. */ -internal interface NoticeProcessor { - suspend fun process(context: NoticePipelineContext, data: Any?) -} +internal interface NoticeProcessor : Processor internal abstract class AnyNoticeProcessor : SimpleNoticeProcessor(type()) diff --git a/mirai-core/src/commonMain/kotlin/pipeline/ProcessorPipeline.kt b/mirai-core/src/commonMain/kotlin/pipeline/ProcessorPipeline.kt new file mode 100644 index 000000000..83183ed05 --- /dev/null +++ b/mirai-core/src/commonMain/kotlin/pipeline/ProcessorPipeline.kt @@ -0,0 +1,203 @@ +/* + * Copyright 2019-2022 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/dev/LICENSE + */ + +package net.mamoe.mirai.internal.pipeline + +import net.mamoe.mirai.internal.message.contextualBugReportException +import net.mamoe.mirai.internal.network.components.NoticeProcessor +import net.mamoe.mirai.utils.* +import java.io.Closeable +import java.util.* +import java.util.concurrent.ConcurrentLinkedQueue + +internal interface Processor> { + suspend fun process(context: C, data: Any?) +} + +internal interface ProcessorPipeline

, D, R> { + val processors: Collection

+ + fun interface DisposableRegistry : Closeable { + fun dispose() + + override fun close() { + dispose() + } + } + + fun registerProcessor(processor: P): DisposableRegistry + + suspend fun process( + data: D, + attributes: TypeSafeMap = TypeSafeMap.EMPTY + ): Collection + +} + +@JvmInline +internal value class MutableProcessResult( + val data: MutableCollection +) + + +internal interface ProcessorPipelineContext { + val attributes: TypeSafeMap + + val collected: MutableProcessResult + + // DSL to simplify some expressions + operator fun MutableProcessResult.plusAssign(result: R?) { + if (result != null) collect(result) + } + + + /** + * Collect a result. + */ + fun collect(result: R) + + /** + * Collect results. + */ + fun collect(results: Iterable) + + + val isConsumed: Boolean + + /** + * Marks the input as consumed so that there will not be warnings like 'Unknown type xxx'. This will not stop the pipeline. + * + * If this is executed, make sure you provided all information important for debugging. + * + * You need to invoke [markAsConsumed] if your implementation includes some `else` branch which covers all situations, + * and throws a [contextualBugReportException] or logs something. + */ + @ConsumptionMarker + fun Processor<*>.markAsConsumed(marker: Any = this) + + /** + * Marks the input as not consumed, if it was marked by this [NoticeProcessor]. + */ + @ConsumptionMarker + fun Processor<*>.markNotConsumed(marker: Any = this) + + @DslMarker + annotation class ConsumptionMarker // to give an explicit color. + + /** + * Fire the [data] into the processor pipeline, and collect the results to current [collected]. + * + * @param attributes extra attributes + * @return result collected from processors. This would also have been collected to this context (where you call [processAlso]). + */ + suspend fun processAlso(data: D, attributes: TypeSafeMap = TypeSafeMap.EMPTY): Collection +} + +internal abstract class AbstractProcessorPipelineContext( + override val attributes: TypeSafeMap, + private val traceLogging: MiraiLogger, +) : ProcessorPipelineContext { + private val consumers: Stack = Stack() + + override val isConsumed: Boolean get() = consumers.isNotEmpty() + override fun Processor<*>.markAsConsumed(marker: Any) { + traceLogging.info { "markAsConsumed: marker=$marker" } + consumers.push(marker) + } + + override fun Processor<*>.markNotConsumed(marker: Any) { + if (consumers.peek() === marker) { + consumers.pop() + traceLogging.info { "markNotConsumed: Y, marker=$marker" } + } else { + traceLogging.info { "markNotConsumed: N, marker=$marker" } + } + } + + override val collected: MutableProcessResult = MutableProcessResult(ConcurrentLinkedQueue()) + + override fun collect(result: R) { + collected.data.add(result) + traceLogging.info { "collect: $result" } + } + + override fun collect(results: Iterable) { + this.collected.data.addAll(results) + traceLogging.info { "collect: $results" } + } +} + +internal abstract class AbstractProcessorPipeline

, C : ProcessorPipelineContext, D, R> +protected constructor( + val traceLogging: MiraiLogger, +) : ProcessorPipeline { + /** + * Must be ordered + */ + override val processors = ConcurrentLinkedQueue

() + + override fun registerProcessor(processor: P): ProcessorPipeline.DisposableRegistry { + processors.add(processor) + return ProcessorPipeline.DisposableRegistry { + processors.remove(processor) + } + } + + protected abstract fun createContext(attributes: TypeSafeMap): C + + abstract inner class BaseContextImpl( + attributes: TypeSafeMap, + ) : AbstractProcessorPipelineContext(attributes, traceLogging) { + override suspend fun processAlso(data: D, attributes: TypeSafeMap): Collection { + traceLogging.info { "processAlso: data=$data" } + return process(data, this.attributes + attributes).also { + this.collected.data += it + traceLogging.info { "processAlso: result=$it" } + } + } + } + + protected open fun handleExceptionInProcess( + data: D, + context: C, + attributes: TypeSafeMap, + processor: P, + e: Throwable + ) { + throw e + } + + override suspend fun process(data: D, attributes: TypeSafeMap): Collection { + traceLogging.info { "process: data=$data" } + val context = createContext(attributes) + + val diff = if (traceLogging.isEnabled) CollectionDiff() else null + diff?.save(context.collected.data) + + for (processor in processors) { + + val result = kotlin.runCatching { + processor.process(context, data) + }.onFailure { e -> + handleExceptionInProcess(data, context, attributes, processor, e) + } + + diff?.run { + val diffPackets = subtractAndSave(context.collected.data) + + traceLogging.info { + "Finished ${ + processor.toString().replace("net.mamoe.mirai.internal.network.notice.", "") + }, success=${result.isSuccess}, consumed=${context.isConsumed}, diff=$diffPackets" + } + } + } + return context.collected.data + } +} diff --git a/mirai-core/src/commonTest/kotlin/notice/processors/AbstractNoticeProcessorTest.kt b/mirai-core/src/commonTest/kotlin/notice/processors/AbstractNoticeProcessorTest.kt index 4d7b3c0b0..2bb2b9d0f 100644 --- a/mirai-core/src/commonTest/kotlin/notice/processors/AbstractNoticeProcessorTest.kt +++ b/mirai-core/src/commonTest/kotlin/notice/processors/AbstractNoticeProcessorTest.kt @@ -24,6 +24,7 @@ import net.mamoe.mirai.internal.contact.info.FriendInfoImpl import net.mamoe.mirai.internal.contact.info.GroupInfoImpl import net.mamoe.mirai.internal.contact.info.MemberInfoImpl import net.mamoe.mirai.internal.contact.info.StrangerInfoImpl +import net.mamoe.mirai.internal.network.Packet import net.mamoe.mirai.internal.network.components.* import net.mamoe.mirai.internal.network.components.NoticeProcessorPipeline.Companion.noticeProcessorPipeline import net.mamoe.mirai.internal.network.framework.AbstractNettyNHTest @@ -58,11 +59,11 @@ internal abstract class AbstractNoticeProcessorTest : AbstractNettyNHTest(), Gro attributes: TypeSafeMap = TypeSafeMap(), pipeline: NoticeProcessorPipeline = bot.components.noticeProcessorPipeline, block: UseTestContext.() -> ProtocolStruct - ): ProcessResult { + ): Collection { bot.components[SsoProcessor].firstLoginResult.value = FirstLoginResult.PASSED val handler = LoggingPacketHandlerAdapter(PacketLoggingStrategyImpl(bot), bot.logger) val context = UseTestContext(attributes.toMutableTypeSafeMap()) - return pipeline.process(bot, block(context), context.attributes).also { list -> + return pipeline.process(block(context), context.attributes).also { list -> for (packet in list) { handler.handlePacket(IncomingPacket("test", packet)) } @@ -71,16 +72,17 @@ internal abstract class AbstractNoticeProcessorTest : AbstractNettyNHTest(), Gro protected suspend inline fun use( attributes: TypeSafeMap = TypeSafeMap(), - crossinline createContext: NoticeProcessorPipelineImpl.(bot: QQAndroidBot, attributes: TypeSafeMap) -> NoticeProcessorPipelineImpl.ContextImpl, + crossinline createContext: NoticeProcessorPipelineImpl.(attributes: TypeSafeMap) -> NoticeProcessorPipelineImpl.ContextImpl, block: UseTestContext.() -> ProtocolStruct - ): ProcessResult = use(attributes, pipeline = object : NoticeProcessorPipelineImpl() { - init { - bot.components.noticeProcessorPipeline.processors.forEach { registerProcessor(it) } - } + ): Collection = + use(attributes, pipeline = object : NoticeProcessorPipelineImpl(bot) { + init { + bot.components.noticeProcessorPipeline.processors.forEach { registerProcessor(it) } + } - override fun createContext(bot: QQAndroidBot, attributes: TypeSafeMap): NoticePipelineContext = - createContext(this, bot, attributes) - }, block) + override fun createContext(attributes: TypeSafeMap): NoticePipelineContext = + createContext(this, attributes) + }, block) fun setBot(id: Long): QQAndroidBot { bot = createBot(BotAccount(id, "a")) diff --git a/mirai-core/src/commonTest/kotlin/notice/processors/BotInvitedJoinTest.kt b/mirai-core/src/commonTest/kotlin/notice/processors/BotInvitedJoinTest.kt index 21f7da63a..61ead2bde 100644 --- a/mirai-core/src/commonTest/kotlin/notice/processors/BotInvitedJoinTest.kt +++ b/mirai-core/src/commonTest/kotlin/notice/processors/BotInvitedJoinTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 Mamoe Technologies and contributors. + * Copyright 2019-2022 Mamoe Technologies and contributors. * * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. @@ -104,8 +104,8 @@ internal class BotInvitedJoinTest : AbstractNoticeProcessorTest() { suspend fun `invited join, accepted`() { // https://github.com/mamoe/mirai/issues/1213 suspend fun runTest() = use( - createContext = { bot, attributes -> - object : NoticeProcessorPipelineImpl.ContextImpl(bot, attributes) { + createContext = { attributes -> + object : NoticeProcessorPipelineImpl.ContextImpl(attributes) { override suspend fun QQAndroidBot.addNewGroupByCode(code: Long): GroupImpl { assertEquals(2230203, code) return bot.addGroup(2230203, 1230001, name = "testtest").apply { @@ -154,8 +154,8 @@ internal class BotInvitedJoinTest : AbstractNoticeProcessorTest() { @Test suspend fun `invitation accepted`() { suspend fun runTest() = - use(createContext = { bot, attributes -> - object : NoticeProcessorPipelineImpl.ContextImpl(bot, attributes) { + use(createContext = { attributes -> + object : NoticeProcessorPipelineImpl.ContextImpl(attributes) { override suspend fun QQAndroidBot.addNewGroupByCode(code: Long): GroupImpl { assertEquals(2230203, code) return bot.addGroup(2230203, 1230002, name = "testtest").apply {