Avoid network congestion

This commit is contained in:
Karlatemp 2021-06-11 21:19:35 +08:00
parent 05a8419fb7
commit e9d9d56489
No known key found for this signature in database
GPG Key ID: 21FBDDF664FF06F8
2 changed files with 18 additions and 17 deletions

View File

@ -92,11 +92,11 @@ internal abstract class NetworkHandlerSupport(
final override suspend fun sendAndExpect(packet: OutgoingPacket, timeout: Long, attempts: Int): Packet? {
require(attempts >= 1) { "attempts must be at least 1." }
val listener = PacketListener(packet.commandName, packet.sequenceId)
packetListeners.add(listener)
withExceptionCollector {
repeat(attempts) {
context[PacketLoggingStrategy].logSent(logger, packet)
try {
packetListeners.add(listener)
try {
repeat(attempts) {
context[PacketLoggingStrategy].logSent(logger, packet)
sendPacketImpl(packet)
try {
return withTimeout(timeout) {
@ -105,12 +105,12 @@ internal abstract class NetworkHandlerSupport(
} catch (e: TimeoutCancellationException) {
collectException(e)
}
} finally {
listener.result.completeExceptionally(getLast() ?: IllegalStateException("No response"))
packetListeners.remove(listener)
}
throwLast()
} finally {
packetListeners.remove(listener)
listener.result.completeExceptionally(getLast() ?: IllegalStateException("No response"))
}
throwLast()
}
}

View File

@ -20,8 +20,6 @@ import io.netty.handler.codec.MessageToByteEncoder
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import net.mamoe.mirai.internal.network.components.*
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext
@ -185,13 +183,16 @@ internal open class NettyNetworkHandler(
}
init {
launch(CoroutineName("PacketDecodePipeline processor")) {
// 'single thread' processor
channel.consumeAsFlow().collect { raw ->
val result = packetCodec.processBody(context.bot, raw)
if (result == null) {
collectUnknownPacket(raw)
} else collectReceived(result)
repeat(4) { processorId ->
launch(CoroutineName("PacketDecodePipeline processor #$processorId")) {
while (isActive) {
val raw = channel.receiveCatching().getOrNull() ?: return@launch
packetLogger.debug { "Packet Handling Processor #$processorId: receive packet ${raw.commandName}" }
val result = packetCodec.processBody(context.bot, raw)
if (result == null) {
collectUnknownPacket(raw)
} else collectReceived(result)
}
}
}
}