1
0
mirror of https://github.com/mamoe/mirai.git synced 2025-04-25 21:23:55 +08:00

Correct state transition logic:

- Linear lifecycle for NetworkHandler: no chance to change state back to previous ones
- No reconnection attempts in NetworkHandler but in NHSelector
- Ensure clashing checks performed in setState
- Print closure exception in TestNettyNH.setStateOK
This commit is contained in:
Him188 2021-05-09 21:05:32 +08:00
parent d43a77b590
commit f7f4ccf4f5
6 changed files with 69 additions and 35 deletions
mirai-core-utils/src/commonMain/kotlin
mirai-core/src
commonMain/kotlin/network
commonTest/kotlin/network

View File

@ -0,0 +1,19 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.utils
import kotlin.RequiresOptIn.Level.ERROR
import kotlin.annotation.AnnotationTarget.*
@RequiresOptIn("This can only be used in tests.", level = ERROR)
@Target(CLASS, FUNCTION, PROPERTY)
@Retention(AnnotationRetention.BINARY)
public annotation class TestOnly

View File

@ -63,8 +63,9 @@ internal interface NetworkHandler : CoroutineScope {
*
* The state after finish of [State.LOADING] is [State.OK]. This state lasts for the majority of time.
*
* When connection is lost (e.g. due to Internet unavailability), it returns to [State.CONNECTING] and repeatedly attempts to reconnect.
* Immediately after successful recovery, [State.OK] will be set.
* When connection is lost (e.g. due to Internet unavailability), it does NOT return to [State.CONNECTING] but to [State.CLOSED]. No attempts is allowed.
*
* Retrial may only be performed in [SelectorNetworkHandler].
*
* @see state
*/
@ -101,7 +102,7 @@ internal interface NetworkHandler : CoroutineScope {
/**
* The terminal state. Both [resumeConnection] and [sendAndExpect] throw a [IllegalStateException].
*
* **Important nodes**: if [NetworkHandler] is [SelectorNetworkHandler], it might return to a normal state e.g. [INITIALIZED] if new instance of [NetworkHandler] is created.
* **Important nodes**: iff [NetworkHandler] is [SelectorNetworkHandler], it might return to a normal state e.g. [INITIALIZED] if new instance of [NetworkHandler] is created.
* However callers usually do not need to pay extra attention on this behavior. Everything will just work fine if you consider [CLOSED] as a final, non-recoverable state.
*
* At this state [resumeConnection] throws the exception caught from underlying socket implementation (i.e netty).

View File

@ -163,6 +163,8 @@ internal abstract class NetworkHandlerSupport(
/**
* State is *lazy*, initialized only if requested.
*
* You need to call setter inside `synchronized(this) { }`.
*/
@Suppress("PropertyName")
protected var _state: BaseStateImpl by lateinitMutableProperty { initialState() }
@ -185,14 +187,24 @@ internal abstract class NetworkHandlerSupport(
/**
* Attempts to change state. Returns null if new state has same [class][KClass] as current.
* Attempts to change state.
*
* Returns null if new state has same [class][KClass] as current (meaning already set by another thread).
*/
protected inline fun <reified S : BaseStateImpl> setState(noinline new: () -> S): S? = setState(S::class, new)
protected inline fun <reified S : BaseStateImpl> setState(
old: BaseStateImpl, noinline new: () -> S
): S? = synchronized(this) {
if (_state === old) {
setState(new)
/**
* Attempts to change state if current state is [this].
*
* Returns null if new state has same [class][KClass] as current or when current state is already set to another state concurrently by another thread.
*
* This is designed to be used inside [BaseStateImpl].
*/
protected inline fun <reified S : BaseStateImpl> BaseStateImpl.setState(
noinline new: () -> S
): S? = synchronized(this@NetworkHandlerSupport) {
if (_state === this) {
this@NetworkHandlerSupport.setState(new)
} else {
null
}
@ -204,20 +216,27 @@ internal abstract class NetworkHandlerSupport(
*
* You may need to call [BaseStateImpl.resumeConnection] to activate the new state, as states are lazy.
*/
protected fun <S : BaseStateImpl> setState(newType: KClass<S>?, new: () -> S): S? = synchronized(this) {
if (newType != null && _state::class == newType) return@synchronized null // already set to expected state by another thread. Avoid replications.
if (_state.correspondingState == NetworkHandler.State.CLOSED) return null // CLOSED is final.
@JvmName("setState1")
protected fun <S : BaseStateImpl> setState(newType: KClass<S>, new: () -> S): S? =
@OptIn(TestOnly::class)
setStateImpl(newType as KClass<S>?, new)
// newType can be null iff in tests, to ignore checks.
@TestOnly
internal fun <S : BaseStateImpl> setStateImpl(newType: KClass<S>?, new: () -> S): S? = synchronized(this) {
val old = _state
if (newType != null && old::class == newType) return@synchronized null // already set to expected state by another thread. Avoid replications.
if (old.correspondingState == NetworkHandler.State.CLOSED) return null // CLOSED is final.
val stateObserver = context.getOrNull(StateObserver)
val impl = try {
new() // inline only once
} catch (e: Throwable) {
stateObserver?.exceptionOnCreatingNewState(this, _state, e)
stateObserver?.exceptionOnCreatingNewState(this, old, e)
throw e
}
val old = _state
check(old !== impl) { "Old and new states cannot be the same." }

View File

@ -218,7 +218,7 @@ internal open class NettyNetworkHandler(
}
override suspend fun resumeConnection0() {
setState(this) { StateConnecting(ExceptionCollector()) }
this.setState { StateConnecting(ExceptionCollector()) }
?.resumeConnection()
?: this@NettyNetworkHandler.resumeConnection() // concurrently closed by other thread.
}
@ -240,13 +240,8 @@ internal open class NettyNetworkHandler(
* Dropped when state becomes [StateOK].
*/
private val collectiveExceptions: ExceptionCollector,
/**
* If `true`, [delay] 5 seconds before connecting.
*/
wait: Boolean = false
) : NettyState(State.CONNECTING) {
private val connection = async {
if (wait) delay(RECONNECT_DELAY)
createConnection(decodePipeline)
}
@ -263,17 +258,15 @@ internal open class NettyNetworkHandler(
this@NettyNetworkHandler.launch { resumeConnection() }
} else {
if (error is StateSwitchingException && error.new is StateConnecting) {
return@invokeOnCompletion // already been switching to CONNECTING
return@invokeOnCompletion // state already switched, so do not do it again.
}
setState(null) { // ignore replication check
// StateConnecting(
// collectiveExceptions.apply { collect(error) },
// wait = true
// )
setState {
// logon failure closes the network handler.
StateClosed(collectiveExceptions.collectGet(error))
} // logon failure closes the network handler.
// The exception will be ignored unless all further attempts recovering connection have failed.
// This is to reduce useless logs for the user----there is nothing to worry about if we can recover the connection.
}
}
// and this error will also be thrown by `StateConnecting.resumeConnection`
}
}
@ -289,7 +282,7 @@ internal open class NettyNetworkHandler(
override suspend fun resumeConnection0() {
connectResult.await() // propagates exceptions
val connection = connection.await()
setState(this) { StateLoading(connection) }
this.setState { StateLoading(connection) }
?.resumeConnection()
?: this@NettyNetworkHandler.resumeConnection() // concurrently closed by other thread.
}
@ -328,7 +321,7 @@ internal open class NettyNetworkHandler(
join()
}
joinCompleted(configPush) // throw exception
setState(this) { StateOK(connection, configPush) }
setState { StateOK(connection, configPush) }
} // noop
override fun toString(): String = "StateLoading"
@ -363,9 +356,7 @@ internal open class NettyNetworkHandler(
try {
action()
} catch (e: Throwable) {
setState {
StateConnecting(ExceptionCollector(IllegalStateException("Exception in $name job", e)))
}
setState { StateClosed(IllegalStateException("Exception in $name job", e)) }
}
}
}.apply {

View File

@ -25,6 +25,7 @@ import net.mamoe.mirai.internal.network.handler.state.SafeStateObserver
import net.mamoe.mirai.internal.network.handler.state.StateObserver
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.TestOnly
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
@ -68,8 +69,10 @@ internal open class TestNetworkHandler(
}
}
@OptIn(TestOnly::class)
fun setState(correspondingState: NetworkHandler.State) {
setState(null) { TestState(correspondingState) }
// `null` means ignore checks. All test states have same type TestState.
setStateImpl(null) { TestState(correspondingState) }
}
private val initialState = TestState(NetworkHandler.State.INITIALIZED)

View File

@ -31,10 +31,11 @@ internal open class TestNettyNH(
}
fun setStateConnecting(exception: Throwable? = null) {
setState { StateConnecting(ExceptionCollector(exception), false) }
setState { StateConnecting(ExceptionCollector(exception)) }
}
fun setStateOK(channel: Channel, exception: Throwable? = null) {
exception?.printStackTrace()
setState { StateOK(channel, CompletableDeferred(Unit)) }
}