1
0
mirror of https://github.com/mamoe/mirai.git synced 2025-03-25 06:50:09 +08:00

Processors

This commit is contained in:
Him188 2021-04-19 23:38:49 +08:00
parent dd1b7404ea
commit 35b80dc700
16 changed files with 355 additions and 137 deletions

View File

@ -9,7 +9,21 @@
package net.mamoe.mirai.utils
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
public class ExceptionCollector {
public constructor()
public constructor(initial: Throwable?) {
collect(initial)
}
public constructor(vararg initials: Throwable?) {
for (initial in initials) {
collect(initial)
}
}
@Volatile
private var last: Throwable? = null
@ -23,6 +37,11 @@ public class ExceptionCollector {
this.last = e
}
public fun collectGet(e: Throwable?): Throwable {
this.collect(e)
return getLast()!!
}
/**
* Alias to [collect] to be used inside [withExceptionCollector]
*/
@ -44,7 +63,16 @@ public class ExceptionCollector {
* Run with a coverage of `throw`. All thrown exceptions will be caught and rethrown with [ExceptionCollector.collectThrow]
*/
public inline fun <R> withExceptionCollector(action: ExceptionCollector.() -> R): R {
ExceptionCollector().run {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return ExceptionCollector().withExceptionCollector(action)
}
/**
* Run with a coverage of `throw`. All thrown exceptions will be caught and rethrown with [ExceptionCollector.collectThrow]
*/
public inline fun <R> ExceptionCollector.withExceptionCollector(action: ExceptionCollector.() -> R): R {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
this.run {
try {
return action()
} catch (e: Throwable) {

View File

@ -161,7 +161,7 @@ internal open class MiraiImpl : IMirai, LowLevelApiAccessor {
}
override suspend fun refreshKeys(bot: Bot) {
// TODO: 2021/4/14 MiraiImpl.refreshKeys
// TODO: 2021/4/14 MiraiImpl.refreshKeysNow
}
override suspend fun rejectNewFriendRequest(event: NewFriendRequestEvent, blackList: Boolean) {

View File

@ -19,6 +19,7 @@ import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.component.ComponentStorage
import net.mamoe.mirai.internal.network.component.ConcurrentComponentStorage
import net.mamoe.mirai.internal.network.components.*
import net.mamoe.mirai.internal.network.context.SsoProcessorContext
import net.mamoe.mirai.internal.network.context.SsoProcessorContextImpl
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContextImpl
@ -86,8 +87,11 @@ internal class QQAndroidBot constructor(
internal val components: ConcurrentComponentStorage by lazy {
ConcurrentComponentStorage().apply {
val components = this // avoid mistakes
set(SsoProcessor, SsoProcessorImpl(SsoProcessorContextImpl(bot)))
// put sso processor at the first to make `client` faster.
set(SsoProcessorContext, SsoProcessorContextImpl(bot))
set(SsoProcessor, SsoProcessorImpl(get(SsoProcessorContext)))
set(HeartbeatProcessor, HeartbeatProcessor())
set(KeyRefreshProcessor, KeyRefreshProcessorImpl(networkLogger))
set(ConfigPushProcessor, ConfigPushProcessorImpl(networkLogger))
set(BotInitProcessor, BotInitProcessorImpl(bot, components, bot.logger))
set(ContactCacheService, ContactCacheServiceImpl(bot))
@ -96,7 +100,7 @@ internal class QQAndroidBot constructor(
set(ServerList, ServerListImpl())
set(
PacketHandler, PacketHandlerChain(
LoggingPacketHandler(bot, components, logger),
LoggingPacketHandler(bot, components, networkLogger),
EventBroadcasterPacketHandler(bot, components, logger)
)
)

View File

@ -17,8 +17,7 @@ import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.component.ComponentKey
import net.mamoe.mirai.internal.network.component.ComponentStorage
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
import net.mamoe.mirai.internal.network.handler.state.StateChangedObserver
import net.mamoe.mirai.internal.network.handler.state.JobAttachStateObserver
import net.mamoe.mirai.internal.network.handler.state.StateObserver
import net.mamoe.mirai.internal.network.protocol.data.proto.MsgSvc
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
@ -36,30 +35,7 @@ internal interface BotInitProcessor {
}
internal fun BotInitProcessor.asObserver(targetState: State = State.LOADING): StateObserver {
return BotInitProcessorAsStateObserverAdapter(this, targetState)
}
private class BotInitProcessorAsStateObserverAdapter(
private val processor: BotInitProcessor,
targetState: State
) : StateChangedObserver(targetState) {
override fun stateChanged0(
networkHandler: NetworkHandlerSupport,
previous: NetworkHandlerSupport.BaseStateImpl,
new: NetworkHandlerSupport.BaseStateImpl
) {
new.launch(CoroutineName("BotInitProcessor.init")) {
try {
processor.init()
} catch (e: Throwable) {
throw IllegalStateException("Exception in BotInitProcessor.init", e)
}
}
}
override fun toString(): String {
return "BotInitProcessorAsStateObserverAdapter"
}
return JobAttachStateObserver("BotInitProcessor.init", targetState) { init() }
}

View File

@ -0,0 +1,64 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.components
import kotlinx.coroutines.CancellationException
import net.mamoe.mirai.event.nextEventOrNull
import net.mamoe.mirai.internal.network.component.ComponentKey
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.protocol.packet.login.ConfigPushSvc
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.info
import net.mamoe.mirai.utils.warning
internal interface ConfigPushProcessor {
@Throws(RequireReconnectException::class)
suspend fun syncConfigPush(network: NetworkHandler)
class RequireReconnectException : Exception("ConfigPushProcessor: server requires reconnection")
companion object : ComponentKey<ConfigPushProcessor>
}
internal class ConfigPushProcessorImpl(
private val logger: MiraiLogger,
) : ConfigPushProcessor {
@Throws(ConfigPushProcessor.RequireReconnectException::class)
override suspend fun syncConfigPush(network: NetworkHandler) {
network.ConfigPushSyncer()
}
@Suppress("FunctionName", "UNUSED_VARIABLE")
private suspend fun NetworkHandler.ConfigPushSyncer() {
logger.info { "Awaiting ConfigPushSvc.PushReq." }
when (val resp: ConfigPushSvc.PushReq.PushReqResponse? = nextEventOrNull(30_000)) {
null -> {
val bdhSyncer = context[BdhSessionSyncer]
val hasSession = bdhSyncer.hasSession
kotlin.runCatching { bdhSyncer.bdhSession.completeExceptionally(CancellationException("Timeout waiting for ConfigPushSvc.PushReq")) }
if (!hasSession) {
logger.warning { "Missing ConfigPushSvc.PushReq. Switching server..." }
// bot.launch { BotOfflineEvent.RequireReconnect(bot).broadcast() }
throw ConfigPushProcessor.RequireReconnectException()
} else {
logger.warning { "Missing ConfigPushSvc.PushReq. Using the latest response. File uploading may be affected." }
}
}
is ConfigPushSvc.PushReq.PushReqResponse.ConfigPush -> {
logger.info { "ConfigPushSvc.PushReq: Config updated." }
}
is ConfigPushSvc.PushReq.PushReqResponse.ServerListPush -> {
logger.info { "ConfigPushSvc.PushReq: Server updated." }
// handled in ConfigPushSvc
return
}
}
}
}

View File

@ -126,7 +126,7 @@ internal class ContactUpdaterImpl(
while (true) {
val data = FriendList.GetFriendGroupList(
bot.client, count, 150, 0, 0
).sendAndExpect<FriendList.GetFriendGroupList.Response>(bot, timeoutMillis = 5000, retry = 2)
).sendAndExpect(bot, timeoutMillis = 5000, retry = 2)
total = data.totalFriendCount
@ -183,7 +183,7 @@ internal class ContactUpdaterImpl(
val members = if (cache != null) {
if (cache.isValid(stTroopNum)) {
cache.list.asSequence().also {
bot.network.logger.info { "Loaded ${cache.list.size} members from local cache for group ${groupName} (${groupCode})" }
bot.network.logger.info { "Loaded ${cache.list.size} members from local cache for group $groupName (${groupCode})" }
}
} else refreshGroupMemberList().also { sequence ->
cache.troopMemberNumSeq = dwMemberNumSeq ?: 0
@ -212,7 +212,7 @@ internal class ContactUpdaterImpl(
var currentCount = 0
logger.info { "Start loading stranger list..." }
val response = StrangerList.GetStrangerList(bot.client)
.sendAndExpect<StrangerList.GetStrangerList.Response>(bot, timeoutMillis = 5000, retry = 2)
.sendAndExpect(bot, timeoutMillis = 5000, retry = 2)
if (response.result == 0) {
response.strangerList.forEach {
@ -231,11 +231,11 @@ internal class ContactUpdaterImpl(
if (initGroupOk) {
return
}
TroopManagement.GetTroopConfig(bot.client).sendAndExpect<TroopManagement.GetTroopConfig.Response>(bot)
TroopManagement.GetTroopConfig(bot.client).sendAndExpect(bot)
logger.info { "Start loading group list..." }
val troopListData = FriendList.GetTroopListSimplify(bot.client)
.sendAndExpect<FriendList.GetTroopListSimplify.Response>(bot, retry = 5)
.sendAndExpect(bot, retry = 5)
val semaphore = Semaphore(30)

View File

@ -0,0 +1,30 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.components
import net.mamoe.mirai.internal.network.component.ComponentKey
import net.mamoe.mirai.internal.network.context.SsoProcessorContext
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.protocol.packet.login.StatSvc
import net.mamoe.mirai.internal.network.protocol.packet.sendAndExpect
internal class HeartbeatProcessor {
@Throws(Exception::class)
suspend fun doHeartbeatNow(networkHandler: NetworkHandler) {
StatSvc.SimpleGet(networkHandler.context.bot.client).sendAndExpect(
networkHandler,
timeoutMillis = networkHandler.context[SsoProcessorContext].configuration.heartbeatTimeoutMillis,
retry = 2
)
}
companion object : ComponentKey<HeartbeatProcessor>
}

View File

@ -0,0 +1,72 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.components
import kotlinx.coroutines.*
import net.mamoe.mirai.internal.network.component.ComponentKey
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.protocol.packet.login.wtlogin.WtLogin15
import net.mamoe.mirai.internal.network.protocol.packet.sendAndExpect
import net.mamoe.mirai.network.LoginFailedException
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.info
import net.mamoe.mirai.utils.toHumanReadableString
import kotlin.time.minutes
import kotlin.time.seconds
internal interface KeyRefreshProcessor {
suspend fun keyRefreshLoop(handler: NetworkHandler)
@Throws(LoginFailedException::class)
suspend fun refreshKeysNow(handler: NetworkHandler)
companion object : ComponentKey<KeyRefreshProcessor>
}
internal class KeyRefreshProcessorImpl(
private val logger: MiraiLogger,
) : KeyRefreshProcessor {
override suspend fun keyRefreshLoop(handler: NetworkHandler): Unit = coroutineScope {
val client = handler.context[SsoProcessor].client
launch(CoroutineName("Login Session Refresh Scheduler")) {
while (isActive) {
client.wLoginSigInfo.vKey.run {
//由过期时间最短的且不会被skey更换更新的vkey计算重新登录的时间
val delay = (expireTime - creationTime).toInt().seconds - 5.minutes
logger.info { "Scheduled refresh login session in ${delay.toHumanReadableString()}." }
delay(delay)
}
runCatching {
handler.context[SsoProcessor].login(handler)
}.onFailure {
logger.warning("Failed to refresh login session.", it)
}
}
}
launch(CoroutineName("Key Refresh Scheduler")) {
while (isActive) {
client.wLoginSigInfo.sKey.run {
val delay = (expireTime - creationTime).seconds - 5.minutes
logger.info { "Scheduled key refresh in ${delay.toHumanReadableString()}." }
delay(delay)
}
runCatching {
refreshKeysNow(handler)
}.onFailure {
logger.error("Failed to refresh key.", it)
}
}
}
}
override suspend fun refreshKeysNow(handler: NetworkHandler) {
WtLogin15(handler.context[SsoProcessor].client).sendAndExpect(handler)
}
}

View File

@ -87,7 +87,8 @@ internal class PacketCodecImpl : PacketCodec {
2 -> RawIncomingPacket(
raw.commandName,
raw.sequenceId,
raw.body.withUse { parseOicqResponse(client) })
raw.body.withUse { parseOicqResponse(client) }
)
else -> error("Unknown flag2=$flag2")
}
}
@ -170,7 +171,9 @@ internal class PacketCodecImpl : PacketCodec {
private fun ByteReadPacket.parseOicqResponse(
client: SsoSession,
): ByteArray {
check(readByte().toInt() == 2)
readByte().toInt().let {
check(it == 2) { "$it" }
}
this.discardExact(2)
this.discardExact(2)
this.readUShort()

View File

@ -9,8 +9,6 @@
package net.mamoe.mirai.internal.network.components
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.network.Packet
@ -20,6 +18,7 @@ import net.mamoe.mirai.internal.network.context.AccountSecretsImpl
import net.mamoe.mirai.internal.network.context.SsoProcessorContext
import net.mamoe.mirai.internal.network.context.SsoSession
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
import net.mamoe.mirai.internal.network.handler.state.StateChangedObserver
import net.mamoe.mirai.internal.network.handler.state.StateObserver
@ -38,6 +37,9 @@ import net.mamoe.mirai.utils.LoginSolver
import net.mamoe.mirai.utils.info
import net.mamoe.mirai.utils.withExceptionCollector
/**
* Handles login, and acts also as a mediator of [BotInitProcessor], []
*/
internal interface SsoProcessor {
val ssoContext: SsoProcessorContext
val client: QQAndroidClient
@ -69,12 +71,17 @@ internal interface SsoProcessor {
internal class SsoProcessorImpl(
override val ssoContext: SsoProcessorContext,
) : SsoProcessor {
///////////////////////////////////////////////////////////////////////////
// public
///////////////////////////////////////////////////////////////////////////
@Volatile
override var client = createClient(ssoContext.bot)
override val ssoSession: SsoSession get() = client
override fun createObserverChain(): StateObserver = StateObserver.chainOfNotNull(
object : StateChangedObserver(NetworkHandler.State.OK) {
object : StateChangedObserver(State.OK) {
override fun stateChanged0(
networkHandler: NetworkHandlerSupport,
previous: NetworkHandlerSupport.BaseStateImpl,
@ -116,17 +123,7 @@ internal class SsoProcessorImpl(
}
///////////////////////////////////////////////////////////////////////////
// state observers
///////////////////////////////////////////////////////////////////////////
private fun CoroutineScope.launchHeartbeat() = launch(CoroutineName("")) {
}
private inner class HeartbeatHandler
///////////////////////////////////////////////////////////////////////////
// Login methods
// login
///////////////////////////////////////////////////////////////////////////
// we have exactly two methods----slow and fast.

View File

@ -11,6 +11,7 @@ package net.mamoe.mirai.internal.network.context
import net.mamoe.mirai.internal.BotAccount
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.network.component.ComponentKey
import net.mamoe.mirai.internal.network.components.AccountSecretsManager
import net.mamoe.mirai.internal.network.components.SsoProcessor
import net.mamoe.mirai.internal.network.components.createAccountsSecretsManager
@ -34,6 +35,8 @@ internal interface SsoProcessorContext {
val accountSecretsManager: AccountSecretsManager
val configuration: BotConfiguration
companion object : ComponentKey<SsoProcessorContext>
}
internal class SsoProcessorContextImpl(

View File

@ -140,7 +140,7 @@ internal abstract class NetworkHandlerSupport(
*/
abstract inner class BaseStateImpl(
val correspondingState: NetworkHandler.State,
) : CoroutineScope by CoroutineScope(coroutineContext + SupervisorJob(coroutineContext.job)) {
) : CoroutineScope by CoroutineScope(coroutineContext + Job(coroutineContext.job)) {
/**
* May throw any exception that caused the state to fail.
@ -177,18 +177,6 @@ internal abstract class NetworkHandlerSupport(
*/
override val onStateChanged: SelectClause1<NetworkHandler.State> get() = _stateChangedDeferred.onAwait
/**
* Can only be used in a job launched within the state scope.
*/
@Suppress("SuspendFunctionOnCoroutineScope")
protected suspend inline fun setStateForJobCompletion(crossinline new: () -> BaseStateImpl) {
val job = currentCoroutineContext()[Job]
this.launch {
job?.join()
setState(new)
}
}
/**
* Calculate [new state][new] and set it as the current.
*

View File

@ -0,0 +1,46 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.handler.state
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
/**
* The [StateObserver] that attaches a job to the [CoroutineScope] of the state.
*/
internal class JobAttachStateObserver(
private val name: String,
targetState: NetworkHandler.State,
private val coroutineContext: CoroutineContext = EmptyCoroutineContext,
private val job: suspend CoroutineScope.(state: NetworkHandlerSupport.BaseStateImpl) -> Unit,
) : StateChangedObserver(targetState) {
override fun stateChanged0(
networkHandler: NetworkHandlerSupport,
previous: NetworkHandlerSupport.BaseStateImpl,
new: NetworkHandlerSupport.BaseStateImpl
) {
new.launch(CoroutineName(name) + coroutineContext) {
try {
job(new)
} catch (e: Throwable) {
throw IllegalStateException("Exception in attached Job '$name'", e)
}
}
}
override fun toString(): String {
return "JobAttachStateObserver(name=$name)"
}
}

View File

@ -15,7 +15,7 @@ import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
internal abstract class StateChangedObserver(
val state: State,
) : StateObserver {
abstract fun stateChanged0(
protected abstract fun stateChanged0(
networkHandler: NetworkHandlerSupport,
previous: NetworkHandlerSupport.BaseStateImpl,
new: NetworkHandlerSupport.BaseStateImpl

View File

@ -22,15 +22,14 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.sendBlocking
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import net.mamoe.mirai.internal.network.components.BotInitProcessor
import net.mamoe.mirai.internal.network.components.PacketCodec
import net.mamoe.mirai.internal.network.components.RawIncomingPacket
import net.mamoe.mirai.internal.network.components.SsoProcessor
import net.mamoe.mirai.internal.network.components.*
import net.mamoe.mirai.internal.network.context.SsoProcessorContext
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext
import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
import net.mamoe.mirai.internal.network.handler.state.StateObserver
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.utils.ExceptionCollector
import net.mamoe.mirai.utils.childScope
import net.mamoe.mirai.utils.debug
import java.net.SocketAddress
@ -113,6 +112,11 @@ internal class NettyNetworkHandler(
contextResult.complete(future.channel())
coroutineContext.job.invokeOnCompletion {
future.channel().close()
eventLoopGroup.shutdownGracefully()
}
future.channel().closeFuture().addListener {
setState { StateConnectionLost(it.cause()) }
}
@ -120,6 +124,8 @@ internal class NettyNetworkHandler(
return contextResult.await()
}
private val decodePipeline = PacketDecodePipeline(this@NettyNetworkHandler.coroutineContext)
private inner class PacketDecodePipeline(parentContext: CoroutineContext) :
CoroutineScope by parentContext.childScope() {
private val channel: Channel<RawIncomingPacket> = Channel(Channel.BUFFERED)
@ -164,7 +170,7 @@ internal class NettyNetworkHandler(
}
override suspend fun resumeConnection0() {
setState { StateConnecting(PacketDecodePipeline(this@NettyNetworkHandler.coroutineContext)) }
setState { StateConnecting(ExceptionCollector()) }
.resumeConnection()
}
@ -178,7 +184,12 @@ internal class NettyNetworkHandler(
* 4. If success, set state to [StateOK]
*/
private inner class StateConnecting(
val decodePipeline: PacketDecodePipeline,
/**
* Collected (suppressed) exceptions that have led this state.
*
* Dropped when state becomes [StateOK].
*/
private val collectiveExceptions: ExceptionCollector
) : NettyState(State.CONNECTING) {
private val connection = async { createConnection(decodePipeline) }
@ -189,7 +200,7 @@ internal class NettyNetworkHandler(
invokeOnCompletion { error ->
if (error != null) setState {
StateClosed(
CancellationException("Connection failure.", error)
CancellationException("Connection failure.", collectiveExceptions.collectGet(error))
)
} // logon failure closes the network handler.
// and this error will also be thrown by `StateConnecting.resumeConnection`
@ -236,24 +247,77 @@ internal class NettyNetworkHandler(
private inner class StateOK(
private val connection: NettyChannel
) : NettyState(State.OK) {
init {
coroutineContext.job.invokeOnCompletion {
connection.close()
}
}
private val heartbeatProcessor = context[HeartbeatProcessor]
private val heartbeat = async(CoroutineName("Heartbeat Scheduler")) {
while (isActive) {
try {
delay(context[SsoProcessorContext].configuration.heartbeatPeriodMillis)
} catch (e: CancellationException) {
return@async // considered normally cancel
}
try {
heartbeatProcessor.doHeartbeatNow(this@NettyNetworkHandler)
} catch (e: Throwable) {
setState {
StateConnecting(ExceptionCollector(IllegalStateException("Exception in Heartbeat job", e)))
}
}
}
}
private val configPush = launch(CoroutineName("ConfigPush sync")) {
try {
context[ConfigPushProcessor].syncConfigPush(this@NettyNetworkHandler)
} catch (e: ConfigPushProcessor.RequireReconnectException) {
setState { StateClosed(e) }
}
}
// we can also move them as observers if needed.
private val keyRefresh = launch(CoroutineName("Key refresh")) {
context[KeyRefreshProcessor].keyRefreshLoop(this@NettyNetworkHandler)
}
override suspend fun sendPacketImpl(packet: OutgoingPacket) {
connection.writeAndFlush(packet)
}
override suspend fun resumeConnection0() {} // noop
override suspend fun resumeConnection0() {
joinCompleted(coroutineContext.job)
joinCompleted(heartbeat)
joinCompleted(configPush)
joinCompleted(keyRefresh)
} // noop
private suspend inline fun joinCompleted(job: Job) {
if (job.isCompleted) job.join()
}
override fun toString(): String = "StateOK"
}
private inner class StateConnectionLost(
private val cause: Throwable
private val cause: Throwable?
) : NettyState(State.CONNECTION_LOST) {
override suspend fun sendPacketImpl(packet: OutgoingPacket) {
throw IllegalStateException("Connection is lost so cannot send packet. Call resumeConnection first.", cause)
throw IllegalStateException(
"Internal error: connection is lost so cannot send packet. Call resumeConnection first.",
cause
)
}
override suspend fun resumeConnection0() {
setState { StateConnecting(PacketDecodePipeline(this@NettyNetworkHandler.coroutineContext)) }
.resumeConnection() // the user wil
setState { StateConnecting(ExceptionCollector(cause)) }
.resumeConnection()
} // noop
}

View File

@ -16,66 +16,9 @@ package net.mamoe.mirai.internal.network.net.protocol
*/
private object `stat heartbeat` {
// private suspend fun doStatHeartbeat(): Throwable? {
// return retryCatching(2) {
// StatSvc.SimpleGet(bot.client)
// .sendAndExpect<StatSvc.SimpleGet.Response>(
// timeoutMillis = bot.configuration.heartbeatTimeoutMillis,
// retry = 2
// )
// return null
// }.exceptionOrNull()
// }
}
private object `config push syncer` {
// @Suppress("FunctionName", "UNUSED_VARIABLE")
// private fun BotNetworkHandler.ConfigPushSyncer(): suspend CoroutineScope.() -> Unit = launch@{
// logger.info { "Awaiting ConfigPushSvc.PushReq." }
// when (val resp: ConfigPushSvc.PushReq.PushReqResponse? = nextEventOrNull(20_000)) {
// null -> {
// val hasSession = bot.bdhSyncer.hasSession
// kotlin.runCatching { bot.bdhSyncer.bdhSession.completeExceptionally(CancellationException("Timeout waiting for ConfigPushSvc.PushReq")) }
// if (!hasSession) {
// logger.warning { "Missing ConfigPushSvc.PushReq. Switching server..." }
// bot.launch { BotOfflineEvent.RequireReconnect(bot).broadcast() }
// } else {
// logger.warning { "Missing ConfigPushSvc.PushReq. Using the latest response. File uploading may be affected." }
// }
// }
// is ConfigPushSvc.PushReq.PushReqResponse.ConfigPush -> {
// logger.info { "ConfigPushSvc.PushReq: Config updated." }
// }
// is ConfigPushSvc.PushReq.PushReqResponse.ServerListPush -> {
// logger.info { "ConfigPushSvc.PushReq: Server updated." }
// // handled in ConfigPushSvc
// return@launch
// }
// }
// }
}
private object `update other client list` {
//
// private suspend fun updateOtherClientsList() {
// val list = Mirai.getOnlineOtherClientsList(bot)
// bot.otherClients.delegate.clear()
// bot.otherClients.delegate.addAll(list.map { bot.createOtherClient(it) })
//
// if (bot.otherClients.isEmpty()) {
// bot.logger.info { "No OtherClient online." }
// } else {
// bot.logger.info { "Online OtherClients: " + bot.otherClients.joinToString { "${it.deviceName}(${it.platform?.name ?: "unknown platform"})" } }
// }
// }
}
private object `skey refresh` {
// suspend fun refreshKeys() {
// suspend fun refreshKeysNow() {
// WtLogin15(bot.client).sendAndExpect()
// }
@ -106,7 +49,7 @@ private object `skey refresh` {
delay(delay)
}
runCatching {
refreshKeys()
refreshKeysNow()
}.onFailure {
logger.error("Failed to refresh key.", it)
}