Decode packets in netty event loop group (#1500)

* Decode packets in netty event loop group

* Update mirai-core/src/commonTest/kotlin/network/framework/AbstractNettyNHTest.kt

Co-authored-by: Him188 <Him188@mamoe.net>

Co-authored-by: Him188 <Him188@mamoe.net>
This commit is contained in:
Karlatemp 2021-09-01 14:00:27 +08:00 committed by GitHub
parent 4d76aa82b0
commit 88b66d7863
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 5 deletions

View File

@ -113,12 +113,18 @@ internal open class NettyNetworkHandler(
.addLast(RawIncomingPacketCollector(decodePipeline))
}
protected open fun createDummyDecodePipeline() = PacketDecodePipeline(this@NettyNetworkHandler.coroutineContext)
// can be overridden for tests
protected open suspend fun createConnection(decodePipeline: PacketDecodePipeline): NettyChannel {
protected open suspend fun createConnection(): NettyChannel {
packetLogger.debug { "Connecting to $address" }
val contextResult = CompletableDeferred<NettyChannel>()
val eventLoopGroup = NioEventLoopGroup()
val decodePipeline = PacketDecodePipeline(
this@NettyNetworkHandler.coroutineContext
.plus(eventLoopGroup.asCoroutineDispatcher())
)
val future = Bootstrap().group(eventLoopGroup)
.channel(NioSocketChannel::class.java)
@ -159,8 +165,6 @@ internal open class NettyNetworkHandler(
return contextResult.await()
}
protected val decodePipeline = PacketDecodePipeline(this@NettyNetworkHandler.coroutineContext)
protected inner class PacketDecodePipeline(parentContext: CoroutineContext) :
CoroutineScope by parentContext.childScope() {
private val packetCodec: PacketCodec by lazy { context[PacketCodec] }
@ -241,7 +245,7 @@ internal open class NettyNetworkHandler(
private val collectiveExceptions: ExceptionCollector,
) : NettyState(State.CONNECTING) {
private val connection = async {
createConnection(decodePipeline)
createConnection()
}
@Suppress("JoinDeclarationAndAssignment")

View File

@ -33,7 +33,10 @@ internal abstract class TestNettyNH(
address: SocketAddress,
) : NettyNetworkHandler(context, address), ITestNetworkHandler {
abstract override suspend fun createConnection(decodePipeline: PacketDecodePipeline): Channel
protected abstract suspend fun createConnection(decodePipeline: PacketDecodePipeline): Channel
final override suspend fun createConnection(): Channel {
return createConnection(createDummyDecodePipeline())
}
override fun setStateClosed(exception: Throwable?): NetworkHandlerSupport.BaseStateImpl? {
return setState { StateClosed(exception) }