mirror of
https://github.com/mamoe/mirai.git
synced 2025-01-27 17:00:14 +08:00
Fix network scopes and exception handling. Support recovering after system hibernation and network failure.
This commit is contained in:
parent
a781941ede
commit
137e1a1235
@ -98,3 +98,19 @@ public fun CoroutineScope.hierarchicalName(
|
||||
name: String
|
||||
): CoroutineName = this.coroutineContext.hierarchicalName(name)
|
||||
|
||||
public inline fun <R> runUnwrapCancellationException(block: () -> R): R {
|
||||
try {
|
||||
return block()
|
||||
} catch (e: CancellationException) {
|
||||
// e is like `Exception in thread "main" kotlinx.coroutines.JobCancellationException: Parent job is Cancelling; job=JobImpl{Cancelled}@f252f300`
|
||||
// and this is useless.
|
||||
if (e.suppressedExceptions.isNotEmpty()) throw e // preserve details.
|
||||
throw e.findCause { it !is CancellationException } ?: e
|
||||
}
|
||||
}
|
||||
|
||||
public fun Throwable.unwrapCancellationException(): Throwable {
|
||||
if (this !is CancellationException) return this
|
||||
if (suppressedExceptions.isNotEmpty()) return this
|
||||
return this.findCause { it !is CancellationException } ?: this
|
||||
}
|
@ -187,6 +187,10 @@ public fun Throwable.getRootCause(maxDepth: Int = 20): Throwable {
|
||||
return rootCause ?: this
|
||||
}
|
||||
|
||||
/**
|
||||
* Use [findCause] instead for better performance.
|
||||
*/
|
||||
@TestOnly
|
||||
public fun Throwable.causes(maxDepth: Int = 20): Sequence<Throwable> = sequence {
|
||||
var depth = 0
|
||||
var rootCause: Throwable? = this@causes
|
||||
@ -197,6 +201,20 @@ public fun Throwable.causes(maxDepth: Int = 20): Sequence<Throwable> = sequence
|
||||
}
|
||||
}
|
||||
|
||||
public inline fun Throwable.findCause(maxDepth: Int = 20, filter: (Throwable) -> Boolean): Throwable? {
|
||||
var depth = 0
|
||||
var rootCause: Throwable? = this
|
||||
while (true) {
|
||||
val current = rootCause?.cause ?: return null
|
||||
if (filter(current)) return current
|
||||
rootCause = rootCause.cause
|
||||
if (depth++ >= maxDepth) return null
|
||||
}
|
||||
}
|
||||
|
||||
public inline fun Throwable.findCauseOrSelf(maxDepth: Int = 20, filter: (Throwable) -> Boolean): Throwable =
|
||||
findCause(maxDepth, filter) ?: this
|
||||
|
||||
public fun String.capitalize(): String {
|
||||
return replaceFirstChar { if (it.isLowerCase()) it.titlecase(Locale.ROOT) else it.toString() }
|
||||
}
|
@ -25,7 +25,10 @@ import net.mamoe.mirai.internal.network.components.SsoProcessor
|
||||
import net.mamoe.mirai.internal.network.handler.NetworkHandler
|
||||
import net.mamoe.mirai.internal.network.impl.netty.asCoroutineExceptionHandler
|
||||
import net.mamoe.mirai.supervisorJob
|
||||
import net.mamoe.mirai.utils.*
|
||||
import net.mamoe.mirai.utils.BotConfiguration
|
||||
import net.mamoe.mirai.utils.MiraiLogger
|
||||
import net.mamoe.mirai.utils.childScopeContext
|
||||
import net.mamoe.mirai.utils.info
|
||||
import kotlin.collections.set
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
|
||||
@ -118,7 +121,7 @@ internal abstract class AbstractBot constructor(
|
||||
if (!components[SsoProcessor].firstLoginSucceed) {
|
||||
this.close() // failed to do first login.
|
||||
}
|
||||
throw e.causes().find { it !is CancellationException } ?: e // emit internal errors
|
||||
throw e
|
||||
}
|
||||
logger.info { "Bot login successful." }
|
||||
}
|
||||
|
@ -21,7 +21,6 @@ import net.mamoe.mirai.internal.asQQAndroidBot
|
||||
import net.mamoe.mirai.internal.network.component.ComponentKey
|
||||
import net.mamoe.mirai.internal.network.handler.NetworkHandler
|
||||
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
|
||||
import net.mamoe.mirai.internal.network.handler.awaitState
|
||||
import net.mamoe.mirai.utils.castOrNull
|
||||
import net.mamoe.mirai.utils.info
|
||||
import net.mamoe.mirai.utils.millisToHumanReadableString
|
||||
@ -43,6 +42,7 @@ private data class BotClosedByEvent(val event: BotOfflineEvent) : RuntimeExcepti
|
||||
|
||||
internal class BotOfflineEventMonitorImpl : BotOfflineEventMonitor {
|
||||
override fun attachJob(bot: AbstractBot, scope: CoroutineScope) {
|
||||
return
|
||||
bot.eventChannel.parentScope(scope).subscribeAlways(
|
||||
::onEvent,
|
||||
priority = EventPriority.MONITOR,
|
||||
@ -56,9 +56,6 @@ internal class BotOfflineEventMonitorImpl : BotOfflineEventMonitor {
|
||||
|
||||
fun closeNetwork() {
|
||||
if (network.state == State.CLOSED) return // avoid recursive calls.
|
||||
launch {
|
||||
network.awaitState(State.CLOSED)
|
||||
}
|
||||
network.close(BotClosedByEvent(event))
|
||||
}
|
||||
|
||||
@ -84,6 +81,7 @@ internal class BotOfflineEventMonitorImpl : BotOfflineEventMonitor {
|
||||
-> {
|
||||
val causeMessage = event.castOrNull<BotOfflineEvent.CauseAware>()?.cause?.toString() ?: event.toString()
|
||||
bot.logger.info { "Connection lost, retrying login ($causeMessage)." }
|
||||
closeNetwork()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -30,6 +30,6 @@ internal class NetworkHandlerContextImpl(
|
||||
storage: ComponentStorage // should be the same as bot.components
|
||||
) : NetworkHandlerContext, ComponentStorage by storage {
|
||||
override fun toString(): String {
|
||||
return "NetworkHandlerContextImpl(storage=$)"
|
||||
return "NetworkHandlerContextImpl"
|
||||
}
|
||||
}
|
||||
|
@ -9,6 +9,7 @@
|
||||
|
||||
package net.mamoe.mirai.internal.network.handler
|
||||
|
||||
import kotlinx.atomicfu.locks.SynchronizedObject
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.ReceiveChannel
|
||||
@ -24,6 +25,7 @@ import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
|
||||
import net.mamoe.mirai.utils.*
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.coroutines.EmptyCoroutineContext
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
/**
|
||||
@ -31,7 +33,7 @@ import kotlin.reflect.KClass
|
||||
*/
|
||||
internal abstract class NetworkHandlerSupport(
|
||||
override val context: NetworkHandlerContext,
|
||||
final override val coroutineContext: CoroutineContext = SupervisorJob(),
|
||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||
) : NetworkHandler, CoroutineScope by coroutineContext.childScope(SupervisorJob()) {
|
||||
|
||||
protected abstract fun initialState(): BaseStateImpl
|
||||
@ -151,7 +153,9 @@ internal abstract class NetworkHandlerSupport(
|
||||
*/
|
||||
abstract inner class BaseStateImpl(
|
||||
val correspondingState: NetworkHandler.State,
|
||||
) : CoroutineScope by CoroutineScope(coroutineContext + Job(coroutineContext.job)) {
|
||||
) : CoroutineScope {
|
||||
final override val coroutineContext: CoroutineContext =
|
||||
this@NetworkHandlerSupport.coroutineContext + Job(this@NetworkHandlerSupport.coroutineContext.job)
|
||||
|
||||
open fun getCause(): Throwable? = null
|
||||
|
||||
@ -184,15 +188,19 @@ internal abstract class NetworkHandlerSupport(
|
||||
private set
|
||||
|
||||
final override val state: NetworkHandler.State get() = _state.correspondingState
|
||||
|
||||
override fun getLastFailure(): Throwable? = _state.getCause()
|
||||
|
||||
private val _stateChannel = Channel<NetworkHandler.State>(0)
|
||||
final override val stateChannel: ReceiveChannel<NetworkHandler.State> get() = _stateChannel
|
||||
|
||||
private val setStateLock = SynchronizedObject()
|
||||
|
||||
protected data class StateSwitchingException(
|
||||
val old: BaseStateImpl,
|
||||
val new: BaseStateImpl,
|
||||
) : CancellationException("State is switched from $old to $new")
|
||||
|
||||
|
||||
/**
|
||||
* Attempts to change state.
|
||||
*
|
||||
@ -209,7 +217,7 @@ internal abstract class NetworkHandlerSupport(
|
||||
*/
|
||||
protected inline fun <reified S : BaseStateImpl> BaseStateImpl.setState(
|
||||
noinline new: () -> S
|
||||
): S? = synchronized(this@NetworkHandlerSupport) {
|
||||
): S? = synchronized(setStateLock) {
|
||||
if (_state === this) {
|
||||
this@NetworkHandlerSupport.setState(new)
|
||||
} else {
|
||||
@ -235,9 +243,9 @@ internal abstract class NetworkHandlerSupport(
|
||||
*/
|
||||
//
|
||||
@TestOnly
|
||||
internal fun <S : BaseStateImpl> setStateImpl(newType: KClass<S>?, new: () -> S): S? = synchronized(this) {
|
||||
internal fun <S : BaseStateImpl> setStateImpl(newType: KClass<S>?, new: () -> S): S? = synchronized(setStateLock) {
|
||||
val old = _state
|
||||
if (newType != null && old::class == newType) return@synchronized null // already set to expected state by another thread. Avoid replications.
|
||||
if (newType != null && old::class == newType) return null // already set to expected state by another thread. Avoid replications.
|
||||
if (old.correspondingState == NetworkHandler.State.CLOSED) return null // CLOSED is final.
|
||||
|
||||
val stateObserver = context.getOrNull(StateObserver)
|
||||
|
@ -13,6 +13,7 @@ import kotlinx.atomicfu.atomic
|
||||
import kotlinx.coroutines.yield
|
||||
import net.mamoe.mirai.internal.network.handler.NetworkHandler
|
||||
import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory
|
||||
import net.mamoe.mirai.utils.ExceptionCollector
|
||||
import net.mamoe.mirai.utils.systemProp
|
||||
import net.mamoe.mirai.utils.toLongUnsigned
|
||||
|
||||
@ -23,8 +24,8 @@ import net.mamoe.mirai.utils.toLongUnsigned
|
||||
* - Re-initialize [NetworkHandler] instances if the old one is dead.
|
||||
* - Suspends requests when connection is not available.
|
||||
*
|
||||
* No connection is created until first invocation of [getResumedInstance],
|
||||
* and new connections are created only when calling [getResumedInstance] if the old connection was dead.
|
||||
* No connection is created until first invocation of [getCurrentInstanceOrNull],
|
||||
* and new connections are created only when calling [getCurrentInstanceOrNull] if the old connection was dead.
|
||||
*/
|
||||
// may be replaced with a better name.
|
||||
internal abstract class AbstractKeepAliveNetworkHandlerSelector<H : NetworkHandler>(
|
||||
@ -44,56 +45,71 @@ internal abstract class AbstractKeepAliveNetworkHandlerSelector<H : NetworkHandl
|
||||
|
||||
protected abstract fun createInstance(): H
|
||||
|
||||
final override fun getResumedInstance(): H? = current.value
|
||||
final override fun getCurrentInstanceOrNull(): H? = current.value
|
||||
|
||||
final override tailrec fun tryResumeInstanceOrCreate(): H {
|
||||
getResumedInstance()?.let { return it }
|
||||
final override tailrec fun getCurrentInstanceOrCreate(): H {
|
||||
getCurrentInstanceOrNull()?.let { return it }
|
||||
refreshInstance()
|
||||
return tryResumeInstanceOrCreate()
|
||||
return getCurrentInstanceOrCreate()
|
||||
}
|
||||
|
||||
final override suspend fun awaitResumeInstance(): H = awaitResumeInstanceImpl(0)
|
||||
final override suspend fun awaitResumeInstance(): H = AwaitResumeInstance().run()
|
||||
|
||||
private tailrec suspend fun awaitResumeInstanceImpl(attempted: Int): H {
|
||||
if (attempted >= maxAttempts) error("Failed to resume instance. Maximum attempts reached.")
|
||||
yield()
|
||||
val current = getResumedInstance()
|
||||
return if (current != null) {
|
||||
when (val thisState = current.state) {
|
||||
NetworkHandler.State.CLOSED -> {
|
||||
this.current.compareAndSet(current, null) // invalidate the instance and try again.
|
||||
awaitResumeInstanceImpl(attempted + 1) // will create new instance.
|
||||
}
|
||||
NetworkHandler.State.CONNECTING,
|
||||
NetworkHandler.State.INITIALIZED -> {
|
||||
current.resumeConnection() // once finished, it should has been LOADING or OK
|
||||
check(current.state != thisState) { "Internal error: State is still $thisState after successful resumeConnection." } // this should not happen.
|
||||
return awaitResumeInstanceImpl(attempted) // does not count for an attempt.
|
||||
}
|
||||
NetworkHandler.State.LOADING -> {
|
||||
return current
|
||||
}
|
||||
NetworkHandler.State.OK -> {
|
||||
current.resumeConnection()
|
||||
return current
|
||||
}
|
||||
private inner class AwaitResumeInstance {
|
||||
private var attempted: Int = 0
|
||||
private val exceptionCollector: ExceptionCollector = ExceptionCollector()
|
||||
|
||||
tailrec suspend fun run(): H {
|
||||
if (attempted >= maxAttempts) {
|
||||
throw IllegalStateException(
|
||||
"Failed to resume instance. Maximum attempts reached.",
|
||||
exceptionCollector.getLast()
|
||||
)
|
||||
}
|
||||
yield() // Avoid endless recursion.
|
||||
val current = getCurrentInstanceOrNull()
|
||||
return if (current != null) {
|
||||
when (val thisState = current.state) {
|
||||
NetworkHandler.State.CLOSED -> {
|
||||
if (this@AbstractKeepAliveNetworkHandlerSelector.current.compareAndSet(current, null)) {
|
||||
// invalidate the instance and try again.
|
||||
|
||||
exceptionCollector.collectException(current.getLastFailure())
|
||||
}
|
||||
attempted += 1
|
||||
run() // will create new instance.
|
||||
}
|
||||
NetworkHandler.State.CONNECTING,
|
||||
NetworkHandler.State.INITIALIZED -> {
|
||||
current.resumeConnection() // once finished, it should has been LOADING or OK
|
||||
check(current.state != thisState) { "Internal error: State is still $thisState after successful resumeConnection." } // this should not happen.
|
||||
return run() // does not count for an attempt.
|
||||
}
|
||||
NetworkHandler.State.LOADING -> {
|
||||
return current
|
||||
}
|
||||
NetworkHandler.State.OK -> {
|
||||
current.resumeConnection()
|
||||
return current
|
||||
}
|
||||
}
|
||||
} else {
|
||||
refreshInstance()
|
||||
run() // directly retry, does not count for attempts.
|
||||
}
|
||||
} else {
|
||||
refreshInstance()
|
||||
awaitResumeInstanceImpl(attempted) // directly retry, does not count for attempts.
|
||||
}
|
||||
}
|
||||
|
||||
protected open fun refreshInstance() {
|
||||
synchronized(this) { // avoid concurrent `createInstance()`
|
||||
if (getResumedInstance() == null) this.current.compareAndSet(null, createInstance())
|
||||
if (getCurrentInstanceOrNull() == null) this.current.compareAndSet(null, createInstance())
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
@JvmField
|
||||
var DEFAULT_MAX_ATTEMPTS =
|
||||
systemProp("mirai.network.handler.selector.max.attempts", 3)
|
||||
systemProp("mirai.network.handler.selector.max.attempts", Long.MAX_VALUE)
|
||||
.coerceIn(1..Int.MAX_VALUE.toLongUnsigned()).toInt()
|
||||
}
|
||||
}
|
@ -38,6 +38,7 @@ internal class FactoryKeepAliveNetworkHandlerSelector<H : NetworkHandler> : Abst
|
||||
override fun createInstance(): H =
|
||||
factory.create(
|
||||
context,
|
||||
context[ServerList].pollCurrent()?.toSocketAddress() ?: throw NoServerAvailableException()
|
||||
// context[ServerList].pollCurrent()?.toSocketAddress() ?: throw NoServerAvailableException()
|
||||
context[ServerList].pollAny().toSocketAddress()
|
||||
)
|
||||
}
|
@ -13,7 +13,9 @@ import net.mamoe.mirai.internal.network.handler.NetworkHandler
|
||||
import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory
|
||||
|
||||
/**
|
||||
* A director([selector][SelectorNetworkHandler.selector]) of [NetworkHandler].
|
||||
* A **lazy** director([selector][SelectorNetworkHandler.selector]) of [NetworkHandler].
|
||||
*
|
||||
* *lazy* means that no action is taken at any time until member functions are invoked.
|
||||
*
|
||||
* It can produce [H] instances (maybe by calling [NetworkHandlerFactory]), to be used by [SelectorNetworkHandler]
|
||||
*
|
||||
@ -21,20 +23,21 @@ import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory
|
||||
*/
|
||||
internal interface NetworkHandlerSelector<H : NetworkHandler> {
|
||||
/**
|
||||
* Returns an instance immediately without suspension, or `null` if instance not ready.
|
||||
* Returns an instance immediately without suspension, or `null` if instance not ready. Returned [H] can be in any states.
|
||||
*
|
||||
* This function should not throw any exception.
|
||||
* @see awaitResumeInstance
|
||||
*/
|
||||
fun getResumedInstance(): H?
|
||||
fun getCurrentInstanceOrNull(): H?
|
||||
|
||||
/**
|
||||
* Returns the currently alive [NetworkHandler] or creates a new one.
|
||||
* Returns the current [NetworkHandler] or creates a new one if it is `null`. Returned [H] can be in any states.
|
||||
*/
|
||||
fun tryResumeInstanceOrCreate(): H
|
||||
fun getCurrentInstanceOrCreate(): H
|
||||
|
||||
/**
|
||||
* Returns an alive [NetworkHandler], or suspends the coroutine until the connection has been made again.
|
||||
* Returned [H] can be in [NetworkHandler.State.OK] only (but it may happen that the state changed just after returning from this function).
|
||||
*
|
||||
* This function may throw exceptions, which would be propagated to the original caller of [SelectorNetworkHandler.resumeConnection].
|
||||
*/
|
||||
|
@ -14,7 +14,10 @@ import kotlinx.coroutines.channels.ReceiveChannel
|
||||
import net.mamoe.mirai.internal.network.handler.NetworkHandler
|
||||
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
|
||||
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext
|
||||
import net.mamoe.mirai.internal.network.handler.awaitState
|
||||
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
|
||||
import net.mamoe.mirai.utils.findCauseOrSelf
|
||||
import net.mamoe.mirai.utils.hierarchicalName
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
|
||||
/**
|
||||
@ -34,6 +37,12 @@ import kotlin.coroutines.CoroutineContext
|
||||
internal class SelectorNetworkHandler(
|
||||
override val context: NetworkHandlerContext, // impl notes: may consider to move into function member.
|
||||
private val selector: NetworkHandlerSelector<*>,
|
||||
/**
|
||||
* If `true`, a watcher job will be started to call [resumeConnection] when network is closed by [NetworkException] and [NetworkException.recoverable] is `true`.
|
||||
*
|
||||
* This is required for automatic reconnecting after network failure or system hibernation, since [NetworkHandler] is lazy and will reconnect iff [resumeConnection] is called.
|
||||
*/
|
||||
allowActiveMaintenance: Boolean = true,
|
||||
) : NetworkHandler {
|
||||
@Volatile
|
||||
private var lastCancellationCause: Throwable? = null
|
||||
@ -44,10 +53,39 @@ internal class SelectorNetworkHandler(
|
||||
return selector.awaitResumeInstance()
|
||||
}
|
||||
|
||||
init {
|
||||
if (allowActiveMaintenance) {
|
||||
val bot = context.bot
|
||||
scope.launch(scope.hierarchicalName("BotOnlineWatchdog ${bot.id}")) {
|
||||
while (isActive) {
|
||||
val instance = selector.getCurrentInstanceOrCreate()
|
||||
|
||||
awaitState(State.CLOSED) // suspend until next CLOSED
|
||||
|
||||
if (!bot.isActive || !isActive) return@launch
|
||||
if (selector.getCurrentInstanceOrNull() != instance) continue // instance already changed by other threads.
|
||||
|
||||
delay(3000) // make it slower to avoid massive reconnection on network failure.
|
||||
|
||||
val failure = getLastFailure()
|
||||
if (failure?.findCauseOrSelf { it is NetworkException && it.recoverable } != null) {
|
||||
try {
|
||||
resumeConnection() // notify selector to actively resume now.
|
||||
} catch (ignored: Exception) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override val state: State
|
||||
get() = selector.tryResumeInstanceOrCreate().state
|
||||
get() = selector.getCurrentInstanceOrCreate().state
|
||||
|
||||
override fun getLastFailure(): Throwable? = selector.getCurrentInstanceOrCreate().getLastFailure()
|
||||
|
||||
override val stateChannel: ReceiveChannel<State>
|
||||
get() = selector.tryResumeInstanceOrCreate().stateChannel
|
||||
get() = selector.getCurrentInstanceOrCreate().stateChannel
|
||||
|
||||
override suspend fun resumeConnection() {
|
||||
instance() // the selector will resume connection for us.
|
||||
@ -66,12 +104,12 @@ internal class SelectorNetworkHandler(
|
||||
return
|
||||
}
|
||||
}
|
||||
selector.getResumedInstance()?.close(cause)
|
||||
selector.getCurrentInstanceOrNull()?.close(cause)
|
||||
}
|
||||
|
||||
override val coroutineContext: CoroutineContext
|
||||
get() = selector.getResumedInstance()?.coroutineContext ?: scope.coroutineContext // merely use fallback
|
||||
get() = selector.getCurrentInstanceOrNull()?.coroutineContext ?: scope.coroutineContext // merely use fallback
|
||||
|
||||
override fun toString(): String = "SelectorNetworkHandler(currentInstance=${selector.getResumedInstance()})"
|
||||
override fun toString(): String = "SelectorNetworkHandler(currentInstance=${selector.getCurrentInstanceOrNull()})"
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,17 @@
|
||||
/*
|
||||
* Copyright 2019-2021 Mamoe Technologies and contributors.
|
||||
*
|
||||
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
|
||||
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
|
||||
*
|
||||
* https://github.com/mamoe/mirai/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package net.mamoe.mirai.internal.network.impl.netty
|
||||
|
||||
import net.mamoe.mirai.internal.network.handler.selector.NetworkException
|
||||
|
||||
internal data class NettyChannelException(
|
||||
override val message: String? = null,
|
||||
override val cause: Throwable? = null
|
||||
) : NetworkException(true)
|
@ -28,10 +28,7 @@ import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext
|
||||
import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
|
||||
import net.mamoe.mirai.internal.network.handler.state.StateObserver
|
||||
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
|
||||
import net.mamoe.mirai.utils.ExceptionCollector
|
||||
import net.mamoe.mirai.utils.childScope
|
||||
import net.mamoe.mirai.utils.debug
|
||||
import net.mamoe.mirai.utils.systemProp
|
||||
import net.mamoe.mirai.utils.*
|
||||
import java.io.EOFException
|
||||
import java.net.SocketAddress
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
@ -77,6 +74,7 @@ internal open class NettyNetworkHandler(
|
||||
protected open fun handlePipelineException(ctx: ChannelHandlerContext, error: Throwable) {
|
||||
context.bot.logger.error(error)
|
||||
synchronized(this) {
|
||||
setState { StateClosed(NettyChannelException(cause = error)) }
|
||||
if (_state !is StateConnecting) {
|
||||
setState { StateConnecting(ExceptionCollector(error)) }
|
||||
} else {
|
||||
@ -134,6 +132,8 @@ internal open class NettyNetworkHandler(
|
||||
|
||||
// can be overridden for tests
|
||||
protected open suspend fun createConnection(decodePipeline: PacketDecodePipeline): NettyChannel {
|
||||
packetLogger.debug { "Connecting to $address" }
|
||||
|
||||
val contextResult = CompletableDeferred<NettyChannel>()
|
||||
val eventLoopGroup = NioEventLoopGroup()
|
||||
|
||||
@ -142,6 +142,7 @@ internal open class NettyNetworkHandler(
|
||||
.option(ChannelOption.SO_KEEPALIVE, true)
|
||||
.handler(object : ChannelInitializer<SocketChannel>() {
|
||||
override fun initChannel(ch: SocketChannel) {
|
||||
setupChannelPipeline(ch.pipeline(), decodePipeline)
|
||||
ch.pipeline()
|
||||
.addLast(object : ChannelInboundHandlerAdapter() {
|
||||
override fun channelInactive(ctx: ChannelHandlerContext?) {
|
||||
@ -149,7 +150,6 @@ internal open class NettyNetworkHandler(
|
||||
}
|
||||
})
|
||||
|
||||
setupChannelPipeline(ch.pipeline(), decodePipeline)
|
||||
}
|
||||
})
|
||||
.connect(address)
|
||||
@ -164,7 +164,7 @@ internal open class NettyNetworkHandler(
|
||||
|
||||
future.channel().closeFuture().addListener {
|
||||
if (_state.correspondingState == State.CLOSED) return@addListener
|
||||
setState { StateConnecting(ExceptionCollector(it.cause())) }
|
||||
setState { StateClosed(it.cause()) }
|
||||
}
|
||||
|
||||
return contextResult.await()
|
||||
@ -207,6 +207,14 @@ internal open class NettyNetworkHandler(
|
||||
protected abstract inner class NettyState(
|
||||
correspondingState: State
|
||||
) : BaseStateImpl(correspondingState) {
|
||||
init {
|
||||
coroutineContext.job.invokeOnCompletion { e ->
|
||||
if (correspondingState != State.CLOSED) {
|
||||
if (e != null) setState { StateClosed(e.unwrapCancellationException()) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return `true` if packet has been sent, `false` if state is not ready for send.
|
||||
* @throws IllegalStateException if is [StateClosed].
|
||||
@ -276,13 +284,13 @@ internal open class NettyNetworkHandler(
|
||||
|
||||
override fun getCause(): Throwable? = collectiveExceptions.getLast()
|
||||
|
||||
override suspend fun sendPacketImpl(packet: OutgoingPacket): Boolean {
|
||||
override suspend fun sendPacketImpl(packet: OutgoingPacket): Boolean = runUnwrapCancellationException {
|
||||
connection.await() // split line number
|
||||
.writeAndFlushOrCloseAsync(packet)
|
||||
return true
|
||||
}
|
||||
|
||||
override suspend fun resumeConnection0() {
|
||||
override suspend fun resumeConnection0() = runUnwrapCancellationException {
|
||||
connectResult.await() // propagates exceptions
|
||||
val connection = connection.await()
|
||||
this.setState { StateLoading(connection) }
|
||||
@ -318,7 +326,7 @@ internal open class NettyNetworkHandler(
|
||||
context[ConfigPushProcessor].syncConfigPush(this@NettyNetworkHandler)
|
||||
}
|
||||
|
||||
override suspend fun resumeConnection0() {
|
||||
override suspend fun resumeConnection0(): Unit = runUnwrapCancellationException {
|
||||
(coroutineContext.job as CompletableJob).run {
|
||||
complete()
|
||||
join()
|
||||
@ -356,7 +364,7 @@ internal open class NettyNetworkHandler(
|
||||
return true
|
||||
}
|
||||
|
||||
override suspend fun resumeConnection0() {
|
||||
override suspend fun resumeConnection0(): Unit = runUnwrapCancellationException {
|
||||
joinCompleted(coroutineContext.job)
|
||||
for (job in heartbeatJobs) joinCompleted(job)
|
||||
joinCompleted(configPush)
|
||||
|
@ -50,7 +50,7 @@ internal class KeepAliveNetworkHandlerSelectorTest : AbstractMockNetworkHandlerT
|
||||
}
|
||||
}
|
||||
runBlockingUnit(timeout = Duration.seconds(1)) { selector.awaitResumeInstance() }
|
||||
assertNotNull(selector.getResumedInstance())
|
||||
assertNotNull(selector.getCurrentInstanceOrNull())
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -60,7 +60,7 @@ internal class KeepAliveNetworkHandlerSelectorTest : AbstractMockNetworkHandlerT
|
||||
}
|
||||
val handler = createNetworkHandler()
|
||||
selector.setCurrent(handler)
|
||||
assertSame(handler, selector.getResumedInstance())
|
||||
assertSame(handler, selector.getCurrentInstanceOrNull())
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -70,7 +70,7 @@ internal class KeepAliveNetworkHandlerSelectorTest : AbstractMockNetworkHandlerT
|
||||
}
|
||||
val handler = createNetworkHandler()
|
||||
selector.setCurrent(handler)
|
||||
assertSame(handler, selector.getResumedInstance())
|
||||
assertSame(handler, selector.getCurrentInstanceOrNull())
|
||||
handler.setState(State.CLOSED)
|
||||
runBlockingUnit(timeout = Duration.seconds(3)) { selector.awaitResumeInstance() }
|
||||
assertEquals(1, selector.createInstanceCount.get())
|
||||
|
@ -10,9 +10,15 @@
|
||||
package net.mamoe.mirai.internal.network.handler
|
||||
|
||||
import io.netty.channel.Channel
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Job
|
||||
import net.mamoe.mirai.internal.network.components.EventDispatcher
|
||||
import net.mamoe.mirai.internal.network.components.HeartbeatFailureHandler
|
||||
import net.mamoe.mirai.internal.network.components.HeartbeatScheduler
|
||||
import net.mamoe.mirai.internal.network.framework.AbstractRealNetworkHandlerTest
|
||||
import net.mamoe.mirai.internal.network.handler.selector.SelectorNetworkHandler
|
||||
import net.mamoe.mirai.internal.network.impl.netty.AbstractNettyNHTest
|
||||
import net.mamoe.mirai.internal.network.impl.netty.HeartbeatFailedException
|
||||
import net.mamoe.mirai.internal.network.impl.netty.TestNettyNH
|
||||
import net.mamoe.mirai.internal.test.runBlockingUnit
|
||||
import net.mamoe.mirai.utils.cast
|
||||
@ -46,4 +52,35 @@ internal class SelectorNetworkHandlerTest : AbstractRealNetworkHandlerTest<Selec
|
||||
network.close(IllegalStateException("Closed by test"))
|
||||
assertFails { network.resumeConnection() }
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Emulates system hibernation and network failure.
|
||||
* @see HeartbeatFailedException
|
||||
*/
|
||||
@Test
|
||||
fun `can recover on heartbeat failure`() = runBlockingUnit {
|
||||
val heartbeatScheduler = object : HeartbeatScheduler {
|
||||
lateinit var onHeartFailure: HeartbeatFailureHandler
|
||||
override fun launchJobsIn(
|
||||
network: NetworkHandlerSupport,
|
||||
scope: CoroutineScope,
|
||||
onHeartFailure: HeartbeatFailureHandler
|
||||
): List<Job> {
|
||||
this.onHeartFailure = onHeartFailure
|
||||
return listOf(Job())
|
||||
}
|
||||
}
|
||||
defaultComponents[HeartbeatScheduler] = heartbeatScheduler
|
||||
|
||||
bot.login()
|
||||
bot.network.context[EventDispatcher].joinBroadcast()
|
||||
assertState(NetworkHandler.State.OK)
|
||||
|
||||
heartbeatScheduler.onHeartFailure("Test", HeartbeatFailedException("test", null))
|
||||
assertState(NetworkHandler.State.CLOSED)
|
||||
|
||||
bot.network.resumeConnection() // in real, this is called by BotOnlineWatchdog in SelectorNetworkHandler
|
||||
assertState(NetworkHandler.State.OK)
|
||||
}
|
||||
}
|
@ -15,7 +15,7 @@ import net.mamoe.mirai.event.events.BotEvent
|
||||
import net.mamoe.mirai.event.events.BotOfflineEvent
|
||||
import net.mamoe.mirai.event.events.BotReloginEvent
|
||||
import net.mamoe.mirai.event.nextEvent
|
||||
import net.mamoe.mirai.internal.network.handler.NetworkHandler
|
||||
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State.OK
|
||||
import net.mamoe.mirai.internal.test.assertEventBroadcasts
|
||||
import net.mamoe.mirai.internal.test.runBlockingUnit
|
||||
import net.mamoe.mirai.utils.firstIsInstanceOrNull
|
||||
@ -62,6 +62,6 @@ internal class NettyBotNormalLoginTest : AbstractNettyNHTest() {
|
||||
throw events.firstIsInstanceOrNull<BotOfflineEvent.Dropped>()!!.cause!!
|
||||
}
|
||||
}
|
||||
assertState(NetworkHandler.State.OK)
|
||||
assertState(OK)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user