Fix state resume

This commit is contained in:
Him188 2021-04-25 15:47:29 +08:00
parent 95d634233c
commit 971685a2b0
8 changed files with 52 additions and 22 deletions

View File

@ -184,4 +184,14 @@ public fun Throwable.getRootCause(maxDepth: Int = 20): Throwable {
if (depth++ >= maxDepth) break
}
return rootCause ?: this
}
public fun Throwable.causes(maxDepth: Int = 20): Sequence<Throwable> = sequence {
var depth = 0
var rootCause: Throwable? = this@causes
while (rootCause?.cause != null) {
yield(rootCause.cause!!)
rootCause = rootCause.cause
if (depth++ >= maxDepth) break
}
}

View File

@ -25,10 +25,7 @@ import net.mamoe.mirai.internal.network.components.SsoProcessor
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.impl.netty.asCoroutineExceptionHandler
import net.mamoe.mirai.supervisorJob
import net.mamoe.mirai.utils.BotConfiguration
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.childScopeContext
import net.mamoe.mirai.utils.info
import net.mamoe.mirai.utils.*
import kotlin.collections.set
import kotlin.coroutines.CoroutineContext
@ -121,8 +118,9 @@ internal abstract class AbstractBot constructor(
if (!components[SsoProcessor].firstLoginSucceed) {
this.close() // failed to do first login.
}
throw e
throw e.causes().find { it !is CancellationException } ?: e // emit internal errors
}
logger.info { "Bot login successful." }
}
protected abstract fun createNetworkHandler(): NetworkHandler

View File

@ -24,7 +24,6 @@ import net.mamoe.mirai.internal.network.protocol.data.proto.MsgSvc
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketWithRespType
import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.MessageSvcPbGetMsg
import net.mamoe.mirai.internal.network.protocol.packet.login.StatSvc
import net.mamoe.mirai.internal.network.protocol.packet.sendAndExpect
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.info
@ -62,7 +61,8 @@ internal class BotInitProcessorImpl(
check(bot.isActive) { "bot is dead therefore network can't init." }
context[ContactUpdater].closeAllContacts(CancellationException("re-init"))
val registerResp = registerClientOnline()
val registerResp =
context[SsoProcessor].registerResp ?: error("Internal error: registerResp is not yet available.")
bot.launch(CoroutineName("Awaiting ConfigPushSvc.PushReq")) {
context[ConfigPushSyncer].awaitSync()
@ -78,10 +78,6 @@ internal class BotInitProcessorImpl(
bot.components[SsoProcessor].firstLoginSucceed = true
}
private suspend fun registerClientOnline(): StatSvc.Register.Response {
return StatSvc.Register.online(context[SsoProcessor].client).sendAndExpect(bot)
}
private suspend fun syncMessageSvc() {
logger.info { "Syncing friend message history..." }
withTimeoutOrNull(30000) {

View File

@ -46,6 +46,7 @@ internal interface SsoProcessor {
val ssoSession: SsoSession
var firstLoginSucceed: Boolean
val registerResp: StatSvc.Register.Response?
/**
* The observers to launch jobs for states.
@ -80,8 +81,12 @@ internal class SsoProcessorImpl(
// public
///////////////////////////////////////////////////////////////////////////
@Volatile
override var firstLoginSucceed: Boolean = false
@Volatile
override var registerResp: StatSvc.Register.Response? = null
@Volatile
override var client = createClient(ssoContext.bot)
@ -115,7 +120,12 @@ internal class SsoProcessorImpl(
SlowLoginImpl(handler).doLogin()
}
ssoContext.accountSecretsManager.saveSecrets(ssoContext.account, AccountSecretsImpl(client))
ssoContext.bot.logger.info { "Login successful." }
registerClientOnline(handler)
ssoContext.bot.logger.info { "SSO login successful." }
}
private suspend fun registerClientOnline(handler: NetworkHandler): StatSvc.Register.Response {
return StatSvc.Register.online(client).sendAndExpect(handler).also { registerResp = it }
}
override suspend fun logout(handler: NetworkHandler) {

View File

@ -179,13 +179,18 @@ internal abstract class NetworkHandlerSupport(
*/
override val onStateChanged: SelectClause1<NetworkHandler.State> get() = _stateChangedDeferred.onAwait
protected data class StateSwitchingException(
val old: BaseStateImpl,
val new: BaseStateImpl,
) : CancellationException("State is switched from $old to $new")
/**
* Calculate [new state][new] and set it as the current.
* Calculate [new state][new] and set it as the current, returning the new state, or `null` if state has concurrently been set to CLOSED.
*
* You may need to call [BaseStateImpl.resumeConnection] to activate the new state, as states are lazy.
*/
protected inline fun <S : BaseStateImpl> setState(crossinline new: () -> S): S = synchronized(this) {
if (_state.correspondingState == NetworkHandler.State.CLOSED) error("Cannot change state while it has already been CLOSED.")
protected fun <S : BaseStateImpl> setState(new: () -> S): S? = synchronized(this) {
if (_state.correspondingState == NetworkHandler.State.CLOSED) return null // error("Cannot change state while it has already been CLOSED.")
val stateObserver = context.getOrNull(StateObserver)
@ -208,7 +213,7 @@ internal abstract class NetworkHandlerSupport(
// 2. Update state to [state]. This affects selectors.
_state = impl // switch state first. selector may be busy selecting.
// 3. Cleanup, cancel old states.
old.cancel(CancellationException("State is switched from $old to $impl"))
old.cancel(StateSwitchingException(old, impl))
return impl
}

View File

@ -55,10 +55,7 @@ internal class LoggingStateObserver(
logger.debug { "State resumed: ${state.correspondingState}." }
},
onFailure = {
logger.debug(
{ "State resumed: ${state.correspondingState} ${result.exceptionOrNull()}" },
result.exceptionOrNull()
)
logger.debug { "State resumed: ${state.correspondingState} ${result.exceptionOrNull()}" }
}
)
}

View File

@ -27,6 +27,7 @@ import net.mamoe.mirai.internal.network.context.SsoProcessorContext
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext
import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
import net.mamoe.mirai.internal.network.handler.logger
import net.mamoe.mirai.internal.network.handler.state.StateObserver
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.utils.ExceptionCollector
@ -181,7 +182,8 @@ internal open class NettyNetworkHandler(
override suspend fun resumeConnection0() {
setState { StateConnecting(ExceptionCollector()) }
.resumeConnection()
?.resumeConnection()
?: this@NettyNetworkHandler.resumeConnection() // concurrently closed by other thread.
}
override fun toString(): String = "StateInitialized"
@ -217,6 +219,9 @@ internal open class NettyNetworkHandler(
}.apply {
invokeOnCompletion { error ->
if (error != null) {
if (error is StateSwitchingException && error.new is StateConnecting) {
return@invokeOnCompletion // already been switching to CONNECTING
}
setState {
StateConnecting(
collectiveExceptions.apply { collect(error) },
@ -239,7 +244,8 @@ internal open class NettyNetworkHandler(
connectResult.await() // propagates exceptions
val connection = connection.await()
setState { StateLoading(connection) }
.resumeConnection()
?.resumeConnection()
?: this@NettyNetworkHandler.resumeConnection() // concurrently closed by other thread.
}
override fun toString(): String = "StateConnecting"
@ -294,6 +300,12 @@ internal open class NettyNetworkHandler(
}
}
}
}.apply {
invokeOnCompletion { e ->
if (e != null) {
logger.debug { "x" }
}
}
}
private val configPush = launch(CoroutineName("ConfigPush sync")) {

View File

@ -25,6 +25,7 @@ import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContextImpl
import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory
import net.mamoe.mirai.internal.network.handler.state.StateObserver
import net.mamoe.mirai.internal.network.protocol.packet.login.StatSvc
import net.mamoe.mirai.internal.test.AbstractTest
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.debug
@ -72,6 +73,7 @@ internal abstract class AbstractRealNetworkHandlerTest<H : NetworkHandler> : Abs
override val client: QQAndroidClient get() = bot.client
override val ssoSession: SsoSession get() = bot.client
override var firstLoginSucceed: Boolean = false
override var registerResp: StatSvc.Register.Response? = null
override fun createObserverChain(): StateObserver = get(StateObserver)
override suspend fun login(handler: NetworkHandler) {
nhEvents.add(NHEvent.Login)