Revise exception handling in NetworkHandler, involving:

- HeartbeatProcessor
- HeartbeatFailedException: IOException is not recoverable, since this is not even thrown
This commit is contained in:
Him188 2022-05-13 15:01:45 +01:00
parent 2f40d3f432
commit 1c7e3bc5a1
10 changed files with 71 additions and 40 deletions

View File

@ -1,14 +1,16 @@
/* /*
* Copyright 2019-2021 Mamoe Technologies and contributors. * Copyright 2019-2022 Mamoe Technologies and contributors.
* *
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* *
* https://github.com/mamoe/mirai/blob/master/LICENSE * https://github.com/mamoe/mirai/blob/dev/LICENSE
*/ */
package net.mamoe.mirai.internal.network.components package net.mamoe.mirai.internal.network.components
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.TimeoutCancellationException
import net.mamoe.mirai.internal.network.component.ComponentKey import net.mamoe.mirai.internal.network.component.ComponentKey
import net.mamoe.mirai.internal.network.handler.NetworkHandler import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.protocol.packet.login.Heartbeat import net.mamoe.mirai.internal.network.protocol.packet.login.Heartbeat
@ -16,21 +18,31 @@ import net.mamoe.mirai.internal.network.protocol.packet.login.StatSvc
import net.mamoe.mirai.internal.network.protocol.packet.sendAndExpect import net.mamoe.mirai.internal.network.protocol.packet.sendAndExpect
internal interface HeartbeatProcessor { internal interface HeartbeatProcessor {
/**
@Throws(Exception::class) * @throws TimeoutCancellationException if timed out waiting for response.
* @throws CancellationException if [networkHandler] closed.
* @throws Exception any other exceptions considered critical internal error and will stop SSO (i.e. stop Bot).
*/
suspend fun doAliveHeartbeatNow(networkHandler: NetworkHandler) suspend fun doAliveHeartbeatNow(networkHandler: NetworkHandler)
@Throws(Exception::class) /**
* @throws TimeoutCancellationException if timed out waiting for response.
* @throws CancellationException if [networkHandler] closed.
* @throws Exception any other exceptions considered critical internal error and will stop SSO (i.e. stop Bot).
*/
suspend fun doStatHeartbeatNow(networkHandler: NetworkHandler) suspend fun doStatHeartbeatNow(networkHandler: NetworkHandler)
@Throws(Exception::class) /**
* @throws TimeoutCancellationException if timed out waiting for response.
* @throws CancellationException if [networkHandler] closed.
* @throws Exception any other exceptions considered critical internal error and will stop SSO (i.e. stop Bot).
*/
suspend fun doRegisterNow(networkHandler: NetworkHandler): StatSvc.Register.Response suspend fun doRegisterNow(networkHandler: NetworkHandler): StatSvc.Register.Response
companion object : ComponentKey<HeartbeatProcessor> companion object : ComponentKey<HeartbeatProcessor>
} }
internal class HeartbeatProcessorImpl : HeartbeatProcessor { internal class HeartbeatProcessorImpl : HeartbeatProcessor {
@Throws(Exception::class)
override suspend fun doStatHeartbeatNow(networkHandler: NetworkHandler) { override suspend fun doStatHeartbeatNow(networkHandler: NetworkHandler) {
StatSvc.SimpleGet(networkHandler.context.bot.client).sendAndExpect( StatSvc.SimpleGet(networkHandler.context.bot.client).sendAndExpect(
networkHandler, networkHandler,
@ -39,7 +51,6 @@ internal class HeartbeatProcessorImpl : HeartbeatProcessor {
) )
} }
@Throws(Exception::class)
override suspend fun doAliveHeartbeatNow(networkHandler: NetworkHandler) { override suspend fun doAliveHeartbeatNow(networkHandler: NetworkHandler) {
Heartbeat.Alive(networkHandler.context.bot.client).sendAndExpect( Heartbeat.Alive(networkHandler.context.bot.client).sendAndExpect(
networkHandler, networkHandler,
@ -48,7 +59,6 @@ internal class HeartbeatProcessorImpl : HeartbeatProcessor {
) )
} }
@Throws(Exception::class)
override suspend fun doRegisterNow(networkHandler: NetworkHandler): StatSvc.Register.Response { override suspend fun doRegisterNow(networkHandler: NetworkHandler): StatSvc.Register.Response {
return networkHandler.context[SsoProcessor].sendRegister(networkHandler) return networkHandler.context[SsoProcessor].sendRegister(networkHandler)
} }

View File

@ -13,11 +13,15 @@ import kotlinx.coroutines.*
import net.mamoe.mirai.internal.network.component.ComponentKey import net.mamoe.mirai.internal.network.component.ComponentKey
import net.mamoe.mirai.internal.network.component.ComponentStorage import net.mamoe.mirai.internal.network.component.ComponentStorage
import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
import net.mamoe.mirai.internal.network.handler.selector.NetworkException
import net.mamoe.mirai.internal.network.handler.selector.PacketTimeoutException import net.mamoe.mirai.internal.network.handler.selector.PacketTimeoutException
import net.mamoe.mirai.utils.BotConfiguration.HeartbeatStrategy.* import net.mamoe.mirai.utils.BotConfiguration.HeartbeatStrategy.*
import net.mamoe.mirai.utils.MiraiLogger import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.info import net.mamoe.mirai.utils.info
/**
* Accepts any kinds of exceptions. A [NetworkException] can control whether this error is recoverable, while any other ones are regarded as unexpected failure.
*/
internal typealias HeartbeatFailureHandler = (name: String, e: Throwable) -> Unit internal typealias HeartbeatFailureHandler = (name: String, e: Throwable) -> Unit
/** /**
@ -127,13 +131,13 @@ internal class TimeBasedHeartbeatSchedulerImpl(
name, name,
PacketTimeoutException( PacketTimeoutException(
"$coroutineName: Timeout receiving action response", "$coroutineName: Timeout receiving action response",
cause cause // cause is TimeoutCancellationException from `action`
) // This is a NetworkException that is recoverable ) // This is a NetworkException that is recoverable
) )
return@async return@async
} }
} catch (e: Throwable) { } catch (e: Throwable) {
// catch other errors // catch other errors in `action`, should not happen
onHeartFailure( onHeartFailure(
name, name,
IllegalStateException("$coroutineName: Internal error: caught unexpected exception", e) IllegalStateException("$coroutineName: Internal error: caught unexpected exception", e)

View File

@ -9,7 +9,9 @@
package net.mamoe.mirai.internal.network.handler package net.mamoe.mirai.internal.network.handler
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.flow.consumeAsFlow
@ -131,6 +133,10 @@ internal interface NetworkHandler : CoroutineScope {
* Coroutine suspension may happen if connection is not yet available however, * Coroutine suspension may happen if connection is not yet available however,
* [IllegalStateException] is thrown if [NetworkHandler] is already in [State.CLOSED] since closure is final. * [IllegalStateException] is thrown if [NetworkHandler] is already in [State.CLOSED] since closure is final.
* *
* @throws TimeoutCancellationException if timeout has been reached.
* @throws CancellationException if the [NetworkHandler] is closed, with the last cause for closure.
* @throws IllegalArgumentException if [timeout] or [attempts] are invalid.
*
* @param attempts ranges `1..INFINITY` * @param attempts ranges `1..INFINITY`
*/ */
suspend fun sendAndExpect(packet: OutgoingPacket, timeout: Long = 5000, attempts: Int = 2): Packet? suspend fun sendAndExpect(packet: OutgoingPacket, timeout: Long = 5000, attempts: Int = 2): Packet?
@ -141,6 +147,9 @@ internal interface NetworkHandler : CoroutineScope {
* Response is still being processed but not passed as a return value of this function, so it does not suspends this function (due to awaiting for the response). * Response is still being processed but not passed as a return value of this function, so it does not suspends this function (due to awaiting for the response).
* However, coroutine is still suspended if connection is not yet available, * However, coroutine is still suspended if connection is not yet available,
* and [IllegalStateException] is thrown if [NetworkHandler] is already in [State.CLOSED] since closure is final. * and [IllegalStateException] is thrown if [NetworkHandler] is already in [State.CLOSED] since closure is final.
* legalStateException] is thrown if [NetworkHandler] is already in [State.CLOSED] since closure is final.
*
* @throws CancellationException if the [NetworkHandler] is closed, with the last cause for closure.
*/ */
suspend fun sendWithoutExpect(packet: OutgoingPacket) suspend fun sendWithoutExpect(packet: OutgoingPacket)

View File

@ -43,6 +43,11 @@ internal abstract class NetworkHandlerSupport(
.plus(CoroutineExceptionHandler.fromMiraiLogger(logger)) .plus(CoroutineExceptionHandler.fromMiraiLogger(logger))
protected abstract fun initialState(): BaseStateImpl protected abstract fun initialState(): BaseStateImpl
/**
* It's not guaranteed whether this function sends the packet in-place or launches a coroutine for it.
* Caller should not rely on this property.
*/
protected abstract suspend fun sendPacketImpl(packet: OutgoingPacket) protected abstract suspend fun sendPacketImpl(packet: OutgoingPacket)
protected fun collectUnknownPacket(raw: RawIncomingPacket) { protected fun collectUnknownPacket(raw: RawIncomingPacket) {

View File

@ -1,22 +1,20 @@
/* /*
* Copyright 2019-2021 Mamoe Technologies and contributors. * Copyright 2019-2022 Mamoe Technologies and contributors.
* *
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* *
* https://github.com/mamoe/mirai/blob/master/LICENSE * https://github.com/mamoe/mirai/blob/dev/LICENSE
*/ */
package net.mamoe.mirai.internal.network.impl.netty package net.mamoe.mirai.internal.network.impl.netty
import net.mamoe.mirai.internal.network.handler.selector.NetworkException import net.mamoe.mirai.internal.network.handler.selector.NetworkException
import net.mamoe.mirai.utils.unwrapCancellationException
import java.io.IOException
internal class HeartbeatFailedException( internal class HeartbeatFailedException(
private val name: String, // kind of HB private val name: String, // kind of HB
override val cause: Throwable, override val cause: Throwable,
recoverable: Boolean = cause.unwrapCancellationException() is IOException || cause is NetworkException && cause.recoverable, recoverable: Boolean = cause is NetworkException && cause.recoverable,
) : NetworkException(recoverable) { ) : NetworkException(recoverable) {
override val message: String = "Exception in $name job" override val message: String = "Exception in $name job"
override fun toString(): String = "HeartbeatFailedException: $name, recoverable=$recoverable, cause=$cause" override fun toString(): String = "HeartbeatFailedException: $name, recoverable=$recoverable, cause=$cause"

View File

@ -49,7 +49,7 @@ internal class IncomingPacket private constructor(
val commandName: String, val commandName: String,
val sequenceId: Int, val sequenceId: Int,
val result: Either<Throwable, Packet?> val result: Either<Throwable, Packet?> // exception will be the same as caught from PacketFactory.decode. So they can be ISE, NPE, etc.
) { ) {
companion object { companion object {
operator fun invoke(commandName: String, sequenceId: Int, data: Packet?) = operator fun invoke(commandName: String, sequenceId: Int, data: Packet?) =

View File

@ -1,10 +1,10 @@
/* /*
* Copyright 2019-2021 Mamoe Technologies and contributors. * Copyright 2019-2022 Mamoe Technologies and contributors.
* *
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* *
* https://github.com/mamoe/mirai/blob/master/LICENSE * https://github.com/mamoe/mirai/blob/dev/LICENSE
*/ */
package net.mamoe.mirai.internal.network.framework package net.mamoe.mirai.internal.network.framework
@ -117,6 +117,7 @@ internal sealed class AbstractRealNetworkHandlerTest<H : NetworkHandler> : Abstr
override suspend fun init() { override suspend fun init() {
nhEvents.add(NHEvent.Init) nhEvents.add(NHEvent.Init)
networkLogger.debug { "BotInitProcessor.init" } networkLogger.debug { "BotInitProcessor.init" }
bot.components[SsoProcessor].firstLoginResult.value = FirstLoginResult.PASSED
} }
}) })
set(ServerList, ServerListImpl()) set(ServerList, ServerListImpl())
@ -161,6 +162,7 @@ internal sealed class AbstractRealNetworkHandlerTest<H : NetworkHandler> : Abstr
} }
val eventDispatcher get() = bot.components[EventDispatcher] val eventDispatcher get() = bot.components[EventDispatcher]
val firstLoginResult: FirstLoginResult? get() = bot.components[SsoProcessor].firstLoginResult.value
} }
internal fun AbstractRealNetworkHandlerTest<*>.setSsoProcessor(action: suspend SsoProcessor.(handler: NetworkHandler) -> Unit) { internal fun AbstractRealNetworkHandlerTest<*>.setSsoProcessor(action: suspend SsoProcessor.(handler: NetworkHandler) -> Unit) {

View File

@ -18,14 +18,26 @@ import net.mamoe.mirai.internal.network.framework.AbstractNettyNHTestWithSelecto
import net.mamoe.mirai.internal.network.impl.netty.HeartbeatFailedException import net.mamoe.mirai.internal.network.impl.netty.HeartbeatFailedException
import net.mamoe.mirai.internal.network.impl.netty.NettyChannelException import net.mamoe.mirai.internal.network.impl.netty.NettyChannelException
import net.mamoe.mirai.internal.test.runBlockingUnit import net.mamoe.mirai.internal.test.runBlockingUnit
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.io.IOException import org.junit.jupiter.api.TestInfo
import kotlin.test.assertFails import kotlin.test.assertFails
/** /**
* Test whether the selector can recover the connection after first successful login. * Test whether the selector can recover the connection after first successful login.
*/ */
internal class SelectorRecoveryTest : AbstractNettyNHTestWithSelector() { internal class SelectorRecoveryTest : AbstractNettyNHTestWithSelector() {
@BeforeEach
fun beforeTest(info: TestInfo) {
println("=".repeat(30) + "BEGIN: ${info.displayName}" + "=".repeat(30))
}
@AfterEach
fun afterTest(info: TestInfo) {
println("=".repeat(31) + "END: ${info.displayName}" + "=".repeat(31))
}
@Test @Test
fun `stop on manual close`() = runBlockingUnit { fun `stop on manual close`() = runBlockingUnit {
network.resumeConnection() network.resumeConnection()
@ -33,19 +45,6 @@ internal class SelectorRecoveryTest : AbstractNettyNHTestWithSelector() {
assertFails { network.resumeConnection() } assertFails { network.resumeConnection() }
} }
/**
* Emulates system hibernation and network failure.
* @see HeartbeatFailedException
*/
@Test
fun `can recover on heartbeat failure with IOException`() = runBlockingUnit {
// We allow IOException to cause a reconnect.
testRecoverWhenHeartbeatFailWith { IOException("test IO ex") }
// BotOfflineMonitor immediately launches a recovery which is UNDISPATCHED, so connection is immediately recovered.
assertState(NetworkHandler.State.CONNECTING, NetworkHandler.State.LOADING, NetworkHandler.State.OK)
}
/** /**
* Emulates system hibernation and network failure. * Emulates system hibernation and network failure.
* @see HeartbeatFailedException * @see HeartbeatFailedException

View File

@ -13,6 +13,7 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive import kotlinx.coroutines.isActive
import net.mamoe.mirai.internal.network.components.BotOfflineEventMonitor import net.mamoe.mirai.internal.network.components.BotOfflineEventMonitor
import net.mamoe.mirai.internal.network.components.BotOfflineEventMonitorImpl import net.mamoe.mirai.internal.network.components.BotOfflineEventMonitorImpl
import net.mamoe.mirai.internal.network.components.FirstLoginResult
import net.mamoe.mirai.internal.network.framework.AbstractNettyNHTest import net.mamoe.mirai.internal.network.framework.AbstractNettyNHTest
import net.mamoe.mirai.internal.network.framework.TestNettyNH import net.mamoe.mirai.internal.network.framework.TestNettyNH
import net.mamoe.mirai.internal.network.framework.setSsoProcessor import net.mamoe.mirai.internal.network.framework.setSsoProcessor
@ -27,6 +28,7 @@ import net.mamoe.mirai.utils.cast
import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.io.IOException import java.io.IOException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith import kotlin.test.assertFailsWith
import kotlin.test.assertFalse import kotlin.test.assertFalse
@ -93,8 +95,9 @@ internal class NettyBotNormalLoginTest : AbstractNettyNHTest() {
// } // }
@Test @Test
fun `test resume after MsfOffline received`() = runBlockingUnit { fun `test resume after MsfOffline received after first login`() = runBlockingUnit {
bot.login() bot.login()
assertEquals(FirstLoginResult.PASSED, firstLoginResult)
bot.network.close(StatSvc.ReqMSFOffline.MsfOfflineToken(0, 0, 0)) bot.network.close(StatSvc.ReqMSFOffline.MsfOfflineToken(0, 0, 0))
eventDispatcher.joinBroadcast() eventDispatcher.joinBroadcast()

View File

@ -145,6 +145,7 @@ internal class NettyHandlerEventTest : AbstractNettyNHTest() {
fun `BotOffline from OK TO CLOSED by bot close`() = runBlockingUnit { fun `BotOffline from OK TO CLOSED by bot close`() = runBlockingUnit {
bot.login() bot.login()
assertState(OK) assertState(OK)
assertEquals(FirstLoginResult.PASSED, firstLoginResult)
eventDispatcher.joinBroadcast() // `login` launches a job which broadcasts the event eventDispatcher.joinBroadcast() // `login` launches a job which broadcasts the event
assertEventBroadcasts<Event>(1) { assertEventBroadcasts<Event>(1) {
assertTrue { bot.isActive } assertTrue { bot.isActive }