Add MutableProcessResult; remove locks

This commit is contained in:
Him188 2021-08-13 16:51:25 +08:00
parent d15767fa9b
commit e097c5ab9d
2 changed files with 47 additions and 42 deletions

View File

@ -155,16 +155,15 @@ internal open class QQAndroidBot constructor(
val pipelineLogger = networkLogger.subLogger("NoticeProcessor") // shorten name
set(
NoticeProcessorPipeline,
NoticeProcessorPipelineImpl().apply {
registerProcessor(MsgInfoDecoder(pipelineLogger))
registerProcessor(FriendNoticeProcessor(pipelineLogger))
registerProcessor(GroupListNoticeProcessor(pipelineLogger))
registerProcessor(GroupMessageProcessor())
registerProcessor(PrivateMessageNoticeProcessor())
registerProcessor(OtherClientNoticeProcessor())
registerProcessor(UnconsumedNoticesAlerter(pipelineLogger))
},
NoticeProcessorPipelineImpl.create(
MsgInfoDecoder(pipelineLogger),
FriendNoticeProcessor(pipelineLogger),
GroupListNoticeProcessor(pipelineLogger),
GroupMessageProcessor(),
PrivateMessageNoticeProcessor(),
OtherClientNoticeProcessor(),
UnconsumedNoticesAlerter(pipelineLogger),
)
)
set(SsoProcessorContext, SsoProcessorContextImpl(bot))

View File

@ -32,9 +32,7 @@ import net.mamoe.mirai.utils.toDebugString
import net.mamoe.mirai.utils.uncheckedCast
import java.util.*
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.read
import kotlin.concurrent.write
import java.util.concurrent.CopyOnWriteArrayList
import kotlin.reflect.KClass
internal typealias ProcessResult = Collection<Packet>
@ -63,6 +61,11 @@ internal interface NoticeProcessorPipeline {
}
}
@JvmInline
internal value class MutableProcessResult(
val data: MutableCollection<Packet>
)
internal interface PipelineContext {
val bot: QQAndroidBot
@ -92,11 +95,10 @@ internal interface PipelineContext {
annotation class ConsumptionMarker // to give an explicit color.
val collected: Collection<Packet>
val collected: MutableProcessResult
// DSL to simplify some expressions
operator fun Collection<Packet>.plusAssign(packet: Packet) {
require(this === collected) { "`plusAssign` can only be applied to `collected`" }
operator fun MutableProcessResult.plusAssign(packet: Packet) {
collect(packet)
}
@ -126,21 +128,17 @@ internal interface PipelineContext {
internal inline val PipelineContext.context get() = this
internal open class NoticeProcessorPipelineImpl : NoticeProcessorPipeline {
private val processors = ArrayList<NoticeProcessor>()
private val processorsLock = ReentrantReadWriteLock()
internal open class NoticeProcessorPipelineImpl private constructor() : NoticeProcessorPipeline {
private val processors = CopyOnWriteArrayList<NoticeProcessor>()
override fun registerProcessor(processor: NoticeProcessor) {
processorsLock.write {
processors.add(processor)
}
processors.add(processor)
}
inner class ContextImpl(
override val bot: QQAndroidBot, override val attributes: TypeSafeMap,
) : PipelineContext {
private val consumers: Stack<NoticeProcessor> = Stack()
override val isConsumed: Boolean = consumers.isNotEmpty()
@ -154,14 +152,14 @@ internal open class NoticeProcessorPipelineImpl : NoticeProcessorPipeline {
}
}
override val collected = ConcurrentLinkedQueue<Packet>()
override val collected = MutableProcessResult(ConcurrentLinkedQueue())
override fun collect(packet: Packet) {
collected.add(packet)
collected.data.add(packet)
}
override fun collect(packets: Iterable<Packet>) {
this.collected.addAll(packets)
this.collected.data.addAll(packets)
}
override suspend fun fire(data: ProtocolStruct): ProcessResult {
@ -171,30 +169,38 @@ internal open class NoticeProcessorPipelineImpl : NoticeProcessorPipeline {
override suspend fun process(bot: QQAndroidBot, data: ProtocolStruct, attributes: TypeSafeMap): ProcessResult {
processorsLock.read {
val context = ContextImpl(bot, attributes)
for (processor in processors) {
kotlin.runCatching {
processor.process(context, data)
}.onFailure { e ->
context.collect(
ParseErrorPacket(
data,
IllegalStateException(
"Exception in $processor while processing packet ${packetToString(data)}.",
e,
),
val context = ContextImpl(bot, attributes)
for (processor in processors) {
kotlin.runCatching {
processor.process(context, data)
}.onFailure { e ->
context.collect(
ParseErrorPacket(
data,
IllegalStateException(
"Exception in $processor while processing packet ${packetToString(data)}.",
e,
),
)
}
),
)
}
return context.collected
}
return context.collected.data
}
protected open fun packetToString(data: Any?): String =
data.toDebugString("mirai.network.debug.notice.pipeline.log.full")
companion object {
fun createEmpty(): NoticeProcessorPipelineImpl = NoticeProcessorPipelineImpl()
fun create(vararg processors: NoticeProcessor): NoticeProcessorPipelineImpl =
NoticeProcessorPipelineImpl().apply {
for (processor in processors) {
registerProcessor(processor)
}
}
}
}
///////////////////////////////////////////////////////////////////////////