diff --git a/mirai-core/src/commonMain/kotlin/message/protocol/MessageDecoderPipelineImpl.kt b/mirai-core/src/commonMain/kotlin/message/protocol/MessageDecoderPipelineImpl.kt new file mode 100644 index 000000000..c74b956e6 --- /dev/null +++ b/mirai-core/src/commonMain/kotlin/message/protocol/MessageDecoderPipelineImpl.kt @@ -0,0 +1,35 @@ +/* + * Copyright 2019-2022 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/dev/LICENSE + */ + +package net.mamoe.mirai.internal.message.protocol + +import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody +import net.mamoe.mirai.internal.pipeline.AbstractProcessorPipeline +import net.mamoe.mirai.message.data.Message +import net.mamoe.mirai.utils.MiraiLogger +import net.mamoe.mirai.utils.TypeSafeMap +import net.mamoe.mirai.utils.systemProp +import net.mamoe.mirai.utils.withSwitch + +private val defaultTraceLogging: MiraiLogger by lazy { + MiraiLogger.Factory.create(MessageDecoderPipelineImpl::class, "MessageDecoderPipeline") + .withSwitch(systemProp("mirai.message.decoder.pipeline.log.full", false)) +} + + +internal open class MessageDecoderPipelineImpl : + AbstractProcessorPipeline( + defaultTraceLogging + ), + MessageDecoderPipeline { + + inner class MessageDecoderContextImpl(attributes: TypeSafeMap) : MessageDecoderContext, BaseContextImpl(attributes) + + override fun createContext(attributes: TypeSafeMap): MessageDecoderContext = MessageDecoderContextImpl(attributes) +} \ No newline at end of file diff --git a/mirai-core/src/commonMain/kotlin/message/protocol/MessageEncoderPipelineImpl.kt b/mirai-core/src/commonMain/kotlin/message/protocol/MessageEncoderPipelineImpl.kt new file mode 100644 index 000000000..cc5aa5d3c --- /dev/null +++ b/mirai-core/src/commonMain/kotlin/message/protocol/MessageEncoderPipelineImpl.kt @@ -0,0 +1,34 @@ +/* + * Copyright 2019-2022 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/dev/LICENSE + */ + +package net.mamoe.mirai.internal.message.protocol + +import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody +import net.mamoe.mirai.internal.pipeline.AbstractProcessorPipeline +import net.mamoe.mirai.message.data.SingleMessage +import net.mamoe.mirai.utils.MiraiLogger +import net.mamoe.mirai.utils.TypeSafeMap +import net.mamoe.mirai.utils.systemProp +import net.mamoe.mirai.utils.withSwitch + +private val defaultTraceLogging: MiraiLogger by lazy { + MiraiLogger.Factory.create(MessageEncoderPipelineImpl::class, "MessageEncoderPipeline") + .withSwitch(systemProp("mirai.message.encoder.pipeline.log.full", false)) +} + +internal open class MessageEncoderPipelineImpl : + AbstractProcessorPipeline, MessageEncoderContext, SingleMessage, ImMsgBody.Elem>( + defaultTraceLogging + ), + MessageEncoderPipeline { + + inner class MessageEncoderContextImpl(attributes: TypeSafeMap) : MessageEncoderContext, BaseContextImpl(attributes) + + override fun createContext(attributes: TypeSafeMap): MessageEncoderContext = MessageEncoderContextImpl(attributes) +} \ No newline at end of file diff --git a/mirai-core/src/commonMain/kotlin/message/protocol/MessageProtocol.kt b/mirai-core/src/commonMain/kotlin/message/protocol/MessageProtocol.kt new file mode 100644 index 000000000..1966aaa6b --- /dev/null +++ b/mirai-core/src/commonMain/kotlin/message/protocol/MessageProtocol.kt @@ -0,0 +1,146 @@ +/* + * Copyright 2019-2022 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/dev/LICENSE + */ + +package net.mamoe.mirai.internal.message.protocol + +import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody +import net.mamoe.mirai.internal.pipeline.Processor +import net.mamoe.mirai.internal.pipeline.ProcessorPipeline +import net.mamoe.mirai.internal.pipeline.ProcessorPipelineContext +import net.mamoe.mirai.message.data.Message +import net.mamoe.mirai.message.data.SingleMessage +import net.mamoe.mirai.utils.uncheckedCast +import java.util.* +import kotlin.reflect.KClass + +internal abstract class ProcessorCollector { + inline fun add(encoder: MessageEncoder) = add(encoder, T::class) + + + abstract fun add(encoder: MessageEncoder, elementType: KClass) + + abstract fun add(decoder: MessageDecoder) +} + +internal abstract class MessageProtocol { + fun collectProcessors(processorCollector: ProcessorCollector) { + processorCollector.collectProcessorsImpl() + } + + protected abstract fun ProcessorCollector.collectProcessorsImpl() +} + +internal object MessageProtocols { + val instances: List = initialize() + + private fun initialize(): List { + val encoderPipeline = MessageEncoderPipelineImpl() + val decoderPipeline = MessageDecoderPipelineImpl() + + val instances = ServiceLoader.load(MessageProtocol::class.java).iterator().asSequence().toList() + for (instance in instances) { + instance.collectProcessors(object : ProcessorCollector() { + override fun add(encoder: MessageEncoder, elementType: KClass) { + encoderPipeline.registerProcessor(MessageEncoderProcessor(encoder, elementType)) + } + + override fun add(decoder: MessageDecoder) { + decoderPipeline.registerProcessor(MessageDecoderProcessor(decoder)) + } + + }) + } + + return instances + } + +} + +/////////////////////////////////////////////////////////////////////////// +// decoders +/////////////////////////////////////////////////////////////////////////// + +internal interface MessageDecoderContext : ProcessorPipelineContext { + +} + +internal interface MessageDecoder { + suspend fun MessageDecoderContext.process(data: ImMsgBody.Elem) +} + +/** + * Adapter for [MessageDecoder] to be used as [Processor]. + */ +internal class MessageDecoderProcessor( + private val decoder: MessageDecoder +) : Processor { + override suspend fun process(context: MessageDecoderContext, data: ImMsgBody.Elem) { + decoder.run { context.process(data) } + } +} + +internal interface MessageDecoderPipeline : ProcessorPipeline + +/////////////////////////////////////////////////////////////////////////// +// encoders +/////////////////////////////////////////////////////////////////////////// + +internal interface MessageEncoderContext : ProcessorPipelineContext { + +} + + +internal interface MessageEncoder { + suspend fun MessageEncoderContext.process(data: T) +} + +/** + * Adapter for [MessageEncoder] to be used as [Processor]. + */ +internal class MessageEncoderProcessor( + private val encoder: MessageEncoder, + private val elementType: KClass, +) : Processor { + override suspend fun process(context: MessageEncoderContext, data: SingleMessage) { + if (elementType.isInstance(data)) { + encoder.run { context.process(data.uncheckedCast()) } + } + } +} + +internal interface MessageEncoderPipeline : ProcessorPipeline, SingleMessage, ImMsgBody.Elem> + +/////////////////////////////////////////////////////////////////////////// +// refiners +/////////////////////////////////////////////////////////////////////////// + + +//internal interface MessageRefiner : Processor +// +//internal interface MessageRefinerContext : ProcessorPipelineContext { +// /** +// * Refine if possible (without suspension), returns self otherwise. +// * @since 2.6 +// */ // see #1157 +// fun tryRefine( +// bot: Bot, +// context: MessageChain, +// refineContext: RefineContext = EmptyRefineContext, +// ): Message? = this +// +// /** +// * This message [RefinableMessage] will be replaced by return value of [refineLight] +// */ +// suspend fun refine( +// bot: Bot, +// context: MessageChain, +// refineContext: RefineContext = EmptyRefineContext, +// ): Message? = tryRefine(bot, context, refineContext) +//} +// diff --git a/mirai-core/src/commonMain/kotlin/network/components/NoticeProcessorPipeline.kt b/mirai-core/src/commonMain/kotlin/network/components/NoticeProcessorPipeline.kt index 98af33013..8bf421af0 100644 --- a/mirai-core/src/commonMain/kotlin/network/components/NoticeProcessorPipeline.kt +++ b/mirai-core/src/commonMain/kotlin/network/components/NoticeProcessorPipeline.kt @@ -34,7 +34,6 @@ import net.mamoe.mirai.internal.pipeline.ProcessorPipeline import net.mamoe.mirai.internal.pipeline.ProcessorPipelineContext import net.mamoe.mirai.internal.utils.io.ProtocolStruct import net.mamoe.mirai.utils.* -import java.util.concurrent.ConcurrentLinkedQueue import kotlin.reflect.KClass /** @@ -84,18 +83,6 @@ internal open class NoticeProcessorPipelineImpl protected constructor( traceLogging: MiraiLogger = defaultTraceLogging, ) : NoticeProcessorPipeline, AbstractProcessorPipeline(traceLogging) { - /** - * Must be ordered - */ - override val processors = ConcurrentLinkedQueue() - - override fun registerProcessor(processor: NoticeProcessor): ProcessorPipeline.DisposableRegistry { - processors.add(processor) - return ProcessorPipeline.DisposableRegistry { - processors.remove(processor) - } - } - open inner class ContextImpl( attributes: TypeSafeMap, @@ -133,45 +120,6 @@ internal open class NoticeProcessorPipelineImpl protected constructor( ) } - override suspend fun process( - data: ProtocolStruct, - attributes: TypeSafeMap - ): Collection { - traceLogging.info { "process: data=$data" } - val context = createContext(attributes) - - val diff = if (traceLogging.isEnabled) CollectionDiff() else null - diff?.save(context.collected.data) - - for (processor in processors) { - - val result = kotlin.runCatching { - processor.process(context, data) - }.onFailure { e -> - context.collect( - ParseErrorPacket( - data, - IllegalStateException( - "Exception in $processor while processing packet ${packetToString(data)}.", - e, - ), - ), - ) - } - - diff?.run { - val diffPackets = subtractAndSave(context.collected.data) - - traceLogging.info { - "Finished ${ - processor.toString().replace("net.mamoe.mirai.internal.network.notice.", "") - }, success=${result.isSuccess}, consumed=${context.isConsumed}, diff=$diffPackets" - } - } - } - return context.collected.data - } - override fun createContext(attributes: TypeSafeMap): NoticePipelineContext = ContextImpl(attributes) protected open fun packetToString(data: Any?): String = @@ -195,7 +143,7 @@ internal open class NoticeProcessorPipelineImpl protected constructor( /** * A processor handling some specific type of message. */ -internal interface NoticeProcessor : Processor +internal interface NoticeProcessor : Processor internal abstract class AnyNoticeProcessor : SimpleNoticeProcessor(type()) @@ -203,7 +151,7 @@ internal abstract class SimpleNoticeProcessor( private val type: KClass, ) : NoticeProcessor { - final override suspend fun process(context: NoticePipelineContext, data: Any?) { + final override suspend fun process(context: NoticePipelineContext, data: ProtocolStruct) { if (type.isInstance(data)) { context.processImpl(data.uncheckedCast()) } diff --git a/mirai-core/src/commonMain/kotlin/pipeline/ProcessorPipeline.kt b/mirai-core/src/commonMain/kotlin/pipeline/ProcessorPipeline.kt index 83183ed05..4997a40da 100644 --- a/mirai-core/src/commonMain/kotlin/pipeline/ProcessorPipeline.kt +++ b/mirai-core/src/commonMain/kotlin/pipeline/ProcessorPipeline.kt @@ -16,11 +16,11 @@ import java.io.Closeable import java.util.* import java.util.concurrent.ConcurrentLinkedQueue -internal interface Processor> { - suspend fun process(context: C, data: Any?) +internal interface Processor, D> { + suspend fun process(context: C, data: D) } -internal interface ProcessorPipeline

, D, R> { +internal interface ProcessorPipeline

, D>, D, R> { val processors: Collection

fun interface DisposableRegistry : Closeable { @@ -79,13 +79,13 @@ internal interface ProcessorPipelineContext { * and throws a [contextualBugReportException] or logs something. */ @ConsumptionMarker - fun Processor<*>.markAsConsumed(marker: Any = this) + fun Processor<*, D>.markAsConsumed(marker: Any = this) /** * Marks the input as not consumed, if it was marked by this [NoticeProcessor]. */ @ConsumptionMarker - fun Processor<*>.markNotConsumed(marker: Any = this) + fun Processor<*, D>.markNotConsumed(marker: Any = this) @DslMarker annotation class ConsumptionMarker // to give an explicit color. @@ -106,12 +106,12 @@ internal abstract class AbstractProcessorPipelineContext( private val consumers: Stack = Stack() override val isConsumed: Boolean get() = consumers.isNotEmpty() - override fun Processor<*>.markAsConsumed(marker: Any) { + override fun Processor<*, D>.markAsConsumed(marker: Any) { traceLogging.info { "markAsConsumed: marker=$marker" } consumers.push(marker) } - override fun Processor<*>.markNotConsumed(marker: Any) { + override fun Processor<*, D>.markNotConsumed(marker: Any) { if (consumers.peek() === marker) { consumers.pop() traceLogging.info { "markNotConsumed: Y, marker=$marker" } @@ -133,10 +133,12 @@ internal abstract class AbstractProcessorPipelineContext( } } -internal abstract class AbstractProcessorPipeline

, C : ProcessorPipelineContext, D, R> +internal abstract class AbstractProcessorPipeline

, C : ProcessorPipelineContext, D, R> protected constructor( val traceLogging: MiraiLogger, ) : ProcessorPipeline { + constructor() : this(SilentLogger) + /** * Must be ordered */ @@ -169,9 +171,7 @@ protected constructor( attributes: TypeSafeMap, processor: P, e: Throwable - ) { - throw e - } + ): Unit = throw e override suspend fun process(data: D, attributes: TypeSafeMap): Collection { traceLogging.info { "process: data=$data" }