MessageProtocol pipeline infrastructure

This commit is contained in:
Him188 2022-04-27 09:50:46 +01:00
parent a8c231485c
commit d6343870b8
5 changed files with 228 additions and 65 deletions

View File

@ -0,0 +1,35 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.message.protocol
import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody
import net.mamoe.mirai.internal.pipeline.AbstractProcessorPipeline
import net.mamoe.mirai.message.data.Message
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.TypeSafeMap
import net.mamoe.mirai.utils.systemProp
import net.mamoe.mirai.utils.withSwitch
private val defaultTraceLogging: MiraiLogger by lazy {
MiraiLogger.Factory.create(MessageDecoderPipelineImpl::class, "MessageDecoderPipeline")
.withSwitch(systemProp("mirai.message.decoder.pipeline.log.full", false))
}
internal open class MessageDecoderPipelineImpl :
AbstractProcessorPipeline<MessageDecoderProcessor, MessageDecoderContext, ImMsgBody.Elem, Message>(
defaultTraceLogging
),
MessageDecoderPipeline {
inner class MessageDecoderContextImpl(attributes: TypeSafeMap) : MessageDecoderContext, BaseContextImpl(attributes)
override fun createContext(attributes: TypeSafeMap): MessageDecoderContext = MessageDecoderContextImpl(attributes)
}

View File

@ -0,0 +1,34 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.message.protocol
import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody
import net.mamoe.mirai.internal.pipeline.AbstractProcessorPipeline
import net.mamoe.mirai.message.data.SingleMessage
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.TypeSafeMap
import net.mamoe.mirai.utils.systemProp
import net.mamoe.mirai.utils.withSwitch
private val defaultTraceLogging: MiraiLogger by lazy {
MiraiLogger.Factory.create(MessageEncoderPipelineImpl::class, "MessageEncoderPipeline")
.withSwitch(systemProp("mirai.message.encoder.pipeline.log.full", false))
}
internal open class MessageEncoderPipelineImpl :
AbstractProcessorPipeline<MessageEncoderProcessor<*>, MessageEncoderContext, SingleMessage, ImMsgBody.Elem>(
defaultTraceLogging
),
MessageEncoderPipeline {
inner class MessageEncoderContextImpl(attributes: TypeSafeMap) : MessageEncoderContext, BaseContextImpl(attributes)
override fun createContext(attributes: TypeSafeMap): MessageEncoderContext = MessageEncoderContextImpl(attributes)
}

View File

@ -0,0 +1,146 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.message.protocol
import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody
import net.mamoe.mirai.internal.pipeline.Processor
import net.mamoe.mirai.internal.pipeline.ProcessorPipeline
import net.mamoe.mirai.internal.pipeline.ProcessorPipelineContext
import net.mamoe.mirai.message.data.Message
import net.mamoe.mirai.message.data.SingleMessage
import net.mamoe.mirai.utils.uncheckedCast
import java.util.*
import kotlin.reflect.KClass
internal abstract class ProcessorCollector {
inline fun <reified T : SingleMessage> add(encoder: MessageEncoder<T>) = add(encoder, T::class)
abstract fun <T : SingleMessage> add(encoder: MessageEncoder<T>, elementType: KClass<T>)
abstract fun add(decoder: MessageDecoder)
}
internal abstract class MessageProtocol {
fun collectProcessors(processorCollector: ProcessorCollector) {
processorCollector.collectProcessorsImpl()
}
protected abstract fun ProcessorCollector.collectProcessorsImpl()
}
internal object MessageProtocols {
val instances: List<MessageProtocol> = initialize()
private fun initialize(): List<MessageProtocol> {
val encoderPipeline = MessageEncoderPipelineImpl()
val decoderPipeline = MessageDecoderPipelineImpl()
val instances = ServiceLoader.load(MessageProtocol::class.java).iterator().asSequence().toList()
for (instance in instances) {
instance.collectProcessors(object : ProcessorCollector() {
override fun <T : SingleMessage> add(encoder: MessageEncoder<T>, elementType: KClass<T>) {
encoderPipeline.registerProcessor(MessageEncoderProcessor(encoder, elementType))
}
override fun add(decoder: MessageDecoder) {
decoderPipeline.registerProcessor(MessageDecoderProcessor(decoder))
}
})
}
return instances
}
}
///////////////////////////////////////////////////////////////////////////
// decoders
///////////////////////////////////////////////////////////////////////////
internal interface MessageDecoderContext : ProcessorPipelineContext<ImMsgBody.Elem, Message> {
}
internal interface MessageDecoder {
suspend fun MessageDecoderContext.process(data: ImMsgBody.Elem)
}
/**
* Adapter for [MessageDecoder] to be used as [Processor].
*/
internal class MessageDecoderProcessor(
private val decoder: MessageDecoder
) : Processor<MessageDecoderContext, ImMsgBody.Elem> {
override suspend fun process(context: MessageDecoderContext, data: ImMsgBody.Elem) {
decoder.run { context.process(data) }
}
}
internal interface MessageDecoderPipeline : ProcessorPipeline<MessageDecoderProcessor, ImMsgBody.Elem, Message>
///////////////////////////////////////////////////////////////////////////
// encoders
///////////////////////////////////////////////////////////////////////////
internal interface MessageEncoderContext : ProcessorPipelineContext<SingleMessage, ImMsgBody.Elem> {
}
internal interface MessageEncoder<T : SingleMessage> {
suspend fun MessageEncoderContext.process(data: T)
}
/**
* Adapter for [MessageEncoder] to be used as [Processor].
*/
internal class MessageEncoderProcessor<T : SingleMessage>(
private val encoder: MessageEncoder<T>,
private val elementType: KClass<T>,
) : Processor<MessageEncoderContext, SingleMessage> {
override suspend fun process(context: MessageEncoderContext, data: SingleMessage) {
if (elementType.isInstance(data)) {
encoder.run { context.process(data.uncheckedCast()) }
}
}
}
internal interface MessageEncoderPipeline : ProcessorPipeline<MessageEncoderProcessor<*>, SingleMessage, ImMsgBody.Elem>
///////////////////////////////////////////////////////////////////////////
// refiners
///////////////////////////////////////////////////////////////////////////
//internal interface MessageRefiner : Processor<MessageRefinerContext>
//
//internal interface MessageRefinerContext : ProcessorPipelineContext<SingleMessage, Message?> {
// /**
// * Refine if possible (without suspension), returns self otherwise.
// * @since 2.6
// */ // see #1157
// fun tryRefine(
// bot: Bot,
// context: MessageChain,
// refineContext: RefineContext = EmptyRefineContext,
// ): Message? = this
//
// /**
// * This message [RefinableMessage] will be replaced by return value of [refineLight]
// */
// suspend fun refine(
// bot: Bot,
// context: MessageChain,
// refineContext: RefineContext = EmptyRefineContext,
// ): Message? = tryRefine(bot, context, refineContext)
//}
//

View File

@ -34,7 +34,6 @@ import net.mamoe.mirai.internal.pipeline.ProcessorPipeline
import net.mamoe.mirai.internal.pipeline.ProcessorPipelineContext import net.mamoe.mirai.internal.pipeline.ProcessorPipelineContext
import net.mamoe.mirai.internal.utils.io.ProtocolStruct import net.mamoe.mirai.internal.utils.io.ProtocolStruct
import net.mamoe.mirai.utils.* import net.mamoe.mirai.utils.*
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.reflect.KClass import kotlin.reflect.KClass
/** /**
@ -84,18 +83,6 @@ internal open class NoticeProcessorPipelineImpl protected constructor(
traceLogging: MiraiLogger = defaultTraceLogging, traceLogging: MiraiLogger = defaultTraceLogging,
) : NoticeProcessorPipeline, ) : NoticeProcessorPipeline,
AbstractProcessorPipeline<NoticeProcessor, NoticePipelineContext, ProtocolStruct, Packet>(traceLogging) { AbstractProcessorPipeline<NoticeProcessor, NoticePipelineContext, ProtocolStruct, Packet>(traceLogging) {
/**
* Must be ordered
*/
override val processors = ConcurrentLinkedQueue<NoticeProcessor>()
override fun registerProcessor(processor: NoticeProcessor): ProcessorPipeline.DisposableRegistry {
processors.add(processor)
return ProcessorPipeline.DisposableRegistry {
processors.remove(processor)
}
}
open inner class ContextImpl( open inner class ContextImpl(
attributes: TypeSafeMap, attributes: TypeSafeMap,
@ -133,45 +120,6 @@ internal open class NoticeProcessorPipelineImpl protected constructor(
) )
} }
override suspend fun process(
data: ProtocolStruct,
attributes: TypeSafeMap
): Collection<Packet> {
traceLogging.info { "process: data=$data" }
val context = createContext(attributes)
val diff = if (traceLogging.isEnabled) CollectionDiff<Packet>() else null
diff?.save(context.collected.data)
for (processor in processors) {
val result = kotlin.runCatching {
processor.process(context, data)
}.onFailure { e ->
context.collect(
ParseErrorPacket(
data,
IllegalStateException(
"Exception in $processor while processing packet ${packetToString(data)}.",
e,
),
),
)
}
diff?.run {
val diffPackets = subtractAndSave(context.collected.data)
traceLogging.info {
"Finished ${
processor.toString().replace("net.mamoe.mirai.internal.network.notice.", "")
}, success=${result.isSuccess}, consumed=${context.isConsumed}, diff=$diffPackets"
}
}
}
return context.collected.data
}
override fun createContext(attributes: TypeSafeMap): NoticePipelineContext = ContextImpl(attributes) override fun createContext(attributes: TypeSafeMap): NoticePipelineContext = ContextImpl(attributes)
protected open fun packetToString(data: Any?): String = protected open fun packetToString(data: Any?): String =
@ -195,7 +143,7 @@ internal open class NoticeProcessorPipelineImpl protected constructor(
/** /**
* A processor handling some specific type of message. * A processor handling some specific type of message.
*/ */
internal interface NoticeProcessor : Processor<NoticePipelineContext> internal interface NoticeProcessor : Processor<NoticePipelineContext, ProtocolStruct>
internal abstract class AnyNoticeProcessor : SimpleNoticeProcessor<ProtocolStruct>(type()) internal abstract class AnyNoticeProcessor : SimpleNoticeProcessor<ProtocolStruct>(type())
@ -203,7 +151,7 @@ internal abstract class SimpleNoticeProcessor<in T : ProtocolStruct>(
private val type: KClass<T>, private val type: KClass<T>,
) : NoticeProcessor { ) : NoticeProcessor {
final override suspend fun process(context: NoticePipelineContext, data: Any?) { final override suspend fun process(context: NoticePipelineContext, data: ProtocolStruct) {
if (type.isInstance(data)) { if (type.isInstance(data)) {
context.processImpl(data.uncheckedCast()) context.processImpl(data.uncheckedCast())
} }

View File

@ -16,11 +16,11 @@ import java.io.Closeable
import java.util.* import java.util.*
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
internal interface Processor<C : ProcessorPipelineContext<*, *>> { internal interface Processor<C : ProcessorPipelineContext<D, *>, D> {
suspend fun process(context: C, data: Any?) suspend fun process(context: C, data: D)
} }
internal interface ProcessorPipeline<P : Processor<*>, D, R> { internal interface ProcessorPipeline<P : Processor<out ProcessorPipelineContext<D, *>, D>, D, R> {
val processors: Collection<P> val processors: Collection<P>
fun interface DisposableRegistry : Closeable { fun interface DisposableRegistry : Closeable {
@ -79,13 +79,13 @@ internal interface ProcessorPipelineContext<D, R> {
* and throws a [contextualBugReportException] or logs something. * and throws a [contextualBugReportException] or logs something.
*/ */
@ConsumptionMarker @ConsumptionMarker
fun Processor<*>.markAsConsumed(marker: Any = this) fun Processor<*, D>.markAsConsumed(marker: Any = this)
/** /**
* Marks the input as not consumed, if it was marked by this [NoticeProcessor]. * Marks the input as not consumed, if it was marked by this [NoticeProcessor].
*/ */
@ConsumptionMarker @ConsumptionMarker
fun Processor<*>.markNotConsumed(marker: Any = this) fun Processor<*, D>.markNotConsumed(marker: Any = this)
@DslMarker @DslMarker
annotation class ConsumptionMarker // to give an explicit color. annotation class ConsumptionMarker // to give an explicit color.
@ -106,12 +106,12 @@ internal abstract class AbstractProcessorPipelineContext<D, R>(
private val consumers: Stack<Any> = Stack() private val consumers: Stack<Any> = Stack()
override val isConsumed: Boolean get() = consumers.isNotEmpty() override val isConsumed: Boolean get() = consumers.isNotEmpty()
override fun Processor<*>.markAsConsumed(marker: Any) { override fun Processor<*, D>.markAsConsumed(marker: Any) {
traceLogging.info { "markAsConsumed: marker=$marker" } traceLogging.info { "markAsConsumed: marker=$marker" }
consumers.push(marker) consumers.push(marker)
} }
override fun Processor<*>.markNotConsumed(marker: Any) { override fun Processor<*, D>.markNotConsumed(marker: Any) {
if (consumers.peek() === marker) { if (consumers.peek() === marker) {
consumers.pop() consumers.pop()
traceLogging.info { "markNotConsumed: Y, marker=$marker" } traceLogging.info { "markNotConsumed: Y, marker=$marker" }
@ -133,10 +133,12 @@ internal abstract class AbstractProcessorPipelineContext<D, R>(
} }
} }
internal abstract class AbstractProcessorPipeline<P : Processor<C>, C : ProcessorPipelineContext<D, R>, D, R> internal abstract class AbstractProcessorPipeline<P : Processor<C, D>, C : ProcessorPipelineContext<D, R>, D, R>
protected constructor( protected constructor(
val traceLogging: MiraiLogger, val traceLogging: MiraiLogger,
) : ProcessorPipeline<P, D, R> { ) : ProcessorPipeline<P, D, R> {
constructor() : this(SilentLogger)
/** /**
* Must be ordered * Must be ordered
*/ */
@ -169,9 +171,7 @@ protected constructor(
attributes: TypeSafeMap, attributes: TypeSafeMap,
processor: P, processor: P,
e: Throwable e: Throwable
) { ): Unit = throw e
throw e
}
override suspend fun process(data: D, attributes: TypeSafeMap): Collection<R> { override suspend fun process(data: D, attributes: TypeSafeMap): Collection<R> {
traceLogging.info { "process: data=$data" } traceLogging.info { "process: data=$data" }