From d73f5a26925cfe9f25e53aea0b26c1bc4c3d66a4 Mon Sep 17 00:00:00 2001 From: Him188 <Him188@mamoe.net> Date: Mon, 26 Apr 2021 22:44:27 +0800 Subject: [PATCH] Review BotConfiguration and implement relevant configs, implement alive heartbeat, fix behavior on resume --- .../kotlin/utils/BotConfiguration.kt | 2 + .../src/commonMain/kotlin/QQAndroidBot.kt | 8 +- .../network/components/HeartbeatProcessor.kt | 17 ++- .../kotlin/network/handler/NetworkHandler.kt | 2 +- .../FactoryKeepAliveNetworkHandlerSelector.kt | 22 +++- .../network/impl/netty/NettyNetworkHandler.kt | 58 ++++++--- .../AbstractRealNetworkHandlerTest.kt | 9 +- .../network/impl/netty/AbstractNettyNHTest.kt | 5 + .../netty/NettyEndlessReconnectionTest.kt | 2 +- .../impl/netty/NettyHandlerEventTest.kt | 114 ++++++++++++++++-- .../src/commonTest/kotlin/test/events.kt | 25 ++-- 11 files changed, 210 insertions(+), 54 deletions(-) diff --git a/mirai-core-api/src/commonMain/kotlin/utils/BotConfiguration.kt b/mirai-core-api/src/commonMain/kotlin/utils/BotConfiguration.kt index 344c78b34..f5b1015fb 100644 --- a/mirai-core-api/src/commonMain/kotlin/utils/BotConfiguration.kt +++ b/mirai-core-api/src/commonMain/kotlin/utils/BotConfiguration.kt @@ -194,9 +194,11 @@ public open class BotConfiguration { // open for Java public var heartbeatTimeoutMillis: Long = 5.secondsToMillis /** 心跳失败后的第一次重连前的等待时间. */ + @Deprecated("Useless since new network. Please just remove this.", level = DeprecationLevel.WARNING) public var firstReconnectDelayMillis: Long = 5.secondsToMillis /** 重连失败后, 继续尝试的每次等待时间 */ + @Deprecated("Useless since new network. Please just remove this.", level = DeprecationLevel.WARNING) public var reconnectPeriodMillis: Long = 5.secondsToMillis /** 最多尝试多少次重连 */ diff --git a/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt b/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt index e9bda5c57..6dd09873b 100644 --- a/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt +++ b/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt @@ -115,7 +115,7 @@ internal open class QQAndroidBot constructor( ) { bot.launch(logger.asCoroutineExceptionHandler()) { BotOnlineEvent(bot).broadcast() - if (shouldBroadcastRelogin.compareAndSet(false, true)) { + if (!shouldBroadcastRelogin.compareAndSet(false, true)) { BotReloginEvent(bot, new.getCause()).broadcast() } } @@ -193,7 +193,11 @@ internal open class QQAndroidBot constructor( ) return SelectorNetworkHandler( context, - FactoryKeepAliveNetworkHandlerSelector(NettyNetworkHandlerFactory, context) + FactoryKeepAliveNetworkHandlerSelector( + configuration.reconnectionRetryTimes.coerceIn(1, Int.MAX_VALUE), + NettyNetworkHandlerFactory, + context + ) ) // We can move the factory to configuration but this is not necessary for now. } diff --git a/mirai-core/src/commonMain/kotlin/network/components/HeartbeatProcessor.kt b/mirai-core/src/commonMain/kotlin/network/components/HeartbeatProcessor.kt index ebeb86c42..2caf4ec97 100644 --- a/mirai-core/src/commonMain/kotlin/network/components/HeartbeatProcessor.kt +++ b/mirai-core/src/commonMain/kotlin/network/components/HeartbeatProcessor.kt @@ -12,24 +12,37 @@ package net.mamoe.mirai.internal.network.components import net.mamoe.mirai.internal.network.component.ComponentKey import net.mamoe.mirai.internal.network.context.SsoProcessorContext import net.mamoe.mirai.internal.network.handler.NetworkHandler +import net.mamoe.mirai.internal.network.protocol.packet.login.Heartbeat import net.mamoe.mirai.internal.network.protocol.packet.login.StatSvc import net.mamoe.mirai.internal.network.protocol.packet.sendAndExpect internal interface HeartbeatProcessor { @Throws(Exception::class) - suspend fun doHeartbeatNow(networkHandler: NetworkHandler) + suspend fun doAliveHeartbeatNow(networkHandler: NetworkHandler) + + @Throws(Exception::class) + suspend fun doStatHeartbeatNow(networkHandler: NetworkHandler) companion object : ComponentKey<HeartbeatProcessor> } internal class HeartbeatProcessorImpl : HeartbeatProcessor { @Throws(Exception::class) - override suspend fun doHeartbeatNow(networkHandler: NetworkHandler) { + override suspend fun doStatHeartbeatNow(networkHandler: NetworkHandler) { StatSvc.SimpleGet(networkHandler.context.bot.client).sendAndExpect( networkHandler, timeoutMillis = networkHandler.context[SsoProcessorContext].configuration.heartbeatTimeoutMillis, retry = 2 ) } + + @Throws(Exception::class) + override suspend fun doAliveHeartbeatNow(networkHandler: NetworkHandler) { + Heartbeat.Alive(networkHandler.context.bot.client).sendAndExpect( + networkHandler, + timeoutMillis = networkHandler.context[SsoProcessorContext].configuration.heartbeatTimeoutMillis, + retry = 2 + ) + } } \ No newline at end of file diff --git a/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandler.kt b/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandler.kt index e502e5653..9f567fa6a 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandler.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandler.kt @@ -111,7 +111,7 @@ internal interface NetworkHandler : CoroutineScope { } /** - * Suspends the coroutine until [sendAndExpect] can be executed without suspension. + * Suspends the coroutine until [sendAndExpect] can be executed without suspension or state is [State.CLOSED]. * * In other words, if this functions returns, it indicates that [state] is [State.LOADING] or [State.OK] * diff --git a/mirai-core/src/commonMain/kotlin/network/handler/selector/FactoryKeepAliveNetworkHandlerSelector.kt b/mirai-core/src/commonMain/kotlin/network/handler/selector/FactoryKeepAliveNetworkHandlerSelector.kt index b167dbd05..253df870b 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/selector/FactoryKeepAliveNetworkHandlerSelector.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/selector/FactoryKeepAliveNetworkHandlerSelector.kt @@ -17,10 +17,24 @@ import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory /** * [AbstractKeepAliveNetworkHandlerSelector] implementation delegating [createInstance] to [factory] */ -internal class FactoryKeepAliveNetworkHandlerSelector<H : NetworkHandler>( - private val factory: NetworkHandlerFactory<H>, - private val context: NetworkHandlerContext, -) : AbstractKeepAliveNetworkHandlerSelector<H>() { +internal class FactoryKeepAliveNetworkHandlerSelector<H : NetworkHandler> : AbstractKeepAliveNetworkHandlerSelector<H> { + private val factory: NetworkHandlerFactory<H> + private val context: NetworkHandlerContext + + constructor(factory: NetworkHandlerFactory<H>, context: NetworkHandlerContext) : super() { + this.factory = factory + this.context = context + } + + constructor( + maxAttempts: Int, + factory: NetworkHandlerFactory<H>, + context: NetworkHandlerContext + ) : super(maxAttempts) { + this.factory = factory + this.context = context + } + override fun createInstance(): H = factory.create( context, diff --git a/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt b/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt index db665c357..a48ab3611 100644 --- a/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt +++ b/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt @@ -228,7 +228,7 @@ internal open class NettyNetworkHandler( if (error is StateSwitchingException && error.new is StateConnecting) { return@invokeOnCompletion // already been switching to CONNECTING } - setState { + setState(null) { // ignore replication check StateConnecting( collectiveExceptions.apply { collect(error) }, wait = true @@ -302,30 +302,50 @@ internal open class NettyNetworkHandler( private val heartbeatProcessor = context[HeartbeatProcessor] - private val heartbeat = async(CoroutineName("Heartbeat Scheduler")) { - while (isActive) { - try { - delay(context[SsoProcessorContext].configuration.heartbeatPeriodMillis) - } catch (e: CancellationException) { - return@async // considered normally cancel - } + @Suppress("DeferredIsResult") + private inline fun launchHeartbeatJob( + name: String, + crossinline timeout: () -> Long, + crossinline action: suspend () -> Unit + ): Deferred<Unit> { + return async(CoroutineName("$name Scheduler")) { + while (isActive) { + try { + delay(timeout()) + } catch (e: CancellationException) { + return@async // considered normally cancel + } - try { - heartbeatProcessor.doHeartbeatNow(this@NettyNetworkHandler) - } catch (e: Throwable) { - setState { - StateConnecting(ExceptionCollector(IllegalStateException("Exception in Heartbeat job", e))) + try { + action() + heartbeatProcessor.doAliveHeartbeatNow(this@NettyNetworkHandler) + } catch (e: Throwable) { + setState { + StateConnecting(ExceptionCollector(IllegalStateException("Exception in $name job", e))) + } + } + } + }.apply { + invokeOnCompletion { e -> + if (e != null) { + logger.info { "$name failed: $e." } } } } - }.apply { - invokeOnCompletion { e -> - if (e != null) { - logger.info { "Heartbeat failed: $e." } - } - } } + private val heartbeat = launchHeartbeatJob( + "AliveHeartbeat", + { context[SsoProcessorContext].configuration.heartbeatTimeoutMillis }, + { heartbeatProcessor.doAliveHeartbeatNow(this@NettyNetworkHandler) } + ) + + private val statHeartbeat = launchHeartbeatJob( + "StatHeartbeat", + { context[SsoProcessorContext].configuration.statHeartbeatPeriodMillis }, + { heartbeatProcessor.doStatHeartbeatNow(this@NettyNetworkHandler) } + ) + // we can also move them as observers if needed. private val keyRefresh = launch(CoroutineName("Key refresh")) { diff --git a/mirai-core/src/commonTest/kotlin/network/framework/AbstractRealNetworkHandlerTest.kt b/mirai-core/src/commonTest/kotlin/network/framework/AbstractRealNetworkHandlerTest.kt index e84b581d7..807804f8d 100644 --- a/mirai-core/src/commonTest/kotlin/network/framework/AbstractRealNetworkHandlerTest.kt +++ b/mirai-core/src/commonTest/kotlin/network/framework/AbstractRealNetworkHandlerTest.kt @@ -87,9 +87,14 @@ internal abstract class AbstractRealNetworkHandlerTest<H : NetworkHandler> : Abs } }) set(HeartbeatProcessor, object : HeartbeatProcessor { - override suspend fun doHeartbeatNow(networkHandler: NetworkHandler) { + override suspend fun doAliveHeartbeatNow(networkHandler: NetworkHandler) { nhEvents.add(NHEvent.DoHeartbeatNow) - networkLogger.debug { "HeartbeatProcessor.doHeartbeatNow" } + networkLogger.debug { "HeartbeatProcessor.doAliveHeartbeatNow" } + } + + override suspend fun doStatHeartbeatNow(networkHandler: NetworkHandler) { + nhEvents.add(NHEvent.DoHeartbeatNow) + networkLogger.debug { "HeartbeatProcessor.doStatHeartbeatNow" } } }) set(KeyRefreshProcessor, object : KeyRefreshProcessor { diff --git a/mirai-core/src/commonTest/kotlin/network/impl/netty/AbstractNettyNHTest.kt b/mirai-core/src/commonTest/kotlin/network/impl/netty/AbstractNettyNHTest.kt index f8f83ecd2..b8fc2838e 100644 --- a/mirai-core/src/commonTest/kotlin/network/impl/netty/AbstractNettyNHTest.kt +++ b/mirai-core/src/commonTest/kotlin/network/impl/netty/AbstractNettyNHTest.kt @@ -11,6 +11,7 @@ package net.mamoe.mirai.internal.network.impl.netty import io.netty.channel.Channel import io.netty.channel.embedded.EmbeddedChannel +import kotlinx.coroutines.CompletableDeferred import net.mamoe.mirai.internal.network.framework.AbstractRealNetworkHandlerTest import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory @@ -30,6 +31,10 @@ internal open class TestNettyNH( setState { StateConnecting(ExceptionCollector(exception), false) } } + fun setStateOK(channel: Channel, exception: Throwable? = null) { + setState { StateOK(channel, CompletableDeferred(Unit)) } + } + fun setStateLoading(channel: Channel) { setState { StateLoading(channel) } } diff --git a/mirai-core/src/commonTest/kotlin/network/impl/netty/NettyEndlessReconnectionTest.kt b/mirai-core/src/commonTest/kotlin/network/impl/netty/NettyEndlessReconnectionTest.kt index 31d9d6860..45678c050 100644 --- a/mirai-core/src/commonTest/kotlin/network/impl/netty/NettyEndlessReconnectionTest.kt +++ b/mirai-core/src/commonTest/kotlin/network/impl/netty/NettyEndlessReconnectionTest.kt @@ -45,7 +45,7 @@ internal class NettyEndlessReconnectionTest : AbstractNettyNHTest() { val state = network::_state.javaGetter!!.apply { isAccessible = true } .invoke(network) as NetworkHandlerSupport.BaseStateImpl - assertTrue { state.getCause()!!.suppressed.size <= 1 } // might be zero if just created since at this time network is still running. + assertTrue(state.toString()) { state.getCause()!!.suppressed.size <= 1 } // might be zero if just created since at this time network is still running. // size <= 1 means duplicates are dropped. diff --git a/mirai-core/src/commonTest/kotlin/network/impl/netty/NettyHandlerEventTest.kt b/mirai-core/src/commonTest/kotlin/network/impl/netty/NettyHandlerEventTest.kt index 365d0e737..269542a28 100644 --- a/mirai-core/src/commonTest/kotlin/network/impl/netty/NettyHandlerEventTest.kt +++ b/mirai-core/src/commonTest/kotlin/network/impl/netty/NettyHandlerEventTest.kt @@ -9,12 +9,16 @@ package net.mamoe.mirai.internal.network.impl.netty +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.awaitCancellation import kotlinx.coroutines.delay import net.mamoe.mirai.event.Event +import net.mamoe.mirai.event.broadcast import net.mamoe.mirai.event.events.BotOfflineEvent import net.mamoe.mirai.event.events.BotOnlineEvent import net.mamoe.mirai.event.events.BotReloginEvent import net.mamoe.mirai.internal.network.components.SsoProcessor +import net.mamoe.mirai.internal.network.handler.NetworkHandler import net.mamoe.mirai.internal.network.handler.NetworkHandler.State import net.mamoe.mirai.internal.network.handler.NetworkHandler.State.INITIALIZED import net.mamoe.mirai.internal.network.handler.NetworkHandler.State.OK @@ -79,25 +83,109 @@ internal class NettyHandlerEventTest : AbstractNettyNHTest() { } - private fun noEventOn(setState: () -> Unit) = runBlockingUnit { + @Test + fun `from OK TO CONNECTING`() = runBlockingUnit { + defaultComponents[SsoProcessor] = object : SsoProcessor by defaultComponents[SsoProcessor] { + override suspend fun login(handler: NetworkHandler) = awaitCancellation() // never ends + } + assertState(INITIALIZED) + network.setStateOK(channel) + delay(2.seconds) // ignore events + assertEventBroadcasts<Event>(1) { + network.setStateConnecting() + delay(2.seconds) + }.let { event -> + assertEquals(BotOfflineEvent.Dropped::class, event[0]::class) + } + } + + @Test + fun `from CONNECTING TO OK the first time`() = runBlockingUnit { + val ok = CompletableDeferred<Unit>() + defaultComponents[SsoProcessor] = object : SsoProcessor by defaultComponents[SsoProcessor] { + override suspend fun login(handler: NetworkHandler) = ok.join() + } + assertState(INITIALIZED) + network.setStateConnecting() + assertEventBroadcasts<Event>(1) { + ok.complete(Unit) + network.resumeConnection() + delay(2000) + }.let { event -> + assertEquals(BotOnlineEvent::class, event[0]::class) + } + } + + @Test + fun `from CONNECTING TO OK the second time`() = runBlockingUnit { + var ok = CompletableDeferred<Unit>() + defaultComponents[SsoProcessor] = object : SsoProcessor by defaultComponents[SsoProcessor] { + override suspend fun login(handler: NetworkHandler) = ok.join() + } + + assertState(INITIALIZED) + + network.setStateConnecting() + ok.complete(Unit) + network.resumeConnection() + assertState(OK) + + ok = CompletableDeferred() + network.setStateConnecting() + delay(2000) + assertEventBroadcasts<Event>(2) { + ok.complete(Unit) + network.resumeConnection() + delay(2000) + }.let { event -> + assertEquals(BotOnlineEvent::class, event[0]::class) + assertEquals(BotReloginEvent::class, event[1]::class) + } + } + + + @Test + fun testPreconditions() = runBlockingUnit { + assertEventBroadcasts<Event>(1) { BotOfflineEvent.Active(bot, null).broadcast() } + } + + @Test + fun `BotOffline from OK TO CLOSED`() = runBlockingUnit { + bot.login() + assertState(OK) + delay(3.seconds) // `login` launches a job which broadcasts the event + assertEventBroadcasts<Event>(1) { + network.close(null) + delay(3.seconds) + }.let { event -> + assertEquals(BotOfflineEvent.Active::class, event[0]::class) + } + } + + @Test + fun `BotOffline from CONNECTING TO CLOSED`() = runBlockingUnit { + network.setStateConnecting() + delay(2.seconds) // `login` launches a job which broadcasts the event + assertEventBroadcasts<Event>(1) { + network.setStateClosed() + network.resumeConnection() + delay(2.seconds) + }.let { event -> + assertEquals(BotOfflineEvent.Active::class, event[0]::class) + } + } + + @Test + fun `no event from INITIALIZED TO OK`() = runBlockingUnit { assertState(INITIALIZED) bot.login() bot.components[SsoProcessor].firstLoginSucceed = true assertState(OK) network.setStateConnecting() - delay(3.seconds) // `login` launches a job which broadcasts the event + delay(2.seconds) // `login` launches a job which broadcasts the event assertEventBroadcasts<Event>(0) { - setState() - delay(3.seconds) + network.resumeConnection() + delay(2.seconds) } } - - @Test - fun `no event from CONNECTING TO CLOSED`() = noEventOn { network.setStateConnecting() } - - @Test - fun `no event from CLOSED TO CLOSED`() = noEventOn { network.setStateClosed() } - - @Test - fun `no event from INITIALIZED TO CLOSED`() = noEventOn { } } \ No newline at end of file diff --git a/mirai-core/src/commonTest/kotlin/test/events.kt b/mirai-core/src/commonTest/kotlin/test/events.kt index 0a5e89551..cb4a44740 100644 --- a/mirai-core/src/commonTest/kotlin/test/events.kt +++ b/mirai-core/src/commonTest/kotlin/test/events.kt @@ -12,6 +12,7 @@ package net.mamoe.mirai.internal.test import kotlinx.coroutines.ExperimentalCoroutinesApi import net.mamoe.mirai.event.Event import net.mamoe.mirai.event.GlobalEventChannel +import net.mamoe.mirai.utils.cast import java.util.concurrent.ConcurrentLinkedQueue import kotlin.contracts.InvocationKind import kotlin.contracts.contract @@ -25,26 +26,30 @@ internal inline fun <reified T : Event, R> assertEventBroadcasts(times: Int = 1, } @OptIn(ExperimentalCoroutinesApi::class) -internal inline fun <reified T : Event> assertEventBroadcasts(times: Int = 1, block: () -> Unit) { +internal inline fun <reified T : Event> assertEventBroadcasts(times: Int = 1, block: () -> Unit): List<T> { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } val receivedEvents = ConcurrentLinkedQueue<Event>() val listener = GlobalEventChannel.subscribeAlways<Event> { event -> receivedEvents.add(event) } + try { - return block() + block() } finally { - val actual = receivedEvents.filterIsInstance<T>().count() listener.complete() - assertEquals( - times, - actual, - "Expected event ${T::class.simpleName} broadcast $times time(s). " + - "But actual count is ${actual}. " + - "\nAll received events: ${receivedEvents.joinToString(", ", "[", "]")}" - ) } + + val actual = receivedEvents.filterIsInstance<T>().count() + assertEquals( + times, + actual, + "Expected event ${T::class.simpleName} broadcast $times time(s). " + + "But actual count is ${actual}. " + + "\nAll received events: ${receivedEvents.joinToString(", ", "[", "]")}" + ) + + return receivedEvents.filterIsInstance<T>().cast() } @OptIn(ExperimentalCoroutinesApi::class)