Add state observer

This commit is contained in:
Him188 2021-04-17 00:38:46 +08:00
parent 8786f12d5d
commit 1fef89cf1c
7 changed files with 243 additions and 14 deletions

View File

@ -19,6 +19,9 @@ import net.mamoe.mirai.internal.contact.OtherClientImpl
import net.mamoe.mirai.internal.contact.checkIsGroupImpl import net.mamoe.mirai.internal.contact.checkIsGroupImpl
import net.mamoe.mirai.internal.network.* import net.mamoe.mirai.internal.network.*
import net.mamoe.mirai.internal.network.handler.* import net.mamoe.mirai.internal.network.handler.*
import net.mamoe.mirai.internal.network.handler.impl.LoggingStateObserver
import net.mamoe.mirai.internal.network.handler.impl.SafeStateObserver
import net.mamoe.mirai.internal.network.handler.impl.StateObserver
import net.mamoe.mirai.internal.network.handler.impl.netty.NettyNetworkHandlerFactory import net.mamoe.mirai.internal.network.handler.impl.netty.NettyNetworkHandlerFactory
import net.mamoe.mirai.internal.network.net.protocol.SsoProcessor import net.mamoe.mirai.internal.network.net.protocol.SsoProcessor
import net.mamoe.mirai.internal.network.net.protocol.SsoProcessorContextImpl import net.mamoe.mirai.internal.network.net.protocol.SsoProcessorContextImpl
@ -49,10 +52,22 @@ internal fun QQAndroidBot.createOtherClient(
return OtherClientImpl(this, coroutineContext, info) return OtherClientImpl(this, coroutineContext, info)
} }
internal class BotDebugConfiguration(
var stateObserver: StateObserver? = when {
systemProp("mirai.debug.network.state.observer.logging", false) ->
SafeStateObserver(
LoggingStateObserver(MiraiLogger.create("States")),
MiraiLogger.create("StateObserver errors")
)
else -> null
}
)
@Suppress("INVISIBLE_MEMBER", "BooleanLiteralArgument", "OverridingDeprecatedMember") @Suppress("INVISIBLE_MEMBER", "BooleanLiteralArgument", "OverridingDeprecatedMember")
internal class QQAndroidBot constructor( internal class QQAndroidBot constructor(
internal val account: BotAccount, internal val account: BotAccount,
configuration: BotConfiguration configuration: BotConfiguration,
private val debugConfiguration: BotDebugConfiguration = BotDebugConfiguration(),
) : AbstractBot(configuration, account.id) { ) : AbstractBot(configuration, account.id) {
override val bot: QQAndroidBot get() = this override val bot: QQAndroidBot get() = this
@ -74,7 +89,12 @@ internal class QQAndroidBot constructor(
} }
override fun createNetworkHandler(coroutineContext: CoroutineContext): NetworkHandler { override fun createNetworkHandler(coroutineContext: CoroutineContext): NetworkHandler {
val context = NetworkHandlerContextImpl(this, ssoProcessor, configuration.networkLoggerSupplier(this)) val context = NetworkHandlerContextImpl(
this,
ssoProcessor,
configuration.networkLoggerSupplier(this),
debugConfiguration.stateObserver
)
return SelectorNetworkHandler( return SelectorNetworkHandler(
context, context,
FactoryKeepAliveNetworkHandlerSelector(NettyNetworkHandlerFactory, serverListNew, context) FactoryKeepAliveNetworkHandlerSelector(NettyNetworkHandlerFactory, serverListNew, context)

View File

@ -13,6 +13,7 @@ import net.mamoe.mirai.Bot
import net.mamoe.mirai.internal.QQAndroidBot import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.network.Packet import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.impl.StateObserver
import net.mamoe.mirai.internal.network.net.protocol.SsoProcessor import net.mamoe.mirai.internal.network.net.protocol.SsoProcessor
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketWithRespType import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketWithRespType
@ -31,12 +32,15 @@ internal interface NetworkHandlerContext {
val logger: MiraiLogger val logger: MiraiLogger
val ssoProcessor: SsoProcessor val ssoProcessor: SsoProcessor
val stateObserver: StateObserver?
} }
internal class NetworkHandlerContextImpl( internal class NetworkHandlerContextImpl(
override val bot: QQAndroidBot, override val bot: QQAndroidBot,
override val ssoProcessor: SsoProcessor, override val ssoProcessor: SsoProcessor,
override val logger: MiraiLogger, override val logger: MiraiLogger,
override val stateObserver: StateObserver?,
) : NetworkHandlerContext ) : NetworkHandlerContext
/** /**

View File

@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
private val PACKET_DEBUG = systemProp("mirai.debug.packet.logger", true) private val PACKET_DEBUG = systemProp("mirai.debug.network.packet.logger", true)
internal abstract class NetworkHandlerSupport( internal abstract class NetworkHandlerSupport(
override val context: NetworkHandlerContext, override val context: NetworkHandlerContext,
@ -121,7 +121,7 @@ internal abstract class NetworkHandlerSupport(
* *
* State can only be changed inside [setState]. * State can only be changed inside [setState].
*/ */
protected abstract inner class BaseStateImpl( abstract inner class BaseStateImpl(
val correspondingState: NetworkHandler.State, val correspondingState: NetworkHandler.State,
) : CoroutineScope by CoroutineScope(coroutineContext + SupervisorJob(coroutineContext.job)) { ) : CoroutineScope by CoroutineScope(coroutineContext + SupervisorJob(coroutineContext.job)) {
@ -129,7 +129,19 @@ internal abstract class NetworkHandlerSupport(
* May throw any exception that caused the state to fail. * May throw any exception that caused the state to fail.
*/ */
@Throws(Exception::class) @Throws(Exception::class)
abstract suspend fun resumeConnection() suspend fun resumeConnection() {
val observer = context.stateObserver
if (observer != null) {
observer.beforeStateResume(this@NetworkHandlerSupport, _state)
val result = kotlin.runCatching { resumeConnection0() }
observer.afterStateResume(this@NetworkHandlerSupport, _state, result)
result.getOrThrow()
} else {
resumeConnection0()
}
}
protected abstract suspend fun resumeConnection0()
} }
/** /**
@ -143,16 +155,26 @@ internal abstract class NetworkHandlerSupport(
/** /**
* You may need to call [BaseStateImpl.resumeConnection] since state is lazy. * You may need to call [BaseStateImpl.resumeConnection] since state is lazy.
*
* Do not check for instances of [BaseStateImpl]. Instances may be decorated by [StateObserver] for extended functionality.
*/ */
protected inline fun <S : BaseStateImpl> setState(crossinline new: () -> S): S = synchronized(this) { protected inline fun <S : BaseStateImpl> setState(crossinline new: () -> S): S = synchronized(this) {
// we can add hooks here for debug. // we can add hooks here for debug.
val impl = new() val impl = try {
new() // inline only once
} catch (e: Throwable) {
context.stateObserver?.exceptionOnCreatingNewState(this, _state, e)
throw e
}
val old = _state val old = _state
check(old !== impl) { "Old and new states cannot be the same." } check(old !== impl) { "Old and new states cannot be the same." }
old.cancel() old.cancel()
_state = impl _state = impl
context.stateObserver?.stateChanged(this, old, impl)
return impl return impl
} }

View File

@ -0,0 +1,159 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.handler.impl
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.error
/**
* Stateless observer of state changes.
*
* @see SafeStateObserver
* @see LoggingStateObserver
*/
internal interface StateObserver {
fun stateChanged(
networkHandler: NetworkHandlerSupport,
previous: NetworkHandlerSupport.BaseStateImpl,
new: NetworkHandlerSupport.BaseStateImpl
) {
}
fun exceptionOnCreatingNewState(
networkHandler: NetworkHandlerSupport,
previousState: NetworkHandlerSupport.BaseStateImpl,
exception: Throwable,
) {
}
fun beforeStateResume(
networkHandler: NetworkHandler,
state: NetworkHandlerSupport.BaseStateImpl,
) {
}
fun afterStateResume(
networkHandler: NetworkHandler,
state: NetworkHandlerSupport.BaseStateImpl,
result: Result<Unit>,
) {
}
}
internal class LoggingStateObserver(
val logger: MiraiLogger
) : StateObserver {
override fun stateChanged(
networkHandler: NetworkHandlerSupport,
previous: NetworkHandlerSupport.BaseStateImpl,
new: NetworkHandlerSupport.BaseStateImpl
) {
logger.error { "State changed: ${previous.correspondingState} -> ${new.correspondingState}" }
}
override fun exceptionOnCreatingNewState(
networkHandler: NetworkHandlerSupport,
previousState: NetworkHandlerSupport.BaseStateImpl,
exception: Throwable
) {
logger.error({ "State changed: ${previousState.correspondingState} -> $exception" }, exception)
}
override fun afterStateResume(
networkHandler: NetworkHandler,
state: NetworkHandlerSupport.BaseStateImpl,
result: Result<Unit>
) {
result.fold(
onSuccess = {
logger.error { "State resumed: ${state.correspondingState}." }
},
onFailure = {
logger.error(
{ "State resumed: ${state.correspondingState} ${result.exceptionOrNull()}" },
result.exceptionOrNull()
)
}
)
}
}
internal class ExceptionInStateObserverException(
override val cause: Throwable
) : RuntimeException()
/**
* Catches exception then log by [logger]
*/
internal class SafeStateObserver(
val delegate: StateObserver,
val logger: MiraiLogger,
) : StateObserver {
override fun stateChanged(
networkHandler: NetworkHandlerSupport,
previous: NetworkHandlerSupport.BaseStateImpl,
new: NetworkHandlerSupport.BaseStateImpl
) {
try {
delegate.stateChanged(networkHandler, previous, new)
} catch (e: Throwable) {
logger.error(
{ "Internal error: exception in StateObserver $delegate" },
ExceptionInStateObserverException(e)
)
}
}
override fun exceptionOnCreatingNewState(
networkHandler: NetworkHandlerSupport,
previousState: NetworkHandlerSupport.BaseStateImpl,
exception: Throwable
) {
try {
delegate.exceptionOnCreatingNewState(networkHandler, previousState, exception)
} catch (e: Throwable) {
logger.error(
{ "Internal error: exception in StateObserver $delegate" },
ExceptionInStateObserverException(e)
)
}
}
override fun beforeStateResume(networkHandler: NetworkHandler, state: NetworkHandlerSupport.BaseStateImpl) {
try {
delegate.beforeStateResume(networkHandler, state)
} catch (e: Throwable) {
logger.error(
{ "Internal error: exception in StateObserver $delegate" },
ExceptionInStateObserverException(e)
)
}
}
override fun afterStateResume(
networkHandler: NetworkHandler,
state: NetworkHandlerSupport.BaseStateImpl,
result: Result<Unit>
) {
try {
delegate.afterStateResume(networkHandler, state, result)
} catch (e: Throwable) {
logger.error(
{ "Internal error: exception in StateObserver $delegate" },
ExceptionInStateObserverException(e)
)
}
}
}

View File

@ -141,7 +141,7 @@ internal class NettyNetworkHandler(
error("Cannot send packet when connection is not set. (resumeConnection not called.)") error("Cannot send packet when connection is not set. (resumeConnection not called.)")
} }
override suspend fun resumeConnection() { override suspend fun resumeConnection0() {
setState { StateConnecting(PacketDecodePipeline(this@NettyNetworkHandler.coroutineContext)) } setState { StateConnecting(PacketDecodePipeline(this@NettyNetworkHandler.coroutineContext)) }
.resumeConnection() .resumeConnection()
} }
@ -173,7 +173,7 @@ internal class NettyNetworkHandler(
connection.await().writeAndFlush(packet) connection.await().writeAndFlush(packet)
} }
override suspend fun resumeConnection() { override suspend fun resumeConnection0() {
connectResult.await() // propagates exceptions connectResult.await() // propagates exceptions
} }
} }
@ -185,7 +185,7 @@ internal class NettyNetworkHandler(
connection.writeAndFlush(packet) connection.writeAndFlush(packet)
} }
override suspend fun resumeConnection() {} // noop override suspend fun resumeConnection0() {} // noop
} }
private inner class StateConnectionLost(private val cause: Throwable) : private inner class StateConnectionLost(private val cause: Throwable) :
@ -194,7 +194,7 @@ internal class NettyNetworkHandler(
throw IllegalStateException("Connection is lost so cannot send packet. Call resumeConnection first.", cause) throw IllegalStateException("Connection is lost so cannot send packet. Call resumeConnection first.", cause)
} }
override suspend fun resumeConnection() { override suspend fun resumeConnection0() {
setState { StateConnecting(PacketDecodePipeline(this@NettyNetworkHandler.coroutineContext)) } setState { StateConnecting(PacketDecodePipeline(this@NettyNetworkHandler.coroutineContext)) }
.resumeConnection() // the user wil .resumeConnection() // the user wil
} // noop } // noop
@ -208,7 +208,7 @@ internal class NettyNetworkHandler(
} }
override suspend fun sendPacketImpl(packet: OutgoingPacket) = error("NetworkHandler is already closed.") override suspend fun sendPacketImpl(packet: OutgoingPacket) = error("NetworkHandler is already closed.")
override suspend fun resumeConnection() { override suspend fun resumeConnection0() {
exception?.let { throw it } exception?.let { throw it }
} // noop } // noop
} }

View File

@ -19,9 +19,26 @@ internal val MockAccount = BotAccount(1, "pwd")
internal val MockConfiguration = BotConfiguration { internal val MockConfiguration = BotConfiguration {
} }
internal class MockBotBuilder(
val conf: BotConfiguration = BotConfiguration(),
val debugConf: BotDebugConfiguration = BotDebugConfiguration()
) {
fun conf(action: BotConfiguration.() -> Unit): MockBotBuilder {
conf.apply(action)
return this
}
fun debugConf(action: BotDebugConfiguration.() -> Unit): MockBotBuilder {
debugConf.apply(action)
return this
}
}
@Suppress("TestFunctionName") @Suppress("TestFunctionName")
internal fun MockBot(conf: BotConfiguration.() -> Unit) = internal fun MockBot(conf: MockBotBuilder.() -> Unit) =
QQAndroidBot(MockAccount, MockConfiguration.copy().apply(conf)) MockBotBuilder(MockConfiguration.copy()).apply(conf).run {
QQAndroidBot(MockAccount, this.conf, debugConf)
}
@Suppress("TestFunctionName") @Suppress("TestFunctionName")
internal fun MockBot() = internal fun MockBot() =

View File

@ -12,7 +12,10 @@ package net.mamoe.mirai.internal.network.handler
import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CompletableDeferred
import net.mamoe.mirai.internal.MockBot import net.mamoe.mirai.internal.MockBot
import net.mamoe.mirai.internal.QQAndroidBot import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.network.handler.impl.LoggingStateObserver
import net.mamoe.mirai.internal.network.handler.impl.NetworkHandlerSupport import net.mamoe.mirai.internal.network.handler.impl.NetworkHandlerSupport
import net.mamoe.mirai.internal.network.handler.impl.SafeStateObserver
import net.mamoe.mirai.internal.network.handler.impl.StateObserver
import net.mamoe.mirai.internal.network.net.protocol.SsoProcessor import net.mamoe.mirai.internal.network.net.protocol.SsoProcessor
import net.mamoe.mirai.internal.network.net.protocol.SsoProcessorContextImpl import net.mamoe.mirai.internal.network.net.protocol.SsoProcessorContextImpl
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
@ -25,6 +28,10 @@ internal class TestNetworkHandlerContext(
override val bot: QQAndroidBot = MockBot(), override val bot: QQAndroidBot = MockBot(),
override val logger: MiraiLogger = MiraiLogger.create("Test"), override val logger: MiraiLogger = MiraiLogger.create("Test"),
override val ssoProcessor: SsoProcessor = SsoProcessor(SsoProcessorContextImpl(bot)), override val ssoProcessor: SsoProcessor = SsoProcessor(SsoProcessorContextImpl(bot)),
override val stateObserver: StateObserver? = SafeStateObserver(
LoggingStateObserver(MiraiLogger.create("States")),
MiraiLogger.create("StateObserver errors")
),
) : NetworkHandlerContext ) : NetworkHandlerContext
internal open class TestNetworkHandler( internal open class TestNetworkHandler(
@ -38,7 +45,7 @@ internal open class TestNetworkHandler(
val resumeCount = AtomicInteger(0) val resumeCount = AtomicInteger(0)
val onResume get() = resumeDeferred.onJoin val onResume get() = resumeDeferred.onJoin
override suspend fun resumeConnection() { override suspend fun resumeConnection0() {
resumeCount.incrementAndGet() resumeCount.incrementAndGet()
resumeDeferred.complete(Unit) resumeDeferred.complete(Unit)
} }