From e9d9d56489f9fafe2a86882426c9f03c8a54e9e8 Mon Sep 17 00:00:00 2001 From: Karlatemp Date: Fri, 11 Jun 2021 21:19:35 +0800 Subject: [PATCH] Avoid network congestion --- .../network/handler/NetworkHandlerSupport.kt | 16 ++++++++-------- .../network/impl/netty/NettyNetworkHandler.kt | 19 ++++++++++--------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerSupport.kt b/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerSupport.kt index d55de8416..abb44cc04 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerSupport.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerSupport.kt @@ -92,11 +92,11 @@ internal abstract class NetworkHandlerSupport( final override suspend fun sendAndExpect(packet: OutgoingPacket, timeout: Long, attempts: Int): Packet? { require(attempts >= 1) { "attempts must be at least 1." } val listener = PacketListener(packet.commandName, packet.sequenceId) + packetListeners.add(listener) withExceptionCollector { - repeat(attempts) { - context[PacketLoggingStrategy].logSent(logger, packet) - try { - packetListeners.add(listener) + try { + repeat(attempts) { + context[PacketLoggingStrategy].logSent(logger, packet) sendPacketImpl(packet) try { return withTimeout(timeout) { @@ -105,12 +105,12 @@ internal abstract class NetworkHandlerSupport( } catch (e: TimeoutCancellationException) { collectException(e) } - } finally { - listener.result.completeExceptionally(getLast() ?: IllegalStateException("No response")) - packetListeners.remove(listener) } + throwLast() + } finally { + packetListeners.remove(listener) + listener.result.completeExceptionally(getLast() ?: IllegalStateException("No response")) } - throwLast() } } diff --git a/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt b/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt index 350dafd6c..b97222209 100644 --- a/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt +++ b/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt @@ -20,8 +20,6 @@ import io.netty.handler.codec.MessageToByteEncoder import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.trySendBlocking -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.consumeAsFlow import net.mamoe.mirai.internal.network.components.* import net.mamoe.mirai.internal.network.handler.NetworkHandler.State import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext @@ -185,13 +183,16 @@ internal open class NettyNetworkHandler( } init { - launch(CoroutineName("PacketDecodePipeline processor")) { - // 'single thread' processor - channel.consumeAsFlow().collect { raw -> - val result = packetCodec.processBody(context.bot, raw) - if (result == null) { - collectUnknownPacket(raw) - } else collectReceived(result) + repeat(4) { processorId -> + launch(CoroutineName("PacketDecodePipeline processor #$processorId")) { + while (isActive) { + val raw = channel.receiveCatching().getOrNull() ?: return@launch + packetLogger.debug { "Packet Handling Processor #$processorId: receive packet ${raw.commandName}" } + val result = packetCodec.processBody(context.bot, raw) + if (result == null) { + collectUnknownPacket(raw) + } else collectReceived(result) + } } } }