Redesign connection maintenance mechanism:

Reconnection is directed by `BotOfflineEvent`.

### Event Broadcasting

- PacketFactory `MessageSvcPushForceOffline` closes network with `ForceOfflineException`.
 - `network.close(cause)` pass the cause to state observer from `QQAndroidBot.stateObserverChain`, the observer broadcasts as follows:
   - Nothing if `cause` is `ForceOfflineException` or `BotClosedByEvent`(which is broadcast by user)
   - `BotOfflineEvent.Dropped` if cause is NetworkException and is recoverable(which is from Netty handlers)
   - `BotOfflineEvent.Force` if cause is `ForceOfflineException` is from the factory mentioned above.
   - `BotOfflineEvent.Active` otherwise(any other unexpected exceptions considered as an error)

### Deciding whether to reconnect

User can listen and change `BotOfflineEvent.reconnect` to decide. Default values are determined by `BotConfiguration`.

### Event listening

- Component `BotOfflineEventMonitor` listens `BotOfflineEvent`, logs the event for users, and launches a coroutine to do `network.resumeConnection` to notify the selector to renew an instance.
- `BotOnlineWatchdog` in `SelectorNetworkHandler` is removed.
- Selector now handles `maxAttempts` correctly in such a way that `awaitResumeInstance` does not throw exception until maximum attempts reached.
- On every attempt, new exceptions are logged to provide debugging information, and further duplications are ignored.
This commit is contained in:
Him188 2021-06-16 16:58:20 +08:00
parent 45ba713f73
commit a38f24cbe5
11 changed files with 130 additions and 107 deletions

View File

@ -43,7 +43,7 @@ public sealed class BotOfflineEvent : BotEvent, AbstractEvent() {
*
* 在调用 [Bot.close] , 如果 Bot 连接正常, 将会广播 [Active].
*
* 注意, 2.7 以前主动广播这个事件也可以让 [Bot] 离线, 而在 2.7-M1 起不会.
* 主动广播这个事件也可以让 [Bot] 离线, 但不建议这么做. 建议调用 [Bot.close].
*/
public data class Active(
public override val bot: Bot,

View File

@ -12,7 +12,7 @@ package net.mamoe.mirai.utils
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
public class ExceptionCollector {
public open class ExceptionCollector {
public constructor()
public constructor(initial: Throwable?) {
@ -25,17 +25,25 @@ public class ExceptionCollector {
}
}
protected open fun beforeCollect(throwable: Throwable) {
}
@Volatile
private var last: Throwable? = null
private val hashCodes = mutableSetOf<Long>()
/**
* @return `true` if [e] is new.
*/
@Synchronized
public fun collect(e: Throwable?) {
if (e == null) return
if (!hashCodes.add(hash(e))) return // filter out duplications
public fun collect(e: Throwable?): Boolean {
if (e == null) return false
if (!hashCodes.add(hash(e))) return false // filter out duplications
// we can also check suppressed exceptions of [e] but actual influence would be slight.
beforeCollect(e)
this.last?.let { e.addSuppressed(it) }
this.last = e
return true
}
private fun hash(e: Throwable): Long {
@ -55,8 +63,9 @@ public class ExceptionCollector {
/**
* Alias to [collect] to be used inside [withExceptionCollector]
* @return `true` if [e] is new.
*/
public fun collectException(e: Throwable?): Unit = collect(e)
public fun collectException(e: Throwable?): Boolean = collect(e)
public fun getLast(): Throwable? = last

View File

@ -38,12 +38,10 @@ import net.mamoe.mirai.internal.network.handler.selector.SelectorNetworkHandler
import net.mamoe.mirai.internal.network.handler.state.StateChangedObserver
import net.mamoe.mirai.internal.network.handler.state.StateObserver
import net.mamoe.mirai.internal.network.handler.state.safe
import net.mamoe.mirai.internal.network.impl.netty.ForceOfflineException
import net.mamoe.mirai.internal.network.impl.netty.NettyNetworkHandlerFactory
import net.mamoe.mirai.internal.utils.subLogger
import net.mamoe.mirai.utils.BotConfiguration
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.lateinitMutableProperty
import net.mamoe.mirai.utils.warning
import net.mamoe.mirai.utils.*
import kotlin.contracts.contract
internal fun Bot.asQQAndroidBot(): QQAndroidBot {
@ -99,16 +97,22 @@ internal open class QQAndroidBot constructor(
}
},
StateChangedObserver(State.OK, State.CLOSED) { new ->
// logging performed by BotOfflineEventMonitor
val cause = new.getCause()
if (cause is NetworkException && cause.recoverable) {
eventDispatcher.broadcastAsync(BotOfflineEvent.Dropped(bot, new.getCause()))
logger.warning { "Connection lost. Attempting to reconnect..." }
} else {
eventDispatcher.broadcastAsync(BotOfflineEvent.Active(bot, new.getCause()))
when {
cause is ForceOfflineException -> {
eventDispatcher.broadcastAsync(BotOfflineEvent.Force(bot, cause.title, cause.message))
}
cause is NetworkException && cause.recoverable -> {
eventDispatcher.broadcastAsync(BotOfflineEvent.Dropped(bot, cause))
}
cause is BotClosedByEvent -> {
}
else -> {
// any other unexpected exceptions considered as an error
eventDispatcher.broadcastAsync(BotOfflineEvent.Active(bot, cause))
}
}
},
StateChangedObserver(to = State.OK) { new ->
components[BotOfflineEventMonitor].attachJob(bot, new)
},
).safe(logger.subLogger("StateObserver"))
}
@ -120,6 +124,8 @@ internal open class QQAndroidBot constructor(
private val defaultBotLevelComponents: ComponentStorage by lateinitMutableProperty {
createBotLevelComponents().apply {
set(StateObserver, stateObserverChain())
}.also { components ->
components[BotOfflineEventMonitor].attachJob(bot, this)
}
}
@ -180,7 +186,6 @@ internal open class QQAndroidBot constructor(
override fun createNetworkHandler(): NetworkHandler {
return SelectorNetworkHandler(
bot,
KeepAliveNetworkHandlerSelector(
maxAttempts = configuration.reconnectionRetryTimes.coerceIn(1, Int.MAX_VALUE)
) {

View File

@ -21,9 +21,11 @@ import net.mamoe.mirai.internal.asQQAndroidBot
import net.mamoe.mirai.internal.network.component.ComponentKey
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.selector.NetworkException
import net.mamoe.mirai.utils.castOrNull
import net.mamoe.mirai.utils.info
import net.mamoe.mirai.utils.millisToHumanReadableString
import net.mamoe.mirai.utils.warning
import kotlin.system.measureTimeMillis
/**
@ -38,11 +40,13 @@ internal interface BotOfflineEventMonitor {
fun attachJob(bot: AbstractBot, scope: CoroutineScope)
}
private data class BotClosedByEvent(val event: BotOfflineEvent) : RuntimeException("Bot is closed by event '$event'.")
internal data class BotClosedByEvent(
val event: BotOfflineEvent,
override val message: String? = "Bot is closed by event '$event'."
) : NetworkException(false)
internal class BotOfflineEventMonitorImpl : BotOfflineEventMonitor {
override fun attachJob(bot: AbstractBot, scope: CoroutineScope) {
return // leave it until 2.7-RC
bot.eventChannel.parentScope(scope).subscribeAlways(
::onEvent,
priority = EventPriority.MONITOR,
@ -56,7 +60,7 @@ internal class BotOfflineEventMonitorImpl : BotOfflineEventMonitor {
fun closeNetwork() {
if (network.state == State.CLOSED) return // avoid recursive calls.
network.close(BotClosedByEvent(event))
network.close(if (event is BotOfflineEvent.CauseAware) event.cause else BotClosedByEvent(event))
}
when (event) {
@ -72,7 +76,7 @@ internal class BotOfflineEventMonitorImpl : BotOfflineEventMonitor {
closeNetwork()
}
is BotOfflineEvent.Force -> {
bot.logger.info { "Connection occupied by another android device: ${event.message}" }
bot.logger.warning { "Connection occupied by another android device: ${event.message}" }
closeNetwork()
}
is BotOfflineEvent.MsfOffline,
@ -80,17 +84,22 @@ internal class BotOfflineEventMonitorImpl : BotOfflineEventMonitor {
is BotOfflineEvent.RequireReconnect,
-> {
val causeMessage = event.castOrNull<BotOfflineEvent.CauseAware>()?.cause?.toString() ?: event.toString()
bot.logger.info { "Connection lost, retrying login ($causeMessage)." }
bot.logger.warning { "Connection lost, reconnecting... ($causeMessage)" }
closeNetwork()
}
}
if (event.reconnect) {
launchRecovery(bot)
}
}
private fun launchRecovery(bot: AbstractBot) {
bot.launch {
val success: Boolean
val time = measureTimeMillis {
success = kotlin.runCatching {
bot.login() // selector will create new NH to replace the old, closed one, with some further comprehensive considerations. For example, limitation for attempts.
bot.network.resumeConnection()
}.isSuccess
}
@ -99,6 +108,5 @@ internal class BotOfflineEventMonitorImpl : BotOfflineEventMonitor {
}
}
}
}
}

View File

@ -10,12 +10,15 @@
package net.mamoe.mirai.internal.network.handler.selector
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.delay
import kotlinx.coroutines.yield
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory
import net.mamoe.mirai.internal.network.handler.logger
import net.mamoe.mirai.utils.ExceptionCollector
import net.mamoe.mirai.utils.systemProp
import net.mamoe.mirai.utils.toLongUnsigned
import net.mamoe.mirai.utils.unwrapCancellationException
/**
* A lazy stateful implementation of [NetworkHandlerSelector].
@ -57,9 +60,21 @@ internal abstract class AbstractKeepAliveNetworkHandlerSelector<H : NetworkHandl
private inner class AwaitResumeInstance {
private var attempted: Int = 0
private val exceptionCollector: ExceptionCollector = ExceptionCollector()
private var lastNetwork: H? = null
private val exceptionCollector: ExceptionCollector = object : ExceptionCollector() {
override fun beforeCollect(throwable: Throwable) {
lastNetwork?.logger?.warning(throwable)
}
}
tailrec suspend fun run(): H {
suspend fun run(): H {
return runImpl().also {
exceptionCollector.dispose()
lastNetwork = null // help gc
}
}
private tailrec suspend fun runImpl(): H {
if (attempted >= maxAttempts) {
throw IllegalStateException(
"Failed to resume instance. Maximum attempts reached.",
@ -68,34 +83,49 @@ internal abstract class AbstractKeepAliveNetworkHandlerSelector<H : NetworkHandl
}
yield() // Avoid endless recursion.
val current = getCurrentInstanceOrNull()
lastNetwork = current
suspend fun H.resumeInstanceCatchingException() {
try {
resumeConnection() // once finished, it should has been LOADING or OK
} catch (ignored: Exception) {
// exception will be collected by `exceptionCollector.collectException(current.getLastFailure())`
// so that duplicated exceptions are ignored in logging
}
}
return if (current != null) {
when (val thisState = current.state) {
NetworkHandler.State.CLOSED -> {
if (this@AbstractKeepAliveNetworkHandlerSelector.current.compareAndSet(current, null)) {
// invalidate the instance and try again.
exceptionCollector.collectException(current.getLastFailure())
val lastFailure = current.getLastFailure()?.unwrapCancellationException()
exceptionCollector.collectException(lastFailure)
}
if (attempted > 1) {
delay(3000) // make it slower to avoid massive reconnection on network failure.
}
attempted += 1
run() // will create new instance.
runImpl() // will create new instance (see the `else` branch).
}
NetworkHandler.State.CONNECTING,
NetworkHandler.State.INITIALIZED -> {
current.resumeConnection() // once finished, it should has been LOADING or OK
current.resumeInstanceCatchingException()
check(current.state != thisState) { "Internal error: State is still $thisState after successful resumeConnection." } // this should not happen.
return run() // does not count for an attempt.
return runImpl() // does not count for an attempt.
}
NetworkHandler.State.LOADING -> {
return current
}
NetworkHandler.State.OK -> {
current.resumeConnection()
current.resumeInstanceCatchingException()
return current
}
}
} else {
refreshInstance()
run() // directly retry, does not count for attempts.
runImpl() // directly retry, does not count for attempts.
}
}
}

View File

@ -9,18 +9,16 @@
package net.mamoe.mirai.internal.network.handler.selector
import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.ReceiveChannel
import net.mamoe.mirai.Bot
import kotlinx.coroutines.isActive
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext
import net.mamoe.mirai.internal.network.handler.awaitState
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.utils.addNameHierarchically
import net.mamoe.mirai.utils.childScope
import net.mamoe.mirai.utils.findCauseOrSelf
import net.mamoe.mirai.utils.hierarchicalName
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.cancellation.CancellationException
@ -39,14 +37,7 @@ import kotlin.coroutines.cancellation.CancellationException
* @see NetworkHandlerSelector
*/
internal class SelectorNetworkHandler(
private val bot: Bot,
private val selector: NetworkHandlerSelector<*>,
/**
* If `true`, a watcher job will be started to call [resumeConnection] when network is closed by [NetworkException] and [NetworkException.recoverable] is `true`.
*
* This is required for automatic reconnecting after network failure or system hibernation, since [NetworkHandler] is lazy and will reconnect iff [resumeConnection] is called.
*/
allowActiveMaintenance: Boolean = true,
val selector: NetworkHandlerSelector<*>,
) : NetworkHandler {
@Volatile
private var lastCancellationCause: Throwable? = null
@ -67,35 +58,6 @@ internal class SelectorNetworkHandler(
return selector.awaitResumeInstance()
}
init {
if (allowActiveMaintenance) {
scope.launch(scope.hierarchicalName("BotOnlineWatchdog ${bot.id}")) {
fun isActive(): Boolean {
return isActive && bot.isActive
}
while (isActive()) {
val instance = selector.getCurrentInstanceOrCreate()
awaitState(State.CLOSED) // suspend until next CLOSED
if (!isActive()) return@launch
if (selector.getCurrentInstanceOrNull() != instance) continue // instance already changed by other threads.
delay(3000) // make it slower to avoid massive reconnection on network failure.
if (!isActive()) return@launch
val failure = getLastFailure()
if (failure?.findCauseOrSelf { it is NetworkException && it.recoverable } != null) {
try {
resumeConnection() // notify selector to actively resume now.
} catch (ignored: Exception) {
}
}
}
}
}
}
override val state: State
get() = selector.getCurrentInstanceOrCreate().state

View File

@ -38,7 +38,7 @@ internal open class NettyNetworkHandler(
) : NetworkHandlerSupport(context) {
override fun close(cause: Throwable?) {
if (state == State.CLOSED) return // already
setState { StateClosed(CancellationException("Closed manually.", cause)) }
setState { StateClosed(cause ?: CancellationException("Closed normally.")) }
super.close(cause)
// wrap an exception, more stacktrace information
}
@ -71,7 +71,7 @@ internal open class NettyNetworkHandler(
}
protected open fun handlePipelineException(ctx: ChannelHandlerContext, error: Throwable) {
context.bot.logger.error(error)
// context.bot.logger.error(error)
synchronized(this) {
setState { StateClosed(NettyChannelException(cause = error)) }
if (_state !is StateConnecting) {
@ -146,13 +146,19 @@ internal open class NettyNetworkHandler(
.addLast(object : ChannelInboundHandlerAdapter() {
override fun channelInactive(ctx: ChannelHandlerContext?) {
eventLoopGroup.shutdownGracefully()
contextResult.cancel()
}
})
}
})
.connect(address)
.awaitKt()
.runCatching {
awaitKt()
}.onFailure {
eventLoopGroup.shutdownGracefully()
contextResult.cancel()
}.getOrThrow()
contextResult.complete(future.channel())

View File

@ -11,7 +11,13 @@ package net.mamoe.mirai.internal.network.impl.netty
import net.mamoe.mirai.internal.network.handler.selector.NetworkException
internal data class ServerClosedException(
internal sealed class ServerClosedException(
override val message: String? = null,
override val cause: Throwable? = null
override val cause: Throwable? = null,
) : NetworkException(true)
internal class ForceOfflineException(
val title: String,
override val message: String,
cause: Throwable? = null,
) : ServerClosedException(message, cause)

View File

@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 Mamoe Technologies and contributors.
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
@ -10,13 +10,14 @@
package net.mamoe.mirai.internal.network.protocol.data.jce
import kotlinx.serialization.Serializable
import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.utils.io.JceStruct
import net.mamoe.mirai.internal.utils.io.serialization.tars.TarsId
@Serializable
internal class RequestPushForceOffline(
@TarsId(0) @JvmField val uin: Long,
@TarsId(1) @JvmField val title: String? = "",
@TarsId(2) @JvmField val tips: String? = "",
@TarsId(1) @JvmField val title: String = "",
@TarsId(2) @JvmField val tips: String = "",
@TarsId(3) @JvmField val sameDevice: Byte? = null
) : JceStruct
) : JceStruct, Packet

View File

@ -10,30 +10,26 @@
package net.mamoe.mirai.internal.network.protocol.packet.chat.receive
import kotlinx.io.core.ByteReadPacket
import net.mamoe.mirai.event.events.BotOfflineEvent
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.network.components.AccountSecretsManager
import net.mamoe.mirai.internal.network.impl.netty.ServerClosedException
import net.mamoe.mirai.internal.network.impl.netty.ForceOfflineException
import net.mamoe.mirai.internal.network.protocol.data.jce.RequestPushForceOffline
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketFactory
import net.mamoe.mirai.internal.utils.io.serialization.readUniPacket
import net.mamoe.mirai.utils.warning
/**
* 被挤下线
*/
internal object MessageSvcPushForceOffline :
OutgoingPacketFactory<BotOfflineEvent.Force>("MessageSvc.PushForceOffline") {
override suspend fun ByteReadPacket.decode(bot: QQAndroidBot): BotOfflineEvent.Force {
val struct = this.readUniPacket(RequestPushForceOffline.serializer())
@Suppress("INVISIBLE_MEMBER")
return BotOfflineEvent.Force(bot, title = struct.title ?: "", message = struct.tips ?: "")
OutgoingPacketFactory<RequestPushForceOffline>("MessageSvc.PushForceOffline") {
override suspend fun ByteReadPacket.decode(bot: QQAndroidBot): RequestPushForceOffline {
return readUniPacket(RequestPushForceOffline.serializer())
}
override suspend fun QQAndroidBot.handle(packet: BotOfflineEvent.Force) {
override suspend fun QQAndroidBot.handle(packet: RequestPushForceOffline) {
components[AccountSecretsManager].invalidate() // otherwise you receive `MessageSvc.PushForceOffline` again just after logging in.
bot.logger.warning { "Received a ${packet.title}: ${packet.message}. " }
network.close(ServerClosedException("Closed by MessageSvc.PushForceOffline: $packet"))
network.close(ForceOfflineException(packet.title, "Closed by MessageSvc.PushForceOffline: $packet"))
}
}

View File

@ -40,7 +40,7 @@ internal class SelectorNetworkHandlerTest : AbstractRealNetworkHandlerTest<Selec
override val factory: NetworkHandlerFactory<SelectorNetworkHandler> =
object : NetworkHandlerFactory<SelectorNetworkHandler> {
override fun create(context: NetworkHandlerContext, address: SocketAddress): SelectorNetworkHandler {
return SelectorNetworkHandler(context.bot, selector)
return SelectorNetworkHandler(selector)
}
}