From 82ad953b2b9e4c482afdf0d96f4d82cdd2153609 Mon Sep 17 00:00:00 2001 From: Him188 Date: Sat, 17 Apr 2021 23:23:48 +0800 Subject: [PATCH] Continuing implement states for NetworkHandler --- .../src/commonMain/kotlin/AbstractBot.kt | 4 +- .../src/commonMain/kotlin/QQAndroidBot.kt | 53 ++++---- .../kotlin/contact/OtherClientImpl.kt | 7 ++ .../kotlin/network/handler/NetworkHandler.kt | 28 ++++- .../network/handler/NetworkHandlerSupport.kt | 34 ++++- .../network/handler/component/ComponentKey.kt | 2 +- .../handler/component/ComponentStorage.kt | 2 + .../component/ConcurrentComponentStorage.kt | 2 +- .../component/MutableComponentStorage.kt | 4 +- .../components/AccountSecretsManager.kt | 2 - .../handler/components/BotInitProcessor.kt | 118 ++++++++++++++++++ .../handler/components/ConfigPushSyncer.kt | 24 ++++ .../handler/components/ContactUpdater.kt | 37 +++--- .../components/NetworkHandlerReference.kt | 21 ++++ .../handler/components/OtherClientUpdater.kt | 58 +++++++++ .../network/handler/components/PacketCodec.kt | 8 +- .../handler/components/PacketHandler.kt | 113 +++++++++++++++++ .../network/handler/components/ServerList.kt | 4 + .../handler/components/SsoProcessor.kt | 38 ++++++ .../handler/impl/netty/NettyNetworkHandler.kt | 64 +++++++--- ...AbstractKeepAliveNetworkHandlerSelector.kt | 14 ++- .../selector/SelectorNetworkHandler.kt | 9 ++ .../handler/state/CombinedStateObserver.kt | 12 +- .../handler/state/LoggingStateObserver.kt | 6 +- .../handler/state/SafeStateObserver.kt | 5 + .../handler/state/StateChangedObserver.kt | 33 +++++ .../network/handler/state/StateObserver.kt | 17 ++- .../net/protocol/implementation notes.kt | 87 ------------- .../protocol/packet/OutgoingPacketAndroid.kt | 21 ++-- .../chat/receive/MessageSvc.PbGetMsg.kt | 3 +- .../receive/MessageSvc.RequestPushStatus.kt | 2 +- .../network/protocol/packet/login/StatSvc.kt | 2 +- .../network/component/ComponentKeyTest.kt | 2 +- ...ractKeepAliveNetworkHandlerSelectorTest.kt | 2 +- .../handler/AbstractNetworkHandlerTest.kt | 40 ++++++ .../network/handler/StateObserverTest.kt | 67 ++++++++++ 36 files changed, 757 insertions(+), 188 deletions(-) create mode 100644 mirai-core/src/commonMain/kotlin/network/handler/components/BotInitProcessor.kt create mode 100644 mirai-core/src/commonMain/kotlin/network/handler/components/ConfigPushSyncer.kt create mode 100644 mirai-core/src/commonMain/kotlin/network/handler/components/NetworkHandlerReference.kt create mode 100644 mirai-core/src/commonMain/kotlin/network/handler/components/OtherClientUpdater.kt create mode 100644 mirai-core/src/commonMain/kotlin/network/handler/components/PacketHandler.kt create mode 100644 mirai-core/src/commonMain/kotlin/network/handler/state/StateChangedObserver.kt create mode 100644 mirai-core/src/commonTest/kotlin/network/handler/AbstractNetworkHandlerTest.kt create mode 100644 mirai-core/src/commonTest/kotlin/network/handler/StateObserverTest.kt diff --git a/mirai-core/src/commonMain/kotlin/AbstractBot.kt b/mirai-core/src/commonMain/kotlin/AbstractBot.kt index da8ba5fc7..afd8c5f60 100644 --- a/mirai-core/src/commonMain/kotlin/AbstractBot.kt +++ b/mirai-core/src/commonMain/kotlin/AbstractBot.kt @@ -171,14 +171,14 @@ internal abstract class AbstractBot constructor( // network /////////////////////////////////////////////////////////////////////////// - val network: NetworkHandler by lazy { createNetworkHandler(coroutineContext) } + val network: NetworkHandler by lazy { createNetworkHandler() } final override suspend fun login() { if (!isActive) error("Bot is already closed and cannot relogin. Please create a new Bot instance then do login.") network.resumeConnection() } - protected abstract fun createNetworkHandler(coroutineContext: CoroutineContext): NetworkHandler + protected abstract fun createNetworkHandler(): NetworkHandler protected abstract suspend fun sendLogout() // endregion diff --git a/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt b/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt index 8b181661f..d2b3e3486 100644 --- a/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt +++ b/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt @@ -14,23 +14,21 @@ import kotlinx.coroutines.sync.Mutex import net.mamoe.mirai.Bot import net.mamoe.mirai.Mirai import net.mamoe.mirai.contact.Group -import net.mamoe.mirai.contact.OtherClientInfo -import net.mamoe.mirai.internal.contact.OtherClientImpl import net.mamoe.mirai.internal.contact.checkIsGroupImpl import net.mamoe.mirai.internal.network.Packet import net.mamoe.mirai.internal.network.handler.NetworkHandler +import net.mamoe.mirai.internal.network.handler.component.ComponentStorage import net.mamoe.mirai.internal.network.handler.component.ConcurrentComponentStorage -import net.mamoe.mirai.internal.network.handler.component.set import net.mamoe.mirai.internal.network.handler.components.* import net.mamoe.mirai.internal.network.handler.context.NetworkHandlerContextImpl import net.mamoe.mirai.internal.network.handler.context.SsoProcessorContextImpl import net.mamoe.mirai.internal.network.handler.impl.netty.NettyNetworkHandlerFactory -import net.mamoe.mirai.internal.network.handler.logger import net.mamoe.mirai.internal.network.handler.selector.FactoryKeepAliveNetworkHandlerSelector import net.mamoe.mirai.internal.network.handler.selector.SelectorNetworkHandler import net.mamoe.mirai.internal.network.handler.state.LoggingStateObserver import net.mamoe.mirai.internal.network.handler.state.SafeStateObserver import net.mamoe.mirai.internal.network.handler.state.StateObserver +import net.mamoe.mirai.internal.network.handler.state.safe import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketWithRespType import net.mamoe.mirai.internal.network.protocol.packet.login.StatSvc @@ -38,7 +36,6 @@ import net.mamoe.mirai.utils.BotConfiguration import net.mamoe.mirai.utils.MiraiLogger import net.mamoe.mirai.utils.systemProp import kotlin.contracts.contract -import kotlin.coroutines.CoroutineContext internal fun Bot.asQQAndroidBot(): QQAndroidBot { contract { @@ -48,18 +45,12 @@ internal fun Bot.asQQAndroidBot(): QQAndroidBot { return this as QQAndroidBot } -internal fun QQAndroidBot.createOtherClient( - info: OtherClientInfo, -): OtherClientImpl { - return OtherClientImpl(this, coroutineContext, info) -} - internal class BotDebugConfiguration( var stateObserver: StateObserver? = when { systemProp("mirai.debug.network.state.observer.logging", false) -> SafeStateObserver( LoggingStateObserver(MiraiLogger.create("States")), - MiraiLogger.create("StateObserver errors") + MiraiLogger.create("LoggingStateObserver errors") ) else -> null } @@ -81,19 +72,39 @@ internal class QQAndroidBot constructor( // TODO: 2021/4/14 bdhSyncer.loadFromCache() when login + // IDE error, don't move into lazy + private fun ComponentStorage.stateObserverChain(): StateObserver { + val components = this + return StateObserver.chainOfNotNull( + components[BotInitProcessor].asObserver().safe(networkLogger), + debugConfiguration.stateObserver + ) + } + + + private val networkLogger: MiraiLogger by lazy { configuration.networkLoggerSupplier(this) } internal val components: ConcurrentComponentStorage by lazy { ConcurrentComponentStorage().apply { - set( - SsoProcessor, - SsoProcessorImpl(SsoProcessorContextImpl(bot)) - ) // put sso processor at the first to make `client` faster. + val components = this // avoid mistakes + set(SsoProcessor, SsoProcessorImpl(SsoProcessorContextImpl(bot))) + // put sso processor at the first to make `client` faster. - set(StateObserver, debugConfiguration.stateObserver) + set(BotInitProcessor, BotInitProcessorImpl(bot, components, bot.logger)) set(ContactCacheService, ContactCacheServiceImpl(bot)) - set(ContactUpdater, ContactUpdaterImpl(bot, this)) - set(BdhSessionSyncer, BdhSessionSyncerImpl(configuration, network.logger, this)) + set(ContactUpdater, ContactUpdaterImpl(bot, components, networkLogger)) + set(BdhSessionSyncer, BdhSessionSyncerImpl(configuration, networkLogger, components)) set(ServerList, ServerListImpl()) + set( + PacketHandler, PacketHandlerChain( + LoggingPacketHandler(bot, components, logger), + EventBroadcasterPacketHandler(bot, components, logger) + ) + ) + set(PacketCodec, PacketCodecImpl()) + set(OtherClientUpdater, OtherClientUpdaterImpl(bot, components, bot.logger)) + set(ConfigPushSyncer, ConfigPushSyncerImpl()) + set(StateObserver, stateObserverChain()) // TODO: 2021/4/16 load server list from cache (add a provider) // bot.bdhSyncer.loadServerListFromCache() @@ -107,10 +118,10 @@ internal class QQAndroidBot constructor( network.sendWithoutExpect(StatSvc.Register.offline(client)) } - override fun createNetworkHandler(coroutineContext: CoroutineContext): NetworkHandler { + override fun createNetworkHandler(): NetworkHandler { val context = NetworkHandlerContextImpl( this, - configuration.networkLoggerSupplier(this), + networkLogger, components ) return SelectorNetworkHandler( diff --git a/mirai-core/src/commonMain/kotlin/contact/OtherClientImpl.kt b/mirai-core/src/commonMain/kotlin/contact/OtherClientImpl.kt index 6affaeacb..b6056125f 100644 --- a/mirai-core/src/commonMain/kotlin/contact/OtherClientImpl.kt +++ b/mirai-core/src/commonMain/kotlin/contact/OtherClientImpl.kt @@ -12,6 +12,7 @@ package net.mamoe.mirai.internal.contact import net.mamoe.mirai.Bot import net.mamoe.mirai.contact.OtherClient import net.mamoe.mirai.contact.OtherClientInfo +import net.mamoe.mirai.internal.QQAndroidBot import net.mamoe.mirai.message.MessageReceipt import net.mamoe.mirai.message.data.Image import net.mamoe.mirai.message.data.Message @@ -21,6 +22,12 @@ import kotlin.coroutines.CoroutineContext internal inline val OtherClient.appId: Int get() = info.appId +internal fun QQAndroidBot.createOtherClient( + info: OtherClientInfo, +): OtherClientImpl { + return OtherClientImpl(this, coroutineContext, info) +} + internal class OtherClientImpl( bot: Bot, coroutineContext: CoroutineContext, diff --git a/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandler.kt b/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandler.kt index fb6a656ac..579b17cd7 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandler.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandler.kt @@ -9,9 +9,10 @@ package net.mamoe.mirai.internal.network.handler +import kotlinx.coroutines.selects.SelectClause1 import net.mamoe.mirai.internal.network.Packet import net.mamoe.mirai.internal.network.handler.NetworkHandler.State -import net.mamoe.mirai.internal.network.handler.context.NetworkHandlerContext +import net.mamoe.mirai.internal.network.handler.selector.SelectorNetworkHandler import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketWithRespType import net.mamoe.mirai.utils.MiraiLogger @@ -37,6 +38,11 @@ internal interface NetworkHandler { */ val state: State + /** + * For suspension until a state. e.g login. + */ + val onStateChanged: SelectClause1 + enum class State { /** * Just created and no connection has been made. @@ -54,12 +60,25 @@ internal interface NetworkHandler { CONNECTING, /** - * Everything is working. [resumeConnection] does nothing. [sendAndExpect] does not suspend for connection reasons. + * Loading essential data from server and local cache. Data include contact list. + * + * At this state [resumeConnection] waits for the jobs. [sendAndExpect] works normally. + */ + LOADING, + + /** + * Everything is working. + * + * At this state [resumeConnection] does nothing. [sendAndExpect] works normally. */ OK, /** - * No Internet Connection available or for any other reasons but it is possible to establish a connection again(switching state to [CONNECTING]). + * No Internet Connection available or for any other reasons + * but it is possible to establish a connection again(switching state to [CONNECTING]). + * + * At this state [resumeConnection] turns the handle to [CONNECTING]. + * [sendAndExpect] throws [IllegalStateException] */ CONNECTION_LOST, @@ -67,6 +86,9 @@ internal interface NetworkHandler { * Cannot resume anymore. Both [resumeConnection] and [sendAndExpect] throw a [CancellationException]. * * When a handler reached [CLOSED] state, it is finalized and cannot be restored to any other states. + * + * At this state [resumeConnection] throws the exception caught from underlying socket implementation (i.e netty). + * [sendAndExpect] throws [IllegalStateException] */ CLOSED, } diff --git a/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerSupport.kt b/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerSupport.kt index bb77159c7..bb92ef41b 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerSupport.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerSupport.kt @@ -10,8 +10,10 @@ package net.mamoe.mirai.internal.network.handler import kotlinx.coroutines.* +import kotlinx.coroutines.selects.SelectClause1 import net.mamoe.mirai.internal.network.Packet import net.mamoe.mirai.internal.network.handler.components.PacketCodec +import net.mamoe.mirai.internal.network.handler.components.PacketHandler import net.mamoe.mirai.internal.network.handler.components.RawIncomingPacket import net.mamoe.mirai.internal.network.handler.context.NetworkHandlerContext import net.mamoe.mirai.internal.network.handler.state.StateObserver @@ -32,11 +34,12 @@ internal abstract class NetworkHandlerSupport( protected abstract fun initialState(): BaseStateImpl protected abstract suspend fun sendPacketImpl(packet: OutgoingPacket) + private val packetHandler: PacketHandler by lazy { context[PacketHandler] } + /** * Called when a packet is received. */ - protected fun collectReceived(packet: IncomingPacket) { - logger.verbose({ "Recv: ${packet.commandName} ${packet.data ?: packet.exception}" }, packet.exception) + protected open fun collectReceived(packet: IncomingPacket) { for (listener in packetListeners) { if (!listener.isExpected(packet)) continue if (packetListeners.remove(listener)) { @@ -48,6 +51,13 @@ internal abstract class NetworkHandlerSupport( } } } + launch { + try { + packetHandler.handlePacket(packet) + } catch (e: Throwable) { // do not pass it to CoroutineExceptionHandler for a more controllable behavior. + logger.error(e) + } + } } protected fun collectUnknownPacket(raw: RawIncomingPacket) { @@ -74,7 +84,7 @@ internal abstract class NetworkHandlerSupport( exception = e // show last exception } } finally { - listener.result.complete(null) + listener.result.completeExceptionally(exception ?: IllegalStateException("No response")) packetListeners.remove(listener) } } @@ -161,6 +171,13 @@ internal abstract class NetworkHandlerSupport( final override val state: NetworkHandler.State get() = _state.correspondingState + private var _stateChangedDeferred = CompletableDeferred() + + /** + * For suspension until a state. e.g login. + */ + override val onStateChanged: SelectClause1 get() = _stateChangedDeferred.onAwait + /** * Can only be used in a job launched within the state scope. */ @@ -192,10 +209,17 @@ internal abstract class NetworkHandlerSupport( val old = _state check(old !== impl) { "Old and new states cannot be the same." } - old.cancel(CancellationException("State is switched from $old to $impl")) - _state = impl + + // Order notes: + // 1. Notify observers to attach jobs to [impl] (if so) + _stateChangedDeferred.complete(impl.correspondingState) stateObserver?.stateChanged(this, old, impl) + _stateChangedDeferred = CompletableDeferred() + // 2. Update state to [state]. This affects selectors. + _state = impl // switch state first. selector may be busy selecting. + // 3. Cleanup, cancel old states. + old.cancel(CancellationException("State is switched from $old to $impl")) return impl } diff --git a/mirai-core/src/commonMain/kotlin/network/handler/component/ComponentKey.kt b/mirai-core/src/commonMain/kotlin/network/handler/component/ComponentKey.kt index 36c89d329..1931e0846 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/component/ComponentKey.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/component/ComponentKey.kt @@ -20,7 +20,7 @@ import kotlin.reflect.full.allSupertypes * * @param T is a type hint. */ -internal interface ComponentKey { +internal interface ComponentKey { /** * Get name of `T`. * diff --git a/mirai-core/src/commonMain/kotlin/network/handler/component/ComponentStorage.kt b/mirai-core/src/commonMain/kotlin/network/handler/component/ComponentStorage.kt index 00af02be4..1b8929a75 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/component/ComponentStorage.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/component/ComponentStorage.kt @@ -23,5 +23,7 @@ internal interface ComponentStorage { @Throws(NoSuchComponentException::class) operator fun get(key: ComponentKey): T fun getOrNull(key: ComponentKey): T? + + override fun toString(): String } diff --git a/mirai-core/src/commonMain/kotlin/network/handler/component/ConcurrentComponentStorage.kt b/mirai-core/src/commonMain/kotlin/network/handler/component/ConcurrentComponentStorage.kt index d17012ad6..7110fcaa3 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/component/ConcurrentComponentStorage.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/component/ConcurrentComponentStorage.kt @@ -31,7 +31,7 @@ internal class ConcurrentComponentStorage( return map[key] as T? } - override operator fun set(key: ComponentKey, value: @UnsafeVariance T) { + override operator fun set(key: ComponentKey, value: T) { map[key] = value } diff --git a/mirai-core/src/commonMain/kotlin/network/handler/component/MutableComponentStorage.kt b/mirai-core/src/commonMain/kotlin/network/handler/component/MutableComponentStorage.kt index 52420ade5..2740c5151 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/component/MutableComponentStorage.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/component/MutableComponentStorage.kt @@ -14,11 +14,11 @@ package net.mamoe.mirai.internal.network.handler.component */ internal interface MutableComponentStorage : ComponentStorage { override operator fun get(key: ComponentKey): T - operator fun set(key: ComponentKey, value: @UnsafeVariance T) + operator fun set(key: ComponentKey, value: T) fun remove(key: ComponentKey): T? } -internal operator fun MutableComponentStorage.set(key: ComponentKey, value: @UnsafeVariance T?) { +internal operator fun MutableComponentStorage.set(key: ComponentKey, value: T?) { if (value == null) { remove(key) } else { diff --git a/mirai-core/src/commonMain/kotlin/network/handler/components/AccountSecretsManager.kt b/mirai-core/src/commonMain/kotlin/network/handler/components/AccountSecretsManager.kt index d6697a5e8..d5f7354e9 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/components/AccountSecretsManager.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/components/AccountSecretsManager.kt @@ -76,8 +76,6 @@ internal class FileCacheAccountSecretsManager( ) logger.info { "Saved account secrets to local cache for fast login." } - - TEA.encrypt(file.readBytes(), account.passwordMd5).loadAs(AccountSecretsImpl.serializer()) } override fun getSecrets(account: BotAccount): AccountSecrets? { diff --git a/mirai-core/src/commonMain/kotlin/network/handler/components/BotInitProcessor.kt b/mirai-core/src/commonMain/kotlin/network/handler/components/BotInitProcessor.kt new file mode 100644 index 000000000..204fba8b6 --- /dev/null +++ b/mirai-core/src/commonMain/kotlin/network/handler/components/BotInitProcessor.kt @@ -0,0 +1,118 @@ +/* + * Copyright 2019-2021 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/master/LICENSE + */ + +package net.mamoe.mirai.internal.network.handler.components + +import kotlinx.atomicfu.atomic +import kotlinx.coroutines.* +import net.mamoe.mirai.event.nextEvent +import net.mamoe.mirai.internal.QQAndroidBot +import net.mamoe.mirai.internal.network.Packet +import net.mamoe.mirai.internal.network.handler.NetworkHandler.State +import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport +import net.mamoe.mirai.internal.network.handler.component.ComponentKey +import net.mamoe.mirai.internal.network.handler.component.ComponentStorage +import net.mamoe.mirai.internal.network.handler.state.StateChangedObserver +import net.mamoe.mirai.internal.network.handler.state.StateObserver +import net.mamoe.mirai.internal.network.protocol.data.proto.MsgSvc +import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket +import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketWithRespType +import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.MessageSvcPbGetMsg +import net.mamoe.mirai.internal.network.protocol.packet.login.StatSvc +import net.mamoe.mirai.internal.network.protocol.packet.sendAndExpect +import net.mamoe.mirai.utils.MiraiLogger +import net.mamoe.mirai.utils.info + +internal interface BotInitProcessor { + suspend fun init() + + companion object : ComponentKey +} + +internal fun BotInitProcessor.asObserver(targetState: State = State.LOADING): StateObserver { + return BotInitProcessorAsStateObserverAdapter(this, targetState) +} + +private class BotInitProcessorAsStateObserverAdapter( + private val processor: BotInitProcessor, + targetState: State +) : StateChangedObserver(targetState) { + override fun stateChanged0( + networkHandler: NetworkHandlerSupport, + previous: NetworkHandlerSupport.BaseStateImpl, + new: NetworkHandlerSupport.BaseStateImpl + ) { + new.launch(CoroutineName("BotInitProcessor.init")) { + try { + processor.init() + } catch (e: Throwable) { + throw IllegalStateException("Exception in BotInitProcessor.init", e) + } + } + } + + override fun toString(): String { + return "BotInitProcessorAsStateObserverAdapter" + } +} + + +internal class BotInitProcessorImpl( + private val bot: QQAndroidBot, + private val context: ComponentStorage, + private val logger: MiraiLogger, +) : BotInitProcessor { + + private val initialized = atomic(false) + + override tailrec suspend fun init() { + if (initialized.value) return + if (!initialized.compareAndSet(expect = false, update = true)) return init() + + check(bot.isActive) { "bot is dead therefore network can't init." } + context[ContactUpdater].closeAllContacts(CancellationException("re-init")) + + val registerResp = registerClientOnline() + + bot.launch(CoroutineName("Awaiting ConfigPushSvc.PushReq")) { + context[ConfigPushSyncer].awaitSync() + } // TODO: 2021/4/17 should we launch here? + + // do them parallel. + supervisorScope { +// launch { syncMessageSvc() } + launch { context[OtherClientUpdater].update() } + launch { context[ContactUpdater].loadAll(registerResp.origin) } + } + + bot.firstLoginSucceed = true + } + + private suspend fun registerClientOnline(): StatSvc.Register.Response { + return StatSvc.Register.online(context[SsoProcessor].client).sendAndExpect(bot) + } + + private suspend fun syncMessageSvc() { + logger.info { "Syncing friend message history..." } + withTimeoutOrNull(30000) { + launch(CoroutineName("Syncing friend message history")) { + nextEvent { + it.bot == this@BotInitProcessorImpl.bot + } + } + MessageSvcPbGetMsg(bot.client, MsgSvc.SyncFlag.START, null).sendAndExpect() + } ?: error("timeout syncing friend message history.") + logger.info { "Syncing friend message history: Success." } + } + + private suspend inline fun OutgoingPacket.sendAndExpect() = this.sendAndExpect(bot.network) + private suspend inline fun OutgoingPacketWithRespType.sendAndExpect() = + this.sendAndExpect(bot.network) +} + diff --git a/mirai-core/src/commonMain/kotlin/network/handler/components/ConfigPushSyncer.kt b/mirai-core/src/commonMain/kotlin/network/handler/components/ConfigPushSyncer.kt new file mode 100644 index 000000000..0f6d04c28 --- /dev/null +++ b/mirai-core/src/commonMain/kotlin/network/handler/components/ConfigPushSyncer.kt @@ -0,0 +1,24 @@ +/* + * Copyright 2019-2021 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/master/LICENSE + */ + +package net.mamoe.mirai.internal.network.handler.components + +import net.mamoe.mirai.internal.network.handler.component.ComponentKey + +internal interface ConfigPushSyncer { + suspend fun awaitSync() + + companion object : ComponentKey +} + +internal class ConfigPushSyncerImpl : ConfigPushSyncer { + override suspend fun awaitSync() { + // TODO("Not yet implemented") + } +} \ No newline at end of file diff --git a/mirai-core/src/commonMain/kotlin/network/handler/components/ContactUpdater.kt b/mirai-core/src/commonMain/kotlin/network/handler/components/ContactUpdater.kt index 19b5faec1..d685e87ca 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/components/ContactUpdater.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/components/ContactUpdater.kt @@ -13,7 +13,9 @@ import kotlinx.coroutines.CancellationException import kotlinx.coroutines.cancel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withPermit import net.mamoe.mirai.Mirai import net.mamoe.mirai.data.FriendInfo @@ -38,6 +40,8 @@ import net.mamoe.mirai.internal.network.protocol.data.jce.isValid import net.mamoe.mirai.internal.network.protocol.packet.chat.TroopManagement import net.mamoe.mirai.internal.network.protocol.packet.list.FriendList import net.mamoe.mirai.internal.network.protocol.packet.list.StrangerList +import net.mamoe.mirai.internal.network.protocol.packet.sendAndExpect +import net.mamoe.mirai.utils.MiraiLogger import net.mamoe.mirai.utils.info import net.mamoe.mirai.utils.retryCatching import net.mamoe.mirai.utils.verbose @@ -46,7 +50,7 @@ import net.mamoe.mirai.utils.verbose * Uses [ContactCacheService] */ internal interface ContactUpdater { - suspend fun loadAll(registerResp: SvcRespRegister) // TODO: 2021/4/17 call this fun + suspend fun loadAll(registerResp: SvcRespRegister) fun closeAllContacts(e: CancellationException) @@ -55,16 +59,19 @@ internal interface ContactUpdater { internal class ContactUpdaterImpl( val bot: QQAndroidBot, // not good - val components: ComponentStorage + val components: ComponentStorage, + private val logger: MiraiLogger, ) : ContactUpdater { private val cacheService get() = components[ContactCacheService] + private val lock = Mutex() - @Synchronized override suspend fun loadAll(registerResp: SvcRespRegister) { - coroutineScope { - launch { reloadFriendList(registerResp) } - launch { reloadGroupList() } - launch { reloadStrangerList() } + lock.withLock { + coroutineScope { + launch { reloadFriendList(registerResp) } + launch { reloadGroupList() } + launch { reloadStrangerList() } + } } } @@ -94,7 +101,7 @@ internal class ContactUpdaterImpl( /** * Don't use concurrently */ - private suspend fun reloadFriendList(registerResp: SvcRespRegister) = bot.network.run { + private suspend fun reloadFriendList(registerResp: SvcRespRegister) { if (initFriendOk) { return } @@ -119,7 +126,7 @@ internal class ContactUpdaterImpl( while (true) { val data = FriendList.GetFriendGroupList( bot.client, count, 150, 0, 0 - ).sendAndExpect(timeoutMillis = 5000, retry = 2) + ).sendAndExpect(bot, timeoutMillis = 5000, retry = 2) total = data.totalFriendCount @@ -142,7 +149,7 @@ internal class ContactUpdaterImpl( // For sync bot nick FriendList.GetFriendGroupList( bot.client, 0, 1, 0, 0 - ).sendAndExpect() + ).sendAndExpect(bot) list } else { @@ -198,14 +205,14 @@ internal class ContactUpdaterImpl( ) } - private suspend fun reloadStrangerList() = bot.network.run { + private suspend fun reloadStrangerList() { if (initStrangerOk) { return } var currentCount = 0 logger.info { "Start loading stranger list..." } val response = StrangerList.GetStrangerList(bot.client) - .sendAndExpect(timeoutMillis = 5000, retry = 2) + .sendAndExpect(bot, timeoutMillis = 5000, retry = 2) if (response.result == 0) { response.strangerList.forEach { @@ -220,15 +227,15 @@ internal class ContactUpdaterImpl( } - private suspend fun reloadGroupList() = bot.network.run { + private suspend fun reloadGroupList() { if (initGroupOk) { return } - TroopManagement.GetTroopConfig(bot.client).sendAndExpect() + TroopManagement.GetTroopConfig(bot.client).sendAndExpect(bot) logger.info { "Start loading group list..." } val troopListData = FriendList.GetTroopListSimplify(bot.client) - .sendAndExpect(retry = 5) + .sendAndExpect(bot, retry = 5) val semaphore = Semaphore(30) diff --git a/mirai-core/src/commonMain/kotlin/network/handler/components/NetworkHandlerReference.kt b/mirai-core/src/commonMain/kotlin/network/handler/components/NetworkHandlerReference.kt new file mode 100644 index 000000000..02c7094b7 --- /dev/null +++ b/mirai-core/src/commonMain/kotlin/network/handler/components/NetworkHandlerReference.kt @@ -0,0 +1,21 @@ +/* + * Copyright 2019-2021 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/master/LICENSE + */ + +package net.mamoe.mirai.internal.network.handler.components + +import net.mamoe.mirai.internal.network.handler.NetworkHandler +import net.mamoe.mirai.internal.network.handler.component.ComponentKey + +internal class NetworkHandlerReference( + private val getInstance: () -> NetworkHandler, +) { + fun get(): NetworkHandler = getInstance() + + companion object : ComponentKey +} \ No newline at end of file diff --git a/mirai-core/src/commonMain/kotlin/network/handler/components/OtherClientUpdater.kt b/mirai-core/src/commonMain/kotlin/network/handler/components/OtherClientUpdater.kt new file mode 100644 index 000000000..e5bb86bd4 --- /dev/null +++ b/mirai-core/src/commonMain/kotlin/network/handler/components/OtherClientUpdater.kt @@ -0,0 +1,58 @@ +/* + * Copyright 2019-2021 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/master/LICENSE + */ + +package net.mamoe.mirai.internal.network.handler.components + +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import net.mamoe.mirai.Mirai +import net.mamoe.mirai.contact.ContactList +import net.mamoe.mirai.contact.OtherClient +import net.mamoe.mirai.contact.deviceName +import net.mamoe.mirai.contact.platform +import net.mamoe.mirai.internal.QQAndroidBot +import net.mamoe.mirai.internal.contact.createOtherClient +import net.mamoe.mirai.internal.network.handler.component.ComponentKey +import net.mamoe.mirai.internal.network.handler.component.ComponentStorage +import net.mamoe.mirai.utils.MiraiLogger +import net.mamoe.mirai.utils.info + +internal interface OtherClientUpdater { + + suspend fun update() + + companion object : ComponentKey +} + +internal class OtherClientUpdaterImpl( + private val bot: QQAndroidBot, + private val context: ComponentStorage, + private val logger: MiraiLogger, +) : OtherClientUpdater { + + @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") + val otherClientList = ContactList() + + private val lock = Mutex() + + override suspend fun update() = lock.withLock { + val list = Mirai.getOnlineOtherClientsList(bot) + bot.otherClients.delegate.clear() + bot.otherClients.delegate.addAll(list.map { bot.createOtherClient(it) }) + if (bot.otherClients.isEmpty()) { + logger.info { "No OtherClient online." } + } else { + logger.info { + "Online OtherClients: " + + bot.otherClients.joinToString { "${it.deviceName}(${it.platform?.name ?: "unknown platform"})" } + } + } + } + +} \ No newline at end of file diff --git a/mirai-core/src/commonMain/kotlin/network/handler/components/PacketCodec.kt b/mirai-core/src/commonMain/kotlin/network/handler/components/PacketCodec.kt index 0cf5332f4..2c4714058 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/components/PacketCodec.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/components/PacketCodec.kt @@ -37,17 +37,13 @@ internal interface PacketCodec { /** * Process [RawIncomingPacket] using [IncomingPacketFactory.decode]. * - * This function wraps exceptions into [IncomingPacket] + * This function throws **no** exception and wrap them into [IncomingPacket]. */ suspend fun processBody(bot: QQAndroidBot, input: RawIncomingPacket): IncomingPacket? companion object : ComponentKey { - val PACKET_DEBUG = systemProp("mirai.debug.network.packet.logger", true) + val PACKET_DEBUG = systemProp("mirai.debug.network.packet.logger", false) - /** - * 数据包相关的调试输出. - * 它默认是关闭的. - */ internal val PacketLogger: MiraiLoggerWithSwitch by lazy { MiraiLogger.create("Packet").withSwitch(PACKET_DEBUG) } diff --git a/mirai-core/src/commonMain/kotlin/network/handler/components/PacketHandler.kt b/mirai-core/src/commonMain/kotlin/network/handler/components/PacketHandler.kt new file mode 100644 index 000000000..2dbb63838 --- /dev/null +++ b/mirai-core/src/commonMain/kotlin/network/handler/components/PacketHandler.kt @@ -0,0 +1,113 @@ +/* + * Copyright 2019-2021 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/master/LICENSE + */ + +package net.mamoe.mirai.internal.network.handler.components + +import net.mamoe.mirai.event.BroadcastControllable +import net.mamoe.mirai.event.CancellableEvent +import net.mamoe.mirai.event.Event +import net.mamoe.mirai.event.broadcast +import net.mamoe.mirai.event.events.MessageEvent +import net.mamoe.mirai.internal.QQAndroidBot +import net.mamoe.mirai.internal.contact.logMessageReceived +import net.mamoe.mirai.internal.contact.replaceMagicCodes +import net.mamoe.mirai.internal.network.Packet +import net.mamoe.mirai.internal.network.ParseErrorPacket +import net.mamoe.mirai.internal.network.handler.component.ComponentKey +import net.mamoe.mirai.internal.network.handler.component.ComponentStorage +import net.mamoe.mirai.internal.network.protocol.packet.IncomingPacket +import net.mamoe.mirai.utils.MiraiLogger +import net.mamoe.mirai.utils.verbose + +internal interface PacketHandler { + suspend fun handlePacket(incomingPacket: IncomingPacket) + + companion object : ComponentKey +} + +internal class PacketHandlerChain( + private val instances: Collection, + primaryConstructorMark: Any? +) : PacketHandler { + constructor(vararg instances: PacketHandler?) : this(instances.filterNotNull(), null) + constructor(instances: Iterable) : this(instances.filterNotNull(), null) + + override suspend fun handlePacket(incomingPacket: IncomingPacket) { + for (instance in instances) { + try { + instance.handlePacket(incomingPacket) + } catch (e: Throwable) { + throw ExceptionInPacketHandlerException(instance, e) + } + } + } +} + +internal data class ExceptionInPacketHandlerException( + val packetHandler: PacketHandler, + override val cause: Throwable, +) : IllegalStateException("Exception in PacketHandler '$packetHandler'.") + +internal class LoggingPacketHandler( + val bot: QQAndroidBot, + val context: ComponentStorage, + private val logger: MiraiLogger, +) : PacketHandler { + override suspend fun handlePacket(incomingPacket: IncomingPacket) { + val packet = incomingPacket.data ?: return + if (!bot.logger.isEnabled && !logger.isEnabled) return + when { + packet is ParseErrorPacket -> { + packet.direction.getLogger(bot).error(packet.error) + } + packet is Packet.NoLog -> { + // nothing to do + } + packet is MessageEvent -> packet.logMessageReceived() + packet is Event && packet !is Packet.NoEventLog -> bot.logger.verbose { + "Event: $packet".replaceMagicCodes() + } + else -> logger.verbose { "Recv: ${incomingPacket.commandName} ${incomingPacket.data}".replaceMagicCodes() } + } + } + + override fun toString(): String = "LoggingPacketHandler" +} + +internal class EventBroadcasterPacketHandler( + val bot: QQAndroidBot, + val context: ComponentStorage, + private val logger: MiraiLogger, +) : PacketHandler { + + override suspend fun handlePacket(incomingPacket: IncomingPacket) { + val packet = incomingPacket.data ?: return + when { + packet is CancellableEvent && packet.isCancelled -> return + packet is BroadcastControllable && !packet.shouldBroadcast -> return + packet is Event -> { + try { + packet.broadcast() + } catch (e: Throwable) { + if (logger.isEnabled) { + val msg = optimizeEventToString(packet) + logger.error(IllegalStateException("Exception while broadcasting event '$msg'", e)) + } + } + } + } + } + + private fun optimizeEventToString(event: Event): String { + val qualified = event::class.java.canonicalName ?: return this.toString() + return qualified.substringAfter("net.mamoe.mirai.event.events.") + } + + override fun toString(): String = "LoggingPacketHandler" +} \ No newline at end of file diff --git a/mirai-core/src/commonMain/kotlin/network/handler/components/ServerList.kt b/mirai-core/src/commonMain/kotlin/network/handler/components/ServerList.kt index 1666050dd..130bdf4ea 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/components/ServerList.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/components/ServerList.kt @@ -108,4 +108,8 @@ internal class ServerListImpl( if (current.isEmpty()) refresh() return current.remove() } + + override fun toString(): String { + return "ServerListImpl(current.size=${current.size})" + } } \ No newline at end of file diff --git a/mirai-core/src/commonMain/kotlin/network/handler/components/SsoProcessor.kt b/mirai-core/src/commonMain/kotlin/network/handler/components/SsoProcessor.kt index 0d7883966..4bc43f1ab 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/components/SsoProcessor.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/components/SsoProcessor.kt @@ -9,15 +9,21 @@ package net.mamoe.mirai.internal.network.handler.components +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.launch import net.mamoe.mirai.internal.QQAndroidBot import net.mamoe.mirai.internal.network.Packet import net.mamoe.mirai.internal.network.QQAndroidClient import net.mamoe.mirai.internal.network.handler.NetworkHandler +import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport import net.mamoe.mirai.internal.network.handler.component.ComponentKey import net.mamoe.mirai.internal.network.handler.context.AccountSecretsImpl import net.mamoe.mirai.internal.network.handler.context.SsoProcessorContext import net.mamoe.mirai.internal.network.handler.context.SsoSession import net.mamoe.mirai.internal.network.handler.impl.netty.NettyNetworkHandler +import net.mamoe.mirai.internal.network.handler.state.StateChangedObserver +import net.mamoe.mirai.internal.network.handler.state.StateObserver import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketWithRespType import net.mamoe.mirai.internal.network.protocol.packet.login.WtLogin.Login.LoginPacketResponse import net.mamoe.mirai.internal.network.protocol.packet.login.WtLogin.Login.LoginPacketResponse.Captcha @@ -37,6 +43,13 @@ internal interface SsoProcessor { val client: QQAndroidClient val ssoSession: SsoSession + /** + * The observers to launch jobs for states. + * + * E.g. start heartbeat job for [NetworkHandler.State.OK]. + */ + fun createObserverChain(): StateObserver + /** * Do login. Throws [LoginFailedException] if failed */ @@ -60,6 +73,17 @@ internal class SsoProcessorImpl( override var client = createClient(ssoContext.bot) override val ssoSession: SsoSession get() = client + override fun createObserverChain(): StateObserver = StateObserver.chainOfNotNull( + object : StateChangedObserver(NetworkHandler.State.OK) { + override fun stateChanged0( + networkHandler: NetworkHandlerSupport, + previous: NetworkHandlerSupport.BaseStateImpl, + new: NetworkHandlerSupport.BaseStateImpl + ) { + new.launch { } + } + } + ) /** * Do login. Throws [LoginFailedException] if failed @@ -91,6 +115,20 @@ internal class SsoProcessorImpl( } } + /////////////////////////////////////////////////////////////////////////// + // state observers + /////////////////////////////////////////////////////////////////////////// + + private fun CoroutineScope.launchHeartbeat() = launch(CoroutineName("")) { + + } + + private inner class HeartbeatHandler + + /////////////////////////////////////////////////////////////////////////// + // Login methods + /////////////////////////////////////////////////////////////////////////// + // we have exactly two methods----slow and fast. private abstract inner class LoginStrategy( diff --git a/mirai-core/src/commonMain/kotlin/network/handler/impl/netty/NettyNetworkHandler.kt b/mirai-core/src/commonMain/kotlin/network/handler/impl/netty/NettyNetworkHandler.kt index 8c75fc6cd..6c9f5cf3a 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/impl/netty/NettyNetworkHandler.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/impl/netty/NettyNetworkHandler.kt @@ -22,13 +22,14 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.sendBlocking import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.consumeAsFlow -import net.mamoe.mirai.internal.network.handler.NetworkHandler +import net.mamoe.mirai.internal.network.handler.NetworkHandler.State import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport -import net.mamoe.mirai.internal.network.handler.components.PacketCodecImpl +import net.mamoe.mirai.internal.network.handler.components.BotInitProcessor +import net.mamoe.mirai.internal.network.handler.components.PacketCodec import net.mamoe.mirai.internal.network.handler.components.RawIncomingPacket import net.mamoe.mirai.internal.network.handler.components.SsoProcessor import net.mamoe.mirai.internal.network.handler.context.NetworkHandlerContext -import net.mamoe.mirai.internal.network.handler.logger +import net.mamoe.mirai.internal.network.handler.state.StateObserver import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket import net.mamoe.mirai.utils.childScope import net.mamoe.mirai.utils.debug @@ -61,12 +62,12 @@ internal class NettyNetworkHandler( /////////////////////////////////////////////////////////////////////////// private inner class ByteBufToIncomingPacketDecoder : SimpleChannelInboundHandler(ByteBuf::class.java) { + private val packetCodec: PacketCodec by lazy { context[PacketCodec] } + private val ssoProcessor: SsoProcessor by lazy { context[SsoProcessor] } + override fun channelRead0(ctx: ChannelHandlerContext, msg: ByteBuf) { ctx.fireChannelRead(msg.toReadPacket().use { packet -> - PacketCodecImpl().decodeRaw( - context[SsoProcessor].ssoSession, - packet - ) // TODO: 2021/4/17 components integration + packetCodec.decodeRaw(ssoProcessor.ssoSession, packet) }) } } @@ -81,7 +82,7 @@ internal class NettyNetworkHandler( private inner class OutgoingPacketEncoder : MessageToByteEncoder(OutgoingPacket::class.java) { override fun encode(ctx: ChannelHandlerContext, msg: OutgoingPacket, out: ByteBuf) { - logger.debug { "encode: $msg" } + PacketCodec.PacketLogger.debug { "encode: $msg" } out.writeBytes(msg.delegate) } } @@ -122,12 +123,13 @@ internal class NettyNetworkHandler( private inner class PacketDecodePipeline(parentContext: CoroutineContext) : CoroutineScope by parentContext.childScope() { private val channel: Channel = Channel(Channel.BUFFERED) + private val packetCodec: PacketCodec by lazy { context[PacketCodec] } init { launch(CoroutineName("PacketDecodePipeline processor")) { // 'single thread' processor channel.consumeAsFlow().collect { raw -> - val result = PacketCodecImpl().processBody(context.bot, raw) // TODO: 2021/4/17 components + val result = packetCodec.processBody(context.bot, raw) if (result == null) { collectUnknownPacket(raw) } else collectReceived(result) @@ -145,14 +147,18 @@ internal class NettyNetworkHandler( /** * When state is initialized, it must be set to [_state]. (inside [setState]) + * + * For what jobs each state will do, it is not solely decided by the state itself. [StateObserver]s may also launch jobs into the scope. + * + * @see StateObserver */ private abstract inner class NettyState( - correspondingState: NetworkHandler.State + correspondingState: State ) : BaseStateImpl(correspondingState) { abstract suspend fun sendPacketImpl(packet: OutgoingPacket) } - private inner class StateInitialized : NettyState(NetworkHandler.State.INITIALIZED) { + private inner class StateInitialized : NettyState(State.INITIALIZED) { override suspend fun sendPacketImpl(packet: OutgoingPacket) { error("Cannot send packet when connection is not set. (resumeConnection not called.)") } @@ -173,13 +179,12 @@ internal class NettyNetworkHandler( */ private inner class StateConnecting( val decodePipeline: PacketDecodePipeline, - ) : NettyState(NetworkHandler.State.CONNECTING) { + ) : NettyState(State.CONNECTING) { private val connection = async { createConnection(decodePipeline) } private val connectResult = async { - val connection = connection.await() + connection.join() context[SsoProcessor].login(this@NettyNetworkHandler) - setStateForJobCompletion { StateOK(connection) } }.apply { invokeOnCompletion { error -> if (error != null) setState { @@ -198,14 +203,39 @@ internal class NettyNetworkHandler( override suspend fun resumeConnection0() { connectResult.await() // propagates exceptions + val connection = connection.await() + setState { StateLoading(connection) } + .resumeConnection() } override fun toString(): String = "StateConnecting" } + /** + * @see BotInitProcessor + * @see StateObserver + */ + private inner class StateLoading( + private val connection: NettyChannel + ) : NettyState(State.LOADING) { + override suspend fun sendPacketImpl(packet: OutgoingPacket) { + connection.writeAndFlush(packet) + } + + override suspend fun resumeConnection0() { + (coroutineContext.job as CompletableJob).run { + complete() + join() + } + setState { StateOK(connection) } + } // noop + + override fun toString(): String = "StateLoading" + } + private inner class StateOK( private val connection: NettyChannel - ) : NettyState(NetworkHandler.State.OK) { + ) : NettyState(State.OK) { override suspend fun sendPacketImpl(packet: OutgoingPacket) { connection.writeAndFlush(packet) } @@ -216,7 +246,7 @@ internal class NettyNetworkHandler( private inner class StateConnectionLost( private val cause: Throwable - ) : NettyState(NetworkHandler.State.CONNECTION_LOST) { + ) : NettyState(State.CONNECTION_LOST) { override suspend fun sendPacketImpl(packet: OutgoingPacket) { throw IllegalStateException("Connection is lost so cannot send packet. Call resumeConnection first.", cause) } @@ -229,7 +259,7 @@ internal class NettyNetworkHandler( private inner class StateClosed( val exception: Throwable? - ) : NettyState(NetworkHandler.State.CLOSED) { + ) : NettyState(State.CLOSED) { init { closeSuper(exception) } diff --git a/mirai-core/src/commonMain/kotlin/network/handler/selector/AbstractKeepAliveNetworkHandlerSelector.kt b/mirai-core/src/commonMain/kotlin/network/handler/selector/AbstractKeepAliveNetworkHandlerSelector.kt index 4f872dc8b..972620f29 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/selector/AbstractKeepAliveNetworkHandlerSelector.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/selector/AbstractKeepAliveNetworkHandlerSelector.kt @@ -10,6 +10,7 @@ package net.mamoe.mirai.internal.network.handler.selector import kotlinx.atomicfu.atomic +import kotlinx.coroutines.yield import net.mamoe.mirai.internal.network.handler.NetworkHandler import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory import org.jetbrains.annotations.TestOnly @@ -37,7 +38,8 @@ internal abstract class AbstractKeepAliveNetworkHandlerSelector { + current.resumeConnection() + return awaitResumeInstance() + } + NetworkHandler.State.LOADING -> { + return current + } NetworkHandler.State.OK -> { current.resumeConnection() return current diff --git a/mirai-core/src/commonMain/kotlin/network/handler/selector/SelectorNetworkHandler.kt b/mirai-core/src/commonMain/kotlin/network/handler/selector/SelectorNetworkHandler.kt index 3df20abca..cc827aea9 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/selector/SelectorNetworkHandler.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/selector/SelectorNetworkHandler.kt @@ -9,6 +9,11 @@ package net.mamoe.mirai.internal.network.handler.selector +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.async +import kotlinx.coroutines.selects.SelectClause1 import net.mamoe.mirai.internal.network.handler.NetworkHandler import net.mamoe.mirai.internal.network.handler.NetworkHandler.State import net.mamoe.mirai.internal.network.handler.context.NetworkHandlerContext @@ -26,10 +31,14 @@ internal class SelectorNetworkHandler( override val context: NetworkHandlerContext, // impl notes: may consider to move into function member. private val selector: NetworkHandlerSelector<*>, ) : NetworkHandler { + private val scope = CoroutineScope(SupervisorJob(context.bot.coroutineContext[Job])) private suspend inline fun instance(): NetworkHandler = selector.awaitResumeInstance() override val state: State get() = selector.getResumedInstance()?.state ?: State.INITIALIZED + override val onStateChanged: SelectClause1 + get() = selector.getResumedInstance()?.onStateChanged + ?: scope.async { instance().state }.onAwait override suspend fun resumeConnection() { instance() // the selector will resume connection for us. diff --git a/mirai-core/src/commonMain/kotlin/network/handler/state/CombinedStateObserver.kt b/mirai-core/src/commonMain/kotlin/network/handler/state/CombinedStateObserver.kt index 0afe52161..0587e35bc 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/state/CombinedStateObserver.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/state/CombinedStateObserver.kt @@ -48,7 +48,17 @@ internal class CombinedStateObserver( last.afterStateResume(networkHandler, state, result) } + override fun toString(): String { + return "CombinedStateObserver(first=$first, last=$last)" + } + companion object { - operator fun StateObserver.plus(last: StateObserver): StateObserver = CombinedStateObserver(this, last) + operator fun StateObserver?.plus(last: StateObserver?): StateObserver { + return when { + this == null -> last + last == null -> this + else -> CombinedStateObserver(this, last) + } ?: StateObserver.NOP + } } } \ No newline at end of file diff --git a/mirai-core/src/commonMain/kotlin/network/handler/state/LoggingStateObserver.kt b/mirai-core/src/commonMain/kotlin/network/handler/state/LoggingStateObserver.kt index 9e6f8375d..585e8b4b3 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/state/LoggingStateObserver.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/state/LoggingStateObserver.kt @@ -18,7 +18,7 @@ internal class LoggingStateObserver( val logger: MiraiLogger ) : StateObserver { override fun toString(): String { - return "LoggingStateObserver(logger=$logger)" + return "LoggingStateObserver" } override fun stateChanged( @@ -37,6 +37,10 @@ internal class LoggingStateObserver( logger.debug({ "State changed: ${previousState.correspondingState} -> $exception" }, exception) } + override fun beforeStateResume(networkHandler: NetworkHandler, state: NetworkHandlerSupport.BaseStateImpl) { + logger.debug { "State resuming: ${state.correspondingState}." } + } + override fun afterStateResume( networkHandler: NetworkHandler, state: NetworkHandlerSupport.BaseStateImpl, diff --git a/mirai-core/src/commonMain/kotlin/network/handler/state/SafeStateObserver.kt b/mirai-core/src/commonMain/kotlin/network/handler/state/SafeStateObserver.kt index 1c66832c5..a5fbdef58 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/state/SafeStateObserver.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/state/SafeStateObserver.kt @@ -14,6 +14,11 @@ import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport import net.mamoe.mirai.utils.MiraiLogger import net.mamoe.mirai.utils.error +internal fun StateObserver.safe(logger: MiraiLogger): StateObserver { + if (this is SafeStateObserver) return this + return SafeStateObserver(this, logger) +} + /** * Catches exception then log by [logger] */ diff --git a/mirai-core/src/commonMain/kotlin/network/handler/state/StateChangedObserver.kt b/mirai-core/src/commonMain/kotlin/network/handler/state/StateChangedObserver.kt new file mode 100644 index 000000000..386a74fe4 --- /dev/null +++ b/mirai-core/src/commonMain/kotlin/network/handler/state/StateChangedObserver.kt @@ -0,0 +1,33 @@ +/* + * Copyright 2019-2021 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/master/LICENSE + */ + +package net.mamoe.mirai.internal.network.handler.state + +import net.mamoe.mirai.internal.network.handler.NetworkHandler.State +import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport + +internal abstract class StateChangedObserver( + val state: State, +) : StateObserver { + abstract fun stateChanged0( + networkHandler: NetworkHandlerSupport, + previous: NetworkHandlerSupport.BaseStateImpl, + new: NetworkHandlerSupport.BaseStateImpl + ) + + override fun stateChanged( + networkHandler: NetworkHandlerSupport, + previous: NetworkHandlerSupport.BaseStateImpl, + new: NetworkHandlerSupport.BaseStateImpl + ) { + if (previous.correspondingState != state && new.correspondingState == state) { + stateChanged0(networkHandler, previous, new) + } + } +} \ No newline at end of file diff --git a/mirai-core/src/commonMain/kotlin/network/handler/state/StateObserver.kt b/mirai-core/src/commonMain/kotlin/network/handler/state/StateObserver.kt index 9fb0c2826..bfb762bc4 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/state/StateObserver.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/state/StateObserver.kt @@ -12,6 +12,7 @@ package net.mamoe.mirai.internal.network.handler.state import net.mamoe.mirai.internal.network.handler.NetworkHandler import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport import net.mamoe.mirai.internal.network.handler.component.ComponentKey +import net.mamoe.mirai.internal.network.handler.state.CombinedStateObserver.Companion.plus /** * Stateless observer of state changes. @@ -50,5 +51,19 @@ internal interface StateObserver { } - companion object : ComponentKey + companion object : ComponentKey { + internal val NOP = object : StateObserver { + override fun toString(): String { + return "StateObserver.NOP" + } + } + + fun chainOfNotNull( + vararg observers: StateObserver?, + ): StateObserver { + return observers.reduceOrNull { acc, stateObserver -> + acc + stateObserver + } ?: NOP + } + } } diff --git a/mirai-core/src/commonMain/kotlin/network/net/protocol/implementation notes.kt b/mirai-core/src/commonMain/kotlin/network/net/protocol/implementation notes.kt index 047661460..d1694b3a9 100644 --- a/mirai-core/src/commonMain/kotlin/network/net/protocol/implementation notes.kt +++ b/mirai-core/src/commonMain/kotlin/network/net/protocol/implementation notes.kt @@ -15,45 +15,6 @@ package net.mamoe.mirai.internal.network.net.protocol * 垃圾分类 */ -private object `handle packet1` { -// if (packet != null && (bot.logger.isEnabled || logger.isEnabled)) { -// when { -// packet is ParseErrorPacket -> { -// packet.direction.getLogger(bot).error(packet.error) -// } -// packet is Packet.NoLog -> { -// // nothing to do -// } -// packet is MessageEvent -> packet.logMessageReceived() -// packet is Event && packet !is Packet.NoEventLog -> bot.logger.verbose { -// "Event: $packet".replaceMagicCodes() -// } -// else -> logger.verbose { "Recv: $packet".replaceMagicCodes() } -// } -// } -} - -private object `handle packet2` { -// if (packet is Event) { -// if ((packet as? BroadcastControllable)?.shouldBroadcast != false) { -// if (packet is BotEvent) { -// withContext(bot.coroutineContext[CoroutineExceptionHandler] ?: CoroutineExceptionHandler { _, t -> -// bot.logger.warning( -// """ -// Event processing: An exception occurred but no CoroutineExceptionHandler found in coroutineContext of bot -// """.trimIndent(), t -// ) -// }) { -// packet.broadcast() -// } -// } else { -// packet.broadcast() -// } -// } -// -// if (packet is CancellableEvent && packet.isCancelled) return -// } -} private object `stat heartbeat` { // private suspend fun doStatHeartbeat(): Throwable? { @@ -69,18 +30,6 @@ private object `stat heartbeat` { } -private object syncMessageSvc { -// private suspend fun syncMessageSvc() { -// logger.info { "Syncing friend message history..." } -// withTimeoutOrNull(30000) { -// launch(CoroutineName("Syncing friend message history")) { nextEvent { it.bot == this@QQAndroidBotNetworkHandler.bot } } -// MessageSvcPbGetMsg(bot.client, MsgSvc.SyncFlag.START, null).sendAndExpect() -// -// } ?: error("timeout syncing friend message history.") -// logger.info { "Syncing friend message history: Success." } -// } -} - private object `config push syncer` { // @Suppress("FunctionName", "UNUSED_VARIABLE") // private fun BotNetworkHandler.ConfigPushSyncer(): suspend CoroutineScope.() -> Unit = launch@{ @@ -108,42 +57,6 @@ private object `config push syncer` { // } } -private object `network init` { -// suspend fun init(): Unit = coroutineScope { -// check(bot.isActive) { "bot is dead therefore network can't init." } -// check(this@QQAndroidBotNetworkHandler.isActive) { "network is dead therefore can't init." } -// -// contactUpdater.closeAllContacts(CancellationException("re-init")) -// -// if (!pendingEnabled) { -// pendingIncomingPackets = ConcurrentLinkedQueue() -// _pendingEnabled.value = true -// } -// -// val registerResp = registerClientOnline() -// -// this@QQAndroidBotNetworkHandler.launch( -// CoroutineName("Awaiting ConfigPushSvc.PushReq"), -// block = ConfigPushSyncer() -// ) -// -// launch { -// syncMessageSvc() -// } -// -// launch { -// bot.otherClientsLock.withLock { -// updateOtherClientsList() -// } -// } -// -// contactUpdater.loadAll(registerResp.origin) -// -// bot.firstLoginSucceed = true -// postInitActions() -// } - -} private object `update other client list` { // diff --git a/mirai-core/src/commonMain/kotlin/network/protocol/packet/OutgoingPacketAndroid.kt b/mirai-core/src/commonMain/kotlin/network/protocol/packet/OutgoingPacketAndroid.kt index f71d3d53d..dc5b5bf31 100644 --- a/mirai-core/src/commonMain/kotlin/network/protocol/packet/OutgoingPacketAndroid.kt +++ b/mirai-core/src/commonMain/kotlin/network/protocol/packet/OutgoingPacketAndroid.kt @@ -65,41 +65,34 @@ internal class IncomingPacket constructor( } } +@Suppress("UNCHECKED_CAST") internal suspend inline fun OutgoingPacketWithRespType.sendAndExpect( network: NetworkHandler, timeoutMillis: Long = 5000, retry: Int = 2 -): E = network.run { - return (this@sendAndExpect as OutgoingPacket).sendAndExpect(timeoutMillis, retry) -} +): E = network.sendAndExpect(this, timeoutMillis, retry) as E -@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") +@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "UNCHECKED_CAST") @kotlin.internal.LowPriorityInOverloadResolution internal suspend inline fun OutgoingPacket.sendAndExpect( network: NetworkHandler, timeoutMillis: Long = 5000, retry: Int = 2 -): E = network.run { - return this@sendAndExpect.sendAndExpect(timeoutMillis, retry) -} +): E = network.sendAndExpect(this, timeoutMillis, retry) as E internal suspend inline fun OutgoingPacketWithRespType.sendAndExpect( bot: QQAndroidBot, timeoutMillis: Long = 5000, retry: Int = 2 -): E = bot.network.run { - return (this@sendAndExpect as OutgoingPacket).sendAndExpect(timeoutMillis, retry) -} +): E = (this@sendAndExpect as OutgoingPacket).sendAndExpect(bot.network, timeoutMillis, retry) -@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") +@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "UNCHECKED_CAST") @kotlin.internal.LowPriorityInOverloadResolution internal suspend inline fun OutgoingPacket.sendAndExpect( bot: QQAndroidBot, timeoutMillis: Long = 5000, retry: Int = 2 -): E = bot.network.run { - return this@sendAndExpect.sendAndExpect(timeoutMillis, retry) -} +): E = bot.network.sendAndExpect(this, timeoutMillis, retry) as E @Suppress("DuplicatedCode") diff --git a/mirai-core/src/commonMain/kotlin/network/protocol/packet/chat/receive/MessageSvc.PbGetMsg.kt b/mirai-core/src/commonMain/kotlin/network/protocol/packet/chat/receive/MessageSvc.PbGetMsg.kt index 498e6419c..ad196bb78 100644 --- a/mirai-core/src/commonMain/kotlin/network/protocol/packet/chat/receive/MessageSvc.PbGetMsg.kt +++ b/mirai-core/src/commonMain/kotlin/network/protocol/packet/chat/receive/MessageSvc.PbGetMsg.kt @@ -41,7 +41,6 @@ import net.mamoe.mirai.internal.network.protocol.data.proto.FrdSysMsg import net.mamoe.mirai.internal.network.protocol.data.proto.MsgComm import net.mamoe.mirai.internal.network.protocol.data.proto.MsgSvc import net.mamoe.mirai.internal.network.protocol.data.proto.SubMsgType0x7 -import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketFactory import net.mamoe.mirai.internal.network.protocol.packet.buildOutgoingUniPacket import net.mamoe.mirai.internal.network.protocol.packet.chat.NewContact @@ -70,7 +69,7 @@ internal object MessageSvcPbGetMsg : OutgoingPacketFactory NetworkHandler) : } } -internal class AbstractKeepAliveNetworkHandlerSelectorTest { +internal class KeepAliveNetworkHandlerSelectorTest { private fun createHandler() = TestNetworkHandler(TestNetworkHandlerContext()) diff --git a/mirai-core/src/commonTest/kotlin/network/handler/AbstractNetworkHandlerTest.kt b/mirai-core/src/commonTest/kotlin/network/handler/AbstractNetworkHandlerTest.kt new file mode 100644 index 000000000..93e4a5ec5 --- /dev/null +++ b/mirai-core/src/commonTest/kotlin/network/handler/AbstractNetworkHandlerTest.kt @@ -0,0 +1,40 @@ +/* + * Copyright 2019-2021 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/master/LICENSE + */ + +@file:Suppress("MemberVisibilityCanBePrivate") + +package net.mamoe.mirai.internal.network.handler + +import net.mamoe.mirai.internal.MockBot +import net.mamoe.mirai.internal.network.handler.component.ConcurrentComponentStorage +import net.mamoe.mirai.internal.network.handler.components.SsoProcessor +import net.mamoe.mirai.internal.network.handler.components.SsoProcessorImpl +import net.mamoe.mirai.internal.network.handler.context.SsoProcessorContextImpl +import net.mamoe.mirai.internal.network.handler.state.LoggingStateObserver +import net.mamoe.mirai.internal.network.handler.state.SafeStateObserver +import net.mamoe.mirai.internal.network.handler.state.StateObserver +import net.mamoe.mirai.utils.MiraiLogger + +internal abstract class AbstractNetworkHandlerTest { + protected open fun createNetworkHandlerContext() = TestNetworkHandlerContext(bot, logger, components) + protected open fun createNetworkHandler() = TestNetworkHandler(createNetworkHandlerContext()) + + protected val bot = MockBot() + protected val logger = MiraiLogger.create("test") + protected val components = ConcurrentComponentStorage().apply { + set(SsoProcessor, SsoProcessorImpl(SsoProcessorContextImpl(bot))) + set( + StateObserver, + SafeStateObserver( + LoggingStateObserver(MiraiLogger.create("States")), + MiraiLogger.create("StateObserver errors") + ) + ) + } +} \ No newline at end of file diff --git a/mirai-core/src/commonTest/kotlin/network/handler/StateObserverTest.kt b/mirai-core/src/commonTest/kotlin/network/handler/StateObserverTest.kt new file mode 100644 index 000000000..62524fad7 --- /dev/null +++ b/mirai-core/src/commonTest/kotlin/network/handler/StateObserverTest.kt @@ -0,0 +1,67 @@ +/* + * Copyright 2019-2021 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/master/LICENSE + */ + +package net.mamoe.mirai.internal.network.handler + +import net.mamoe.mirai.internal.network.handler.NetworkHandler.State.CONNECTING +import net.mamoe.mirai.internal.network.handler.NetworkHandler.State.INITIALIZED +import net.mamoe.mirai.internal.network.handler.state.StateChangedObserver +import net.mamoe.mirai.internal.network.handler.state.StateObserver +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import kotlin.test.assertEquals + +@TestInstance(TestInstance.Lifecycle.PER_METHOD) +internal class StateObserverTest : AbstractNetworkHandlerTest() { + @Test + fun `can trigger observer`() { + val called = ArrayList>() + components[StateObserver] = object : StateObserver { + override fun stateChanged( + networkHandler: NetworkHandlerSupport, + previous: NetworkHandlerSupport.BaseStateImpl, + new: NetworkHandlerSupport.BaseStateImpl + ) { + called.add(previous to new) + } + } + val handler = createNetworkHandler() + assertEquals(0, called.size) + handler.setState(INITIALIZED) + assertEquals(1, called.size) + assertEquals(INITIALIZED, called[0].first.correspondingState) + assertEquals(INITIALIZED, called[0].second.correspondingState) + handler.setState(CONNECTING) + assertEquals(2, called.size) + assertEquals(INITIALIZED, called[1].first.correspondingState) + assertEquals(CONNECTING, called[1].second.correspondingState) + } + + @Test + fun `test StateChangedObserver`() { + val called = ArrayList>() + components[StateObserver] = object : StateChangedObserver(CONNECTING) { + override fun stateChanged0( + networkHandler: NetworkHandlerSupport, + previous: NetworkHandlerSupport.BaseStateImpl, + new: NetworkHandlerSupport.BaseStateImpl + ) { + called.add(previous to new) + } + } + val handler = createNetworkHandler() + assertEquals(0, called.size) + handler.setState(INITIALIZED) + assertEquals(0, called.size) + handler.setState(CONNECTING) + assertEquals(1, called.size) + assertEquals(INITIALIZED, called[0].first.correspondingState) + assertEquals(CONNECTING, called[0].second.correspondingState) + } +} \ No newline at end of file