Review BotConfiguration and implement relevant configs, implement alive heartbeat, fix behavior on resume

This commit is contained in:
Him188 2021-04-26 22:44:27 +08:00
parent 6e06406a3a
commit d73f5a2692
11 changed files with 210 additions and 54 deletions

View File

@ -194,9 +194,11 @@ public open class BotConfiguration { // open for Java
public var heartbeatTimeoutMillis: Long = 5.secondsToMillis
/** 心跳失败后的第一次重连前的等待时间. */
@Deprecated("Useless since new network. Please just remove this.", level = DeprecationLevel.WARNING)
public var firstReconnectDelayMillis: Long = 5.secondsToMillis
/** 重连失败后, 继续尝试的每次等待时间 */
@Deprecated("Useless since new network. Please just remove this.", level = DeprecationLevel.WARNING)
public var reconnectPeriodMillis: Long = 5.secondsToMillis
/** 最多尝试多少次重连 */

View File

@ -115,7 +115,7 @@ internal open class QQAndroidBot constructor(
) {
bot.launch(logger.asCoroutineExceptionHandler()) {
BotOnlineEvent(bot).broadcast()
if (shouldBroadcastRelogin.compareAndSet(false, true)) {
if (!shouldBroadcastRelogin.compareAndSet(false, true)) {
BotReloginEvent(bot, new.getCause()).broadcast()
}
}
@ -193,7 +193,11 @@ internal open class QQAndroidBot constructor(
)
return SelectorNetworkHandler(
context,
FactoryKeepAliveNetworkHandlerSelector(NettyNetworkHandlerFactory, context)
FactoryKeepAliveNetworkHandlerSelector(
configuration.reconnectionRetryTimes.coerceIn(1, Int.MAX_VALUE),
NettyNetworkHandlerFactory,
context
)
) // We can move the factory to configuration but this is not necessary for now.
}

View File

@ -12,24 +12,37 @@ package net.mamoe.mirai.internal.network.components
import net.mamoe.mirai.internal.network.component.ComponentKey
import net.mamoe.mirai.internal.network.context.SsoProcessorContext
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.protocol.packet.login.Heartbeat
import net.mamoe.mirai.internal.network.protocol.packet.login.StatSvc
import net.mamoe.mirai.internal.network.protocol.packet.sendAndExpect
internal interface HeartbeatProcessor {
@Throws(Exception::class)
suspend fun doHeartbeatNow(networkHandler: NetworkHandler)
suspend fun doAliveHeartbeatNow(networkHandler: NetworkHandler)
@Throws(Exception::class)
suspend fun doStatHeartbeatNow(networkHandler: NetworkHandler)
companion object : ComponentKey<HeartbeatProcessor>
}
internal class HeartbeatProcessorImpl : HeartbeatProcessor {
@Throws(Exception::class)
override suspend fun doHeartbeatNow(networkHandler: NetworkHandler) {
override suspend fun doStatHeartbeatNow(networkHandler: NetworkHandler) {
StatSvc.SimpleGet(networkHandler.context.bot.client).sendAndExpect(
networkHandler,
timeoutMillis = networkHandler.context[SsoProcessorContext].configuration.heartbeatTimeoutMillis,
retry = 2
)
}
@Throws(Exception::class)
override suspend fun doAliveHeartbeatNow(networkHandler: NetworkHandler) {
Heartbeat.Alive(networkHandler.context.bot.client).sendAndExpect(
networkHandler,
timeoutMillis = networkHandler.context[SsoProcessorContext].configuration.heartbeatTimeoutMillis,
retry = 2
)
}
}

View File

@ -111,7 +111,7 @@ internal interface NetworkHandler : CoroutineScope {
}
/**
* Suspends the coroutine until [sendAndExpect] can be executed without suspension.
* Suspends the coroutine until [sendAndExpect] can be executed without suspension or state is [State.CLOSED].
*
* In other words, if this functions returns, it indicates that [state] is [State.LOADING] or [State.OK]
*

View File

@ -17,10 +17,24 @@ import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory
/**
* [AbstractKeepAliveNetworkHandlerSelector] implementation delegating [createInstance] to [factory]
*/
internal class FactoryKeepAliveNetworkHandlerSelector<H : NetworkHandler>(
private val factory: NetworkHandlerFactory<H>,
private val context: NetworkHandlerContext,
) : AbstractKeepAliveNetworkHandlerSelector<H>() {
internal class FactoryKeepAliveNetworkHandlerSelector<H : NetworkHandler> : AbstractKeepAliveNetworkHandlerSelector<H> {
private val factory: NetworkHandlerFactory<H>
private val context: NetworkHandlerContext
constructor(factory: NetworkHandlerFactory<H>, context: NetworkHandlerContext) : super() {
this.factory = factory
this.context = context
}
constructor(
maxAttempts: Int,
factory: NetworkHandlerFactory<H>,
context: NetworkHandlerContext
) : super(maxAttempts) {
this.factory = factory
this.context = context
}
override fun createInstance(): H =
factory.create(
context,

View File

@ -228,7 +228,7 @@ internal open class NettyNetworkHandler(
if (error is StateSwitchingException && error.new is StateConnecting) {
return@invokeOnCompletion // already been switching to CONNECTING
}
setState {
setState(null) { // ignore replication check
StateConnecting(
collectiveExceptions.apply { collect(error) },
wait = true
@ -302,30 +302,50 @@ internal open class NettyNetworkHandler(
private val heartbeatProcessor = context[HeartbeatProcessor]
private val heartbeat = async(CoroutineName("Heartbeat Scheduler")) {
while (isActive) {
try {
delay(context[SsoProcessorContext].configuration.heartbeatPeriodMillis)
} catch (e: CancellationException) {
return@async // considered normally cancel
}
@Suppress("DeferredIsResult")
private inline fun launchHeartbeatJob(
name: String,
crossinline timeout: () -> Long,
crossinline action: suspend () -> Unit
): Deferred<Unit> {
return async(CoroutineName("$name Scheduler")) {
while (isActive) {
try {
delay(timeout())
} catch (e: CancellationException) {
return@async // considered normally cancel
}
try {
heartbeatProcessor.doHeartbeatNow(this@NettyNetworkHandler)
} catch (e: Throwable) {
setState {
StateConnecting(ExceptionCollector(IllegalStateException("Exception in Heartbeat job", e)))
try {
action()
heartbeatProcessor.doAliveHeartbeatNow(this@NettyNetworkHandler)
} catch (e: Throwable) {
setState {
StateConnecting(ExceptionCollector(IllegalStateException("Exception in $name job", e)))
}
}
}
}.apply {
invokeOnCompletion { e ->
if (e != null) {
logger.info { "$name failed: $e." }
}
}
}
}.apply {
invokeOnCompletion { e ->
if (e != null) {
logger.info { "Heartbeat failed: $e." }
}
}
}
private val heartbeat = launchHeartbeatJob(
"AliveHeartbeat",
{ context[SsoProcessorContext].configuration.heartbeatTimeoutMillis },
{ heartbeatProcessor.doAliveHeartbeatNow(this@NettyNetworkHandler) }
)
private val statHeartbeat = launchHeartbeatJob(
"StatHeartbeat",
{ context[SsoProcessorContext].configuration.statHeartbeatPeriodMillis },
{ heartbeatProcessor.doStatHeartbeatNow(this@NettyNetworkHandler) }
)
// we can also move them as observers if needed.
private val keyRefresh = launch(CoroutineName("Key refresh")) {

View File

@ -87,9 +87,14 @@ internal abstract class AbstractRealNetworkHandlerTest<H : NetworkHandler> : Abs
}
})
set(HeartbeatProcessor, object : HeartbeatProcessor {
override suspend fun doHeartbeatNow(networkHandler: NetworkHandler) {
override suspend fun doAliveHeartbeatNow(networkHandler: NetworkHandler) {
nhEvents.add(NHEvent.DoHeartbeatNow)
networkLogger.debug { "HeartbeatProcessor.doHeartbeatNow" }
networkLogger.debug { "HeartbeatProcessor.doAliveHeartbeatNow" }
}
override suspend fun doStatHeartbeatNow(networkHandler: NetworkHandler) {
nhEvents.add(NHEvent.DoHeartbeatNow)
networkLogger.debug { "HeartbeatProcessor.doStatHeartbeatNow" }
}
})
set(KeyRefreshProcessor, object : KeyRefreshProcessor {

View File

@ -11,6 +11,7 @@ package net.mamoe.mirai.internal.network.impl.netty
import io.netty.channel.Channel
import io.netty.channel.embedded.EmbeddedChannel
import kotlinx.coroutines.CompletableDeferred
import net.mamoe.mirai.internal.network.framework.AbstractRealNetworkHandlerTest
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext
import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory
@ -30,6 +31,10 @@ internal open class TestNettyNH(
setState { StateConnecting(ExceptionCollector(exception), false) }
}
fun setStateOK(channel: Channel, exception: Throwable? = null) {
setState { StateOK(channel, CompletableDeferred(Unit)) }
}
fun setStateLoading(channel: Channel) {
setState { StateLoading(channel) }
}

View File

@ -45,7 +45,7 @@ internal class NettyEndlessReconnectionTest : AbstractNettyNHTest() {
val state = network::_state.javaGetter!!.apply { isAccessible = true }
.invoke(network) as NetworkHandlerSupport.BaseStateImpl
assertTrue { state.getCause()!!.suppressed.size <= 1 } // might be zero if just created since at this time network is still running.
assertTrue(state.toString()) { state.getCause()!!.suppressed.size <= 1 } // might be zero if just created since at this time network is still running.
// size <= 1 means duplicates are dropped.

View File

@ -9,12 +9,16 @@
package net.mamoe.mirai.internal.network.impl.netty
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.delay
import net.mamoe.mirai.event.Event
import net.mamoe.mirai.event.broadcast
import net.mamoe.mirai.event.events.BotOfflineEvent
import net.mamoe.mirai.event.events.BotOnlineEvent
import net.mamoe.mirai.event.events.BotReloginEvent
import net.mamoe.mirai.internal.network.components.SsoProcessor
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State.INITIALIZED
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State.OK
@ -79,25 +83,109 @@ internal class NettyHandlerEventTest : AbstractNettyNHTest() {
}
private fun noEventOn(setState: () -> Unit) = runBlockingUnit {
@Test
fun `from OK TO CONNECTING`() = runBlockingUnit {
defaultComponents[SsoProcessor] = object : SsoProcessor by defaultComponents[SsoProcessor] {
override suspend fun login(handler: NetworkHandler) = awaitCancellation() // never ends
}
assertState(INITIALIZED)
network.setStateOK(channel)
delay(2.seconds) // ignore events
assertEventBroadcasts<Event>(1) {
network.setStateConnecting()
delay(2.seconds)
}.let { event ->
assertEquals(BotOfflineEvent.Dropped::class, event[0]::class)
}
}
@Test
fun `from CONNECTING TO OK the first time`() = runBlockingUnit {
val ok = CompletableDeferred<Unit>()
defaultComponents[SsoProcessor] = object : SsoProcessor by defaultComponents[SsoProcessor] {
override suspend fun login(handler: NetworkHandler) = ok.join()
}
assertState(INITIALIZED)
network.setStateConnecting()
assertEventBroadcasts<Event>(1) {
ok.complete(Unit)
network.resumeConnection()
delay(2000)
}.let { event ->
assertEquals(BotOnlineEvent::class, event[0]::class)
}
}
@Test
fun `from CONNECTING TO OK the second time`() = runBlockingUnit {
var ok = CompletableDeferred<Unit>()
defaultComponents[SsoProcessor] = object : SsoProcessor by defaultComponents[SsoProcessor] {
override suspend fun login(handler: NetworkHandler) = ok.join()
}
assertState(INITIALIZED)
network.setStateConnecting()
ok.complete(Unit)
network.resumeConnection()
assertState(OK)
ok = CompletableDeferred()
network.setStateConnecting()
delay(2000)
assertEventBroadcasts<Event>(2) {
ok.complete(Unit)
network.resumeConnection()
delay(2000)
}.let { event ->
assertEquals(BotOnlineEvent::class, event[0]::class)
assertEquals(BotReloginEvent::class, event[1]::class)
}
}
@Test
fun testPreconditions() = runBlockingUnit {
assertEventBroadcasts<Event>(1) { BotOfflineEvent.Active(bot, null).broadcast() }
}
@Test
fun `BotOffline from OK TO CLOSED`() = runBlockingUnit {
bot.login()
assertState(OK)
delay(3.seconds) // `login` launches a job which broadcasts the event
assertEventBroadcasts<Event>(1) {
network.close(null)
delay(3.seconds)
}.let { event ->
assertEquals(BotOfflineEvent.Active::class, event[0]::class)
}
}
@Test
fun `BotOffline from CONNECTING TO CLOSED`() = runBlockingUnit {
network.setStateConnecting()
delay(2.seconds) // `login` launches a job which broadcasts the event
assertEventBroadcasts<Event>(1) {
network.setStateClosed()
network.resumeConnection()
delay(2.seconds)
}.let { event ->
assertEquals(BotOfflineEvent.Active::class, event[0]::class)
}
}
@Test
fun `no event from INITIALIZED TO OK`() = runBlockingUnit {
assertState(INITIALIZED)
bot.login()
bot.components[SsoProcessor].firstLoginSucceed = true
assertState(OK)
network.setStateConnecting()
delay(3.seconds) // `login` launches a job which broadcasts the event
delay(2.seconds) // `login` launches a job which broadcasts the event
assertEventBroadcasts<Event>(0) {
setState()
delay(3.seconds)
network.resumeConnection()
delay(2.seconds)
}
}
@Test
fun `no event from CONNECTING TO CLOSED`() = noEventOn { network.setStateConnecting() }
@Test
fun `no event from CLOSED TO CLOSED`() = noEventOn { network.setStateClosed() }
@Test
fun `no event from INITIALIZED TO CLOSED`() = noEventOn { }
}

View File

@ -12,6 +12,7 @@ package net.mamoe.mirai.internal.test
import kotlinx.coroutines.ExperimentalCoroutinesApi
import net.mamoe.mirai.event.Event
import net.mamoe.mirai.event.GlobalEventChannel
import net.mamoe.mirai.utils.cast
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
@ -25,26 +26,30 @@ internal inline fun <reified T : Event, R> assertEventBroadcasts(times: Int = 1,
}
@OptIn(ExperimentalCoroutinesApi::class)
internal inline fun <reified T : Event> assertEventBroadcasts(times: Int = 1, block: () -> Unit) {
internal inline fun <reified T : Event> assertEventBroadcasts(times: Int = 1, block: () -> Unit): List<T> {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
val receivedEvents = ConcurrentLinkedQueue<Event>()
val listener = GlobalEventChannel.subscribeAlways<Event> { event ->
receivedEvents.add(event)
}
try {
return block()
block()
} finally {
val actual = receivedEvents.filterIsInstance<T>().count()
listener.complete()
assertEquals(
times,
actual,
"Expected event ${T::class.simpleName} broadcast $times time(s). " +
"But actual count is ${actual}. " +
"\nAll received events: ${receivedEvents.joinToString(", ", "[", "]")}"
)
}
val actual = receivedEvents.filterIsInstance<T>().count()
assertEquals(
times,
actual,
"Expected event ${T::class.simpleName} broadcast $times time(s). " +
"But actual count is ${actual}. " +
"\nAll received events: ${receivedEvents.joinToString(", ", "[", "]")}"
)
return receivedEvents.filterIsInstance<T>().cast()
}
@OptIn(ExperimentalCoroutinesApi::class)