Redesign reconnecting logic

This commit is contained in:
Him188 2020-02-14 18:41:24 +08:00
parent c03fb41fd0
commit 1993047221
3 changed files with 128 additions and 84 deletions

View File

@ -20,9 +20,13 @@ import kotlinx.io.core.buildPacket
import kotlinx.io.core.use
import net.mamoe.mirai.data.MultiPacket
import net.mamoe.mirai.data.Packet
import net.mamoe.mirai.event.*
import net.mamoe.mirai.event.BroadcastControllable
import net.mamoe.mirai.event.CancellableEvent
import net.mamoe.mirai.event.Event
import net.mamoe.mirai.event.broadcast
import net.mamoe.mirai.event.events.BotOfflineEvent
import net.mamoe.mirai.network.BotNetworkHandler
import net.mamoe.mirai.network.WrongPasswordException
import net.mamoe.mirai.qqandroid.FriendInfoImpl
import net.mamoe.mirai.qqandroid.GroupImpl
import net.mamoe.mirai.qqandroid.QQAndroidBot
@ -37,7 +41,10 @@ import net.mamoe.mirai.qqandroid.network.protocol.packet.login.Heartbeat
import net.mamoe.mirai.qqandroid.network.protocol.packet.login.StatSvc
import net.mamoe.mirai.qqandroid.network.protocol.packet.login.WtLogin
import net.mamoe.mirai.utils.*
import net.mamoe.mirai.utils.io.*
import net.mamoe.mirai.utils.io.ByteArrayPool
import net.mamoe.mirai.utils.io.PlatformSocket
import net.mamoe.mirai.utils.io.readPacket
import net.mamoe.mirai.utils.io.useBytes
import kotlin.coroutines.CoroutineContext
import kotlin.jvm.Volatile
import kotlin.time.ExperimentalTime
@ -55,13 +62,42 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
private lateinit var channel: PlatformSocket
override suspend fun login() {
private var _packetReceiverJob: Job? = null
private val packetReceiveLock: Mutex = Mutex()
private fun startPacketReceiverJobOrGet(): Job {
val job = _packetReceiverJob
if (job != null && job.isActive && channel.isOpen) {
return job
}
return this.launch(CoroutineName("Incoming Packet Receiver")) {
while (channel.isOpen) {
val rawInput = try {
channel.read()
} catch (e: CancellationException) {
return@launch
} catch (e: Throwable) {
BotOfflineEvent.Dropped(bot).broadcast()
return@launch
}
packetReceiveLock.withLock {
processPacket(rawInput)
}
}
}.also { _packetReceiverJob = it }
}
override suspend fun relogin() {
if (::channel.isInitialized) {
channel.close()
}
channel = PlatformSocket()
// TODO: 2020/2/14 连接多个服务器
channel.connect("113.96.13.208", 8080)
this.launch(CoroutineName("Incoming Packet Receiver")) { processReceive() }
startPacketReceiverJobOrGet()
// logger.info("Trying login")
var response: WtLogin.Login.LoginPacketResponse = WtLogin.Login.SubCommand9(bot.client).sendAndExpect()
@ -94,7 +130,8 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
}
}
is WtLogin.Login.LoginPacketResponse.Error -> error(response.toString())
is WtLogin.Login.LoginPacketResponse.Error ->
throw WrongPasswordException(response.toString())
is WtLogin.Login.LoginPacketResponse.DeviceLockLogin -> {
response = WtLogin.Login.SubCommand20(
@ -112,18 +149,14 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
}
// println("d2key=${bot.client.wLoginSigInfo.d2Key.toUHexString()}")
StatSvc.Register(bot.client).sendAndExpect<StatSvc.Register.Response>(6000) // it's slow
repeat(2) {
StatSvc.Register(bot.client).sendAndExpect<StatSvc.Register.Response>(6000) // it's slow
}
}
@UseExperimental(MiraiExperimentalAPI::class, ExperimentalTime::class)
override suspend fun init(): Unit = coroutineScope {
this@QQAndroidBotNetworkHandler.subscribeAlways<BotOfflineEvent> {
if (this@QQAndroidBotNetworkHandler.bot == this.bot) {
logger.error("被挤下线")
close()
}
}
MessageSvc.PbGetMsg(bot.client, MsgSvc.SyncFlag.START, currentTimeSeconds).sendWithoutExpect()
bot.qqs.delegate.clear()
@ -172,6 +205,7 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
launch {
try {
bot.groups.delegate.addLast(
@Suppress("DuplicatedCode")
GroupImpl(
bot = bot,
coroutineContext = bot.coroutineContext,
@ -218,7 +252,7 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
if (failException != null) {
delay(bot.configuration.firstReconnectDelayMillis)
close()
bot.tryReinitializeNetworkHandler(failException)
BotOfflineEvent.Dropped(bot).broadcast()
}
}
}
@ -408,33 +442,6 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
}
@UseExperimental(ExperimentalCoroutinesApi::class)
private suspend fun processReceive() {
while (channel.isOpen) {
val rawInput = try {
channel.read()
} catch (e: ClosedChannelException) {
bot.tryReinitializeNetworkHandler(e)
return
} catch (e: ReadPacketInternalException) {
logger.error("Socket channel read failed: ${e.message}")
bot.tryReinitializeNetworkHandler(e)
return
} catch (e: CancellationException) {
return
} catch (e: Throwable) {
logger.error("Caught unexpected exceptions", e)
bot.tryReinitializeNetworkHandler(e)
return
}
packetReceiveLock.withLock {
processPacket(rawInput)
}
}
}
private val packetReceiveLock: Mutex = Mutex()
/**
* 发送一个包, 但不期待任何返回.
*/
@ -517,5 +524,5 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
super.close(cause)
}
override suspend fun awaitDisconnection() = supervisor.join()
override suspend fun awaitDisconnection() = _packetReceiverJob?.join() ?: Unit
}

View File

@ -12,10 +12,15 @@
package net.mamoe.mirai
import kotlinx.coroutines.*
import net.mamoe.mirai.event.Listener
import net.mamoe.mirai.event.broadcast
import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.event.events.BotOfflineEvent
import net.mamoe.mirai.event.events.BotReloginEvent
import net.mamoe.mirai.event.subscribeAlways
import net.mamoe.mirai.network.BotNetworkHandler
import net.mamoe.mirai.network.ForceOfflineException
import net.mamoe.mirai.network.LoginFailedException
import net.mamoe.mirai.network.closeAndJoin
import net.mamoe.mirai.utils.*
import net.mamoe.mirai.utils.io.logStacktrace
@ -78,60 +83,70 @@ abstract class BotImpl<N : BotNetworkHandler> constructor(
@Suppress("PropertyName")
internal lateinit var _network: N
final override suspend fun login() = reinitializeNetworkHandler(null)
@Suppress("unused")
private val offlineListener: Listener<BotOfflineEvent> = this.subscribeAlways { event ->
when (event) {
is BotOfflineEvent.Dropped -> {
bot.logger.info("Connection dropped or lost by server, retrying login")
// shouldn't be suspend!! This function MUST NOT inherit the context from the caller because the caller(NetworkHandler) is going to close
fun tryReinitializeNetworkHandler(
cause: Throwable?
): Job = launch {
var lastFailedException: Throwable? = null
repeat(configuration.reconnectionRetryTimes) {
try {
reinitializeNetworkHandler(cause)
logger.info("Reconnected successfully")
return@launch
} catch (e: Throwable) {
lastFailedException = e
delay(configuration.reconnectPeriodMillis)
var lastFailedException: Throwable? = null
repeat(configuration.reconnectionRetryTimes) {
try {
network.relogin()
logger.info("Reconnected successfully")
return@subscribeAlways
} catch (e: Throwable) {
lastFailedException = e
delay(configuration.reconnectPeriodMillis)
}
}
if (lastFailedException != null) {
throw lastFailedException!!
}
}
is BotOfflineEvent.Active -> {
val msg = if (event.cause == null) {
""
} else {
" with exception: " + event.cause.message
}
bot.logger.info("Bot is closed manually$msg")
close(CancellationException(event.toString()))
}
is BotOfflineEvent.Force -> {
bot.logger.info("Connection occupied by another android device: ${event.message}")
close(ForceOfflineException(event.toString()))
}
}
if (lastFailedException != null) {
throw lastFailedException!!
}
}
final override suspend fun login() = reinitializeNetworkHandler(null)
private suspend fun reinitializeNetworkHandler(
cause: Throwable?
) {
logger.info("BotAccount: $uin")
logger.info("Initializing BotNetworkHandler")
try {
if (::_network.isInitialized) {
BotOfflineEvent.Active(this, cause).broadcast()
_network.closeAndJoin(cause)
suspend fun doRelogin() {
while (true) {
_network = createNetworkHandler(this.coroutineContext)
try {
_network.relogin()
return
} catch (e: LoginFailedException) {
throw e
} catch (e: Exception) {
network.logger.error(e)
_network.closeAndJoin(e)
}
logger.warning("Login failed. Retrying in 3s...")
delay(3000)
}
} catch (e: Exception) {
logger.error("Cannot close network handler", e)
}
loginLoop@ while (true) {
_network = createNetworkHandler(this.coroutineContext)
try {
_network.login()
break@loginLoop
} catch (e: Exception) {
e.logStacktrace()
_network.closeAndJoin(e)
}
logger.warning("Login failed. Retrying in 3s...")
delay(3000)
}
repeat(1) block@{
suspend fun doInit() {
repeat(2) {
try {
_network.init()
return@block
return
} catch (e: Exception) {
e.logStacktrace()
}
@ -141,6 +156,16 @@ abstract class BotImpl<N : BotNetworkHandler> constructor(
logger.error("cannot init. some features may be affected")
}
logger.info("Initializing BotNetworkHandler")
if (::_network.isInitialized) {
BotReloginEvent(this, cause).broadcast()
doRelogin()
return
}
doRelogin()
doInit()
}
protected abstract fun createNetworkHandler(coroutineContext: CoroutineContext): N
@ -153,9 +178,11 @@ abstract class BotImpl<N : BotNetworkHandler> constructor(
if (cause == null) {
network.close()
this.botJob.complete()
offlineListener.complete()
} else {
network.close(cause)
this.botJob.completeExceptionally(cause)
offlineListener.completeExceptionally(cause)
}
}
groups.delegate.clear()

View File

@ -36,6 +36,7 @@ import net.mamoe.mirai.utils.io.PlatformDatagramChannel
*
* [BotNetworkHandler.close] 时将会 [取消][Job.cancel] 所有此作用域下的协程
*/
@MiraiInternalAPI
@Suppress("PropertyName")
abstract class BotNetworkHandler : CoroutineScope {
/**
@ -55,12 +56,20 @@ abstract class BotNetworkHandler : CoroutineScope {
/**
* 依次尝试登录到可用的服务器. 在任一服务器登录完成后返回.
* 本函数将挂起直到登录成功.
*
* - 会断开连接并重新登录.
* - 不会停止网络层的 [Job].
* - 重新登录时不会再次拉取联系人列表.
* - 挂起直到登录成功.
*
* 不要使用这个 API. 请使用 [Bot.login]
*
* @throws LoginFailedException 登录失败时
* @throws WrongPasswordException 密码错误时
*/
@Suppress("SpellCheckingInspection")
@MiraiInternalAPI
abstract suspend fun login()
abstract suspend fun relogin()
/**
* 初始化获取好友列表等值.
@ -92,6 +101,7 @@ abstract class BotNetworkHandler : CoroutineScope {
}
}
@UseExperimental(MiraiInternalAPI::class)
suspend fun BotNetworkHandler.closeAndJoin(cause: Throwable? = null) {
this.close(cause)
this.supervisor.join()