Generalize NoticeProcessorPipeline as ProcessorPipeline

This commit is contained in:
Him188 2022-04-26 15:18:11 +01:00
parent 0bb86a3b11
commit 0b52c2bb5a
6 changed files with 297 additions and 163 deletions

View File

@ -190,6 +190,7 @@ internal open class QQAndroidBot constructor(
set(
NoticeProcessorPipeline,
NoticeProcessorPipelineImpl.create(
bot,
MsgInfoDecoder(pipelineLogger.subLogger("MsgInfoDecoder")),
GroupNotificationDecoder(),

View File

@ -0,0 +1,20 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.message.protocol
import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody
internal interface OutgoingMessageContext {
fun collect(elem: ImMsgBody.Elem)
fun processAlso()
}

View File

@ -1,16 +1,15 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.network.components
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.message.contextualBugReportException
import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.ParseErrorPacket
import net.mamoe.mirai.internal.network.component.ComponentKey
@ -29,40 +28,19 @@ import net.mamoe.mirai.internal.network.protocol.data.proto.Structmsg
import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.MessageSvcPbGetMsg
import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.OnlinePushPbPushTransMsg
import net.mamoe.mirai.internal.network.toPacket
import net.mamoe.mirai.internal.pipeline.AbstractProcessorPipeline
import net.mamoe.mirai.internal.pipeline.Processor
import net.mamoe.mirai.internal.pipeline.ProcessorPipeline
import net.mamoe.mirai.internal.pipeline.ProcessorPipelineContext
import net.mamoe.mirai.internal.utils.io.ProtocolStruct
import net.mamoe.mirai.utils.*
import java.io.Closeable
import java.util.*
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.reflect.KClass
internal typealias ProcessResult = Collection<Packet>
/**
* Centralized processor pipeline for [MessageSvcPbGetMsg] and [OnlinePushPbPushTransMsg]
*/
internal interface NoticeProcessorPipeline {
val processors: Collection<NoticeProcessor>
fun interface DisposableRegistry : Closeable {
fun dispose()
override fun close() {
dispose()
}
}
fun registerProcessor(processor: NoticeProcessor): DisposableRegistry
/**
* Process [data] into [Packet]s. Exceptions are wrapped into [ParseErrorPacket]
*/
suspend fun process(
bot: QQAndroidBot,
data: ProtocolStruct,
attributes: TypeSafeMap = TypeSafeMap.EMPTY
): ProcessResult
internal interface NoticeProcessorPipeline : ProcessorPipeline<NoticeProcessor, ProtocolStruct, Packet> {
companion object : ComponentKey<NoticeProcessorPipeline> {
val ComponentStorage.noticeProcessorPipeline get() = get(NoticeProcessorPipeline)
@ -71,71 +49,15 @@ internal interface NoticeProcessorPipeline {
data: ProtocolStruct,
attributes: TypeSafeMap = TypeSafeMap.EMPTY,
): Packet {
return components.noticeProcessorPipeline.process(this, data, attributes).toPacket()
return components.noticeProcessorPipeline.process(data, attributes).toPacket()
}
}
}
@JvmInline
internal value class MutableProcessResult(
val data: MutableCollection<Packet>
)
internal interface NoticePipelineContext : BotAware, NewContactSupport {
internal interface NoticePipelineContext : BotAware, NewContactSupport,
ProcessorPipelineContext<ProtocolStruct, Packet> {
override val bot: QQAndroidBot
val attributes: TypeSafeMap
val isConsumed: Boolean
/**
* Marks the input as consumed so that there will not be warnings like 'Unknown type xxx'. This will not stop the pipeline.
*
* If this is executed, make sure you provided all information important for debugging.
*
* You need to invoke [markAsConsumed] if your implementation includes some `else` branch which covers all situations,
* and throws a [contextualBugReportException] or logs something.
*/
@ConsumptionMarker
fun NoticeProcessor.markAsConsumed(marker: Any = this)
/**
* Marks the input as not consumed, if it was marked by this [NoticeProcessor].
*/
@ConsumptionMarker
fun NoticeProcessor.markNotConsumed(marker: Any = this)
@DslMarker
annotation class ConsumptionMarker // to give an explicit color.
val collected: MutableProcessResult
// DSL to simplify some expressions
operator fun MutableProcessResult.plusAssign(packet: Packet?) {
if (packet != null) collect(packet)
}
/**
* Collect a result.
*/
fun collect(packet: Packet)
/**
* Collect results.
*/
fun collect(packets: Iterable<Packet>)
/**
* Fire the [data] into the processor pipeline, and collect the results to current [collected].
*
* @param attributes extra attributes
* @return result collected from processors. This would also have been collected to this context (where you call [processAlso]).
*/
suspend fun processAlso(data: ProtocolStruct, attributes: TypeSafeMap = TypeSafeMap.EMPTY): ProcessResult
companion object {
val KEY_FROM_SYNC = TypeKey<Boolean>("fromSync")
val KEY_MSG_INFO = TypeKey<MsgInfo>("msgInfo")
@ -150,82 +72,73 @@ internal interface NoticePipelineContext : BotAware, NewContactSupport {
}
}
internal abstract class AbstractNoticePipelineContext(
override val bot: QQAndroidBot, override val attributes: TypeSafeMap,
) : NoticePipelineContext {
private val consumers: Stack<Any> = Stack()
override val isConsumed: Boolean get() = consumers.isNotEmpty()
override fun NoticeProcessor.markAsConsumed(marker: Any) {
traceLogging.info { "markAsConsumed: marker=$marker" }
consumers.push(marker)
}
override fun NoticeProcessor.markNotConsumed(marker: Any) {
if (consumers.peek() === marker) {
consumers.pop()
traceLogging.info { "markNotConsumed: Y, marker=$marker" }
} else {
traceLogging.info { "markNotConsumed: N, marker=$marker" }
}
}
override val collected = MutableProcessResult(ConcurrentLinkedQueue())
override fun collect(packet: Packet) {
collected.data.add(packet)
traceLogging.info { "collect: $packet" }
}
override fun collect(packets: Iterable<Packet>) {
this.collected.data.addAll(packets)
traceLogging.info {
val list = packets.toList()
"collect: [${list.size}] ${list.joinToString()}"
}
}
abstract override suspend fun processAlso(data: ProtocolStruct, attributes: TypeSafeMap): ProcessResult
}
internal inline val NoticePipelineContext.context get() = this
private val traceLogging: MiraiLogger by lazy {
private val defaultTraceLogging: MiraiLogger by lazy {
MiraiLogger.Factory.create(NoticeProcessorPipelineImpl::class, "NoticeProcessorPipeline")
.withSwitch(systemProp("mirai.network.notice.pipeline.log.full", false))
}
internal open class NoticeProcessorPipelineImpl protected constructor() : NoticeProcessorPipeline {
internal open class NoticeProcessorPipelineImpl protected constructor(
private val bot: QQAndroidBot,
traceLogging: MiraiLogger = defaultTraceLogging,
) : NoticeProcessorPipeline,
AbstractProcessorPipeline<NoticeProcessor, NoticePipelineContext, ProtocolStruct, Packet>(traceLogging) {
/**
* Must be ordered
*/
override val processors = ConcurrentLinkedQueue<NoticeProcessor>()
override fun registerProcessor(processor: NoticeProcessor): NoticeProcessorPipeline.DisposableRegistry {
override fun registerProcessor(processor: NoticeProcessor): ProcessorPipeline.DisposableRegistry {
processors.add(processor)
return NoticeProcessorPipeline.DisposableRegistry {
return ProcessorPipeline.DisposableRegistry {
processors.remove(processor)
}
}
open inner class ContextImpl(
bot: QQAndroidBot, attributes: TypeSafeMap,
) : AbstractNoticePipelineContext(bot, attributes) {
override suspend fun processAlso(data: ProtocolStruct, attributes: TypeSafeMap): ProcessResult {
attributes: TypeSafeMap,
) : BaseContextImpl(attributes), NoticePipelineContext {
override val bot: QQAndroidBot
get() = this@NoticeProcessorPipelineImpl.bot
override suspend fun processAlso(
data: ProtocolStruct,
attributes: TypeSafeMap
): Collection<Packet> {
traceLogging.info { "processAlso: data=$data" }
return process(bot, data, this.attributes + attributes).also {
return process(data, this.attributes + attributes).also {
this.collected.data += it
traceLogging.info { "processAlso: result=$it" }
}
}
}
override fun handleExceptionInProcess(
data: ProtocolStruct,
context: NoticePipelineContext,
attributes: TypeSafeMap,
processor: NoticeProcessor,
e: Throwable
) {
context.collect(
ParseErrorPacket(
data,
IllegalStateException(
"Exception in $processor while processing packet ${packetToString(data)}.",
e,
),
)
)
}
override suspend fun process(bot: QQAndroidBot, data: ProtocolStruct, attributes: TypeSafeMap): ProcessResult {
override suspend fun process(
data: ProtocolStruct,
attributes: TypeSafeMap
): Collection<Packet> {
traceLogging.info { "process: data=$data" }
val context = createContext(bot, attributes)
val context = createContext(attributes)
val diff = if (traceLogging.isEnabled) CollectionDiff<Packet>() else null
diff?.save(context.collected.data)
@ -259,18 +172,15 @@ internal open class NoticeProcessorPipelineImpl protected constructor() : Notice
return context.collected.data
}
protected open fun createContext(
bot: QQAndroidBot,
attributes: TypeSafeMap
): NoticePipelineContext = ContextImpl(bot, attributes)
override fun createContext(attributes: TypeSafeMap): NoticePipelineContext = ContextImpl(attributes)
protected open fun packetToString(data: Any?): String =
data.toDebugString("mirai.network.notice.pipeline.log.full")
companion object {
fun create(vararg processors: NoticeProcessor): NoticeProcessorPipelineImpl =
NoticeProcessorPipelineImpl().apply {
fun create(bot: QQAndroidBot, vararg processors: NoticeProcessor): NoticeProcessorPipelineImpl =
NoticeProcessorPipelineImpl(bot, defaultTraceLogging).apply {
for (processor in processors) {
registerProcessor(processor)
}
@ -285,9 +195,7 @@ internal open class NoticeProcessorPipelineImpl protected constructor() : Notice
/**
* A processor handling some specific type of message.
*/
internal interface NoticeProcessor {
suspend fun process(context: NoticePipelineContext, data: Any?)
}
internal interface NoticeProcessor : Processor<NoticePipelineContext>
internal abstract class AnyNoticeProcessor : SimpleNoticeProcessor<ProtocolStruct>(type())

View File

@ -0,0 +1,203 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.pipeline
import net.mamoe.mirai.internal.message.contextualBugReportException
import net.mamoe.mirai.internal.network.components.NoticeProcessor
import net.mamoe.mirai.utils.*
import java.io.Closeable
import java.util.*
import java.util.concurrent.ConcurrentLinkedQueue
internal interface Processor<C : ProcessorPipelineContext<*, *>> {
suspend fun process(context: C, data: Any?)
}
internal interface ProcessorPipeline<P : Processor<*>, D, R> {
val processors: Collection<P>
fun interface DisposableRegistry : Closeable {
fun dispose()
override fun close() {
dispose()
}
}
fun registerProcessor(processor: P): DisposableRegistry
suspend fun process(
data: D,
attributes: TypeSafeMap = TypeSafeMap.EMPTY
): Collection<R>
}
@JvmInline
internal value class MutableProcessResult<R>(
val data: MutableCollection<R>
)
internal interface ProcessorPipelineContext<D, R> {
val attributes: TypeSafeMap
val collected: MutableProcessResult<R>
// DSL to simplify some expressions
operator fun MutableProcessResult<R>.plusAssign(result: R?) {
if (result != null) collect(result)
}
/**
* Collect a result.
*/
fun collect(result: R)
/**
* Collect results.
*/
fun collect(results: Iterable<R>)
val isConsumed: Boolean
/**
* Marks the input as consumed so that there will not be warnings like 'Unknown type xxx'. This will not stop the pipeline.
*
* If this is executed, make sure you provided all information important for debugging.
*
* You need to invoke [markAsConsumed] if your implementation includes some `else` branch which covers all situations,
* and throws a [contextualBugReportException] or logs something.
*/
@ConsumptionMarker
fun Processor<*>.markAsConsumed(marker: Any = this)
/**
* Marks the input as not consumed, if it was marked by this [NoticeProcessor].
*/
@ConsumptionMarker
fun Processor<*>.markNotConsumed(marker: Any = this)
@DslMarker
annotation class ConsumptionMarker // to give an explicit color.
/**
* Fire the [data] into the processor pipeline, and collect the results to current [collected].
*
* @param attributes extra attributes
* @return result collected from processors. This would also have been collected to this context (where you call [processAlso]).
*/
suspend fun processAlso(data: D, attributes: TypeSafeMap = TypeSafeMap.EMPTY): Collection<R>
}
internal abstract class AbstractProcessorPipelineContext<D, R>(
override val attributes: TypeSafeMap,
private val traceLogging: MiraiLogger,
) : ProcessorPipelineContext<D, R> {
private val consumers: Stack<Any> = Stack()
override val isConsumed: Boolean get() = consumers.isNotEmpty()
override fun Processor<*>.markAsConsumed(marker: Any) {
traceLogging.info { "markAsConsumed: marker=$marker" }
consumers.push(marker)
}
override fun Processor<*>.markNotConsumed(marker: Any) {
if (consumers.peek() === marker) {
consumers.pop()
traceLogging.info { "markNotConsumed: Y, marker=$marker" }
} else {
traceLogging.info { "markNotConsumed: N, marker=$marker" }
}
}
override val collected: MutableProcessResult<R> = MutableProcessResult(ConcurrentLinkedQueue())
override fun collect(result: R) {
collected.data.add(result)
traceLogging.info { "collect: $result" }
}
override fun collect(results: Iterable<R>) {
this.collected.data.addAll(results)
traceLogging.info { "collect: $results" }
}
}
internal abstract class AbstractProcessorPipeline<P : Processor<C>, C : ProcessorPipelineContext<D, R>, D, R>
protected constructor(
val traceLogging: MiraiLogger,
) : ProcessorPipeline<P, D, R> {
/**
* Must be ordered
*/
override val processors = ConcurrentLinkedQueue<P>()
override fun registerProcessor(processor: P): ProcessorPipeline.DisposableRegistry {
processors.add(processor)
return ProcessorPipeline.DisposableRegistry {
processors.remove(processor)
}
}
protected abstract fun createContext(attributes: TypeSafeMap): C
abstract inner class BaseContextImpl(
attributes: TypeSafeMap,
) : AbstractProcessorPipelineContext<D, R>(attributes, traceLogging) {
override suspend fun processAlso(data: D, attributes: TypeSafeMap): Collection<R> {
traceLogging.info { "processAlso: data=$data" }
return process(data, this.attributes + attributes).also {
this.collected.data += it
traceLogging.info { "processAlso: result=$it" }
}
}
}
protected open fun handleExceptionInProcess(
data: D,
context: C,
attributes: TypeSafeMap,
processor: P,
e: Throwable
) {
throw e
}
override suspend fun process(data: D, attributes: TypeSafeMap): Collection<R> {
traceLogging.info { "process: data=$data" }
val context = createContext(attributes)
val diff = if (traceLogging.isEnabled) CollectionDiff<R>() else null
diff?.save(context.collected.data)
for (processor in processors) {
val result = kotlin.runCatching {
processor.process(context, data)
}.onFailure { e ->
handleExceptionInProcess(data, context, attributes, processor, e)
}
diff?.run {
val diffPackets = subtractAndSave(context.collected.data)
traceLogging.info {
"Finished ${
processor.toString().replace("net.mamoe.mirai.internal.network.notice.", "")
}, success=${result.isSuccess}, consumed=${context.isConsumed}, diff=$diffPackets"
}
}
}
return context.collected.data
}
}

View File

@ -24,6 +24,7 @@ import net.mamoe.mirai.internal.contact.info.FriendInfoImpl
import net.mamoe.mirai.internal.contact.info.GroupInfoImpl
import net.mamoe.mirai.internal.contact.info.MemberInfoImpl
import net.mamoe.mirai.internal.contact.info.StrangerInfoImpl
import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.components.*
import net.mamoe.mirai.internal.network.components.NoticeProcessorPipeline.Companion.noticeProcessorPipeline
import net.mamoe.mirai.internal.network.framework.AbstractNettyNHTest
@ -58,11 +59,11 @@ internal abstract class AbstractNoticeProcessorTest : AbstractNettyNHTest(), Gro
attributes: TypeSafeMap = TypeSafeMap(),
pipeline: NoticeProcessorPipeline = bot.components.noticeProcessorPipeline,
block: UseTestContext.() -> ProtocolStruct
): ProcessResult {
): Collection<Packet> {
bot.components[SsoProcessor].firstLoginResult.value = FirstLoginResult.PASSED
val handler = LoggingPacketHandlerAdapter(PacketLoggingStrategyImpl(bot), bot.logger)
val context = UseTestContext(attributes.toMutableTypeSafeMap())
return pipeline.process(bot, block(context), context.attributes).also { list ->
return pipeline.process(block(context), context.attributes).also { list ->
for (packet in list) {
handler.handlePacket(IncomingPacket("test", packet))
}
@ -71,16 +72,17 @@ internal abstract class AbstractNoticeProcessorTest : AbstractNettyNHTest(), Gro
protected suspend inline fun use(
attributes: TypeSafeMap = TypeSafeMap(),
crossinline createContext: NoticeProcessorPipelineImpl.(bot: QQAndroidBot, attributes: TypeSafeMap) -> NoticeProcessorPipelineImpl.ContextImpl,
crossinline createContext: NoticeProcessorPipelineImpl.(attributes: TypeSafeMap) -> NoticeProcessorPipelineImpl.ContextImpl,
block: UseTestContext.() -> ProtocolStruct
): ProcessResult = use(attributes, pipeline = object : NoticeProcessorPipelineImpl() {
init {
bot.components.noticeProcessorPipeline.processors.forEach { registerProcessor(it) }
}
): Collection<Packet> =
use(attributes, pipeline = object : NoticeProcessorPipelineImpl(bot) {
init {
bot.components.noticeProcessorPipeline.processors.forEach { registerProcessor(it) }
}
override fun createContext(bot: QQAndroidBot, attributes: TypeSafeMap): NoticePipelineContext =
createContext(this, bot, attributes)
}, block)
override fun createContext(attributes: TypeSafeMap): NoticePipelineContext =
createContext(this, attributes)
}, block)
fun setBot(id: Long): QQAndroidBot {
bot = createBot(BotAccount(id, "a"))

View File

@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
@ -104,8 +104,8 @@ internal class BotInvitedJoinTest : AbstractNoticeProcessorTest() {
suspend fun `invited join, accepted`() {
// https://github.com/mamoe/mirai/issues/1213
suspend fun runTest() = use(
createContext = { bot, attributes ->
object : NoticeProcessorPipelineImpl.ContextImpl(bot, attributes) {
createContext = { attributes ->
object : NoticeProcessorPipelineImpl.ContextImpl(attributes) {
override suspend fun QQAndroidBot.addNewGroupByCode(code: Long): GroupImpl {
assertEquals(2230203, code)
return bot.addGroup(2230203, 1230001, name = "testtest").apply {
@ -154,8 +154,8 @@ internal class BotInvitedJoinTest : AbstractNoticeProcessorTest() {
@Test
suspend fun `invitation accepted`() {
suspend fun runTest() =
use(createContext = { bot, attributes ->
object : NoticeProcessorPipelineImpl.ContextImpl(bot, attributes) {
use(createContext = { attributes ->
object : NoticeProcessorPipelineImpl.ContextImpl(attributes) {
override suspend fun QQAndroidBot.addNewGroupByCode(code: Long): GroupImpl {
assertEquals(2230203, code)
return bot.addGroup(2230203, 1230002, name = "testtest").apply {