From e097c5ab9d8306a842bc684c5fc73faec7b6634b Mon Sep 17 00:00:00 2001 From: Him188 Date: Fri, 13 Aug 2021 16:51:25 +0800 Subject: [PATCH] Add `MutableProcessResult`; remove locks --- .../src/commonMain/kotlin/QQAndroidBot.kt | 19 +++-- .../components/NoticeProcessorPipeline.kt | 70 ++++++++++--------- 2 files changed, 47 insertions(+), 42 deletions(-) diff --git a/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt b/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt index eeb8f2feb..1292f3213 100644 --- a/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt +++ b/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt @@ -155,16 +155,15 @@ internal open class QQAndroidBot constructor( val pipelineLogger = networkLogger.subLogger("NoticeProcessor") // shorten name set( NoticeProcessorPipeline, - NoticeProcessorPipelineImpl().apply { - registerProcessor(MsgInfoDecoder(pipelineLogger)) - - registerProcessor(FriendNoticeProcessor(pipelineLogger)) - registerProcessor(GroupListNoticeProcessor(pipelineLogger)) - registerProcessor(GroupMessageProcessor()) - registerProcessor(PrivateMessageNoticeProcessor()) - registerProcessor(OtherClientNoticeProcessor()) - registerProcessor(UnconsumedNoticesAlerter(pipelineLogger)) - }, + NoticeProcessorPipelineImpl.create( + MsgInfoDecoder(pipelineLogger), + FriendNoticeProcessor(pipelineLogger), + GroupListNoticeProcessor(pipelineLogger), + GroupMessageProcessor(), + PrivateMessageNoticeProcessor(), + OtherClientNoticeProcessor(), + UnconsumedNoticesAlerter(pipelineLogger), + ) ) set(SsoProcessorContext, SsoProcessorContextImpl(bot)) diff --git a/mirai-core/src/commonMain/kotlin/network/components/NoticeProcessorPipeline.kt b/mirai-core/src/commonMain/kotlin/network/components/NoticeProcessorPipeline.kt index 74f25a952..fc9a78f23 100644 --- a/mirai-core/src/commonMain/kotlin/network/components/NoticeProcessorPipeline.kt +++ b/mirai-core/src/commonMain/kotlin/network/components/NoticeProcessorPipeline.kt @@ -32,9 +32,7 @@ import net.mamoe.mirai.utils.toDebugString import net.mamoe.mirai.utils.uncheckedCast import java.util.* import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.locks.ReentrantReadWriteLock -import kotlin.concurrent.read -import kotlin.concurrent.write +import java.util.concurrent.CopyOnWriteArrayList import kotlin.reflect.KClass internal typealias ProcessResult = Collection @@ -63,6 +61,11 @@ internal interface NoticeProcessorPipeline { } } +@JvmInline +internal value class MutableProcessResult( + val data: MutableCollection +) + internal interface PipelineContext { val bot: QQAndroidBot @@ -92,11 +95,10 @@ internal interface PipelineContext { annotation class ConsumptionMarker // to give an explicit color. - val collected: Collection + val collected: MutableProcessResult // DSL to simplify some expressions - operator fun Collection.plusAssign(packet: Packet) { - require(this === collected) { "`plusAssign` can only be applied to `collected`" } + operator fun MutableProcessResult.plusAssign(packet: Packet) { collect(packet) } @@ -126,21 +128,17 @@ internal interface PipelineContext { internal inline val PipelineContext.context get() = this -internal open class NoticeProcessorPipelineImpl : NoticeProcessorPipeline { - private val processors = ArrayList() - private val processorsLock = ReentrantReadWriteLock() +internal open class NoticeProcessorPipelineImpl private constructor() : NoticeProcessorPipeline { + private val processors = CopyOnWriteArrayList() override fun registerProcessor(processor: NoticeProcessor) { - processorsLock.write { - processors.add(processor) - } + processors.add(processor) } inner class ContextImpl( override val bot: QQAndroidBot, override val attributes: TypeSafeMap, ) : PipelineContext { - private val consumers: Stack = Stack() override val isConsumed: Boolean = consumers.isNotEmpty() @@ -154,14 +152,14 @@ internal open class NoticeProcessorPipelineImpl : NoticeProcessorPipeline { } } - override val collected = ConcurrentLinkedQueue() + override val collected = MutableProcessResult(ConcurrentLinkedQueue()) override fun collect(packet: Packet) { - collected.add(packet) + collected.data.add(packet) } override fun collect(packets: Iterable) { - this.collected.addAll(packets) + this.collected.data.addAll(packets) } override suspend fun fire(data: ProtocolStruct): ProcessResult { @@ -171,30 +169,38 @@ internal open class NoticeProcessorPipelineImpl : NoticeProcessorPipeline { override suspend fun process(bot: QQAndroidBot, data: ProtocolStruct, attributes: TypeSafeMap): ProcessResult { - processorsLock.read { - val context = ContextImpl(bot, attributes) - for (processor in processors) { - kotlin.runCatching { - processor.process(context, data) - }.onFailure { e -> - context.collect( - ParseErrorPacket( - data, - IllegalStateException( - "Exception in $processor while processing packet ${packetToString(data)}.", - e, - ), + val context = ContextImpl(bot, attributes) + for (processor in processors) { + kotlin.runCatching { + processor.process(context, data) + }.onFailure { e -> + context.collect( + ParseErrorPacket( + data, + IllegalStateException( + "Exception in $processor while processing packet ${packetToString(data)}.", + e, ), - ) - } + ), + ) } - return context.collected } + return context.collected.data } protected open fun packetToString(data: Any?): String = data.toDebugString("mirai.network.debug.notice.pipeline.log.full") + + companion object { + fun createEmpty(): NoticeProcessorPipelineImpl = NoticeProcessorPipelineImpl() + fun create(vararg processors: NoticeProcessor): NoticeProcessorPipelineImpl = + NoticeProcessorPipelineImpl().apply { + for (processor in processors) { + registerProcessor(processor) + } + } + } } ///////////////////////////////////////////////////////////////////////////