From 88b66d7863bb189002ac740df8f72f8e6373474e Mon Sep 17 00:00:00 2001
From: Karlatemp <karlatemp@vip.qq.com>
Date: Wed, 1 Sep 2021 14:00:27 +0800
Subject: [PATCH] Decode packets in netty event loop group (#1500)

* Decode packets in netty event loop group

* Update mirai-core/src/commonTest/kotlin/network/framework/AbstractNettyNHTest.kt

Co-authored-by: Him188 <Him188@mamoe.net>

Co-authored-by: Him188 <Him188@mamoe.net>
---
 .../kotlin/network/impl/netty/NettyNetworkHandler.kt | 12 ++++++++----
 .../kotlin/network/framework/AbstractNettyNHTest.kt  |  5 ++++-
 2 files changed, 12 insertions(+), 5 deletions(-)

diff --git a/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt b/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt
index 0be48b9d7..0fd22c82e 100644
--- a/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt
+++ b/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt
@@ -113,12 +113,18 @@ internal open class NettyNetworkHandler(
             .addLast(RawIncomingPacketCollector(decodePipeline))
     }
 
+    protected open fun createDummyDecodePipeline() = PacketDecodePipeline(this@NettyNetworkHandler.coroutineContext)
+
     // can be overridden for tests
-    protected open suspend fun createConnection(decodePipeline: PacketDecodePipeline): NettyChannel {
+    protected open suspend fun createConnection(): NettyChannel {
         packetLogger.debug { "Connecting to $address" }
 
         val contextResult = CompletableDeferred<NettyChannel>()
         val eventLoopGroup = NioEventLoopGroup()
+        val decodePipeline = PacketDecodePipeline(
+            this@NettyNetworkHandler.coroutineContext
+                .plus(eventLoopGroup.asCoroutineDispatcher())
+        )
 
         val future = Bootstrap().group(eventLoopGroup)
             .channel(NioSocketChannel::class.java)
@@ -159,8 +165,6 @@ internal open class NettyNetworkHandler(
         return contextResult.await()
     }
 
-    protected val decodePipeline = PacketDecodePipeline(this@NettyNetworkHandler.coroutineContext)
-
     protected inner class PacketDecodePipeline(parentContext: CoroutineContext) :
         CoroutineScope by parentContext.childScope() {
         private val packetCodec: PacketCodec by lazy { context[PacketCodec] }
@@ -241,7 +245,7 @@ internal open class NettyNetworkHandler(
         private val collectiveExceptions: ExceptionCollector,
     ) : NettyState(State.CONNECTING) {
         private val connection = async {
-            createConnection(decodePipeline)
+            createConnection()
         }
 
         @Suppress("JoinDeclarationAndAssignment")
diff --git a/mirai-core/src/commonTest/kotlin/network/framework/AbstractNettyNHTest.kt b/mirai-core/src/commonTest/kotlin/network/framework/AbstractNettyNHTest.kt
index ff96b66f6..21ad528d5 100644
--- a/mirai-core/src/commonTest/kotlin/network/framework/AbstractNettyNHTest.kt
+++ b/mirai-core/src/commonTest/kotlin/network/framework/AbstractNettyNHTest.kt
@@ -33,7 +33,10 @@ internal abstract class TestNettyNH(
     address: SocketAddress,
 ) : NettyNetworkHandler(context, address), ITestNetworkHandler {
 
-    abstract override suspend fun createConnection(decodePipeline: PacketDecodePipeline): Channel
+    protected abstract suspend fun createConnection(decodePipeline: PacketDecodePipeline): Channel
+    final override suspend fun createConnection(): Channel {
+        return createConnection(createDummyDecodePipeline())
+    }
 
     override fun setStateClosed(exception: Throwable?): NetworkHandlerSupport.BaseStateImpl? {
         return setState { StateClosed(exception) }