diff --git a/mirai-core-utils/src/commonMain/kotlin/CoroutineUtils.kt b/mirai-core-utils/src/commonMain/kotlin/CoroutineUtils.kt index c02eaaf3e..9d900d897 100644 --- a/mirai-core-utils/src/commonMain/kotlin/CoroutineUtils.kt +++ b/mirai-core-utils/src/commonMain/kotlin/CoroutineUtils.kt @@ -98,3 +98,19 @@ public fun CoroutineScope.hierarchicalName( name: String ): CoroutineName = this.coroutineContext.hierarchicalName(name) +public inline fun runUnwrapCancellationException(block: () -> R): R { + try { + return block() + } catch (e: CancellationException) { + // e is like `Exception in thread "main" kotlinx.coroutines.JobCancellationException: Parent job is Cancelling; job=JobImpl{Cancelled}@f252f300` + // and this is useless. + if (e.suppressedExceptions.isNotEmpty()) throw e // preserve details. + throw e.findCause { it !is CancellationException } ?: e + } +} + +public fun Throwable.unwrapCancellationException(): Throwable { + if (this !is CancellationException) return this + if (suppressedExceptions.isNotEmpty()) return this + return this.findCause { it !is CancellationException } ?: this +} \ No newline at end of file diff --git a/mirai-core-utils/src/commonMain/kotlin/StandardUtils.kt b/mirai-core-utils/src/commonMain/kotlin/StandardUtils.kt index 4804a6cbf..97ddf8754 100644 --- a/mirai-core-utils/src/commonMain/kotlin/StandardUtils.kt +++ b/mirai-core-utils/src/commonMain/kotlin/StandardUtils.kt @@ -187,6 +187,10 @@ public fun Throwable.getRootCause(maxDepth: Int = 20): Throwable { return rootCause ?: this } +/** + * Use [findCause] instead for better performance. + */ +@TestOnly public fun Throwable.causes(maxDepth: Int = 20): Sequence = sequence { var depth = 0 var rootCause: Throwable? = this@causes @@ -197,6 +201,20 @@ public fun Throwable.causes(maxDepth: Int = 20): Sequence = sequence } } +public inline fun Throwable.findCause(maxDepth: Int = 20, filter: (Throwable) -> Boolean): Throwable? { + var depth = 0 + var rootCause: Throwable? = this + while (true) { + val current = rootCause?.cause ?: return null + if (filter(current)) return current + rootCause = rootCause.cause + if (depth++ >= maxDepth) return null + } +} + +public inline fun Throwable.findCauseOrSelf(maxDepth: Int = 20, filter: (Throwable) -> Boolean): Throwable = + findCause(maxDepth, filter) ?: this + public fun String.capitalize(): String { return replaceFirstChar { if (it.isLowerCase()) it.titlecase(Locale.ROOT) else it.toString() } } \ No newline at end of file diff --git a/mirai-core/src/commonMain/kotlin/AbstractBot.kt b/mirai-core/src/commonMain/kotlin/AbstractBot.kt index 9d30b0613..8ef964f10 100644 --- a/mirai-core/src/commonMain/kotlin/AbstractBot.kt +++ b/mirai-core/src/commonMain/kotlin/AbstractBot.kt @@ -25,7 +25,10 @@ import net.mamoe.mirai.internal.network.components.SsoProcessor import net.mamoe.mirai.internal.network.handler.NetworkHandler import net.mamoe.mirai.internal.network.impl.netty.asCoroutineExceptionHandler import net.mamoe.mirai.supervisorJob -import net.mamoe.mirai.utils.* +import net.mamoe.mirai.utils.BotConfiguration +import net.mamoe.mirai.utils.MiraiLogger +import net.mamoe.mirai.utils.childScopeContext +import net.mamoe.mirai.utils.info import kotlin.collections.set import kotlin.coroutines.CoroutineContext @@ -118,7 +121,7 @@ internal abstract class AbstractBot constructor( if (!components[SsoProcessor].firstLoginSucceed) { this.close() // failed to do first login. } - throw e.causes().find { it !is CancellationException } ?: e // emit internal errors + throw e } logger.info { "Bot login successful." } } diff --git a/mirai-core/src/commonMain/kotlin/network/components/BotOfflineEventMonitor.kt b/mirai-core/src/commonMain/kotlin/network/components/BotOfflineEventMonitor.kt index 0422cd64b..44f4aac42 100644 --- a/mirai-core/src/commonMain/kotlin/network/components/BotOfflineEventMonitor.kt +++ b/mirai-core/src/commonMain/kotlin/network/components/BotOfflineEventMonitor.kt @@ -21,7 +21,6 @@ import net.mamoe.mirai.internal.asQQAndroidBot import net.mamoe.mirai.internal.network.component.ComponentKey import net.mamoe.mirai.internal.network.handler.NetworkHandler import net.mamoe.mirai.internal.network.handler.NetworkHandler.State -import net.mamoe.mirai.internal.network.handler.awaitState import net.mamoe.mirai.utils.castOrNull import net.mamoe.mirai.utils.info import net.mamoe.mirai.utils.millisToHumanReadableString @@ -43,6 +42,7 @@ private data class BotClosedByEvent(val event: BotOfflineEvent) : RuntimeExcepti internal class BotOfflineEventMonitorImpl : BotOfflineEventMonitor { override fun attachJob(bot: AbstractBot, scope: CoroutineScope) { + return bot.eventChannel.parentScope(scope).subscribeAlways( ::onEvent, priority = EventPriority.MONITOR, @@ -56,9 +56,6 @@ internal class BotOfflineEventMonitorImpl : BotOfflineEventMonitor { fun closeNetwork() { if (network.state == State.CLOSED) return // avoid recursive calls. - launch { - network.awaitState(State.CLOSED) - } network.close(BotClosedByEvent(event)) } @@ -84,6 +81,7 @@ internal class BotOfflineEventMonitorImpl : BotOfflineEventMonitor { -> { val causeMessage = event.castOrNull()?.cause?.toString() ?: event.toString() bot.logger.info { "Connection lost, retrying login ($causeMessage)." } + closeNetwork() } } diff --git a/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerContext.kt b/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerContext.kt index d27f0220f..b51606f3b 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerContext.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerContext.kt @@ -30,6 +30,6 @@ internal class NetworkHandlerContextImpl( storage: ComponentStorage // should be the same as bot.components ) : NetworkHandlerContext, ComponentStorage by storage { override fun toString(): String { - return "NetworkHandlerContextImpl(storage=$)" + return "NetworkHandlerContextImpl" } } diff --git a/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerSupport.kt b/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerSupport.kt index ed9a11aec..101b29953 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerSupport.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerSupport.kt @@ -9,6 +9,7 @@ package net.mamoe.mirai.internal.network.handler +import kotlinx.atomicfu.locks.SynchronizedObject import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel @@ -24,6 +25,7 @@ import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket import net.mamoe.mirai.utils.* import java.util.concurrent.ConcurrentLinkedQueue import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext import kotlin.reflect.KClass /** @@ -31,7 +33,7 @@ import kotlin.reflect.KClass */ internal abstract class NetworkHandlerSupport( override val context: NetworkHandlerContext, - final override val coroutineContext: CoroutineContext = SupervisorJob(), + coroutineContext: CoroutineContext = EmptyCoroutineContext, ) : NetworkHandler, CoroutineScope by coroutineContext.childScope(SupervisorJob()) { protected abstract fun initialState(): BaseStateImpl @@ -151,7 +153,9 @@ internal abstract class NetworkHandlerSupport( */ abstract inner class BaseStateImpl( val correspondingState: NetworkHandler.State, - ) : CoroutineScope by CoroutineScope(coroutineContext + Job(coroutineContext.job)) { + ) : CoroutineScope { + final override val coroutineContext: CoroutineContext = + this@NetworkHandlerSupport.coroutineContext + Job(this@NetworkHandlerSupport.coroutineContext.job) open fun getCause(): Throwable? = null @@ -184,15 +188,19 @@ internal abstract class NetworkHandlerSupport( private set final override val state: NetworkHandler.State get() = _state.correspondingState + + override fun getLastFailure(): Throwable? = _state.getCause() + private val _stateChannel = Channel(0) final override val stateChannel: ReceiveChannel get() = _stateChannel + private val setStateLock = SynchronizedObject() + protected data class StateSwitchingException( val old: BaseStateImpl, val new: BaseStateImpl, ) : CancellationException("State is switched from $old to $new") - /** * Attempts to change state. * @@ -209,7 +217,7 @@ internal abstract class NetworkHandlerSupport( */ protected inline fun BaseStateImpl.setState( noinline new: () -> S - ): S? = synchronized(this@NetworkHandlerSupport) { + ): S? = synchronized(setStateLock) { if (_state === this) { this@NetworkHandlerSupport.setState(new) } else { @@ -235,9 +243,9 @@ internal abstract class NetworkHandlerSupport( */ // @TestOnly - internal fun setStateImpl(newType: KClass?, new: () -> S): S? = synchronized(this) { + internal fun setStateImpl(newType: KClass?, new: () -> S): S? = synchronized(setStateLock) { val old = _state - if (newType != null && old::class == newType) return@synchronized null // already set to expected state by another thread. Avoid replications. + if (newType != null && old::class == newType) return null // already set to expected state by another thread. Avoid replications. if (old.correspondingState == NetworkHandler.State.CLOSED) return null // CLOSED is final. val stateObserver = context.getOrNull(StateObserver) diff --git a/mirai-core/src/commonMain/kotlin/network/handler/selector/AbstractKeepAliveNetworkHandlerSelector.kt b/mirai-core/src/commonMain/kotlin/network/handler/selector/AbstractKeepAliveNetworkHandlerSelector.kt index fc5082c8b..9c2bd6184 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/selector/AbstractKeepAliveNetworkHandlerSelector.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/selector/AbstractKeepAliveNetworkHandlerSelector.kt @@ -13,6 +13,7 @@ import kotlinx.atomicfu.atomic import kotlinx.coroutines.yield import net.mamoe.mirai.internal.network.handler.NetworkHandler import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory +import net.mamoe.mirai.utils.ExceptionCollector import net.mamoe.mirai.utils.systemProp import net.mamoe.mirai.utils.toLongUnsigned @@ -23,8 +24,8 @@ import net.mamoe.mirai.utils.toLongUnsigned * - Re-initialize [NetworkHandler] instances if the old one is dead. * - Suspends requests when connection is not available. * - * No connection is created until first invocation of [getResumedInstance], - * and new connections are created only when calling [getResumedInstance] if the old connection was dead. + * No connection is created until first invocation of [getCurrentInstanceOrNull], + * and new connections are created only when calling [getCurrentInstanceOrNull] if the old connection was dead. */ // may be replaced with a better name. internal abstract class AbstractKeepAliveNetworkHandlerSelector( @@ -44,56 +45,71 @@ internal abstract class AbstractKeepAliveNetworkHandlerSelector= maxAttempts) error("Failed to resume instance. Maximum attempts reached.") - yield() - val current = getResumedInstance() - return if (current != null) { - when (val thisState = current.state) { - NetworkHandler.State.CLOSED -> { - this.current.compareAndSet(current, null) // invalidate the instance and try again. - awaitResumeInstanceImpl(attempted + 1) // will create new instance. - } - NetworkHandler.State.CONNECTING, - NetworkHandler.State.INITIALIZED -> { - current.resumeConnection() // once finished, it should has been LOADING or OK - check(current.state != thisState) { "Internal error: State is still $thisState after successful resumeConnection." } // this should not happen. - return awaitResumeInstanceImpl(attempted) // does not count for an attempt. - } - NetworkHandler.State.LOADING -> { - return current - } - NetworkHandler.State.OK -> { - current.resumeConnection() - return current - } + private inner class AwaitResumeInstance { + private var attempted: Int = 0 + private val exceptionCollector: ExceptionCollector = ExceptionCollector() + + tailrec suspend fun run(): H { + if (attempted >= maxAttempts) { + throw IllegalStateException( + "Failed to resume instance. Maximum attempts reached.", + exceptionCollector.getLast() + ) + } + yield() // Avoid endless recursion. + val current = getCurrentInstanceOrNull() + return if (current != null) { + when (val thisState = current.state) { + NetworkHandler.State.CLOSED -> { + if (this@AbstractKeepAliveNetworkHandlerSelector.current.compareAndSet(current, null)) { + // invalidate the instance and try again. + + exceptionCollector.collectException(current.getLastFailure()) + } + attempted += 1 + run() // will create new instance. + } + NetworkHandler.State.CONNECTING, + NetworkHandler.State.INITIALIZED -> { + current.resumeConnection() // once finished, it should has been LOADING or OK + check(current.state != thisState) { "Internal error: State is still $thisState after successful resumeConnection." } // this should not happen. + return run() // does not count for an attempt. + } + NetworkHandler.State.LOADING -> { + return current + } + NetworkHandler.State.OK -> { + current.resumeConnection() + return current + } + } + } else { + refreshInstance() + run() // directly retry, does not count for attempts. } - } else { - refreshInstance() - awaitResumeInstanceImpl(attempted) // directly retry, does not count for attempts. } } protected open fun refreshInstance() { synchronized(this) { // avoid concurrent `createInstance()` - if (getResumedInstance() == null) this.current.compareAndSet(null, createInstance()) + if (getCurrentInstanceOrNull() == null) this.current.compareAndSet(null, createInstance()) } } companion object { @JvmField var DEFAULT_MAX_ATTEMPTS = - systemProp("mirai.network.handler.selector.max.attempts", 3) + systemProp("mirai.network.handler.selector.max.attempts", Long.MAX_VALUE) .coerceIn(1..Int.MAX_VALUE.toLongUnsigned()).toInt() } } \ No newline at end of file diff --git a/mirai-core/src/commonMain/kotlin/network/handler/selector/FactoryKeepAliveNetworkHandlerSelector.kt b/mirai-core/src/commonMain/kotlin/network/handler/selector/FactoryKeepAliveNetworkHandlerSelector.kt index 253df870b..5d7bcb8fe 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/selector/FactoryKeepAliveNetworkHandlerSelector.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/selector/FactoryKeepAliveNetworkHandlerSelector.kt @@ -38,6 +38,7 @@ internal class FactoryKeepAliveNetworkHandlerSelector : Abst override fun createInstance(): H = factory.create( context, - context[ServerList].pollCurrent()?.toSocketAddress() ?: throw NoServerAvailableException() +// context[ServerList].pollCurrent()?.toSocketAddress() ?: throw NoServerAvailableException() + context[ServerList].pollAny().toSocketAddress() ) } \ No newline at end of file diff --git a/mirai-core/src/commonMain/kotlin/network/handler/selector/NetworkHandlerSelector.kt b/mirai-core/src/commonMain/kotlin/network/handler/selector/NetworkHandlerSelector.kt index e1e2cef93..90d0537c8 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/selector/NetworkHandlerSelector.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/selector/NetworkHandlerSelector.kt @@ -13,7 +13,9 @@ import net.mamoe.mirai.internal.network.handler.NetworkHandler import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory /** - * A director([selector][SelectorNetworkHandler.selector]) of [NetworkHandler]. + * A **lazy** director([selector][SelectorNetworkHandler.selector]) of [NetworkHandler]. + * + * *lazy* means that no action is taken at any time until member functions are invoked. * * It can produce [H] instances (maybe by calling [NetworkHandlerFactory]), to be used by [SelectorNetworkHandler] * @@ -21,20 +23,21 @@ import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory */ internal interface NetworkHandlerSelector { /** - * Returns an instance immediately without suspension, or `null` if instance not ready. + * Returns an instance immediately without suspension, or `null` if instance not ready. Returned [H] can be in any states. * * This function should not throw any exception. * @see awaitResumeInstance */ - fun getResumedInstance(): H? + fun getCurrentInstanceOrNull(): H? /** - * Returns the currently alive [NetworkHandler] or creates a new one. + * Returns the current [NetworkHandler] or creates a new one if it is `null`. Returned [H] can be in any states. */ - fun tryResumeInstanceOrCreate(): H + fun getCurrentInstanceOrCreate(): H /** * Returns an alive [NetworkHandler], or suspends the coroutine until the connection has been made again. + * Returned [H] can be in [NetworkHandler.State.OK] only (but it may happen that the state changed just after returning from this function). * * This function may throw exceptions, which would be propagated to the original caller of [SelectorNetworkHandler.resumeConnection]. */ diff --git a/mirai-core/src/commonMain/kotlin/network/handler/selector/SelectorNetworkHandler.kt b/mirai-core/src/commonMain/kotlin/network/handler/selector/SelectorNetworkHandler.kt index 9cb05bd02..05eb4b596 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/selector/SelectorNetworkHandler.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/selector/SelectorNetworkHandler.kt @@ -14,7 +14,10 @@ import kotlinx.coroutines.channels.ReceiveChannel import net.mamoe.mirai.internal.network.handler.NetworkHandler import net.mamoe.mirai.internal.network.handler.NetworkHandler.State import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext +import net.mamoe.mirai.internal.network.handler.awaitState import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket +import net.mamoe.mirai.utils.findCauseOrSelf +import net.mamoe.mirai.utils.hierarchicalName import kotlin.coroutines.CoroutineContext /** @@ -34,6 +37,12 @@ import kotlin.coroutines.CoroutineContext internal class SelectorNetworkHandler( override val context: NetworkHandlerContext, // impl notes: may consider to move into function member. private val selector: NetworkHandlerSelector<*>, + /** + * If `true`, a watcher job will be started to call [resumeConnection] when network is closed by [NetworkException] and [NetworkException.recoverable] is `true`. + * + * This is required for automatic reconnecting after network failure or system hibernation, since [NetworkHandler] is lazy and will reconnect iff [resumeConnection] is called. + */ + allowActiveMaintenance: Boolean = true, ) : NetworkHandler { @Volatile private var lastCancellationCause: Throwable? = null @@ -44,10 +53,39 @@ internal class SelectorNetworkHandler( return selector.awaitResumeInstance() } + init { + if (allowActiveMaintenance) { + val bot = context.bot + scope.launch(scope.hierarchicalName("BotOnlineWatchdog ${bot.id}")) { + while (isActive) { + val instance = selector.getCurrentInstanceOrCreate() + + awaitState(State.CLOSED) // suspend until next CLOSED + + if (!bot.isActive || !isActive) return@launch + if (selector.getCurrentInstanceOrNull() != instance) continue // instance already changed by other threads. + + delay(3000) // make it slower to avoid massive reconnection on network failure. + + val failure = getLastFailure() + if (failure?.findCauseOrSelf { it is NetworkException && it.recoverable } != null) { + try { + resumeConnection() // notify selector to actively resume now. + } catch (ignored: Exception) { + } + } + } + } + } + } + override val state: State - get() = selector.tryResumeInstanceOrCreate().state + get() = selector.getCurrentInstanceOrCreate().state + + override fun getLastFailure(): Throwable? = selector.getCurrentInstanceOrCreate().getLastFailure() + override val stateChannel: ReceiveChannel - get() = selector.tryResumeInstanceOrCreate().stateChannel + get() = selector.getCurrentInstanceOrCreate().stateChannel override suspend fun resumeConnection() { instance() // the selector will resume connection for us. @@ -66,12 +104,12 @@ internal class SelectorNetworkHandler( return } } - selector.getResumedInstance()?.close(cause) + selector.getCurrentInstanceOrNull()?.close(cause) } override val coroutineContext: CoroutineContext - get() = selector.getResumedInstance()?.coroutineContext ?: scope.coroutineContext // merely use fallback + get() = selector.getCurrentInstanceOrNull()?.coroutineContext ?: scope.coroutineContext // merely use fallback - override fun toString(): String = "SelectorNetworkHandler(currentInstance=${selector.getResumedInstance()})" + override fun toString(): String = "SelectorNetworkHandler(currentInstance=${selector.getCurrentInstanceOrNull()})" } diff --git a/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyChannelException.kt b/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyChannelException.kt new file mode 100644 index 000000000..493b5fafc --- /dev/null +++ b/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyChannelException.kt @@ -0,0 +1,17 @@ +/* + * Copyright 2019-2021 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/master/LICENSE + */ + +package net.mamoe.mirai.internal.network.impl.netty + +import net.mamoe.mirai.internal.network.handler.selector.NetworkException + +internal data class NettyChannelException( + override val message: String? = null, + override val cause: Throwable? = null +) : NetworkException(true) \ No newline at end of file diff --git a/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt b/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt index d0b0a8658..d0ba85c64 100644 --- a/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt +++ b/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt @@ -28,10 +28,7 @@ import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport import net.mamoe.mirai.internal.network.handler.state.StateObserver import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket -import net.mamoe.mirai.utils.ExceptionCollector -import net.mamoe.mirai.utils.childScope -import net.mamoe.mirai.utils.debug -import net.mamoe.mirai.utils.systemProp +import net.mamoe.mirai.utils.* import java.io.EOFException import java.net.SocketAddress import kotlin.coroutines.CoroutineContext @@ -77,6 +74,7 @@ internal open class NettyNetworkHandler( protected open fun handlePipelineException(ctx: ChannelHandlerContext, error: Throwable) { context.bot.logger.error(error) synchronized(this) { + setState { StateClosed(NettyChannelException(cause = error)) } if (_state !is StateConnecting) { setState { StateConnecting(ExceptionCollector(error)) } } else { @@ -134,6 +132,8 @@ internal open class NettyNetworkHandler( // can be overridden for tests protected open suspend fun createConnection(decodePipeline: PacketDecodePipeline): NettyChannel { + packetLogger.debug { "Connecting to $address" } + val contextResult = CompletableDeferred() val eventLoopGroup = NioEventLoopGroup() @@ -142,6 +142,7 @@ internal open class NettyNetworkHandler( .option(ChannelOption.SO_KEEPALIVE, true) .handler(object : ChannelInitializer() { override fun initChannel(ch: SocketChannel) { + setupChannelPipeline(ch.pipeline(), decodePipeline) ch.pipeline() .addLast(object : ChannelInboundHandlerAdapter() { override fun channelInactive(ctx: ChannelHandlerContext?) { @@ -149,7 +150,6 @@ internal open class NettyNetworkHandler( } }) - setupChannelPipeline(ch.pipeline(), decodePipeline) } }) .connect(address) @@ -164,7 +164,7 @@ internal open class NettyNetworkHandler( future.channel().closeFuture().addListener { if (_state.correspondingState == State.CLOSED) return@addListener - setState { StateConnecting(ExceptionCollector(it.cause())) } + setState { StateClosed(it.cause()) } } return contextResult.await() @@ -207,6 +207,14 @@ internal open class NettyNetworkHandler( protected abstract inner class NettyState( correspondingState: State ) : BaseStateImpl(correspondingState) { + init { + coroutineContext.job.invokeOnCompletion { e -> + if (correspondingState != State.CLOSED) { + if (e != null) setState { StateClosed(e.unwrapCancellationException()) } + } + } + } + /** * @return `true` if packet has been sent, `false` if state is not ready for send. * @throws IllegalStateException if is [StateClosed]. @@ -276,13 +284,13 @@ internal open class NettyNetworkHandler( override fun getCause(): Throwable? = collectiveExceptions.getLast() - override suspend fun sendPacketImpl(packet: OutgoingPacket): Boolean { + override suspend fun sendPacketImpl(packet: OutgoingPacket): Boolean = runUnwrapCancellationException { connection.await() // split line number .writeAndFlushOrCloseAsync(packet) return true } - override suspend fun resumeConnection0() { + override suspend fun resumeConnection0() = runUnwrapCancellationException { connectResult.await() // propagates exceptions val connection = connection.await() this.setState { StateLoading(connection) } @@ -318,7 +326,7 @@ internal open class NettyNetworkHandler( context[ConfigPushProcessor].syncConfigPush(this@NettyNetworkHandler) } - override suspend fun resumeConnection0() { + override suspend fun resumeConnection0(): Unit = runUnwrapCancellationException { (coroutineContext.job as CompletableJob).run { complete() join() @@ -356,7 +364,7 @@ internal open class NettyNetworkHandler( return true } - override suspend fun resumeConnection0() { + override suspend fun resumeConnection0(): Unit = runUnwrapCancellationException { joinCompleted(coroutineContext.job) for (job in heartbeatJobs) joinCompleted(job) joinCompleted(configPush) diff --git a/mirai-core/src/commonTest/kotlin/network/handler/KeepAliveNetworkHandlerSelectorTest.kt b/mirai-core/src/commonTest/kotlin/network/handler/KeepAliveNetworkHandlerSelectorTest.kt index ed386f354..fc7347029 100644 --- a/mirai-core/src/commonTest/kotlin/network/handler/KeepAliveNetworkHandlerSelectorTest.kt +++ b/mirai-core/src/commonTest/kotlin/network/handler/KeepAliveNetworkHandlerSelectorTest.kt @@ -50,7 +50,7 @@ internal class KeepAliveNetworkHandlerSelectorTest : AbstractMockNetworkHandlerT } } runBlockingUnit(timeout = Duration.seconds(1)) { selector.awaitResumeInstance() } - assertNotNull(selector.getResumedInstance()) + assertNotNull(selector.getCurrentInstanceOrNull()) } @Test @@ -60,7 +60,7 @@ internal class KeepAliveNetworkHandlerSelectorTest : AbstractMockNetworkHandlerT } val handler = createNetworkHandler() selector.setCurrent(handler) - assertSame(handler, selector.getResumedInstance()) + assertSame(handler, selector.getCurrentInstanceOrNull()) } @Test @@ -70,7 +70,7 @@ internal class KeepAliveNetworkHandlerSelectorTest : AbstractMockNetworkHandlerT } val handler = createNetworkHandler() selector.setCurrent(handler) - assertSame(handler, selector.getResumedInstance()) + assertSame(handler, selector.getCurrentInstanceOrNull()) handler.setState(State.CLOSED) runBlockingUnit(timeout = Duration.seconds(3)) { selector.awaitResumeInstance() } assertEquals(1, selector.createInstanceCount.get()) diff --git a/mirai-core/src/commonTest/kotlin/network/handler/SelectorNetworkHandlerTest.kt b/mirai-core/src/commonTest/kotlin/network/handler/SelectorNetworkHandlerTest.kt index e2ef37171..2a4cd90d1 100644 --- a/mirai-core/src/commonTest/kotlin/network/handler/SelectorNetworkHandlerTest.kt +++ b/mirai-core/src/commonTest/kotlin/network/handler/SelectorNetworkHandlerTest.kt @@ -10,9 +10,15 @@ package net.mamoe.mirai.internal.network.handler import io.netty.channel.Channel +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import net.mamoe.mirai.internal.network.components.EventDispatcher +import net.mamoe.mirai.internal.network.components.HeartbeatFailureHandler +import net.mamoe.mirai.internal.network.components.HeartbeatScheduler import net.mamoe.mirai.internal.network.framework.AbstractRealNetworkHandlerTest import net.mamoe.mirai.internal.network.handler.selector.SelectorNetworkHandler import net.mamoe.mirai.internal.network.impl.netty.AbstractNettyNHTest +import net.mamoe.mirai.internal.network.impl.netty.HeartbeatFailedException import net.mamoe.mirai.internal.network.impl.netty.TestNettyNH import net.mamoe.mirai.internal.test.runBlockingUnit import net.mamoe.mirai.utils.cast @@ -46,4 +52,35 @@ internal class SelectorNetworkHandlerTest : AbstractRealNetworkHandlerTest { + this.onHeartFailure = onHeartFailure + return listOf(Job()) + } + } + defaultComponents[HeartbeatScheduler] = heartbeatScheduler + + bot.login() + bot.network.context[EventDispatcher].joinBroadcast() + assertState(NetworkHandler.State.OK) + + heartbeatScheduler.onHeartFailure("Test", HeartbeatFailedException("test", null)) + assertState(NetworkHandler.State.CLOSED) + + bot.network.resumeConnection() // in real, this is called by BotOnlineWatchdog in SelectorNetworkHandler + assertState(NetworkHandler.State.OK) + } } \ No newline at end of file diff --git a/mirai-core/src/commonTest/kotlin/network/impl/netty/NettyBotNormalLoginTest.kt b/mirai-core/src/commonTest/kotlin/network/impl/netty/NettyBotNormalLoginTest.kt index afea6b423..5fc76b112 100644 --- a/mirai-core/src/commonTest/kotlin/network/impl/netty/NettyBotNormalLoginTest.kt +++ b/mirai-core/src/commonTest/kotlin/network/impl/netty/NettyBotNormalLoginTest.kt @@ -15,7 +15,7 @@ import net.mamoe.mirai.event.events.BotEvent import net.mamoe.mirai.event.events.BotOfflineEvent import net.mamoe.mirai.event.events.BotReloginEvent import net.mamoe.mirai.event.nextEvent -import net.mamoe.mirai.internal.network.handler.NetworkHandler +import net.mamoe.mirai.internal.network.handler.NetworkHandler.State.OK import net.mamoe.mirai.internal.test.assertEventBroadcasts import net.mamoe.mirai.internal.test.runBlockingUnit import net.mamoe.mirai.utils.firstIsInstanceOrNull @@ -62,6 +62,6 @@ internal class NettyBotNormalLoginTest : AbstractNettyNHTest() { throw events.firstIsInstanceOrNull()!!.cause!! } } - assertState(NetworkHandler.State.OK) + assertState(OK) } }