Extract HeartbeatScheduler and add HeartbeatFailedException and NetworkException

This commit is contained in:
Him188 2021-06-05 13:44:52 +08:00
parent 0f7fad1fda
commit b7527a1b56
5 changed files with 137 additions and 45 deletions

View File

@ -127,6 +127,7 @@ internal open class QQAndroidBot constructor(
set(SsoProcessorContext, SsoProcessorContextImpl(bot))
set(SsoProcessor, SsoProcessorImpl(get(SsoProcessorContext)))
set(HeartbeatProcessor, HeartbeatProcessorImpl())
set(HeartbeatScheduler, TimeBasedHeartbeatSchedulerImpl(networkLogger.subLogger("HeartbeatScheduler")))
set(KeyRefreshProcessor, KeyRefreshProcessorImpl(networkLogger.subLogger("KeyRefreshProcessor")))
set(ConfigPushProcessor, ConfigPushProcessorImpl(networkLogger.subLogger("ConfigPushProcessor")))
set(BotOfflineEventMonitor, BotOfflineEventMonitorImpl())

View File

@ -0,0 +1,94 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.components
import kotlinx.coroutines.*
import net.mamoe.mirai.internal.network.component.ComponentKey
import net.mamoe.mirai.internal.network.component.ComponentStorage
import net.mamoe.mirai.internal.network.context.SsoProcessorContext
import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.info
internal typealias HeartbeatFailureHandler = (name: String, e: Throwable) -> Unit
/**
* Schedules [HeartbeatProcessor]
*/
internal interface HeartbeatScheduler {
fun launchJobsIn(
network: NetworkHandlerSupport,
scope: CoroutineScope,
onHeartFailure: HeartbeatFailureHandler
): List<Job>
companion object : ComponentKey<HeartbeatScheduler>
}
internal class TimeBasedHeartbeatSchedulerImpl(
private val logger: MiraiLogger,
) : HeartbeatScheduler {
override fun launchJobsIn(
network: NetworkHandlerSupport,
scope: CoroutineScope,
onHeartFailure: HeartbeatFailureHandler
): List<Job> {
val context: ComponentStorage = network.context
val heartbeatProcessor = context[HeartbeatProcessor]
val list = mutableListOf<Job>()
list += launchHeartbeatJobAsync(
scope = scope,
name = "StatHeartbeat",
timeout = { context[SsoProcessorContext].configuration.statHeartbeatPeriodMillis },
action = { heartbeatProcessor.doStatHeartbeatNow(network) },
onHeartFailure = onHeartFailure
)
list += launchHeartbeatJobAsync(
scope = scope,
name = "AliveHeartbeat",
timeout = { context[SsoProcessorContext].configuration.heartbeatPeriodMillis },
action = { heartbeatProcessor.doAliveHeartbeatNow(network) },
onHeartFailure = onHeartFailure
)
return list
}
private fun launchHeartbeatJobAsync(
scope: CoroutineScope,
name: String,
timeout: () -> Long,
action: suspend () -> Unit,
onHeartFailure: HeartbeatFailureHandler,
): Deferred<Unit> {
return scope.async(CoroutineName("$name Scheduler")) {
while (isActive) {
try {
delay(timeout())
} catch (e: CancellationException) {
return@async // considered normally cancel
}
try {
action()
} catch (e: Throwable) {
onHeartFailure(name, e)
}
}
}.apply {
invokeOnCompletion { e ->
if (e is CancellationException) return@invokeOnCompletion // normally closed
if (e != null) logger.info { "$name failed: $e." }
}
}
}
}

View File

@ -0,0 +1,17 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.handler.selector
internal abstract class NetworkException(
/**
* If true, the selector may recover the network handler by some means.
*/
val recoverable: Boolean,
) : Exception()

View File

@ -0,0 +1,17 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.impl.netty
import net.mamoe.mirai.internal.network.handler.selector.NetworkException
internal class HeartbeatFailedException(
override val message: String?,
override val cause: Throwable? = null
) : NetworkException(true)

View File

@ -23,14 +23,15 @@ import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import net.mamoe.mirai.internal.network.components.*
import net.mamoe.mirai.internal.network.context.SsoProcessorContext
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext
import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
import net.mamoe.mirai.internal.network.handler.logger
import net.mamoe.mirai.internal.network.handler.state.StateObserver
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.utils.*
import net.mamoe.mirai.utils.ExceptionCollector
import net.mamoe.mirai.utils.childScope
import net.mamoe.mirai.utils.debug
import net.mamoe.mirai.utils.systemProp
import java.io.EOFException
import java.net.SocketAddress
import kotlin.coroutines.CoroutineContext
@ -339,47 +340,10 @@ internal open class NettyNetworkHandler(
}
}
private val heartbeatProcessor = context[HeartbeatProcessor]
@Suppress("DeferredIsResult")
private inline fun launchHeartbeatJob(
name: String,
crossinline timeout: () -> Long,
crossinline action: suspend () -> Unit
): Deferred<Unit> {
return async(CoroutineName("$name Scheduler")) {
while (isActive) {
try {
delay(timeout())
} catch (e: CancellationException) {
return@async // considered normally cancel
}
try {
action()
} catch (e: Throwable) {
setState { StateClosed(IllegalStateException("Exception in $name job", e)) }
}
}
}.apply {
invokeOnCompletion { e ->
if (e is CancellationException) return@invokeOnCompletion // normally closed
if (e != null) logger.info { "$name failed: $e." }
}
private val heartbeatJobs =
context[HeartbeatScheduler].launchJobsIn(this@NettyNetworkHandler, this) { name, e ->
setState { StateClosed(HeartbeatFailedException("Exception in $name job", e)) }
}
}
private val heartbeat = launchHeartbeatJob(
"AliveHeartbeat",
{ context[SsoProcessorContext].configuration.heartbeatPeriodMillis },
{ heartbeatProcessor.doAliveHeartbeatNow(this@NettyNetworkHandler) }
)
private val statHeartbeat = launchHeartbeatJob(
"StatHeartbeat",
{ context[SsoProcessorContext].configuration.statHeartbeatPeriodMillis },
{ heartbeatProcessor.doStatHeartbeatNow(this@NettyNetworkHandler) }
)
// we can also move them as observers if needed.
@ -394,8 +358,7 @@ internal open class NettyNetworkHandler(
override suspend fun resumeConnection0() {
joinCompleted(coroutineContext.job)
joinCompleted(heartbeat)
joinCompleted(statHeartbeat)
for (job in heartbeatJobs) joinCompleted(job)
joinCompleted(configPush)
joinCompleted(keyRefresh)
} // noop