From d6343870b8139c673754c712f233b3aa9ffb46e9 Mon Sep 17 00:00:00 2001
From: Him188 <Him188@mamoe.net>
Date: Wed, 27 Apr 2022 09:50:46 +0100
Subject: [PATCH] MessageProtocol pipeline infrastructure

---
 .../protocol/MessageDecoderPipelineImpl.kt    |  35 +++++
 .../protocol/MessageEncoderPipelineImpl.kt    |  34 ++++
 .../message/protocol/MessageProtocol.kt       | 146 ++++++++++++++++++
 .../components/NoticeProcessorPipeline.kt     |  56 +------
 .../kotlin/pipeline/ProcessorPipeline.kt      |  22 +--
 5 files changed, 228 insertions(+), 65 deletions(-)
 create mode 100644 mirai-core/src/commonMain/kotlin/message/protocol/MessageDecoderPipelineImpl.kt
 create mode 100644 mirai-core/src/commonMain/kotlin/message/protocol/MessageEncoderPipelineImpl.kt
 create mode 100644 mirai-core/src/commonMain/kotlin/message/protocol/MessageProtocol.kt

diff --git a/mirai-core/src/commonMain/kotlin/message/protocol/MessageDecoderPipelineImpl.kt b/mirai-core/src/commonMain/kotlin/message/protocol/MessageDecoderPipelineImpl.kt
new file mode 100644
index 000000000..c74b956e6
--- /dev/null
+++ b/mirai-core/src/commonMain/kotlin/message/protocol/MessageDecoderPipelineImpl.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2019-2022 Mamoe Technologies and contributors.
+ *
+ * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
+ * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
+ *
+ * https://github.com/mamoe/mirai/blob/dev/LICENSE
+ */
+
+package net.mamoe.mirai.internal.message.protocol
+
+import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody
+import net.mamoe.mirai.internal.pipeline.AbstractProcessorPipeline
+import net.mamoe.mirai.message.data.Message
+import net.mamoe.mirai.utils.MiraiLogger
+import net.mamoe.mirai.utils.TypeSafeMap
+import net.mamoe.mirai.utils.systemProp
+import net.mamoe.mirai.utils.withSwitch
+
+private val defaultTraceLogging: MiraiLogger by lazy {
+    MiraiLogger.Factory.create(MessageDecoderPipelineImpl::class, "MessageDecoderPipeline")
+        .withSwitch(systemProp("mirai.message.decoder.pipeline.log.full", false))
+}
+
+
+internal open class MessageDecoderPipelineImpl :
+    AbstractProcessorPipeline<MessageDecoderProcessor, MessageDecoderContext, ImMsgBody.Elem, Message>(
+        defaultTraceLogging
+    ),
+    MessageDecoderPipeline {
+
+    inner class MessageDecoderContextImpl(attributes: TypeSafeMap) : MessageDecoderContext, BaseContextImpl(attributes)
+
+    override fun createContext(attributes: TypeSafeMap): MessageDecoderContext = MessageDecoderContextImpl(attributes)
+}
\ No newline at end of file
diff --git a/mirai-core/src/commonMain/kotlin/message/protocol/MessageEncoderPipelineImpl.kt b/mirai-core/src/commonMain/kotlin/message/protocol/MessageEncoderPipelineImpl.kt
new file mode 100644
index 000000000..cc5aa5d3c
--- /dev/null
+++ b/mirai-core/src/commonMain/kotlin/message/protocol/MessageEncoderPipelineImpl.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2019-2022 Mamoe Technologies and contributors.
+ *
+ * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
+ * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
+ *
+ * https://github.com/mamoe/mirai/blob/dev/LICENSE
+ */
+
+package net.mamoe.mirai.internal.message.protocol
+
+import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody
+import net.mamoe.mirai.internal.pipeline.AbstractProcessorPipeline
+import net.mamoe.mirai.message.data.SingleMessage
+import net.mamoe.mirai.utils.MiraiLogger
+import net.mamoe.mirai.utils.TypeSafeMap
+import net.mamoe.mirai.utils.systemProp
+import net.mamoe.mirai.utils.withSwitch
+
+private val defaultTraceLogging: MiraiLogger by lazy {
+    MiraiLogger.Factory.create(MessageEncoderPipelineImpl::class, "MessageEncoderPipeline")
+        .withSwitch(systemProp("mirai.message.encoder.pipeline.log.full", false))
+}
+
+internal open class MessageEncoderPipelineImpl :
+    AbstractProcessorPipeline<MessageEncoderProcessor<*>, MessageEncoderContext, SingleMessage, ImMsgBody.Elem>(
+        defaultTraceLogging
+    ),
+    MessageEncoderPipeline {
+
+    inner class MessageEncoderContextImpl(attributes: TypeSafeMap) : MessageEncoderContext, BaseContextImpl(attributes)
+
+    override fun createContext(attributes: TypeSafeMap): MessageEncoderContext = MessageEncoderContextImpl(attributes)
+}
\ No newline at end of file
diff --git a/mirai-core/src/commonMain/kotlin/message/protocol/MessageProtocol.kt b/mirai-core/src/commonMain/kotlin/message/protocol/MessageProtocol.kt
new file mode 100644
index 000000000..1966aaa6b
--- /dev/null
+++ b/mirai-core/src/commonMain/kotlin/message/protocol/MessageProtocol.kt
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2019-2022 Mamoe Technologies and contributors.
+ *
+ * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
+ * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
+ *
+ * https://github.com/mamoe/mirai/blob/dev/LICENSE
+ */
+
+package net.mamoe.mirai.internal.message.protocol
+
+import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody
+import net.mamoe.mirai.internal.pipeline.Processor
+import net.mamoe.mirai.internal.pipeline.ProcessorPipeline
+import net.mamoe.mirai.internal.pipeline.ProcessorPipelineContext
+import net.mamoe.mirai.message.data.Message
+import net.mamoe.mirai.message.data.SingleMessage
+import net.mamoe.mirai.utils.uncheckedCast
+import java.util.*
+import kotlin.reflect.KClass
+
+internal abstract class ProcessorCollector {
+    inline fun <reified T : SingleMessage> add(encoder: MessageEncoder<T>) = add(encoder, T::class)
+
+
+    abstract fun <T : SingleMessage> add(encoder: MessageEncoder<T>, elementType: KClass<T>)
+
+    abstract fun add(decoder: MessageDecoder)
+}
+
+internal abstract class MessageProtocol {
+    fun collectProcessors(processorCollector: ProcessorCollector) {
+        processorCollector.collectProcessorsImpl()
+    }
+
+    protected abstract fun ProcessorCollector.collectProcessorsImpl()
+}
+
+internal object MessageProtocols {
+    val instances: List<MessageProtocol> = initialize()
+
+    private fun initialize(): List<MessageProtocol> {
+        val encoderPipeline = MessageEncoderPipelineImpl()
+        val decoderPipeline = MessageDecoderPipelineImpl()
+
+        val instances = ServiceLoader.load(MessageProtocol::class.java).iterator().asSequence().toList()
+        for (instance in instances) {
+            instance.collectProcessors(object : ProcessorCollector() {
+                override fun <T : SingleMessage> add(encoder: MessageEncoder<T>, elementType: KClass<T>) {
+                    encoderPipeline.registerProcessor(MessageEncoderProcessor(encoder, elementType))
+                }
+
+                override fun add(decoder: MessageDecoder) {
+                    decoderPipeline.registerProcessor(MessageDecoderProcessor(decoder))
+                }
+
+            })
+        }
+
+        return instances
+    }
+
+}
+
+///////////////////////////////////////////////////////////////////////////
+// decoders
+///////////////////////////////////////////////////////////////////////////
+
+internal interface MessageDecoderContext : ProcessorPipelineContext<ImMsgBody.Elem, Message> {
+
+}
+
+internal interface MessageDecoder {
+    suspend fun MessageDecoderContext.process(data: ImMsgBody.Elem)
+}
+
+/**
+ * Adapter for [MessageDecoder] to be used as [Processor].
+ */
+internal class MessageDecoderProcessor(
+    private val decoder: MessageDecoder
+) : Processor<MessageDecoderContext, ImMsgBody.Elem> {
+    override suspend fun process(context: MessageDecoderContext, data: ImMsgBody.Elem) {
+        decoder.run { context.process(data) }
+    }
+}
+
+internal interface MessageDecoderPipeline : ProcessorPipeline<MessageDecoderProcessor, ImMsgBody.Elem, Message>
+
+///////////////////////////////////////////////////////////////////////////
+// encoders
+///////////////////////////////////////////////////////////////////////////
+
+internal interface MessageEncoderContext : ProcessorPipelineContext<SingleMessage, ImMsgBody.Elem> {
+
+}
+
+
+internal interface MessageEncoder<T : SingleMessage> {
+    suspend fun MessageEncoderContext.process(data: T)
+}
+
+/**
+ * Adapter for [MessageEncoder] to be used as [Processor].
+ */
+internal class MessageEncoderProcessor<T : SingleMessage>(
+    private val encoder: MessageEncoder<T>,
+    private val elementType: KClass<T>,
+) : Processor<MessageEncoderContext, SingleMessage> {
+    override suspend fun process(context: MessageEncoderContext, data: SingleMessage) {
+        if (elementType.isInstance(data)) {
+            encoder.run { context.process(data.uncheckedCast()) }
+        }
+    }
+}
+
+internal interface MessageEncoderPipeline : ProcessorPipeline<MessageEncoderProcessor<*>, SingleMessage, ImMsgBody.Elem>
+
+///////////////////////////////////////////////////////////////////////////
+// refiners
+///////////////////////////////////////////////////////////////////////////
+
+
+//internal interface MessageRefiner : Processor<MessageRefinerContext>
+//
+//internal interface MessageRefinerContext : ProcessorPipelineContext<SingleMessage, Message?> {
+//    /**
+//     * Refine if possible (without suspension), returns self otherwise.
+//     * @since 2.6
+//     */ // see #1157
+//    fun tryRefine(
+//        bot: Bot,
+//        context: MessageChain,
+//        refineContext: RefineContext = EmptyRefineContext,
+//    ): Message? = this
+//
+//    /**
+//     * This message [RefinableMessage] will be replaced by return value of [refineLight]
+//     */
+//    suspend fun refine(
+//        bot: Bot,
+//        context: MessageChain,
+//        refineContext: RefineContext = EmptyRefineContext,
+//    ): Message? = tryRefine(bot, context, refineContext)
+//}
+//
diff --git a/mirai-core/src/commonMain/kotlin/network/components/NoticeProcessorPipeline.kt b/mirai-core/src/commonMain/kotlin/network/components/NoticeProcessorPipeline.kt
index 98af33013..8bf421af0 100644
--- a/mirai-core/src/commonMain/kotlin/network/components/NoticeProcessorPipeline.kt
+++ b/mirai-core/src/commonMain/kotlin/network/components/NoticeProcessorPipeline.kt
@@ -34,7 +34,6 @@ import net.mamoe.mirai.internal.pipeline.ProcessorPipeline
 import net.mamoe.mirai.internal.pipeline.ProcessorPipelineContext
 import net.mamoe.mirai.internal.utils.io.ProtocolStruct
 import net.mamoe.mirai.utils.*
-import java.util.concurrent.ConcurrentLinkedQueue
 import kotlin.reflect.KClass
 
 /**
@@ -84,18 +83,6 @@ internal open class NoticeProcessorPipelineImpl protected constructor(
     traceLogging: MiraiLogger = defaultTraceLogging,
 ) : NoticeProcessorPipeline,
     AbstractProcessorPipeline<NoticeProcessor, NoticePipelineContext, ProtocolStruct, Packet>(traceLogging) {
-    /**
-     * Must be ordered
-     */
-    override val processors = ConcurrentLinkedQueue<NoticeProcessor>()
-
-    override fun registerProcessor(processor: NoticeProcessor): ProcessorPipeline.DisposableRegistry {
-        processors.add(processor)
-        return ProcessorPipeline.DisposableRegistry {
-            processors.remove(processor)
-        }
-    }
-
 
     open inner class ContextImpl(
         attributes: TypeSafeMap,
@@ -133,45 +120,6 @@ internal open class NoticeProcessorPipelineImpl protected constructor(
         )
     }
 
-    override suspend fun process(
-        data: ProtocolStruct,
-        attributes: TypeSafeMap
-    ): Collection<Packet> {
-        traceLogging.info { "process: data=$data" }
-        val context = createContext(attributes)
-
-        val diff = if (traceLogging.isEnabled) CollectionDiff<Packet>() else null
-        diff?.save(context.collected.data)
-
-        for (processor in processors) {
-
-            val result = kotlin.runCatching {
-                processor.process(context, data)
-            }.onFailure { e ->
-                context.collect(
-                    ParseErrorPacket(
-                        data,
-                        IllegalStateException(
-                            "Exception in $processor while processing packet ${packetToString(data)}.",
-                            e,
-                        ),
-                    ),
-                )
-            }
-
-            diff?.run {
-                val diffPackets = subtractAndSave(context.collected.data)
-
-                traceLogging.info {
-                    "Finished ${
-                        processor.toString().replace("net.mamoe.mirai.internal.network.notice.", "")
-                    }, success=${result.isSuccess}, consumed=${context.isConsumed}, diff=$diffPackets"
-                }
-            }
-        }
-        return context.collected.data
-    }
-
     override fun createContext(attributes: TypeSafeMap): NoticePipelineContext = ContextImpl(attributes)
 
     protected open fun packetToString(data: Any?): String =
@@ -195,7 +143,7 @@ internal open class NoticeProcessorPipelineImpl protected constructor(
 /**
  * A processor handling some specific type of message.
  */
-internal interface NoticeProcessor : Processor<NoticePipelineContext>
+internal interface NoticeProcessor : Processor<NoticePipelineContext, ProtocolStruct>
 
 internal abstract class AnyNoticeProcessor : SimpleNoticeProcessor<ProtocolStruct>(type())
 
@@ -203,7 +151,7 @@ internal abstract class SimpleNoticeProcessor<in T : ProtocolStruct>(
     private val type: KClass<T>,
 ) : NoticeProcessor {
 
-    final override suspend fun process(context: NoticePipelineContext, data: Any?) {
+    final override suspend fun process(context: NoticePipelineContext, data: ProtocolStruct) {
         if (type.isInstance(data)) {
             context.processImpl(data.uncheckedCast())
         }
diff --git a/mirai-core/src/commonMain/kotlin/pipeline/ProcessorPipeline.kt b/mirai-core/src/commonMain/kotlin/pipeline/ProcessorPipeline.kt
index 83183ed05..4997a40da 100644
--- a/mirai-core/src/commonMain/kotlin/pipeline/ProcessorPipeline.kt
+++ b/mirai-core/src/commonMain/kotlin/pipeline/ProcessorPipeline.kt
@@ -16,11 +16,11 @@ import java.io.Closeable
 import java.util.*
 import java.util.concurrent.ConcurrentLinkedQueue
 
-internal interface Processor<C : ProcessorPipelineContext<*, *>> {
-    suspend fun process(context: C, data: Any?)
+internal interface Processor<C : ProcessorPipelineContext<D, *>, D> {
+    suspend fun process(context: C, data: D)
 }
 
-internal interface ProcessorPipeline<P : Processor<*>, D, R> {
+internal interface ProcessorPipeline<P : Processor<out ProcessorPipelineContext<D, *>, D>, D, R> {
     val processors: Collection<P>
 
     fun interface DisposableRegistry : Closeable {
@@ -79,13 +79,13 @@ internal interface ProcessorPipelineContext<D, R> {
      * and throws a [contextualBugReportException] or logs something.
      */
     @ConsumptionMarker
-    fun Processor<*>.markAsConsumed(marker: Any = this)
+    fun Processor<*, D>.markAsConsumed(marker: Any = this)
 
     /**
      * Marks the input as not consumed, if it was marked by this [NoticeProcessor].
      */
     @ConsumptionMarker
-    fun Processor<*>.markNotConsumed(marker: Any = this)
+    fun Processor<*, D>.markNotConsumed(marker: Any = this)
 
     @DslMarker
     annotation class ConsumptionMarker // to give an explicit color.
@@ -106,12 +106,12 @@ internal abstract class AbstractProcessorPipelineContext<D, R>(
     private val consumers: Stack<Any> = Stack()
 
     override val isConsumed: Boolean get() = consumers.isNotEmpty()
-    override fun Processor<*>.markAsConsumed(marker: Any) {
+    override fun Processor<*, D>.markAsConsumed(marker: Any) {
         traceLogging.info { "markAsConsumed: marker=$marker" }
         consumers.push(marker)
     }
 
-    override fun Processor<*>.markNotConsumed(marker: Any) {
+    override fun Processor<*, D>.markNotConsumed(marker: Any) {
         if (consumers.peek() === marker) {
             consumers.pop()
             traceLogging.info { "markNotConsumed: Y, marker=$marker" }
@@ -133,10 +133,12 @@ internal abstract class AbstractProcessorPipelineContext<D, R>(
     }
 }
 
-internal abstract class AbstractProcessorPipeline<P : Processor<C>, C : ProcessorPipelineContext<D, R>, D, R>
+internal abstract class AbstractProcessorPipeline<P : Processor<C, D>, C : ProcessorPipelineContext<D, R>, D, R>
 protected constructor(
     val traceLogging: MiraiLogger,
 ) : ProcessorPipeline<P, D, R> {
+    constructor() : this(SilentLogger)
+
     /**
      * Must be ordered
      */
@@ -169,9 +171,7 @@ protected constructor(
         attributes: TypeSafeMap,
         processor: P,
         e: Throwable
-    ) {
-        throw e
-    }
+    ): Unit = throw e
 
     override suspend fun process(data: D, attributes: TypeSafeMap): Collection<R> {
         traceLogging.info { "process: data=$data" }