Improve SelectorNetworkHandler lifecycle: do not tolerant any exception thrown by states

This commit is contained in:
Him188 2021-05-31 01:32:42 +08:00
parent 0505dc41fa
commit 266d73f8a1
10 changed files with 150 additions and 71 deletions

View File

@ -17,6 +17,7 @@ import net.mamoe.mirai.internal.network.components.PacketCodec
import net.mamoe.mirai.internal.network.components.PacketHandler
import net.mamoe.mirai.internal.network.components.PacketLoggingStrategy
import net.mamoe.mirai.internal.network.components.RawIncomingPacket
import net.mamoe.mirai.internal.network.handler.selector.NetworkHandlerSelector
import net.mamoe.mirai.internal.network.handler.state.StateObserver
import net.mamoe.mirai.internal.network.protocol.packet.IncomingPacket
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
@ -136,6 +137,15 @@ internal abstract class NetworkHandlerSupport(
* CoroutineScope is cancelled when switched to another state.
*
* State can only be changed inside [setState].
*
* **IMPORTANT notes to lifecycle:**
*
* Normally if the state is set to [NetworkHandler.State.CLOSED] by [setState], [selector][NetworkHandlerSelector] may reinitialize an instance.
*
* Any exception caught by the scope (supervisor job) is considered as _fatal failure_ that will set state to CLOSE and **propagate the exception to user of [selector][NetworkHandlerSelector]**.
*
*
* You must catch all the exceptions and change states by [setState] manually.
*/
abstract inner class BaseStateImpl(
val correspondingState: NetworkHandler.State,

View File

@ -15,7 +15,6 @@ import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory
import net.mamoe.mirai.utils.systemProp
import net.mamoe.mirai.utils.toLongUnsigned
import org.jetbrains.annotations.TestOnly
/**
* A lazy stateful implementation of [NetworkHandlerSelector].
@ -38,7 +37,7 @@ internal abstract class AbstractKeepAliveNetworkHandlerSelector<H : NetworkHandl
private val current = atomic<H?>(null)
@TestOnly
@net.mamoe.mirai.utils.TestOnly
internal fun setCurrent(h: H) {
current.value = h
}

View File

@ -34,9 +34,12 @@ internal class SelectorNetworkHandler(
override val context: NetworkHandlerContext, // impl notes: may consider to move into function member.
private val selector: NetworkHandlerSelector<*>,
) : NetworkHandler {
@Volatile
private var lastCancellationCause: Throwable? = null
private val scope = CoroutineScope(SupervisorJob(context.bot.coroutineContext[Job]))
private suspend inline fun instance(): NetworkHandler {
if (!scope.isActive) scope.coroutineContext.job.join()
if (!scope.isActive) throw lastCancellationCause ?: error("SelectorNetworkHandler is already closed")
return selector.awaitResumeInstance()
}
@ -56,8 +59,12 @@ internal class SelectorNetworkHandler(
override suspend fun sendWithoutExpect(packet: OutgoingPacket) = instance().sendWithoutExpect(packet)
override fun close(cause: Throwable?) {
synchronized(scope) {
if (scope.isActive) scope.cancel()
else return
if (scope.isActive) {
lastCancellationCause = cause
scope.cancel()
} else {
return
}
}
selector.getResumedInstance()?.close(cause)
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 Mamoe Technologies and contributors.
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
@ -27,4 +27,8 @@ internal object MessageSvcPushForceOffline :
@Suppress("INVISIBLE_MEMBER")
return BotOfflineEvent.Force(bot, title = struct.title ?: "", message = struct.tips ?: "")
}
override suspend fun QQAndroidBot.handle(packet: BotOfflineEvent.Force) {
network.close(IllegalStateException("Closed by MessageSvc.PushForceOffline: $packet"))
}
}

View File

@ -116,6 +116,15 @@ internal abstract class AbstractRealNetworkHandlerTest<H : NetworkHandler> : Abs
* [additionalComponents] overrides [defaultComponents] and [QQAndroidBot.components]
*/
open fun createHandler(additionalComponents: ComponentStorage? = null): H {
return factory.create(
createContext(additionalComponents),
address
)
}
val address = InetSocketAddress.createUnresolved("localhost", 123)
open fun createContext(additionalComponents: ComponentStorage? = null): NetworkHandlerContextImpl {
// StateObserver
val components = additionalComponents + defaultComponents + bot.createDefaultComponents()
val observerComponents = if (
@ -127,13 +136,11 @@ internal abstract class AbstractRealNetworkHandlerTest<H : NetworkHandler> : Abs
set(StateObserver, bot.run { components.stateObserverChain() })
}
} else null
return factory.create(
NetworkHandlerContextImpl(
bot,
networkLogger,
observerComponents + components
),
InetSocketAddress.createUnresolved("localhost", 123)
return NetworkHandlerContextImpl(
bot,
networkLogger,
observerComponents + components
)
}

View File

@ -0,0 +1,51 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
@file:OptIn(TestOnly::class)
package net.mamoe.mirai.internal.network.handler
import io.netty.channel.Channel
import kotlinx.coroutines.CancellationException
import net.mamoe.mirai.internal.network.impl.netty.AbstractNettyNHTest
import net.mamoe.mirai.internal.network.impl.netty.TestNettyNH
import net.mamoe.mirai.internal.test.runBlockingUnit
import net.mamoe.mirai.utils.TestOnly
import net.mamoe.mirai.utils.getRootCause
import org.junit.jupiter.api.assertThrows
import java.net.SocketAddress
import kotlin.test.Test
import kotlin.test.assertIs
internal class KeepAliveNetworkHandlerSelectorRealTest : AbstractNettyNHTest() {
internal class FakeFailOnCreatingConnection : AbstractNettyNHTest() {
private class MyException : Exception()
override val factory: NetworkHandlerFactory<TestNettyNH> = object : NetworkHandlerFactory<TestNettyNH> {
override fun create(context: NetworkHandlerContext, address: SocketAddress): TestNettyNH {
return object : TestNettyNH(bot, context, address) {
override suspend fun createConnection(decodePipeline: PacketDecodePipeline): Channel =
throw MyException()
}
}
}
@Test
fun `should not tolerant any exception thrown by states`() = runBlockingUnit {
// selector should not tolerant any exception during state initialization, or in the Jobs launched in states.
val selector = TestSelector(3) { createHandler() }
assertThrows<CancellationException> { selector.awaitResumeInstance() }.run {
assertIs<MyException>(getRootCause())
}
}
}
}

View File

@ -7,17 +7,20 @@
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
@file:OptIn(TestOnly::class)
package net.mamoe.mirai.internal.network.handler
import net.mamoe.mirai.internal.network.framework.AbstractMockNetworkHandlerTest
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.selector.AbstractKeepAliveNetworkHandlerSelector
import net.mamoe.mirai.internal.test.runBlockingUnit
import net.mamoe.mirai.utils.TestOnly
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.*
import kotlin.time.Duration
private class TestSelector :
internal class TestSelector :
AbstractKeepAliveNetworkHandlerSelector<NetworkHandler> {
val createInstance0: () -> NetworkHandler

View File

@ -0,0 +1,49 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.handler
import io.netty.channel.Channel
import net.mamoe.mirai.internal.network.framework.AbstractRealNetworkHandlerTest
import net.mamoe.mirai.internal.network.handler.selector.SelectorNetworkHandler
import net.mamoe.mirai.internal.network.impl.netty.AbstractNettyNHTest
import net.mamoe.mirai.internal.network.impl.netty.TestNettyNH
import net.mamoe.mirai.internal.test.runBlockingUnit
import net.mamoe.mirai.utils.cast
import org.junit.jupiter.api.Test
import java.net.SocketAddress
import kotlin.test.assertFails
internal class SelectorNetworkHandlerTest : AbstractRealNetworkHandlerTest<SelectorNetworkHandler>() {
val channel = AbstractNettyNHTest.NettyNHTestChannel()
private val selector = TestSelector {
object : TestNettyNH(bot, createContext(), address) {
override suspend fun createConnection(decodePipeline: PacketDecodePipeline): Channel {
return channel
}
}
}
override val factory: NetworkHandlerFactory<SelectorNetworkHandler> =
object : NetworkHandlerFactory<SelectorNetworkHandler> {
override fun create(context: NetworkHandlerContext, address: SocketAddress): SelectorNetworkHandler {
return SelectorNetworkHandler(context, selector)
}
}
override val network: SelectorNetworkHandler get() = bot.network.cast()
@Test
fun `stop on manual close`() = runBlockingUnit {
network.resumeConnection()
network.close(IllegalStateException("Closed by test"))
assertFails { network.resumeConnection() }
}
}

View File

@ -24,6 +24,9 @@ import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory
import net.mamoe.mirai.utils.ExceptionCollector
import java.net.SocketAddress
/**
* You may need to override [createConnection]
*/
internal open class TestNettyNH(
override val bot: QQAndroidBot,
context: NetworkHandlerContext,
@ -50,9 +53,10 @@ internal open class TestNettyNH(
}
internal abstract class AbstractNettyNHTest : AbstractRealNetworkHandlerTest<TestNettyNH>() {
var fakeServer: (NettyNHTestChannel.(msg: Any?) -> Unit)? = null
internal inner class NettyNHTestChannel : EmbeddedChannel() {
class NettyNHTestChannel(
var fakeServer: (NettyNHTestChannel.(msg: Any?) -> Unit)? = null
) : EmbeddedChannel() {
public /*internal*/ override fun doRegister() {
super.doRegister() // Set channel state to ACTIVE
// Drop old handlers

View File

@ -1,55 +0,0 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.impl.netty
import io.netty.channel.Channel
import kotlinx.coroutines.delay
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext
import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory
import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
import net.mamoe.mirai.internal.test.runBlockingUnit
import java.net.SocketAddress
import kotlin.reflect.jvm.javaGetter
import kotlin.test.Test
import kotlin.test.assertTrue
/**
* When offline, handler will try endlessly to re-establish a connection. Exceptions are preserved as suppressed exceptions however, duplicates must be dropped to save memory.
*/
internal class NettyEndlessReconnectionTest : AbstractNettyNHTest() {
override val factory: NetworkHandlerFactory<TestNettyNH> = object : NetworkHandlerFactory<TestNettyNH> {
override fun create(context: NetworkHandlerContext, address: SocketAddress): TestNettyNH {
return object : TestNettyNH(bot, context, address) {
override suspend fun createConnection(decodePipeline: PacketDecodePipeline): Channel =
error("fail")
}
}
}
@Test
fun `massive reconnection`() = runBlockingUnit {
val r = NettyNetworkHandler.Companion.RECONNECT_DELAY
NettyNetworkHandler.Companion.RECONNECT_DELAY = 0
network.setStateConnecting() // will connect endlessly and create a massive amount of exceptions
delay(10000) // if exceptions are ignored by ExceptionCollector, memory usage will not exceed limitation.
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
val state = network::_state.javaGetter!!.apply { isAccessible = true }
.invoke(network) as NetworkHandlerSupport.BaseStateImpl
assertTrue(state.toString()) { state.getCause()!!.suppressed.size <= 1 } // might be zero if just created since at this time network is still running.
// size <= 1 means duplicates are dropped.
network.close(null)
NettyNetworkHandler.Companion.RECONNECT_DELAY = r
}
}