Continuing implement states for NetworkHandler

This commit is contained in:
Him188 2021-04-17 23:23:48 +08:00
parent 03e12eb287
commit 82ad953b2b
36 changed files with 757 additions and 188 deletions

View File

@ -171,14 +171,14 @@ internal abstract class AbstractBot constructor(
// network
///////////////////////////////////////////////////////////////////////////
val network: NetworkHandler by lazy { createNetworkHandler(coroutineContext) }
val network: NetworkHandler by lazy { createNetworkHandler() }
final override suspend fun login() {
if (!isActive) error("Bot is already closed and cannot relogin. Please create a new Bot instance then do login.")
network.resumeConnection()
}
protected abstract fun createNetworkHandler(coroutineContext: CoroutineContext): NetworkHandler
protected abstract fun createNetworkHandler(): NetworkHandler
protected abstract suspend fun sendLogout()
// endregion

View File

@ -14,23 +14,21 @@ import kotlinx.coroutines.sync.Mutex
import net.mamoe.mirai.Bot
import net.mamoe.mirai.Mirai
import net.mamoe.mirai.contact.Group
import net.mamoe.mirai.contact.OtherClientInfo
import net.mamoe.mirai.internal.contact.OtherClientImpl
import net.mamoe.mirai.internal.contact.checkIsGroupImpl
import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.component.ComponentStorage
import net.mamoe.mirai.internal.network.handler.component.ConcurrentComponentStorage
import net.mamoe.mirai.internal.network.handler.component.set
import net.mamoe.mirai.internal.network.handler.components.*
import net.mamoe.mirai.internal.network.handler.context.NetworkHandlerContextImpl
import net.mamoe.mirai.internal.network.handler.context.SsoProcessorContextImpl
import net.mamoe.mirai.internal.network.handler.impl.netty.NettyNetworkHandlerFactory
import net.mamoe.mirai.internal.network.handler.logger
import net.mamoe.mirai.internal.network.handler.selector.FactoryKeepAliveNetworkHandlerSelector
import net.mamoe.mirai.internal.network.handler.selector.SelectorNetworkHandler
import net.mamoe.mirai.internal.network.handler.state.LoggingStateObserver
import net.mamoe.mirai.internal.network.handler.state.SafeStateObserver
import net.mamoe.mirai.internal.network.handler.state.StateObserver
import net.mamoe.mirai.internal.network.handler.state.safe
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketWithRespType
import net.mamoe.mirai.internal.network.protocol.packet.login.StatSvc
@ -38,7 +36,6 @@ import net.mamoe.mirai.utils.BotConfiguration
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.systemProp
import kotlin.contracts.contract
import kotlin.coroutines.CoroutineContext
internal fun Bot.asQQAndroidBot(): QQAndroidBot {
contract {
@ -48,18 +45,12 @@ internal fun Bot.asQQAndroidBot(): QQAndroidBot {
return this as QQAndroidBot
}
internal fun QQAndroidBot.createOtherClient(
info: OtherClientInfo,
): OtherClientImpl {
return OtherClientImpl(this, coroutineContext, info)
}
internal class BotDebugConfiguration(
var stateObserver: StateObserver? = when {
systemProp("mirai.debug.network.state.observer.logging", false) ->
SafeStateObserver(
LoggingStateObserver(MiraiLogger.create("States")),
MiraiLogger.create("StateObserver errors")
MiraiLogger.create("LoggingStateObserver errors")
)
else -> null
}
@ -81,19 +72,39 @@ internal class QQAndroidBot constructor(
// TODO: 2021/4/14 bdhSyncer.loadFromCache() when login
// IDE error, don't move into lazy
private fun ComponentStorage.stateObserverChain(): StateObserver {
val components = this
return StateObserver.chainOfNotNull(
components[BotInitProcessor].asObserver().safe(networkLogger),
debugConfiguration.stateObserver
)
}
private val networkLogger: MiraiLogger by lazy { configuration.networkLoggerSupplier(this) }
internal val components: ConcurrentComponentStorage by lazy {
ConcurrentComponentStorage().apply {
set(
SsoProcessor,
SsoProcessorImpl(SsoProcessorContextImpl(bot))
) // put sso processor at the first to make `client` faster.
val components = this // avoid mistakes
set(SsoProcessor, SsoProcessorImpl(SsoProcessorContextImpl(bot)))
// put sso processor at the first to make `client` faster.
set(StateObserver, debugConfiguration.stateObserver)
set(BotInitProcessor, BotInitProcessorImpl(bot, components, bot.logger))
set(ContactCacheService, ContactCacheServiceImpl(bot))
set(ContactUpdater, ContactUpdaterImpl(bot, this))
set(BdhSessionSyncer, BdhSessionSyncerImpl(configuration, network.logger, this))
set(ContactUpdater, ContactUpdaterImpl(bot, components, networkLogger))
set(BdhSessionSyncer, BdhSessionSyncerImpl(configuration, networkLogger, components))
set(ServerList, ServerListImpl())
set(
PacketHandler, PacketHandlerChain(
LoggingPacketHandler(bot, components, logger),
EventBroadcasterPacketHandler(bot, components, logger)
)
)
set(PacketCodec, PacketCodecImpl())
set(OtherClientUpdater, OtherClientUpdaterImpl(bot, components, bot.logger))
set(ConfigPushSyncer, ConfigPushSyncerImpl())
set(StateObserver, stateObserverChain())
// TODO: 2021/4/16 load server list from cache (add a provider)
// bot.bdhSyncer.loadServerListFromCache()
@ -107,10 +118,10 @@ internal class QQAndroidBot constructor(
network.sendWithoutExpect(StatSvc.Register.offline(client))
}
override fun createNetworkHandler(coroutineContext: CoroutineContext): NetworkHandler {
override fun createNetworkHandler(): NetworkHandler {
val context = NetworkHandlerContextImpl(
this,
configuration.networkLoggerSupplier(this),
networkLogger,
components
)
return SelectorNetworkHandler(

View File

@ -12,6 +12,7 @@ package net.mamoe.mirai.internal.contact
import net.mamoe.mirai.Bot
import net.mamoe.mirai.contact.OtherClient
import net.mamoe.mirai.contact.OtherClientInfo
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.message.MessageReceipt
import net.mamoe.mirai.message.data.Image
import net.mamoe.mirai.message.data.Message
@ -21,6 +22,12 @@ import kotlin.coroutines.CoroutineContext
internal inline val OtherClient.appId: Int
get() = info.appId
internal fun QQAndroidBot.createOtherClient(
info: OtherClientInfo,
): OtherClientImpl {
return OtherClientImpl(this, coroutineContext, info)
}
internal class OtherClientImpl(
bot: Bot,
coroutineContext: CoroutineContext,

View File

@ -9,9 +9,10 @@
package net.mamoe.mirai.internal.network.handler
import kotlinx.coroutines.selects.SelectClause1
import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.context.NetworkHandlerContext
import net.mamoe.mirai.internal.network.handler.selector.SelectorNetworkHandler
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketWithRespType
import net.mamoe.mirai.utils.MiraiLogger
@ -37,6 +38,11 @@ internal interface NetworkHandler {
*/
val state: State
/**
* For suspension until a state. e.g login.
*/
val onStateChanged: SelectClause1<State>
enum class State {
/**
* Just created and no connection has been made.
@ -54,12 +60,25 @@ internal interface NetworkHandler {
CONNECTING,
/**
* Everything is working. [resumeConnection] does nothing. [sendAndExpect] does not suspend for connection reasons.
* Loading essential data from server and local cache. Data include contact list.
*
* At this state [resumeConnection] waits for the jobs. [sendAndExpect] works normally.
*/
LOADING,
/**
* Everything is working.
*
* At this state [resumeConnection] does nothing. [sendAndExpect] works normally.
*/
OK,
/**
* No Internet Connection available or for any other reasons but it is possible to establish a connection again(switching state to [CONNECTING]).
* No Internet Connection available or for any other reasons
* but it is possible to establish a connection again(switching state to [CONNECTING]).
*
* At this state [resumeConnection] turns the handle to [CONNECTING].
* [sendAndExpect] throws [IllegalStateException]
*/
CONNECTION_LOST,
@ -67,6 +86,9 @@ internal interface NetworkHandler {
* Cannot resume anymore. Both [resumeConnection] and [sendAndExpect] throw a [CancellationException].
*
* When a handler reached [CLOSED] state, it is finalized and cannot be restored to any other states.
*
* At this state [resumeConnection] throws the exception caught from underlying socket implementation (i.e netty).
* [sendAndExpect] throws [IllegalStateException]
*/
CLOSED,
}

View File

@ -10,8 +10,10 @@
package net.mamoe.mirai.internal.network.handler
import kotlinx.coroutines.*
import kotlinx.coroutines.selects.SelectClause1
import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.handler.components.PacketCodec
import net.mamoe.mirai.internal.network.handler.components.PacketHandler
import net.mamoe.mirai.internal.network.handler.components.RawIncomingPacket
import net.mamoe.mirai.internal.network.handler.context.NetworkHandlerContext
import net.mamoe.mirai.internal.network.handler.state.StateObserver
@ -32,11 +34,12 @@ internal abstract class NetworkHandlerSupport(
protected abstract fun initialState(): BaseStateImpl
protected abstract suspend fun sendPacketImpl(packet: OutgoingPacket)
private val packetHandler: PacketHandler by lazy { context[PacketHandler] }
/**
* Called when a packet is received.
*/
protected fun collectReceived(packet: IncomingPacket) {
logger.verbose({ "Recv: ${packet.commandName} ${packet.data ?: packet.exception}" }, packet.exception)
protected open fun collectReceived(packet: IncomingPacket) {
for (listener in packetListeners) {
if (!listener.isExpected(packet)) continue
if (packetListeners.remove(listener)) {
@ -48,6 +51,13 @@ internal abstract class NetworkHandlerSupport(
}
}
}
launch {
try {
packetHandler.handlePacket(packet)
} catch (e: Throwable) { // do not pass it to CoroutineExceptionHandler for a more controllable behavior.
logger.error(e)
}
}
}
protected fun collectUnknownPacket(raw: RawIncomingPacket) {
@ -74,7 +84,7 @@ internal abstract class NetworkHandlerSupport(
exception = e // show last exception
}
} finally {
listener.result.complete(null)
listener.result.completeExceptionally(exception ?: IllegalStateException("No response"))
packetListeners.remove(listener)
}
}
@ -161,6 +171,13 @@ internal abstract class NetworkHandlerSupport(
final override val state: NetworkHandler.State get() = _state.correspondingState
private var _stateChangedDeferred = CompletableDeferred<NetworkHandler.State>()
/**
* For suspension until a state. e.g login.
*/
override val onStateChanged: SelectClause1<NetworkHandler.State> get() = _stateChangedDeferred.onAwait
/**
* Can only be used in a job launched within the state scope.
*/
@ -192,10 +209,17 @@ internal abstract class NetworkHandlerSupport(
val old = _state
check(old !== impl) { "Old and new states cannot be the same." }
old.cancel(CancellationException("State is switched from $old to $impl"))
_state = impl
// Order notes:
// 1. Notify observers to attach jobs to [impl] (if so)
_stateChangedDeferred.complete(impl.correspondingState)
stateObserver?.stateChanged(this, old, impl)
_stateChangedDeferred = CompletableDeferred()
// 2. Update state to [state]. This affects selectors.
_state = impl // switch state first. selector may be busy selecting.
// 3. Cleanup, cancel old states.
old.cancel(CancellationException("State is switched from $old to $impl"))
return impl
}

View File

@ -20,7 +20,7 @@ import kotlin.reflect.full.allSupertypes
*
* @param T is a type hint.
*/
internal interface ComponentKey<out T : Any> {
internal interface ComponentKey<T : Any> {
/**
* Get name of `T`.
*

View File

@ -23,5 +23,7 @@ internal interface ComponentStorage {
@Throws(NoSuchComponentException::class)
operator fun <T : Any> get(key: ComponentKey<T>): T
fun <T : Any> getOrNull(key: ComponentKey<T>): T?
override fun toString(): String
}

View File

@ -31,7 +31,7 @@ internal class ConcurrentComponentStorage(
return map[key] as T?
}
override operator fun <T : Any> set(key: ComponentKey<T>, value: @UnsafeVariance T) {
override operator fun <T : Any> set(key: ComponentKey<T>, value: T) {
map[key] = value
}

View File

@ -14,11 +14,11 @@ package net.mamoe.mirai.internal.network.handler.component
*/
internal interface MutableComponentStorage : ComponentStorage {
override operator fun <T : Any> get(key: ComponentKey<T>): T
operator fun <T : Any> set(key: ComponentKey<T>, value: @UnsafeVariance T)
operator fun <T : Any> set(key: ComponentKey<T>, value: T)
fun <T : Any> remove(key: ComponentKey<T>): T?
}
internal operator fun <T : Any> MutableComponentStorage.set(key: ComponentKey<T>, value: @UnsafeVariance T?) {
internal operator fun <T : Any> MutableComponentStorage.set(key: ComponentKey<T>, value: T?) {
if (value == null) {
remove(key)
} else {

View File

@ -76,8 +76,6 @@ internal class FileCacheAccountSecretsManager(
)
logger.info { "Saved account secrets to local cache for fast login." }
TEA.encrypt(file.readBytes(), account.passwordMd5).loadAs(AccountSecretsImpl.serializer())
}
override fun getSecrets(account: BotAccount): AccountSecrets? {

View File

@ -0,0 +1,118 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.handler.components
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.*
import net.mamoe.mirai.event.nextEvent
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
import net.mamoe.mirai.internal.network.handler.component.ComponentKey
import net.mamoe.mirai.internal.network.handler.component.ComponentStorage
import net.mamoe.mirai.internal.network.handler.state.StateChangedObserver
import net.mamoe.mirai.internal.network.handler.state.StateObserver
import net.mamoe.mirai.internal.network.protocol.data.proto.MsgSvc
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketWithRespType
import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.MessageSvcPbGetMsg
import net.mamoe.mirai.internal.network.protocol.packet.login.StatSvc
import net.mamoe.mirai.internal.network.protocol.packet.sendAndExpect
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.info
internal interface BotInitProcessor {
suspend fun init()
companion object : ComponentKey<BotInitProcessor>
}
internal fun BotInitProcessor.asObserver(targetState: State = State.LOADING): StateObserver {
return BotInitProcessorAsStateObserverAdapter(this, targetState)
}
private class BotInitProcessorAsStateObserverAdapter(
private val processor: BotInitProcessor,
targetState: State
) : StateChangedObserver(targetState) {
override fun stateChanged0(
networkHandler: NetworkHandlerSupport,
previous: NetworkHandlerSupport.BaseStateImpl,
new: NetworkHandlerSupport.BaseStateImpl
) {
new.launch(CoroutineName("BotInitProcessor.init")) {
try {
processor.init()
} catch (e: Throwable) {
throw IllegalStateException("Exception in BotInitProcessor.init", e)
}
}
}
override fun toString(): String {
return "BotInitProcessorAsStateObserverAdapter"
}
}
internal class BotInitProcessorImpl(
private val bot: QQAndroidBot,
private val context: ComponentStorage,
private val logger: MiraiLogger,
) : BotInitProcessor {
private val initialized = atomic(false)
override tailrec suspend fun init() {
if (initialized.value) return
if (!initialized.compareAndSet(expect = false, update = true)) return init()
check(bot.isActive) { "bot is dead therefore network can't init." }
context[ContactUpdater].closeAllContacts(CancellationException("re-init"))
val registerResp = registerClientOnline()
bot.launch(CoroutineName("Awaiting ConfigPushSvc.PushReq")) {
context[ConfigPushSyncer].awaitSync()
} // TODO: 2021/4/17 should we launch here?
// do them parallel.
supervisorScope {
// launch { syncMessageSvc() }
launch { context[OtherClientUpdater].update() }
launch { context[ContactUpdater].loadAll(registerResp.origin) }
}
bot.firstLoginSucceed = true
}
private suspend fun registerClientOnline(): StatSvc.Register.Response {
return StatSvc.Register.online(context[SsoProcessor].client).sendAndExpect(bot)
}
private suspend fun syncMessageSvc() {
logger.info { "Syncing friend message history..." }
withTimeoutOrNull(30000) {
launch(CoroutineName("Syncing friend message history")) {
nextEvent<MessageSvcPbGetMsg.GetMsgSuccess> {
it.bot == this@BotInitProcessorImpl.bot
}
}
MessageSvcPbGetMsg(bot.client, MsgSvc.SyncFlag.START, null).sendAndExpect()
} ?: error("timeout syncing friend message history.")
logger.info { "Syncing friend message history: Success." }
}
private suspend inline fun <T : Packet> OutgoingPacket.sendAndExpect() = this.sendAndExpect<T>(bot.network)
private suspend inline fun <T : Packet> OutgoingPacketWithRespType<T>.sendAndExpect() =
this.sendAndExpect(bot.network)
}

View File

@ -0,0 +1,24 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.handler.components
import net.mamoe.mirai.internal.network.handler.component.ComponentKey
internal interface ConfigPushSyncer {
suspend fun awaitSync()
companion object : ComponentKey<ConfigPushSyncer>
}
internal class ConfigPushSyncerImpl : ConfigPushSyncer {
override suspend fun awaitSync() {
// TODO("Not yet implemented")
}
}

View File

@ -13,7 +13,9 @@ import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.sync.withPermit
import net.mamoe.mirai.Mirai
import net.mamoe.mirai.data.FriendInfo
@ -38,6 +40,8 @@ import net.mamoe.mirai.internal.network.protocol.data.jce.isValid
import net.mamoe.mirai.internal.network.protocol.packet.chat.TroopManagement
import net.mamoe.mirai.internal.network.protocol.packet.list.FriendList
import net.mamoe.mirai.internal.network.protocol.packet.list.StrangerList
import net.mamoe.mirai.internal.network.protocol.packet.sendAndExpect
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.info
import net.mamoe.mirai.utils.retryCatching
import net.mamoe.mirai.utils.verbose
@ -46,7 +50,7 @@ import net.mamoe.mirai.utils.verbose
* Uses [ContactCacheService]
*/
internal interface ContactUpdater {
suspend fun loadAll(registerResp: SvcRespRegister) // TODO: 2021/4/17 call this fun
suspend fun loadAll(registerResp: SvcRespRegister)
fun closeAllContacts(e: CancellationException)
@ -55,16 +59,19 @@ internal interface ContactUpdater {
internal class ContactUpdaterImpl(
val bot: QQAndroidBot, // not good
val components: ComponentStorage
val components: ComponentStorage,
private val logger: MiraiLogger,
) : ContactUpdater {
private val cacheService get() = components[ContactCacheService]
private val lock = Mutex()
@Synchronized
override suspend fun loadAll(registerResp: SvcRespRegister) {
coroutineScope {
launch { reloadFriendList(registerResp) }
launch { reloadGroupList() }
launch { reloadStrangerList() }
lock.withLock {
coroutineScope {
launch { reloadFriendList(registerResp) }
launch { reloadGroupList() }
launch { reloadStrangerList() }
}
}
}
@ -94,7 +101,7 @@ internal class ContactUpdaterImpl(
/**
* Don't use concurrently
*/
private suspend fun reloadFriendList(registerResp: SvcRespRegister) = bot.network.run {
private suspend fun reloadFriendList(registerResp: SvcRespRegister) {
if (initFriendOk) {
return
}
@ -119,7 +126,7 @@ internal class ContactUpdaterImpl(
while (true) {
val data = FriendList.GetFriendGroupList(
bot.client, count, 150, 0, 0
).sendAndExpect<FriendList.GetFriendGroupList.Response>(timeoutMillis = 5000, retry = 2)
).sendAndExpect<FriendList.GetFriendGroupList.Response>(bot, timeoutMillis = 5000, retry = 2)
total = data.totalFriendCount
@ -142,7 +149,7 @@ internal class ContactUpdaterImpl(
// For sync bot nick
FriendList.GetFriendGroupList(
bot.client, 0, 1, 0, 0
).sendAndExpect<Packet>()
).sendAndExpect<Packet>(bot)
list
} else {
@ -198,14 +205,14 @@ internal class ContactUpdaterImpl(
)
}
private suspend fun reloadStrangerList() = bot.network.run {
private suspend fun reloadStrangerList() {
if (initStrangerOk) {
return
}
var currentCount = 0
logger.info { "Start loading stranger list..." }
val response = StrangerList.GetStrangerList(bot.client)
.sendAndExpect<StrangerList.GetStrangerList.Response>(timeoutMillis = 5000, retry = 2)
.sendAndExpect<StrangerList.GetStrangerList.Response>(bot, timeoutMillis = 5000, retry = 2)
if (response.result == 0) {
response.strangerList.forEach {
@ -220,15 +227,15 @@ internal class ContactUpdaterImpl(
}
private suspend fun reloadGroupList() = bot.network.run {
private suspend fun reloadGroupList() {
if (initGroupOk) {
return
}
TroopManagement.GetTroopConfig(bot.client).sendAndExpect<TroopManagement.GetTroopConfig.Response>()
TroopManagement.GetTroopConfig(bot.client).sendAndExpect<TroopManagement.GetTroopConfig.Response>(bot)
logger.info { "Start loading group list..." }
val troopListData = FriendList.GetTroopListSimplify(bot.client)
.sendAndExpect<FriendList.GetTroopListSimplify.Response>(retry = 5)
.sendAndExpect<FriendList.GetTroopListSimplify.Response>(bot, retry = 5)
val semaphore = Semaphore(30)

View File

@ -0,0 +1,21 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.handler.components
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.component.ComponentKey
internal class NetworkHandlerReference(
private val getInstance: () -> NetworkHandler,
) {
fun get(): NetworkHandler = getInstance()
companion object : ComponentKey<NetworkHandlerReference>
}

View File

@ -0,0 +1,58 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.handler.components
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import net.mamoe.mirai.Mirai
import net.mamoe.mirai.contact.ContactList
import net.mamoe.mirai.contact.OtherClient
import net.mamoe.mirai.contact.deviceName
import net.mamoe.mirai.contact.platform
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.contact.createOtherClient
import net.mamoe.mirai.internal.network.handler.component.ComponentKey
import net.mamoe.mirai.internal.network.handler.component.ComponentStorage
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.info
internal interface OtherClientUpdater {
suspend fun update()
companion object : ComponentKey<OtherClientUpdater>
}
internal class OtherClientUpdaterImpl(
private val bot: QQAndroidBot,
private val context: ComponentStorage,
private val logger: MiraiLogger,
) : OtherClientUpdater {
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
val otherClientList = ContactList<OtherClient>()
private val lock = Mutex()
override suspend fun update() = lock.withLock {
val list = Mirai.getOnlineOtherClientsList(bot)
bot.otherClients.delegate.clear()
bot.otherClients.delegate.addAll(list.map { bot.createOtherClient(it) })
if (bot.otherClients.isEmpty()) {
logger.info { "No OtherClient online." }
} else {
logger.info {
"Online OtherClients: " +
bot.otherClients.joinToString { "${it.deviceName}(${it.platform?.name ?: "unknown platform"})" }
}
}
}
}

View File

@ -37,17 +37,13 @@ internal interface PacketCodec {
/**
* Process [RawIncomingPacket] using [IncomingPacketFactory.decode].
*
* This function wraps exceptions into [IncomingPacket]
* This function throws **no** exception and wrap them into [IncomingPacket].
*/
suspend fun processBody(bot: QQAndroidBot, input: RawIncomingPacket): IncomingPacket?
companion object : ComponentKey<PacketCodec> {
val PACKET_DEBUG = systemProp("mirai.debug.network.packet.logger", true)
val PACKET_DEBUG = systemProp("mirai.debug.network.packet.logger", false)
/**
* 数据包相关的调试输出.
* 它默认是关闭的.
*/
internal val PacketLogger: MiraiLoggerWithSwitch by lazy {
MiraiLogger.create("Packet").withSwitch(PACKET_DEBUG)
}

View File

@ -0,0 +1,113 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.handler.components
import net.mamoe.mirai.event.BroadcastControllable
import net.mamoe.mirai.event.CancellableEvent
import net.mamoe.mirai.event.Event
import net.mamoe.mirai.event.broadcast
import net.mamoe.mirai.event.events.MessageEvent
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.contact.logMessageReceived
import net.mamoe.mirai.internal.contact.replaceMagicCodes
import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.ParseErrorPacket
import net.mamoe.mirai.internal.network.handler.component.ComponentKey
import net.mamoe.mirai.internal.network.handler.component.ComponentStorage
import net.mamoe.mirai.internal.network.protocol.packet.IncomingPacket
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.verbose
internal interface PacketHandler {
suspend fun handlePacket(incomingPacket: IncomingPacket)
companion object : ComponentKey<PacketHandler>
}
internal class PacketHandlerChain(
private val instances: Collection<PacketHandler>,
primaryConstructorMark: Any?
) : PacketHandler {
constructor(vararg instances: PacketHandler?) : this(instances.filterNotNull(), null)
constructor(instances: Iterable<PacketHandler?>) : this(instances.filterNotNull(), null)
override suspend fun handlePacket(incomingPacket: IncomingPacket) {
for (instance in instances) {
try {
instance.handlePacket(incomingPacket)
} catch (e: Throwable) {
throw ExceptionInPacketHandlerException(instance, e)
}
}
}
}
internal data class ExceptionInPacketHandlerException(
val packetHandler: PacketHandler,
override val cause: Throwable,
) : IllegalStateException("Exception in PacketHandler '$packetHandler'.")
internal class LoggingPacketHandler(
val bot: QQAndroidBot,
val context: ComponentStorage,
private val logger: MiraiLogger,
) : PacketHandler {
override suspend fun handlePacket(incomingPacket: IncomingPacket) {
val packet = incomingPacket.data ?: return
if (!bot.logger.isEnabled && !logger.isEnabled) return
when {
packet is ParseErrorPacket -> {
packet.direction.getLogger(bot).error(packet.error)
}
packet is Packet.NoLog -> {
// nothing to do
}
packet is MessageEvent -> packet.logMessageReceived()
packet is Event && packet !is Packet.NoEventLog -> bot.logger.verbose {
"Event: $packet".replaceMagicCodes()
}
else -> logger.verbose { "Recv: ${incomingPacket.commandName} ${incomingPacket.data}".replaceMagicCodes() }
}
}
override fun toString(): String = "LoggingPacketHandler"
}
internal class EventBroadcasterPacketHandler(
val bot: QQAndroidBot,
val context: ComponentStorage,
private val logger: MiraiLogger,
) : PacketHandler {
override suspend fun handlePacket(incomingPacket: IncomingPacket) {
val packet = incomingPacket.data ?: return
when {
packet is CancellableEvent && packet.isCancelled -> return
packet is BroadcastControllable && !packet.shouldBroadcast -> return
packet is Event -> {
try {
packet.broadcast()
} catch (e: Throwable) {
if (logger.isEnabled) {
val msg = optimizeEventToString(packet)
logger.error(IllegalStateException("Exception while broadcasting event '$msg'", e))
}
}
}
}
}
private fun optimizeEventToString(event: Event): String {
val qualified = event::class.java.canonicalName ?: return this.toString()
return qualified.substringAfter("net.mamoe.mirai.event.events.")
}
override fun toString(): String = "LoggingPacketHandler"
}

View File

@ -108,4 +108,8 @@ internal class ServerListImpl(
if (current.isEmpty()) refresh()
return current.remove()
}
override fun toString(): String {
return "ServerListImpl(current.size=${current.size})"
}
}

View File

@ -9,15 +9,21 @@
package net.mamoe.mirai.internal.network.handler.components
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.QQAndroidClient
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
import net.mamoe.mirai.internal.network.handler.component.ComponentKey
import net.mamoe.mirai.internal.network.handler.context.AccountSecretsImpl
import net.mamoe.mirai.internal.network.handler.context.SsoProcessorContext
import net.mamoe.mirai.internal.network.handler.context.SsoSession
import net.mamoe.mirai.internal.network.handler.impl.netty.NettyNetworkHandler
import net.mamoe.mirai.internal.network.handler.state.StateChangedObserver
import net.mamoe.mirai.internal.network.handler.state.StateObserver
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketWithRespType
import net.mamoe.mirai.internal.network.protocol.packet.login.WtLogin.Login.LoginPacketResponse
import net.mamoe.mirai.internal.network.protocol.packet.login.WtLogin.Login.LoginPacketResponse.Captcha
@ -37,6 +43,13 @@ internal interface SsoProcessor {
val client: QQAndroidClient
val ssoSession: SsoSession
/**
* The observers to launch jobs for states.
*
* E.g. start heartbeat job for [NetworkHandler.State.OK].
*/
fun createObserverChain(): StateObserver
/**
* Do login. Throws [LoginFailedException] if failed
*/
@ -60,6 +73,17 @@ internal class SsoProcessorImpl(
override var client = createClient(ssoContext.bot)
override val ssoSession: SsoSession get() = client
override fun createObserverChain(): StateObserver = StateObserver.chainOfNotNull(
object : StateChangedObserver(NetworkHandler.State.OK) {
override fun stateChanged0(
networkHandler: NetworkHandlerSupport,
previous: NetworkHandlerSupport.BaseStateImpl,
new: NetworkHandlerSupport.BaseStateImpl
) {
new.launch { }
}
}
)
/**
* Do login. Throws [LoginFailedException] if failed
@ -91,6 +115,20 @@ internal class SsoProcessorImpl(
}
}
///////////////////////////////////////////////////////////////////////////
// state observers
///////////////////////////////////////////////////////////////////////////
private fun CoroutineScope.launchHeartbeat() = launch(CoroutineName("")) {
}
private inner class HeartbeatHandler
///////////////////////////////////////////////////////////////////////////
// Login methods
///////////////////////////////////////////////////////////////////////////
// we have exactly two methods----slow and fast.
private abstract inner class LoginStrategy(

View File

@ -22,13 +22,14 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.sendBlocking
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
import net.mamoe.mirai.internal.network.handler.components.PacketCodecImpl
import net.mamoe.mirai.internal.network.handler.components.BotInitProcessor
import net.mamoe.mirai.internal.network.handler.components.PacketCodec
import net.mamoe.mirai.internal.network.handler.components.RawIncomingPacket
import net.mamoe.mirai.internal.network.handler.components.SsoProcessor
import net.mamoe.mirai.internal.network.handler.context.NetworkHandlerContext
import net.mamoe.mirai.internal.network.handler.logger
import net.mamoe.mirai.internal.network.handler.state.StateObserver
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.utils.childScope
import net.mamoe.mirai.utils.debug
@ -61,12 +62,12 @@ internal class NettyNetworkHandler(
///////////////////////////////////////////////////////////////////////////
private inner class ByteBufToIncomingPacketDecoder : SimpleChannelInboundHandler<ByteBuf>(ByteBuf::class.java) {
private val packetCodec: PacketCodec by lazy { context[PacketCodec] }
private val ssoProcessor: SsoProcessor by lazy { context[SsoProcessor] }
override fun channelRead0(ctx: ChannelHandlerContext, msg: ByteBuf) {
ctx.fireChannelRead(msg.toReadPacket().use { packet ->
PacketCodecImpl().decodeRaw(
context[SsoProcessor].ssoSession,
packet
) // TODO: 2021/4/17 components integration
packetCodec.decodeRaw(ssoProcessor.ssoSession, packet)
})
}
}
@ -81,7 +82,7 @@ internal class NettyNetworkHandler(
private inner class OutgoingPacketEncoder : MessageToByteEncoder<OutgoingPacket>(OutgoingPacket::class.java) {
override fun encode(ctx: ChannelHandlerContext, msg: OutgoingPacket, out: ByteBuf) {
logger.debug { "encode: $msg" }
PacketCodec.PacketLogger.debug { "encode: $msg" }
out.writeBytes(msg.delegate)
}
}
@ -122,12 +123,13 @@ internal class NettyNetworkHandler(
private inner class PacketDecodePipeline(parentContext: CoroutineContext) :
CoroutineScope by parentContext.childScope() {
private val channel: Channel<RawIncomingPacket> = Channel(Channel.BUFFERED)
private val packetCodec: PacketCodec by lazy { context[PacketCodec] }
init {
launch(CoroutineName("PacketDecodePipeline processor")) {
// 'single thread' processor
channel.consumeAsFlow().collect { raw ->
val result = PacketCodecImpl().processBody(context.bot, raw) // TODO: 2021/4/17 components
val result = packetCodec.processBody(context.bot, raw)
if (result == null) {
collectUnknownPacket(raw)
} else collectReceived(result)
@ -145,14 +147,18 @@ internal class NettyNetworkHandler(
/**
* When state is initialized, it must be set to [_state]. (inside [setState])
*
* For what jobs each state will do, it is not solely decided by the state itself. [StateObserver]s may also launch jobs into the scope.
*
* @see StateObserver
*/
private abstract inner class NettyState(
correspondingState: NetworkHandler.State
correspondingState: State
) : BaseStateImpl(correspondingState) {
abstract suspend fun sendPacketImpl(packet: OutgoingPacket)
}
private inner class StateInitialized : NettyState(NetworkHandler.State.INITIALIZED) {
private inner class StateInitialized : NettyState(State.INITIALIZED) {
override suspend fun sendPacketImpl(packet: OutgoingPacket) {
error("Cannot send packet when connection is not set. (resumeConnection not called.)")
}
@ -173,13 +179,12 @@ internal class NettyNetworkHandler(
*/
private inner class StateConnecting(
val decodePipeline: PacketDecodePipeline,
) : NettyState(NetworkHandler.State.CONNECTING) {
) : NettyState(State.CONNECTING) {
private val connection = async { createConnection(decodePipeline) }
private val connectResult = async {
val connection = connection.await()
connection.join()
context[SsoProcessor].login(this@NettyNetworkHandler)
setStateForJobCompletion { StateOK(connection) }
}.apply {
invokeOnCompletion { error ->
if (error != null) setState {
@ -198,14 +203,39 @@ internal class NettyNetworkHandler(
override suspend fun resumeConnection0() {
connectResult.await() // propagates exceptions
val connection = connection.await()
setState { StateLoading(connection) }
.resumeConnection()
}
override fun toString(): String = "StateConnecting"
}
/**
* @see BotInitProcessor
* @see StateObserver
*/
private inner class StateLoading(
private val connection: NettyChannel
) : NettyState(State.LOADING) {
override suspend fun sendPacketImpl(packet: OutgoingPacket) {
connection.writeAndFlush(packet)
}
override suspend fun resumeConnection0() {
(coroutineContext.job as CompletableJob).run {
complete()
join()
}
setState { StateOK(connection) }
} // noop
override fun toString(): String = "StateLoading"
}
private inner class StateOK(
private val connection: NettyChannel
) : NettyState(NetworkHandler.State.OK) {
) : NettyState(State.OK) {
override suspend fun sendPacketImpl(packet: OutgoingPacket) {
connection.writeAndFlush(packet)
}
@ -216,7 +246,7 @@ internal class NettyNetworkHandler(
private inner class StateConnectionLost(
private val cause: Throwable
) : NettyState(NetworkHandler.State.CONNECTION_LOST) {
) : NettyState(State.CONNECTION_LOST) {
override suspend fun sendPacketImpl(packet: OutgoingPacket) {
throw IllegalStateException("Connection is lost so cannot send packet. Call resumeConnection first.", cause)
}
@ -229,7 +259,7 @@ internal class NettyNetworkHandler(
private inner class StateClosed(
val exception: Throwable?
) : NettyState(NetworkHandler.State.CLOSED) {
) : NettyState(State.CLOSED) {
init {
closeSuper(exception)
}

View File

@ -10,6 +10,7 @@
package net.mamoe.mirai.internal.network.handler.selector
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.yield
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory
import org.jetbrains.annotations.TestOnly
@ -37,7 +38,8 @@ internal abstract class AbstractKeepAliveNetworkHandlerSelector<H : NetworkHandl
final override fun getResumedInstance(): H? = current.value
final override tailrec suspend fun awaitResumeInstance(): H {
final override tailrec suspend fun awaitResumeInstance(): H { // TODO: 2021/4/18 max 5 retry
yield()
val current = getResumedInstance()
return if (current != null) {
when (current.state) {
@ -45,9 +47,15 @@ internal abstract class AbstractKeepAliveNetworkHandlerSelector<H : NetworkHandl
this.current.compareAndSet(current, null) // invalidate the instance and try again.
awaitResumeInstance() // will create new instance.
}
NetworkHandler.State.CONNECTING,
NetworkHandler.State.CONNECTION_LOST,
NetworkHandler.State.INITIALIZED,
NetworkHandler.State.CONNECTING,
NetworkHandler.State.INITIALIZED -> {
current.resumeConnection()
return awaitResumeInstance()
}
NetworkHandler.State.LOADING -> {
return current
}
NetworkHandler.State.OK -> {
current.resumeConnection()
return current

View File

@ -9,6 +9,11 @@
package net.mamoe.mirai.internal.network.handler.selector
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.selects.SelectClause1
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.context.NetworkHandlerContext
@ -26,10 +31,14 @@ internal class SelectorNetworkHandler(
override val context: NetworkHandlerContext, // impl notes: may consider to move into function member.
private val selector: NetworkHandlerSelector<*>,
) : NetworkHandler {
private val scope = CoroutineScope(SupervisorJob(context.bot.coroutineContext[Job]))
private suspend inline fun instance(): NetworkHandler = selector.awaitResumeInstance()
override val state: State
get() = selector.getResumedInstance()?.state ?: State.INITIALIZED
override val onStateChanged: SelectClause1<State>
get() = selector.getResumedInstance()?.onStateChanged
?: scope.async { instance().state }.onAwait
override suspend fun resumeConnection() {
instance() // the selector will resume connection for us.

View File

@ -48,7 +48,17 @@ internal class CombinedStateObserver(
last.afterStateResume(networkHandler, state, result)
}
override fun toString(): String {
return "CombinedStateObserver(first=$first, last=$last)"
}
companion object {
operator fun StateObserver.plus(last: StateObserver): StateObserver = CombinedStateObserver(this, last)
operator fun StateObserver?.plus(last: StateObserver?): StateObserver {
return when {
this == null -> last
last == null -> this
else -> CombinedStateObserver(this, last)
} ?: StateObserver.NOP
}
}
}

View File

@ -18,7 +18,7 @@ internal class LoggingStateObserver(
val logger: MiraiLogger
) : StateObserver {
override fun toString(): String {
return "LoggingStateObserver(logger=$logger)"
return "LoggingStateObserver"
}
override fun stateChanged(
@ -37,6 +37,10 @@ internal class LoggingStateObserver(
logger.debug({ "State changed: ${previousState.correspondingState} -> $exception" }, exception)
}
override fun beforeStateResume(networkHandler: NetworkHandler, state: NetworkHandlerSupport.BaseStateImpl) {
logger.debug { "State resuming: ${state.correspondingState}." }
}
override fun afterStateResume(
networkHandler: NetworkHandler,
state: NetworkHandlerSupport.BaseStateImpl,

View File

@ -14,6 +14,11 @@ import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.error
internal fun StateObserver.safe(logger: MiraiLogger): StateObserver {
if (this is SafeStateObserver) return this
return SafeStateObserver(this, logger)
}
/**
* Catches exception then log by [logger]
*/

View File

@ -0,0 +1,33 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.handler.state
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
internal abstract class StateChangedObserver(
val state: State,
) : StateObserver {
abstract fun stateChanged0(
networkHandler: NetworkHandlerSupport,
previous: NetworkHandlerSupport.BaseStateImpl,
new: NetworkHandlerSupport.BaseStateImpl
)
override fun stateChanged(
networkHandler: NetworkHandlerSupport,
previous: NetworkHandlerSupport.BaseStateImpl,
new: NetworkHandlerSupport.BaseStateImpl
) {
if (previous.correspondingState != state && new.correspondingState == state) {
stateChanged0(networkHandler, previous, new)
}
}
}

View File

@ -12,6 +12,7 @@ package net.mamoe.mirai.internal.network.handler.state
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
import net.mamoe.mirai.internal.network.handler.component.ComponentKey
import net.mamoe.mirai.internal.network.handler.state.CombinedStateObserver.Companion.plus
/**
* Stateless observer of state changes.
@ -50,5 +51,19 @@ internal interface StateObserver {
}
companion object : ComponentKey<StateObserver>
companion object : ComponentKey<StateObserver> {
internal val NOP = object : StateObserver {
override fun toString(): String {
return "StateObserver.NOP"
}
}
fun chainOfNotNull(
vararg observers: StateObserver?,
): StateObserver {
return observers.reduceOrNull { acc, stateObserver ->
acc + stateObserver
} ?: NOP
}
}
}

View File

@ -15,45 +15,6 @@ package net.mamoe.mirai.internal.network.net.protocol
* 垃圾分类
*/
private object `handle packet1` {
// if (packet != null && (bot.logger.isEnabled || logger.isEnabled)) {
// when {
// packet is ParseErrorPacket -> {
// packet.direction.getLogger(bot).error(packet.error)
// }
// packet is Packet.NoLog -> {
// // nothing to do
// }
// packet is MessageEvent -> packet.logMessageReceived()
// packet is Event && packet !is Packet.NoEventLog -> bot.logger.verbose {
// "Event: $packet".replaceMagicCodes()
// }
// else -> logger.verbose { "Recv: $packet".replaceMagicCodes() }
// }
// }
}
private object `handle packet2` {
// if (packet is Event) {
// if ((packet as? BroadcastControllable)?.shouldBroadcast != false) {
// if (packet is BotEvent) {
// withContext(bot.coroutineContext[CoroutineExceptionHandler] ?: CoroutineExceptionHandler { _, t ->
// bot.logger.warning(
// """
// Event processing: An exception occurred but no CoroutineExceptionHandler found in coroutineContext of bot
// """.trimIndent(), t
// )
// }) {
// packet.broadcast()
// }
// } else {
// packet.broadcast()
// }
// }
//
// if (packet is CancellableEvent && packet.isCancelled) return
// }
}
private object `stat heartbeat` {
// private suspend fun doStatHeartbeat(): Throwable? {
@ -69,18 +30,6 @@ private object `stat heartbeat` {
}
private object syncMessageSvc {
// private suspend fun syncMessageSvc() {
// logger.info { "Syncing friend message history..." }
// withTimeoutOrNull(30000) {
// launch(CoroutineName("Syncing friend message history")) { nextEvent<MessageSvcPbGetMsg.GetMsgSuccess> { it.bot == this@QQAndroidBotNetworkHandler.bot } }
// MessageSvcPbGetMsg(bot.client, MsgSvc.SyncFlag.START, null).sendAndExpect<Packet>()
//
// } ?: error("timeout syncing friend message history.")
// logger.info { "Syncing friend message history: Success." }
// }
}
private object `config push syncer` {
// @Suppress("FunctionName", "UNUSED_VARIABLE")
// private fun BotNetworkHandler.ConfigPushSyncer(): suspend CoroutineScope.() -> Unit = launch@{
@ -108,42 +57,6 @@ private object `config push syncer` {
// }
}
private object `network init` {
// suspend fun init(): Unit = coroutineScope {
// check(bot.isActive) { "bot is dead therefore network can't init." }
// check(this@QQAndroidBotNetworkHandler.isActive) { "network is dead therefore can't init." }
//
// contactUpdater.closeAllContacts(CancellationException("re-init"))
//
// if (!pendingEnabled) {
// pendingIncomingPackets = ConcurrentLinkedQueue()
// _pendingEnabled.value = true
// }
//
// val registerResp = registerClientOnline()
//
// this@QQAndroidBotNetworkHandler.launch(
// CoroutineName("Awaiting ConfigPushSvc.PushReq"),
// block = ConfigPushSyncer()
// )
//
// launch {
// syncMessageSvc()
// }
//
// launch {
// bot.otherClientsLock.withLock {
// updateOtherClientsList()
// }
// }
//
// contactUpdater.loadAll(registerResp.origin)
//
// bot.firstLoginSucceed = true
// postInitActions()
// }
}
private object `update other client list` {
//

View File

@ -65,41 +65,34 @@ internal class IncomingPacket constructor(
}
}
@Suppress("UNCHECKED_CAST")
internal suspend inline fun <E : Packet> OutgoingPacketWithRespType<E>.sendAndExpect(
network: NetworkHandler,
timeoutMillis: Long = 5000,
retry: Int = 2
): E = network.run {
return (this@sendAndExpect as OutgoingPacket).sendAndExpect(timeoutMillis, retry)
}
): E = network.sendAndExpect(this, timeoutMillis, retry) as E
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "UNCHECKED_CAST")
@kotlin.internal.LowPriorityInOverloadResolution
internal suspend inline fun <E : Packet> OutgoingPacket.sendAndExpect(
network: NetworkHandler,
timeoutMillis: Long = 5000,
retry: Int = 2
): E = network.run {
return this@sendAndExpect.sendAndExpect(timeoutMillis, retry)
}
): E = network.sendAndExpect(this, timeoutMillis, retry) as E
internal suspend inline fun <E : Packet> OutgoingPacketWithRespType<E>.sendAndExpect(
bot: QQAndroidBot,
timeoutMillis: Long = 5000,
retry: Int = 2
): E = bot.network.run {
return (this@sendAndExpect as OutgoingPacket).sendAndExpect(timeoutMillis, retry)
}
): E = (this@sendAndExpect as OutgoingPacket).sendAndExpect(bot.network, timeoutMillis, retry)
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "UNCHECKED_CAST")
@kotlin.internal.LowPriorityInOverloadResolution
internal suspend inline fun <E : Packet> OutgoingPacket.sendAndExpect(
bot: QQAndroidBot,
timeoutMillis: Long = 5000,
retry: Int = 2
): E = bot.network.run {
return this@sendAndExpect.sendAndExpect(timeoutMillis, retry)
}
): E = bot.network.sendAndExpect(this, timeoutMillis, retry) as E
@Suppress("DuplicatedCode")

View File

@ -41,7 +41,6 @@ import net.mamoe.mirai.internal.network.protocol.data.proto.FrdSysMsg
import net.mamoe.mirai.internal.network.protocol.data.proto.MsgComm
import net.mamoe.mirai.internal.network.protocol.data.proto.MsgSvc
import net.mamoe.mirai.internal.network.protocol.data.proto.SubMsgType0x7
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketFactory
import net.mamoe.mirai.internal.network.protocol.packet.buildOutgoingUniPacket
import net.mamoe.mirai.internal.network.protocol.packet.chat.NewContact
@ -70,7 +69,7 @@ internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Re
client: QQAndroidClient,
syncFlag: MsgSvc.SyncFlag = MsgSvc.SyncFlag.START,
syncCookie: ByteArray?, //PbPushMsg.msg.msgHead.msgTime
): OutgoingPacket = buildOutgoingUniPacket(
) = buildOutgoingUniPacket(
client
) {
//println("syncCookie=${client.c2cMessageSync.syncCookie?.toUHexString()}")

View File

@ -22,7 +22,7 @@ import net.mamoe.mirai.event.events.OtherClientOfflineEvent
import net.mamoe.mirai.event.events.OtherClientOnlineEvent
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.contact.appId
import net.mamoe.mirai.internal.createOtherClient
import net.mamoe.mirai.internal.contact.createOtherClient
import net.mamoe.mirai.internal.message.contextualBugReportException
import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.handler.logger

View File

@ -25,7 +25,7 @@ import net.mamoe.mirai.event.events.OtherClientOfflineEvent
import net.mamoe.mirai.event.events.OtherClientOnlineEvent
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.contact.appId
import net.mamoe.mirai.internal.createOtherClient
import net.mamoe.mirai.internal.contact.createOtherClient
import net.mamoe.mirai.internal.message.contextualBugReportException
import net.mamoe.mirai.internal.network.*
import net.mamoe.mirai.internal.network.handler.components.ContactCacheService

View File

@ -22,7 +22,7 @@ internal class ComponentKeyTest {
@Test
fun testComponentName() {
assertEquals("TestComponent", TestComponent.componentName(false))
assertEquals(TestComponent::class.qualifiedName!!, TestComponent.smartToString(true))
assertEquals(TestComponent::class.qualifiedName!!, TestComponent.componentName(true))
}
@Test

View File

@ -25,7 +25,7 @@ private class TestSelector(val createInstance0: () -> NetworkHandler) :
}
}
internal class AbstractKeepAliveNetworkHandlerSelectorTest {
internal class KeepAliveNetworkHandlerSelectorTest {
private fun createHandler() = TestNetworkHandler(TestNetworkHandlerContext())

View File

@ -0,0 +1,40 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
@file:Suppress("MemberVisibilityCanBePrivate")
package net.mamoe.mirai.internal.network.handler
import net.mamoe.mirai.internal.MockBot
import net.mamoe.mirai.internal.network.handler.component.ConcurrentComponentStorage
import net.mamoe.mirai.internal.network.handler.components.SsoProcessor
import net.mamoe.mirai.internal.network.handler.components.SsoProcessorImpl
import net.mamoe.mirai.internal.network.handler.context.SsoProcessorContextImpl
import net.mamoe.mirai.internal.network.handler.state.LoggingStateObserver
import net.mamoe.mirai.internal.network.handler.state.SafeStateObserver
import net.mamoe.mirai.internal.network.handler.state.StateObserver
import net.mamoe.mirai.utils.MiraiLogger
internal abstract class AbstractNetworkHandlerTest {
protected open fun createNetworkHandlerContext() = TestNetworkHandlerContext(bot, logger, components)
protected open fun createNetworkHandler() = TestNetworkHandler(createNetworkHandlerContext())
protected val bot = MockBot()
protected val logger = MiraiLogger.create("test")
protected val components = ConcurrentComponentStorage().apply {
set(SsoProcessor, SsoProcessorImpl(SsoProcessorContextImpl(bot)))
set(
StateObserver,
SafeStateObserver(
LoggingStateObserver(MiraiLogger.create("States")),
MiraiLogger.create("StateObserver errors")
)
)
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.handler
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State.CONNECTING
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State.INITIALIZED
import net.mamoe.mirai.internal.network.handler.state.StateChangedObserver
import net.mamoe.mirai.internal.network.handler.state.StateObserver
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import kotlin.test.assertEquals
@TestInstance(TestInstance.Lifecycle.PER_METHOD)
internal class StateObserverTest : AbstractNetworkHandlerTest() {
@Test
fun `can trigger observer`() {
val called = ArrayList<Pair<NetworkHandlerSupport.BaseStateImpl, NetworkHandlerSupport.BaseStateImpl>>()
components[StateObserver] = object : StateObserver {
override fun stateChanged(
networkHandler: NetworkHandlerSupport,
previous: NetworkHandlerSupport.BaseStateImpl,
new: NetworkHandlerSupport.BaseStateImpl
) {
called.add(previous to new)
}
}
val handler = createNetworkHandler()
assertEquals(0, called.size)
handler.setState(INITIALIZED)
assertEquals(1, called.size)
assertEquals(INITIALIZED, called[0].first.correspondingState)
assertEquals(INITIALIZED, called[0].second.correspondingState)
handler.setState(CONNECTING)
assertEquals(2, called.size)
assertEquals(INITIALIZED, called[1].first.correspondingState)
assertEquals(CONNECTING, called[1].second.correspondingState)
}
@Test
fun `test StateChangedObserver`() {
val called = ArrayList<Pair<NetworkHandlerSupport.BaseStateImpl, NetworkHandlerSupport.BaseStateImpl>>()
components[StateObserver] = object : StateChangedObserver(CONNECTING) {
override fun stateChanged0(
networkHandler: NetworkHandlerSupport,
previous: NetworkHandlerSupport.BaseStateImpl,
new: NetworkHandlerSupport.BaseStateImpl
) {
called.add(previous to new)
}
}
val handler = createNetworkHandler()
assertEquals(0, called.size)
handler.setState(INITIALIZED)
assertEquals(0, called.size)
handler.setState(CONNECTING)
assertEquals(1, called.size)
assertEquals(INITIALIZED, called[0].first.correspondingState)
assertEquals(CONNECTING, called[0].second.correspondingState)
}
}