Remove rubbish; fix build; add todos

This commit is contained in:
Him188 2021-04-14 23:23:24 +08:00
parent b844efb072
commit d01f71ff8b
16 changed files with 327 additions and 1483 deletions

View File

@ -19,6 +19,7 @@ import kotlinx.io.core.*
import java.io.File
import kotlin.text.Charsets
public val DECRYPTER_16_ZERO: ByteArray = ByteArray(16)
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
public inline fun <R> ByteReadPacket.useBytes(

View File

@ -166,4 +166,15 @@ public inline fun <E> MutableList<E>.replaceAllKotlin(operator: (E) -> E) {
}
public fun systemProp(name: String, default: Boolean): Boolean =
System.getProperty(name, default.toString())?.toBoolean() ?: default
System.getProperty(name, default.toString())?.toBoolean() ?: default
public fun Throwable.getRootCause(maxDepth: Int = 20): Throwable {
var depth = 0
var rootCause: Throwable? = this
while (rootCause?.cause != null) {
rootCause = rootCause.cause
if (depth++ >= maxDepth) break
}
return rootCause ?: this
}

View File

@ -26,35 +26,35 @@ import net.mamoe.mirai.event.*
import net.mamoe.mirai.event.EventPriority.MONITOR
import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.event.events.BotOfflineEvent
import net.mamoe.mirai.event.events.BotReloginEvent
import net.mamoe.mirai.internal.network.BotNetworkHandler
import net.mamoe.mirai.internal.network.DefaultServerList
import net.mamoe.mirai.internal.network.closeAndJoin
import net.mamoe.mirai.network.ForceOfflineException
import net.mamoe.mirai.network.LoginFailedException
import net.mamoe.mirai.internal.network.net.NetworkHandler
import net.mamoe.mirai.supervisorJob
import net.mamoe.mirai.utils.*
import kotlin.coroutines.CoroutineContext
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
internal abstract class AbstractBot<N : BotNetworkHandler> constructor(
internal abstract class AbstractBot constructor(
final override val configuration: BotConfiguration,
final override val id: Long,
) : Bot, CoroutineScope {
// FASTEST INIT
///////////////////////////////////////////////////////////////////////////
// lifecycle
///////////////////////////////////////////////////////////////////////////
val supervisor = SupervisorJob(configuration.parentCoroutineContext[Job])
// FASTEST INIT
private val supervisor = SupervisorJob(configuration.parentCoroutineContext[Job])
final override val logger: MiraiLogger by lazy { configuration.botLoggerSupplier(this) }
final override val coroutineContext: CoroutineContext = // for id
configuration.parentCoroutineContext
.plus(supervisor)
.plus(configuration.parentCoroutineContext[CoroutineExceptionHandler]
?: CoroutineExceptionHandler { _, e ->
logger.error("An exception was thrown under a coroutine of Bot", e)
}
.plus(
configuration.parentCoroutineContext[CoroutineExceptionHandler]
?: CoroutineExceptionHandler { _, e ->
logger.error("An exception was thrown under a coroutine of Bot", e)
}
)
.plus(CoroutineName("Mirai Bot"))
@ -66,36 +66,24 @@ internal abstract class AbstractBot<N : BotNetworkHandler> constructor(
}
}
// region network
internal val serverList: MutableList<Pair<String, Int>> = mutableListOf()
///////////////////////////////////////////////////////////////////////////
// overrides
///////////////////////////////////////////////////////////////////////////
val network: N get() = _network
@Suppress("PropertyName")
internal lateinit var _network: N
suspend fun reinitializeNetwork() {
if (::_network.isInitialized) {
_network.closeAndJoin(null)
}
_network = createNetworkHandler(coroutineContext)
}
internal var _isConnecting: Boolean = false
override val isOnline: Boolean get() = _network.areYouOk()
final override val isOnline: Boolean get() = _network.state == NetworkHandler.State.OK
final override val eventChannel: EventChannel<BotEvent> =
GlobalEventChannel.filterIsInstance<BotEvent>().filter { it.bot === this@AbstractBot }
val otherClientsLock = Mutex() // lock sync
override val otherClients: ContactList<OtherClient> = ContactList()
/**
* Close server connection, resend login packet, BUT DOESN'T [BotNetworkHandler.init]
*/
@ThisApiMustBeUsedInWithConnectionLockBlock
@Throws(LoginFailedException::class) // only
protected abstract suspend fun relogin(cause: Throwable?)
///////////////////////////////////////////////////////////////////////////
// sync (// TODO: 2021/4/14 extract sync logic
///////////////////////////////////////////////////////////////////////////
val otherClientsLock = Mutex() // lock sync
// TODO: 2021/4/14 extract offlineListener
@OptIn(ExperimentalTime::class)
@Suppress("unused")
@ -108,11 +96,11 @@ internal abstract class AbstractBot<N : BotNetworkHandler> constructor(
if (
!event.bot.isActive // bot closed
|| !::_network.isInitialized // bot 还未登录就被 close
|| _isConnecting // bot 还在登入
// || _isConnecting // bot 还在登入 // TODO: 2021/4/14 处理还在登入?
) {
// Close network to avoid endless reconnection while network is ok
// https://github.com/mamoe/mirai/issues/894
kotlin.runCatching { network.close(event.castOrNull<BotOfflineEvent.CauseAware>()?.cause) }
kotlin.runCatching { network.close() }
return@subscribeAlways
}
/*
@ -125,7 +113,7 @@ internal abstract class AbstractBot<N : BotNetworkHandler> constructor(
val cause = event.cause
val msg = if (cause == null) "" else " with exception: $cause"
bot.logger.info("Bot is closed manually $msg", cause)
network.cancel(CancellationException("Bot offline manually $msg", cause))
network.close()
}
is BotOfflineEvent.Force -> {
bot.logger.info { "Connection occupied by another android device: ${event.message}" }
@ -135,7 +123,7 @@ internal abstract class AbstractBot<N : BotNetworkHandler> constructor(
bot.logger.info { "Reconnecting..." }
// delay(3000)
} else {
network.cancel(ForceOfflineException("Connection occupied by another android device: ${event.message}"))
network.close()
}
}
is BotOfflineEvent.MsfOffline,
@ -148,7 +136,7 @@ internal abstract class AbstractBot<N : BotNetworkHandler> constructor(
}
if (event.reconnect) {
if (!_network.isActive) {
if (_network.state != NetworkHandler.State.OK) {
// normally closed
return@subscribeAlways
}
@ -158,7 +146,7 @@ internal abstract class AbstractBot<N : BotNetworkHandler> constructor(
bot.asQQAndroidBot().client.run {
if (serverList.isEmpty()) {
bot.asQQAndroidBot().bdhSyncer.loadServerListFromCache()
bot.bdhSyncer.loadServerListFromCache()
if (serverList.isEmpty()) {
serverList.addAll(DefaultServerList)
} else Unit
@ -167,7 +155,9 @@ internal abstract class AbstractBot<N : BotNetworkHandler> constructor(
bot.launch {
val success: Boolean
val time = measureTime { success = Reconnect().reconnect(event) }
val time = measureTime {
success = TODO("relogin")
}
if (success) {
logger.info { "Reconnected successfully in ${time.toHumanReadableString()}" }
@ -176,141 +166,18 @@ internal abstract class AbstractBot<N : BotNetworkHandler> constructor(
}
}
private inner class Reconnect {
suspend fun reconnect(event: BotOfflineEvent): Boolean {
retryCatchingExceptions<Unit>(
configuration.reconnectionRetryTimes,
except = LoginFailedException::class
) { tryCount, _ ->
if (tryCount != 0) {
delay(configuration.reconnectPeriodMillis)
}
///////////////////////////////////////////////////////////////////////////
// network
///////////////////////////////////////////////////////////////////////////
internal val serverList: MutableList<Pair<String, Int>> = mutableListOf()
// Close network to avoid endless reconnection while network is ok
// https://github.com/mamoe/mirai/issues/894
kotlin.runCatching { network.close(event.castOrNull<BotOfflineEvent.CauseAware>()?.cause) }
// TODO: 2021/4/14 handle serverList
login()
_network.postInitActions()
// network.withConnectionLock {
// /**
// * [AbstractBot.relogin] only, no [BotNetworkHandler.init]
// */
// @OptIn(ThisApiMustBeUsedInWithConnectionLockBlock::class)
// relogin((event as? BotOfflineEvent.Dropped)?.cause)
// }
launch {
BotReloginEvent(bot, (event as? BotOfflineEvent.CauseAware)?.cause).broadcast()
}
return true
}.getOrElse { exception ->
if (exception is LoginFailedException && !exception.killBot) {
logger.info { "Cannot reconnect." }
logger.error(exception)
// logger.info { "Retrying in 3s..." }
// delay(3000)
return false
}
logger.info { "Cannot reconnect." }
bot.cancel(CancellationException("Cannot reconnect.", exception))
return false
}
val network: NetworkHandler get() = _network
return false
}
}
/**
* 仅用在 [login]
*/
private inner class Login {
private suspend fun doRelogin() {
while (true) {
reinitializeNetwork()
try {
_isConnecting = true
@OptIn(ThisApiMustBeUsedInWithConnectionLockBlock::class)
relogin(null)
return
} catch (e: Exception) {
if (e is LoginFailedException) {
if (e.killBot) throw e
} else {
network.logger.error(e)
}
logger.warning { "Login failed. Retrying in 3s... (rootCause=${e.rootCause})" }
_network.closeAndJoin(e)
delay(3000)
continue
} finally {
_isConnecting = false
}
// unreachable here
}
}
private suspend fun doInit() {
retryCatchingExceptions(5) { count, lastException ->
if (count != 0) {
if (!isActive) {
logger.error("Cannot init due to fatal error")
throw lastException ?: error("<No lastException>")
}
logger.warning { "Init failed. Retrying in 3s... (rootCause=${lastException?.rootCause})" }
delay(3000)
}
_network.init()
}.getOrElse {
logger.error { "Cannot init. some features may be affected" }
throw it // abort
}
}
@ThisApiMustBeUsedInWithConnectionLockBlock
private suspend fun reinitializeNetworkHandler(cause: Throwable?) {
// logger.info("Initializing BotNetworkHandler")
if (::_network.isInitialized) {
_network.cancel(CancellationException("manual re-login", cause = cause))
BotReloginEvent(this@AbstractBot, cause).broadcast()
doRelogin()
return
}
doRelogin()
doInit()
}
suspend fun doLogin() {
logger.info { "Logging in..." }
if (::_network.isInitialized) {
network.withConnectionLock {
@OptIn(ThisApiMustBeUsedInWithConnectionLockBlock::class)
reinitializeNetworkHandler(null)
}
} else {
@OptIn(ThisApiMustBeUsedInWithConnectionLockBlock::class)
reinitializeNetworkHandler(null)
}
// https://github.com/mamoe/mirai/issues/1019
kotlin.runCatching {
bot.nick
}.onFailure {
bot.asQQAndroidBot().nick = MiraiImpl.queryProfile(bot, bot.id).nickname
if (bot.nick.isBlank()) {
logger.warning { "Unable to fetch nickname of bot." }
}
}
logger.info { "Login successful" }
}
}
@Suppress("PropertyName")
internal lateinit var _network: NetworkHandler
/**
@ -319,10 +186,10 @@ internal abstract class AbstractBot<N : BotNetworkHandler> constructor(
*/
final override suspend fun login() {
if (!isActive) error("Bot is already closed and cannot relogin. Please create a new Bot instance then do login.")
Login().doLogin()
network
}
protected abstract fun createNetworkHandler(coroutineContext: CoroutineContext): N
protected abstract fun createNetworkHandler(coroutineContext: CoroutineContext): NetworkHandler
// endregion
@ -332,7 +199,7 @@ internal abstract class AbstractBot<N : BotNetworkHandler> constructor(
logger.info { "Bot cancelled" + throwable?.message?.let { ": $it" }.orEmpty() }
kotlin.runCatching {
network.close(throwable)
network.close()
}
offlineListener.cancel(CancellationException("Bot cancelled", throwable))
@ -353,20 +220,7 @@ internal abstract class AbstractBot<N : BotNetworkHandler> constructor(
return
}
if (::_network.isInitialized) {
if (this.network.areYouOk()) {
// send log out
kotlin.runCatching { runBlocking { sendLogout() } } // just ignore errors
GlobalScope.launch {
runCatching { BotOfflineEvent.Active(this@AbstractBot, cause).broadcast() }.exceptionOrNull()
?.let { logger.error(it) }
}
}
this.network.close(cause)
}
this.network.close()
if (supervisorJob.isActive) {
if (cause == null) {
@ -389,7 +243,4 @@ private val Throwable.rootCause: Throwable
if (depth++ == 20) break
}
return rootCause ?: this
}
@RequiresOptIn(level = RequiresOptIn.Level.ERROR)
internal annotation class ThisApiMustBeUsedInWithConnectionLockBlock
}

View File

@ -162,7 +162,7 @@ internal open class MiraiImpl : IMirai, LowLevelApiAccessor {
}
override suspend fun refreshKeys(bot: Bot) {
bot.asQQAndroidBot().network.refreshKeys()
// TODO: 2021/4/14 MiraiImpl.refreshKeys
}
override suspend fun rejectNewFriendRequest(event: NewFriendRequestEvent, blackList: Boolean) {

View File

@ -24,7 +24,9 @@ import net.mamoe.mirai.internal.message.ForwardMessageInternal
import net.mamoe.mirai.internal.message.LongMessageInternal
import net.mamoe.mirai.internal.network.*
import net.mamoe.mirai.internal.network.handler.BdhSessionSyncer
import net.mamoe.mirai.internal.network.handler.QQAndroidBotNetworkHandler
import net.mamoe.mirai.internal.network.net.NetworkHandler
import net.mamoe.mirai.internal.network.net.NetworkHandlerContextImpl
import net.mamoe.mirai.internal.network.net.impl.netty.NettyNetworkHandler
import net.mamoe.mirai.internal.network.net.protocol.SsoContext
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketWithRespType
@ -36,9 +38,9 @@ import net.mamoe.mirai.internal.utils.friendCacheFile
import net.mamoe.mirai.internal.utils.io.serialization.toByteArray
import net.mamoe.mirai.message.data.ForwardMessage
import net.mamoe.mirai.message.data.RichMessage
import net.mamoe.mirai.network.LoginFailedException
import net.mamoe.mirai.utils.*
import java.io.File
import java.net.InetSocketAddress
import kotlin.contracts.contract
import kotlin.coroutines.CoroutineContext
@ -60,7 +62,7 @@ internal fun QQAndroidBot.createOtherClient(
internal class QQAndroidBot constructor(
internal val account: BotAccount,
configuration: BotConfiguration
) : AbstractBot<QQAndroidBotNetworkHandler>(configuration, account.id), SsoContext {
) : AbstractBot(configuration, account.id), SsoContext {
val bdhSyncer: BdhSessionSyncer = BdhSessionSyncer(this)
///////////////////////////////////////////////////////////////////////////
@ -86,7 +88,7 @@ internal class QQAndroidBot constructor(
)
)
network.logger.info { "Saved account secrets to local cache for fast login." }
network.context.logger.info { "Saved account secrets to local cache for fast login." }
}
init {
@ -147,7 +149,7 @@ internal class QQAndroidBot constructor(
configuration.friendCacheFile().run {
createFileIfNotExists()
writeText(JsonForCache.encodeToString(FriendListCache.serializer(), friendListCache))
bot.network.logger.info { "Saved ${friendListCache.list.size} friends to local cache." }
bot.network.context.logger.info { "Saved ${friendListCache.list.size} friends to local cache." }
}
}
@ -160,29 +162,17 @@ internal class QQAndroidBot constructor(
override val groups: ContactList<Group> = ContactList()
/**
* Final process for 'login'
*/
@ThisApiMustBeUsedInWithConnectionLockBlock
@Throws(LoginFailedException::class) // only
override suspend fun relogin(cause: Throwable?) {
bdhSyncer.loadFromCache()
client.useNextServers { host, port ->
// net error in login
// network is dead therefore can't send any packet
reinitializeNetwork()
network.closeEverythingAndRelogin(host, port, cause, 0)
}
}
// TODO: 2021/4/14 bdhSyncer.loadFromCache() when login
override suspend fun sendLogout() {
network.run {
StatSvc.Register.offline(client).sendWithoutExpect()
}
network.sendWithoutExpect(StatSvc.Register.offline(client))
}
override fun createNetworkHandler(coroutineContext: CoroutineContext): QQAndroidBotNetworkHandler {
return QQAndroidBotNetworkHandler(coroutineContext, this)
override fun createNetworkHandler(coroutineContext: CoroutineContext): NetworkHandler {
return NettyNetworkHandler(
NetworkHandlerContextImpl(this, this),
InetSocketAddress("123", 1) // TODO: 2021/4/14 address
) // TODO: 2021/4/14
}
@JvmField

View File

@ -21,12 +21,15 @@ import net.mamoe.mirai.event.broadcast
import net.mamoe.mirai.event.events.*
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.contact.info.MemberInfoImpl
import net.mamoe.mirai.internal.message.*
import net.mamoe.mirai.internal.message.OfflineGroupImage
import net.mamoe.mirai.internal.network.BdhSession
import net.mamoe.mirai.internal.network.handler.QQAndroidBotNetworkHandler
import net.mamoe.mirai.internal.network.highway.*
import net.mamoe.mirai.internal.network.highway.ChannelKind
import net.mamoe.mirai.internal.network.highway.Highway
import net.mamoe.mirai.internal.network.highway.ResourceKind.GROUP_IMAGE
import net.mamoe.mirai.internal.network.highway.ResourceKind.GROUP_VOICE
import net.mamoe.mirai.internal.network.highway.postPtt
import net.mamoe.mirai.internal.network.highway.tryServersUpload
import net.mamoe.mirai.internal.network.net.NetworkHandler
import net.mamoe.mirai.internal.network.protocol.data.proto.Cmd0x388
import net.mamoe.mirai.internal.network.protocol.packet.EMPTY_BYTE_ARRAY
import net.mamoe.mirai.internal.network.protocol.packet.chat.TroopEssenceMsgManager
@ -146,7 +149,7 @@ internal class GroupImpl(
if (BeforeImageUploadEvent(this, resource).broadcast().isCancelled) {
throw EventCancelledException("cancelled by BeforeImageUploadEvent.ToGroup")
}
bot.network.run<QQAndroidBotNetworkHandler, Image> {
bot.network.run<NetworkHandler, Image> {
val response: ImgStore.GroupPicUp.Response = ImgStore.GroupPicUp(
bot.client,
uin = bot.id,
@ -204,7 +207,7 @@ internal class GroupImpl(
.toByteArray(Cmd0x388.ReqBody.serializer()),
)
}.recoverCatchingSuppressed {
when (val resp = PttStore.GroupPttUp(bot.client, bot.id, id, resource).sendAndExpect()) {
when (val resp = PttStore.GroupPttUp(bot.client, bot.id, id, resource).sendAndExpect<Any>()) {
is PttStore.GroupPttUp.Response.RequireUpload -> {
tryServersUpload(
bot,

View File

@ -1,122 +0,0 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
@file:Suppress("EXPERIMENTAL_API_USAGE")
package net.mamoe.mirai.internal.network
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import net.mamoe.mirai.Bot
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.utils.MiraiInternalApi
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.WeakRefProperty
/**
* Mirai 的网络处理器, 它承担所有数据包([Packet])的处理任务.
* [BotNetworkHandler] 是线程安全的.
*
* NetworkHandler 实现接口 [CoroutineScope]
* [BotNetworkHandler] 自己就是作用域.
* 所有 [BotNetworkHandler] 的协程均启动在此作用域下.
*
* [BotNetworkHandler] 的协程包含:
* - UDP 包接收: [PlatformDatagramChannel.read]
* - 心跳 Job
* - Key 刷新
* - 所有数据包处理和发送
*
* [BotNetworkHandler.close] 时将会 [取消][Job.cancel] 所有此作用域下的协程
*
* @suppress 此为**内部 API**, 可能在任意时刻被改动, 且不会给出任何警告.
*/
@Suppress("PropertyName")
internal abstract class BotNetworkHandler : CoroutineScope {
/**
* 所属 [Bot]. 为弱引用
*/
@WeakRefProperty
abstract val bot: QQAndroidBot
/**
* 监管 child [Job]s
*/
abstract val supervisor: CompletableJob
/**
* logger
*/
abstract val logger: MiraiLogger
/**
* 依次尝试登录到可用的服务器. 在任一服务器登录完成后返回.
*
* - 会断开连接并重新登录.
* - 不会停止网络层的 [Job].
* - 重新登录时不会再次拉取联系人列表.
* - 挂起直到登录成功.
*
* 不要使用这个 API. 请使用 [Bot.login]
*
* @throws LoginFailedException 登录失败时
* @throws WrongPasswordException 密码错误时
*/
@Suppress("SpellCheckingInspection")
@MiraiInternalApi
abstract suspend fun closeEverythingAndRelogin(host: String, port: Int, cause: Throwable? = null, step: Int)
abstract suspend fun postInitActions()
/**
* 初始化获取好友列表等值.
*
* 不要使用这个 API. 它会在登录完成后被自动调用.
*/
@MiraiInternalApi
open suspend fun init() {
}
/**
* [Bot] 正常运作时, 这个函数将一直挂起协程到 [Bot] [Bot.close]
*/
abstract suspend fun join()
// cool name
abstract fun areYouOk(): Boolean
private val connectionLock: Mutex = Mutex()
internal suspend inline fun withConnectionLock(block: BotNetworkHandler.() -> Unit) {
connectionLock.withLock { if (areYouOk()) return else block() }
}
/**
* 关闭网络接口, 停止所有有关协程和任务
*
* @param cause 关闭的原因. null 时视为正常关闭, null 时视为异常关闭.
*/
open fun close(cause: Throwable? = null) {
if (supervisor.isActive) {
if (cause != null) {
supervisor.cancel(CancellationException("NetworkHandler closed", cause))
} else {
supervisor.cancel(CancellationException("NetworkHandler closed"))
}
}
}
}
internal suspend fun BotNetworkHandler.closeAndJoin(cause: Throwable? = null) {
this.close(cause)
this.supervisor.join()
}

View File

@ -1,856 +0,0 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.handler
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.io.core.ByteReadPacket
import kotlinx.io.core.buildPacket
import kotlinx.io.core.readBytes
import net.mamoe.mirai.Mirai
import net.mamoe.mirai.contact.deviceName
import net.mamoe.mirai.contact.platform
import net.mamoe.mirai.event.*
import net.mamoe.mirai.event.events.*
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.contact.logMessageReceived
import net.mamoe.mirai.internal.contact.replaceMagicCodes
import net.mamoe.mirai.internal.createOtherClient
import net.mamoe.mirai.internal.network.*
import net.mamoe.mirai.internal.network.protocol.data.proto.MsgSvc
import net.mamoe.mirai.internal.network.protocol.packet.*
import net.mamoe.mirai.internal.network.protocol.packet.KnownPacketFactories.PacketFactoryIllegalStateException
import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.MessageSvcPbGetMsg
import net.mamoe.mirai.internal.network.protocol.packet.login.ConfigPushSvc
import net.mamoe.mirai.internal.network.protocol.packet.login.Heartbeat
import net.mamoe.mirai.internal.network.protocol.packet.login.StatSvc
import net.mamoe.mirai.internal.network.protocol.packet.login.WtLogin
import net.mamoe.mirai.internal.network.protocol.packet.login.wtlogin.*
import net.mamoe.mirai.internal.utils.NoRouteToHostException
import net.mamoe.mirai.internal.utils.PlatformSocket
import net.mamoe.mirai.internal.utils.SocketException
import net.mamoe.mirai.internal.utils.UnknownHostException
import net.mamoe.mirai.network.*
import net.mamoe.mirai.utils.*
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlin.coroutines.CoroutineContext
@Suppress("MemberVisibilityCanBePrivate")
internal class QQAndroidBotNetworkHandler(coroutineContext: CoroutineContext, bot: QQAndroidBot) : BotNetworkHandler() {
override val bot: QQAndroidBot by bot.unsafeWeakRef()
override val supervisor: CompletableJob = SupervisorJob(coroutineContext[Job])
override val logger: MiraiLogger get() = bot.configuration.networkLoggerSupplier(bot)
override val coroutineContext: CoroutineContext = coroutineContext + CoroutineExceptionHandler { _, throwable ->
logger.error("Exception in NetworkHandler", throwable)
} + supervisor
private lateinit var channel: PlatformSocket
private var _packetReceiverJob: Job? = null
private var heartbeatJob: Job? = null
private var statHeartbeatJob: Job? = null
private val packetReceiveLock: Mutex = Mutex()
override fun areYouOk(): Boolean {
return kotlin.runCatching {
this.isActive && ::channel.isInitialized && channel.isOpen
&& heartbeatJob?.isActive == true && _packetReceiverJob?.isActive == true
}.getOrElse { false }
}
private suspend fun startPacketReceiverJobOrKill(cancelCause: CancellationException? = null): Job {
_packetReceiverJob?.cancel(cancelCause)
return this.launch(CoroutineName("Incoming Packet Receiver")) {
while (channel.isOpen && isActive) {
val rawInput = try {
channel.read()
} catch (e: CancellationException) {
return@launch
} catch (e: Throwable) {
logger.verbose { "Channel closed." }
if (this@QQAndroidBotNetworkHandler.isActive) {
bot.launch { BotOfflineEvent.Dropped(bot, e).broadcast() }
}
return@launch
}
packetReceiveLock.withLock {
processPacket(rawInput)
}
}
}.also { _packetReceiverJob = it }
}
private fun startStatHeartbeatJobOrKill(cancelCause: CancellationException? = null): Job {
statHeartbeatJob?.cancel(cancelCause)
return this@QQAndroidBotNetworkHandler.launch(CoroutineName("statHeartbeatJob")) statHeartbeatJob@{
while (this.isActive) {
delay(bot.configuration.statHeartbeatPeriodMillis)
val failException = doStatHeartbeat()
if (failException != null) {
delay(bot.configuration.firstReconnectDelayMillis)
bot.launch {
BotOfflineEvent.Dropped(bot, failException).broadcast()
}
return@statHeartbeatJob
}
}
}.also { statHeartbeatJob = it }
}
private fun startHeartbeatJobOrKill(cancelCause: CancellationException? = null): Job {
heartbeatJob?.cancel(cancelCause)
return this@QQAndroidBotNetworkHandler.launch(CoroutineName("Heartbeat")) heartBeatJob@{
while (this.isActive) {
delay(bot.configuration.heartbeatPeriodMillis)
val failException = doHeartBeat()
if (failException != null) {
delay(bot.configuration.firstReconnectDelayMillis)
bot.launch {
BotOfflineEvent.Dropped(bot, failException).broadcast()
}
return@heartBeatJob
}
}
}.also { heartbeatJob = it }
}
// @param step
// 0 -> 初始状态, 其他函数调用应永远传入 0
// 1 -> 代表滑块验证已禁用
override suspend fun closeEverythingAndRelogin(host: String, port: Int, cause: Throwable?, step: Int) {
heartbeatJob?.cancel(CancellationException("relogin", cause))
heartbeatJob?.join()
statHeartbeatJob?.cancel(CancellationException("relogin", cause))
statHeartbeatJob?.join()
_packetReceiverJob?.cancel(CancellationException("relogin", cause))
_packetReceiverJob?.join()
if (::channel.isInitialized) {
// if (channel.isOpen) {
// kotlin.runCatching {
// registerClientOnline(500)
// }.exceptionOrNull() ?: return
// logger.info("Cannot do fast relogin. Trying slow relogin")
// }
channel.close()
}
channel = PlatformSocket()
while (isActive) {
try {
channel.connect(host, port)
break
} catch (e: SocketException) {
if (e is NoRouteToHostException || e.message?.contains("Network is unreachable") == true) {
logger.warning { "No route to host (Mostly due to no Internet connection). Retrying in 3s..." }
delay(3000)
} else {
throw e
}
} catch (e: UnknownHostException) {
if (e is NoRouteToHostException || e.message?.contains("Network is unreachable") == true) {
logger.warning { "No route to host (Mostly due to no Internet connection). Retrying in 3s..." }
delay(3000)
} else {
throw e
}
}
}
logger.info { "Connected to server $host:$port" }
if (bot.client.wLoginSigInfoInitialized) {
// do fast login
} else {
bot.initClient()
}
startPacketReceiverJobOrKill(CancellationException("relogin", cause))
if (bot.client.wLoginSigInfoInitialized) {
// do fast login
kotlin.runCatching {
doFastLogin()
}.onFailure {
bot.initClient()
doSlowLogin(host, port, cause, step)
}
} else {
doSlowLogin(host, port, cause, step)
}
// println("d2key=${bot.client.wLoginSigInfo.d2Key.toUHexString()}")
registerClientOnline()
startStatHeartbeatJobOrKill()
startHeartbeatJobOrKill()
bot.eventChannel.subscribeOnce<BotOnlineEvent>(this.coroutineContext) {
val bot = (bot as QQAndroidBot)
if (bot.firstLoginSucceed && bot.client.wLoginSigInfoInitialized) {
launch {
while (isActive) {
bot.client.wLoginSigInfo.vKey.run {
//由过期时间最短的且不会被skey更换更新的vkey计算重新登录的时间
val delay = (expireTime - creationTime - 5).times(1000)
logger.info { "Scheduled refresh login session in ${delay.millisToHumanReadableString()}." }
delay(delay)
}
runCatching {
doFastLogin()
registerClientOnline()
}.onFailure {
logger.warning("Failed to refresh login session.", it)
}
}
}
launch {
while (isActive) {
bot.client.wLoginSigInfo.sKey.run {
val delay = (expireTime - creationTime - 5).times(1000)
logger.info { "Scheduled key refresh in ${delay.millisToHumanReadableString()}." }
delay(delay)
}
runCatching {
refreshKeys()
}.onFailure {
logger.error("Failed to refresh key.", it)
}
}
}
}
}
}
private val fastLoginOrSendPacketLock = Mutex()
private suspend fun doFastLogin() {
fastLoginOrSendPacketLock.withLock {
val login10 = WtLogin10(bot.client).sendAndExpect(ignoreLock = true)
check(login10 is WtLogin.Login.LoginPacketResponse.Success) { "Fast login failed: $login10" }
}
}
private suspend fun doSlowLogin(host: String, port: Int, cause: Throwable?, step: Int) {
fun LoginSolver?.notnull(): LoginSolver {
checkNotNull(this) {
"No LoginSolver found. Please provide by BotConfiguration.loginSolver. " +
"For example use `BotFactory.newBot(...) { loginSolver = yourLoginSolver}` in Kotlin, " +
"use `BotFactory.newBot(..., new BotConfiguration() {{ setLoginSolver(yourLoginSolver) }})` in Java."
}
return this
}
val isSliderCaptchaSupport = bot.configuration.loginSolver?.isSliderCaptchaSupported ?: false
val allowSlider = isSliderCaptchaSupport
|| bot.configuration.protocol == BotConfiguration.MiraiProtocol.ANDROID_PHONE
|| step == 0
fun loginSolverNotNull() = bot.configuration.loginSolver.notnull()
var response: WtLogin.Login.LoginPacketResponse =
WtLogin9(bot.client, allowSlider).sendAndExpect()
mainloop@ while (true) {
when (response) {
is WtLogin.Login.LoginPacketResponse.UnsafeLogin -> {
loginSolverNotNull().onSolveUnsafeDeviceLoginVerify(bot, response.url)
response = WtLogin9(bot.client, allowSlider).sendAndExpect()
}
is WtLogin.Login.LoginPacketResponse.Captcha -> when (response) {
is WtLogin.Login.LoginPacketResponse.Captcha.Picture -> {
var result = loginSolverNotNull().onSolvePicCaptcha(bot, response.data)
if (result == null || result.length != 4) {
//refresh captcha
result = "ABCD"
}
response = WtLogin2.SubmitPictureCaptcha(bot.client, response.sign, result)
.sendAndExpect()
continue@mainloop
}
is WtLogin.Login.LoginPacketResponse.Captcha.Slider -> {
if (!isSliderCaptchaSupport) {
if (step == 0) {
return closeEverythingAndRelogin(host, port, cause, 1)
}
throw UnsupportedSliderCaptchaException(
buildString {
append("Mirai 无法完成滑块验证.")
if (allowSlider) {
append(" 使用协议 ")
append(bot.configuration.protocol)
append(" 强制要求滑块验证, 请更换协议后重试.")
}
append(" 另请参阅: https://github.com/project-mirai/mirai-login-solver-selenium")
}
)
}
val ticket = try {
loginSolverNotNull().onSolveSliderCaptcha(bot, response.url)
?.takeIf { it.isNotEmpty() }
?: return closeEverythingAndRelogin(host, port, cause, step)
} catch (lfe: LoginFailedException) {
throw lfe
} catch (error: Throwable) {
if (step == 0) {
logger.warning(error)
return closeEverythingAndRelogin(host, port, error, 1)
}
throw error
}
response = WtLogin2.SubmitSliderCaptcha(bot.client, ticket).sendAndExpect()
continue@mainloop
}
}
is WtLogin.Login.LoginPacketResponse.Error -> {
if (response.message.contains("0x9a")) { //Error(title=登录失败, message=请你稍后重试。(0x9a), errorInfo=)
throw RetryLaterException()
}
val msg = response.toString()
throw WrongPasswordException(buildString(capacity = msg.length) {
append(msg)
if (msg.contains("当前上网环境异常")) { // Error(title=禁止登录, message=当前上网环境异常,请更换网络环境或在常用设备上登录或稍后再试。, errorInfo=)
append(", tips=若频繁出现, 请尝试开启设备锁")
}
})
}
is WtLogin.Login.LoginPacketResponse.DeviceLockLogin -> {
response = WtLogin20(
bot.client
).sendAndExpect()
continue@mainloop
}
is WtLogin.Login.LoginPacketResponse.Success -> {
logger.info { "Login successful" }
break@mainloop
}
is WtLogin.Login.LoginPacketResponse.SMSVerifyCodeNeeded -> {
val message = "SMS required: $response, which isn't yet supported"
logger.error(message)
throw UnsupportedSMSLoginException(message)
}
}
}
}
suspend fun refreshKeys() {
WtLogin15(bot.client).sendAndExpect()
}
private suspend fun registerClientOnline(): StatSvc.Register.Response {
// object : OutgoingPacketFactory<Packet?>("push.proxyUnRegister") {
// override suspend fun ByteReadPacket.decode(bot: QQAndroidBot): Packet? {
// return null
// }
// }.buildOutgoingUniPacket(bot.client) {}.sendWithoutExpect()
// kotlin.runCatching {
// StatSvc.Register.offline(bot.client).sendAndExpect()
// }.getOrElse { logger.warning(it) }
return StatSvc.Register.online(bot.client).sendAndExpect().also {
lastRegisterResp = it
}
}
private suspend fun updateOtherClientsList() {
val list = Mirai.getOnlineOtherClientsList(bot)
bot.otherClients.delegate.clear()
bot.otherClients.delegate.addAll(list.map { bot.createOtherClient(it) })
if (bot.otherClients.isEmpty()) {
bot.logger.info { "No OtherClient online." }
} else {
bot.logger.info { "Online OtherClients: " + bot.otherClients.joinToString { "${it.deviceName}(${it.platform?.name ?: "unknown platform"})" } }
}
}
// caches
private val _pendingEnabled = atomic(true)
internal val pendingEnabled get() = _pendingEnabled.value
@JvmField
@Volatile
internal var pendingIncomingPackets: ConcurrentLinkedQueue<KnownPacketFactories.IncomingPacket<*>>? =
ConcurrentLinkedQueue()
private val contactUpdater: ContactUpdater by lazy { ContactUpdaterImpl(bot) }
private lateinit var lastRegisterResp: StatSvc.Register.Response
override suspend fun init(): Unit = coroutineScope {
check(bot.isActive) { "bot is dead therefore network can't init." }
check(this@QQAndroidBotNetworkHandler.isActive) { "network is dead therefore can't init." }
contactUpdater.closeAllContacts(CancellationException("re-init"))
if (!pendingEnabled) {
pendingIncomingPackets = ConcurrentLinkedQueue()
_pendingEnabled.value = true
}
this@QQAndroidBotNetworkHandler.launch(
CoroutineName("Awaiting ConfigPushSvc.PushReq"),
block = ConfigPushSyncer()
)
launch {
syncMessageSvc()
}
launch {
bot.otherClientsLock.withLock {
updateOtherClientsList()
}
}
contactUpdater.loadAll(lastRegisterResp.origin)
bot.firstLoginSucceed = true
postInitActions()
}
@Suppress("FunctionName", "UNUSED_VARIABLE")
private fun BotNetworkHandler.ConfigPushSyncer(): suspend CoroutineScope.() -> Unit = launch@{
logger.info { "Awaiting ConfigPushSvc.PushReq." }
when (val resp: ConfigPushSvc.PushReq.PushReqResponse? = nextEventOrNull(20_000)) {
null -> {
val hasSession = bot.bdhSyncer.hasSession
kotlin.runCatching { bot.bdhSyncer.bdhSession.completeExceptionally(CancellationException("Timeout waiting for ConfigPushSvc.PushReq")) }
if (!hasSession) {
logger.warning { "Missing ConfigPushSvc.PushReq. Switching server..." }
bot.launch { BotOfflineEvent.RequireReconnect(bot).broadcast() }
} else {
logger.warning { "Missing ConfigPushSvc.PushReq. Using the latest response. File uploading may be affected." }
}
}
is ConfigPushSvc.PushReq.PushReqResponse.ConfigPush -> {
logger.info { "ConfigPushSvc.PushReq: Config updated." }
}
is ConfigPushSvc.PushReq.PushReqResponse.ServerListPush -> {
logger.info { "ConfigPushSvc.PushReq: Server updated." }
// handled in ConfigPushSvc
return@launch
}
}
}
override suspend fun postInitActions() {
_pendingEnabled.value = false
pendingIncomingPackets?.forEach {
runCatching {
@Suppress("UNCHECKED_CAST")
KnownPacketFactories.handleIncomingPacket(
it as KnownPacketFactories.IncomingPacket<Packet>,
bot,
it.flag2,
it.consumer.cast() // IDE false positive warning
)
}.getOrElse {
logger.error("Exception on processing pendingIncomingPackets.", it)
}
}
val list = pendingIncomingPackets
pendingIncomingPackets = null // release, help gc
list?.clear() // help gc
runCatching {
BotOnlineEvent(bot).broadcast()
}.getOrElse {
logger.error("Exception on broadcasting BotOnlineEvent.", it)
}
}
init {
@Suppress("RemoveRedundantQualifierName")
val listener = bot.eventChannel
.parentJob(supervisor)
.subscribeAlways<BotReloginEvent>(priority = EventPriority.MONITOR) {
this@QQAndroidBotNetworkHandler.launch { syncMessageSvc() }
}
supervisor.invokeOnCompletion { listener.cancel() }
}
private suspend fun syncMessageSvc() {
logger.info { "Syncing friend message history..." }
withTimeoutOrNull(30000) {
launch(CoroutineName("Syncing friend message history")) { nextEvent<MessageSvcPbGetMsg.GetMsgSuccess> { it.bot == this@QQAndroidBotNetworkHandler.bot } }
MessageSvcPbGetMsg(bot.client, MsgSvc.SyncFlag.START, null).sendAndExpect<Packet>()
} ?: error("timeout syncing friend message history.")
logger.info { "Syncing friend message history: Success." }
}
private suspend fun doStatHeartbeat(): Throwable? {
return retryCatching(2) {
StatSvc.SimpleGet(bot.client)
.sendAndExpect<StatSvc.SimpleGet.Response>(
timeoutMillis = bot.configuration.heartbeatTimeoutMillis,
retry = 2
)
return null
}.exceptionOrNull()
}
private suspend fun doHeartBeat(): Throwable? {
return retryCatching(2) {
Heartbeat.Alive(bot.client)
.sendAndExpect<Heartbeat.Alive.Response>(
timeoutMillis = bot.configuration.heartbeatTimeoutMillis,
retry = 2
)
return null
}.exceptionOrNull()
}
/**
* 缓存超时处理的 [Job]. 超时后将清空缓存, 以免阻碍后续包的处理
*/
@Volatile
private var cachedPacketTimeoutJob: Job? = null
/**
* 缓存的包
*/
private val cachedPacket: AtomicRef<ByteReadPacket?> = atomic(null)
/**
* 缓存的包还差多少长度
*/
@Volatile
private var expectingRemainingLength: Long = 0
/**
* 解析包内容.
*
* @param input 一个完整的包的内容, 去掉开头的 int 包长度
*/
@OptIn(ExperimentalCoroutinesApi::class)
fun parsePacketAsync(input: ByteReadPacket): Job {
return this.launch(
start = CoroutineStart.ATOMIC
) {
input.use {
try {
parsePacket(it)
} catch (e: PacketFactoryIllegalStateException) {
logger.warning { "Network force offline: ${e.message}" }
bot.launch { BotOfflineEvent.PacketFactoryErrorCode(e.code, bot, e).broadcast() }
}
}
}
}
/**
* 解析包内容
* **注意**: 需要函数调用者 close 这个 [input]
*
* @param input 一个完整的包的内容, 去掉开头的 int 包长度
*/
@Throws(ForceOfflineException::class)
suspend fun parsePacket(input: ByteReadPacket) {
if (input.isEmpty) return
generifiedParsePacket<Packet>(input)
}
// with generic type, less mistakes
private suspend fun <P : Packet?> generifiedParsePacket(input: ByteReadPacket) {
KnownPacketFactories.parseIncomingPacket(
bot,
input
) { packetFactory: PacketFactory<P>, packet: P, commandName: String, sequenceId: Int ->
if (packet is MultiPacket<*>) {
packet.forEach {
handlePacket(null, it, commandName, sequenceId)
}
}
handlePacket(packetFactory, packet, commandName, sequenceId)
}
}
/**
* 处理解析完成的包.
*/
suspend fun <P : Packet?> handlePacket(
packetFactory: PacketFactory<P>?,
packet: P,
commandName: String,
sequenceId: Int
) {
// highest priority: pass to listeners (attached by sendAndExpect).
if (packet != null && (bot.logger.isEnabled || logger.isEnabled)) {
when {
packet is ParseErrorPacket -> {
packet.direction.getLogger(bot).error(packet.error)
}
packet is Packet.NoLog -> {
// nothing to do
}
packet is MessageEvent -> packet.logMessageReceived()
packet is Event && packet !is Packet.NoEventLog -> bot.logger.verbose {
"Event: $packet".replaceMagicCodes()
}
else -> logger.verbose { "Recv: $packet".replaceMagicCodes() }
}
}
packetListeners.forEach { listener ->
if (listener.filter(commandName, sequenceId) && packetListeners.remove(listener)) {
listener.complete(packet)
}
}
packetFactory?.run {
when (this) {
is OutgoingPacketFactory<P> -> bot.handle(packet)
is IncomingPacketFactory<P> -> bot.handle(packet, sequenceId)?.sendWithoutExpect()
}
}
if (packet is Event) {
if ((packet as? BroadcastControllable)?.shouldBroadcast != false) {
if (packet is BotEvent) {
withContext(bot.coroutineContext[CoroutineExceptionHandler] ?: CoroutineExceptionHandler { _, t ->
bot.logger.warning(
"""
Event processing: An exception occurred but no CoroutineExceptionHandler found in coroutineContext of bot
""".trimIndent(), t
)
}) {
packet.broadcast()
}
} else {
packet.broadcast()
}
}
if (packet is CancellableEvent && packet.isCancelled) return
}
}
/**
* 处理从服务器接收过来的包. 这些包可能是粘在一起的, 也可能是不完整的. 将会自动处理.
* 处理后的包会调用 [parsePacketAsync]
*/
private fun processPacket(rawInput: ByteReadPacket) {
if (rawInput.remaining == 0L) {
return
}
val cache = cachedPacket.value
if (cache == null) {
kotlin.runCatching {
// 没有缓存
var length: Int = rawInput.readInt() - 4
if (rawInput.remaining == length.toLong()) {
// 捷径: 当包长度正好, 直接传递剩余数据.
cachedPacketTimeoutJob?.cancel()
parsePacketAsync(rawInput)
return
}
// 循环所有完整的包
while (rawInput.remaining >= length) {
parsePacketAsync(rawInput.readPacketExact(length))
if (rawInput.remaining == 0L) {
cachedPacket.value = null // 表示包长度正好
cachedPacketTimeoutJob?.cancel()
rawInput.close()
return
}
length = rawInput.readInt() - 4
}
if (rawInput.remaining != 0L) {
// 剩余的包长度不够, 缓存后接收下一个包
expectingRemainingLength = length - rawInput.remaining
cachedPacket.value = rawInput
} else {
cachedPacket.value = null // 表示包长度正好
cachedPacketTimeoutJob?.cancel()
rawInput.close()
return
}
}.getOrElse {
cachedPacket.value = null
cachedPacketTimeoutJob?.cancel()
}
} else {
// 有缓存
val expectingLength = expectingRemainingLength
if (rawInput.remaining >= expectingLength) {
// 剩余长度够, 连接上去, 处理这个包.
parsePacketAsync(buildPacket {
writePacket(cache)
writePacket(rawInput, expectingLength)
})
cache.close()
cachedPacket.value = null // 缺少的长度已经给上了.
cachedPacketTimeoutJob?.cancel()
if (rawInput.remaining != 0L) {
return processPacket(rawInput) // 继续处理剩下内容
} else {
rawInput.close()
// 处理好了.
return
}
} else {
// 剩余不够, 连接上去
expectingRemainingLength -= rawInput.remaining
// do not inline `packet`. atomicfu unsupported
val packet = buildPacket {
writePacket(cache)
writePacket(rawInput)
}
cachedPacket.value = packet
}
}
cachedPacketTimeoutJob?.cancel()
cachedPacketTimeoutJob = launch {
delay(1000)
val get = cachedPacket.getAndSet(null)
get?.close()
if (cachedPacketTimeoutJob == this.coroutineContext[Job] && get != null) {
logger.warning { "等待另一部分包时超时. 将舍弃已接收的半个包" }
}
}
}
/**
* 发送一个包, 但不期待任何返回.
* 不推荐使用它, 可能产生意外的情况.
*/
suspend fun OutgoingPacket.sendWithoutExpect() {
check(bot.isActive) { "bot is dead therefore can't send ${this.commandName}" }
check(this@QQAndroidBotNetworkHandler.isActive) { "network is dead therefore can't send ${this.commandName}" }
check(channel.isOpen) { "network channel is closed therefore can't send ${this.commandName}" }
logger.verbose { "Send: ${this.commandName}" }
delegate.withUse {
channel.send(delegate)
}
}
suspend inline fun <E : Packet> OutgoingPacketWithRespType<E>.sendAndExpect(
timeoutMillis: Long = 5000,
retry: Int = 2,
ignoreLock: Boolean = false,
): E {
return (this as OutgoingPacket).sendAndExpect(timeoutMillis, retry, ignoreLock)
}
/**
* 发送一个包, 挂起协程直到接收到指定的返回包或超时
*/
@Suppress("UNCHECKED_CAST")
suspend fun <E : Packet> OutgoingPacket.sendAndExpect(
timeoutMillis: Long = 5000,
retry: Int = 2,
ignoreLock: Boolean = false
): E {
return if (!ignoreLock) fastLoginOrSendPacketLock.withLock {
sendAndExpectImpl(timeoutMillis, retry)
} else sendAndExpectImpl(timeoutMillis, retry)
}
private suspend fun <E : Packet> OutgoingPacket.sendAndExpectImpl(timeoutMillis: Long, retry: Int): E {
require(timeoutMillis > 100) { "timeoutMillis must > 100" }
require(retry in 0..10) { "retry must in 0..10" }
if (!bot.isActive) {
throw CancellationException("bot is dead therefore can't send ${this.commandName}")
}
if (!this@QQAndroidBotNetworkHandler.isActive) {
throw CancellationException("network is dead therefore can't send any packet")
}
if (!channel.isOpen) {
throw CancellationException("network channel is closed")
}
val data = this.delegate.withUse { readBytes() }
return retryCatchingExceptions(
retry + 1,
except = CancellationException::class.cast() // explicit cast due for stupid IDE.
// CancellationException means network closed so don't retry
) {
withPacketListener(commandName, sequenceId) { listener ->
@Suppress("UNCHECKED_CAST")
return withTimeout(timeoutMillis) { // may throw CancellationException
channel.send(data, 0, data.size)
logger.verbose { "Send: $commandName" }
listener.await()
} as E
}
}.getOrThrow<E>()
}
private inline fun <R> withPacketListener(commandName: String, sequenceId: Int, block: (PacketListener) -> R): R {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
val handler = PacketListener(commandName = commandName, sequenceId = sequenceId)
packetListeners.add(handler)
try {
return block(handler)
} finally {
kotlin.runCatching { if (handler.isActive) handler.cancel() } // ensure coroutine completion
packetListeners.remove(handler)
}
}
@PublishedApi
internal val packetListeners: ConcurrentLinkedQueue<PacketListener> = ConcurrentLinkedQueue()
@PublishedApi
internal inner class PacketListener(
// callback
val commandName: String,
val sequenceId: Int
) : CompletableDeferred<Packet?> by CompletableDeferred(supervisor) {
fun filter(commandName: String, sequenceId: Int) =
this.commandName == commandName && this.sequenceId == sequenceId
}
init {
this.supervisor.invokeOnCompletion {
close(it)
}
}
override fun close(cause: Throwable?) {
if (::channel.isInitialized) {
channel.close()
}
super.close(cause)
}
override suspend fun join() = supervisor.join()
}

View File

@ -16,12 +16,14 @@ import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.net.NetworkHandler.State
import net.mamoe.mirai.internal.network.net.protocol.SsoController
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketWithRespType
import net.mamoe.mirai.utils.BotConfiguration
import net.mamoe.mirai.utils.MiraiLogger
import java.net.InetAddress
import java.net.InetSocketAddress
import java.net.SocketAddress
import java.util.concurrent.CancellationException
import kotlin.coroutines.CoroutineContext
/**
* Immutable context for [NetworkHandler]
@ -58,6 +60,12 @@ internal class NetworkHandlerContextImpl(
internal interface NetworkHandler {
val context: NetworkHandlerContext
val logger get() = context.logger // TODO: 2021/4/14 just for migration
@Deprecated("") // TODO: 2021/4/14 migrate NetworkHandler.coroutineContext
val coroutineContext: CoroutineContext
get() = error("ERROR")
/**
* State of this handler.
*/
@ -115,11 +123,26 @@ internal interface NetworkHandler {
*/
suspend fun sendWithoutExpect(packet: OutgoingPacket)
@Suppress("INAPPLICABLE_JVM_NAME")
@JvmName("sendWithoutExpect1") // TODO: 2021/4/14 just for migration
suspend fun OutgoingPacket.sendWithoutExpect() = sendWithoutExpect(this)
@Suppress("INAPPLICABLE_JVM_NAME")
@JvmName("sendWithoutExpect1") // TODO: 2021/4/14 just for migration
suspend fun <R> OutgoingPacket.sendAndExpect(timeoutMillis: Long = 5000, retry: Int = 2): R = TODO()
@Suppress("INAPPLICABLE_JVM_NAME")
@JvmName("sendWithoutExpect1") // TODO: 2021/4/14 just for migration
suspend fun <R : Packet?> OutgoingPacketWithRespType<R>.sendAndExpect(
timeoutMillis: Long = 5000,
retry: Int = 2
): R = TODO()
/**
* Closes this handler gracefully and suspends the coroutine for its completion.
*/
suspend fun close()
fun close()
}
/**
@ -209,5 +232,7 @@ internal class SelectorNetworkHandler(
instance().sendAndExpect(packet, timeout, attempts)
override suspend fun sendWithoutExpect(packet: OutgoingPacket) = instance().sendWithoutExpect(packet)
override suspend fun close() = instance().close()
override fun close() {
selector.getResumedInstance()?.close()
}
}

View File

@ -39,7 +39,7 @@ internal class NettyNetworkHandler(
context: NetworkHandlerContext,
private val address: SocketAddress,
) : NetworkHandlerSupport(context) {
override suspend fun close() {
override fun close() {
super.close()
setState(StateClosed())
}
@ -88,7 +88,7 @@ internal class NettyNetworkHandler(
}
})
.connect(address).runBIO { await() }
// TODO: 2021/4/14 eventLoopGroup 移动到 bot, 并在 bot.close() 时关闭
// TODO: 2021/4/14 eventLoopGroup 关闭
return contextResult.await()
}
@ -177,6 +177,7 @@ internal class NettyNetworkHandler(
override fun initialState(): BaseStateImpl = StateInitialized()
}
// TODO: 2021/4/14 Add test for toReadPacket
private fun ByteBuf.toReadPacket(): ByteReadPacket {
val buf = this
return buildPacket {

View File

@ -81,7 +81,7 @@ internal abstract class NetworkHandlerSupport(
sendPacketImpl(packet)
}
override suspend fun close() {
override fun close() {
coroutineContext.job.cancel("NetworkHandler closed.")
}

View File

@ -87,10 +87,7 @@ internal object PacketCodec {
check(returnCode == 0) {
if (returnCode <= -10000) {
// https://github.com/mamoe/mirai/issues/470
throw KnownPacketFactories.PacketFactoryIllegalStateException(
returnCode,
"returnCode = $returnCode"
)
error("returnCode = $returnCode")
} else "returnCode = $returnCode"
}

View File

@ -0,0 +1,204 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
@file:Suppress("ClassName")
package net.mamoe.mirai.internal.network.net.protocol
/*
* 垃圾分类
*/
private object `handle packet1` {
// if (packet != null && (bot.logger.isEnabled || logger.isEnabled)) {
// when {
// packet is ParseErrorPacket -> {
// packet.direction.getLogger(bot).error(packet.error)
// }
// packet is Packet.NoLog -> {
// // nothing to do
// }
// packet is MessageEvent -> packet.logMessageReceived()
// packet is Event && packet !is Packet.NoEventLog -> bot.logger.verbose {
// "Event: $packet".replaceMagicCodes()
// }
// else -> logger.verbose { "Recv: $packet".replaceMagicCodes() }
// }
// }
}
private object `handle packet2` {
// if (packet is Event) {
// if ((packet as? BroadcastControllable)?.shouldBroadcast != false) {
// if (packet is BotEvent) {
// withContext(bot.coroutineContext[CoroutineExceptionHandler] ?: CoroutineExceptionHandler { _, t ->
// bot.logger.warning(
// """
// Event processing: An exception occurred but no CoroutineExceptionHandler found in coroutineContext of bot
// """.trimIndent(), t
// )
// }) {
// packet.broadcast()
// }
// } else {
// packet.broadcast()
// }
// }
//
// if (packet is CancellableEvent && packet.isCancelled) return
// }
}
private object `stat heartbeat` {
// private suspend fun doStatHeartbeat(): Throwable? {
// return retryCatching(2) {
// StatSvc.SimpleGet(bot.client)
// .sendAndExpect<StatSvc.SimpleGet.Response>(
// timeoutMillis = bot.configuration.heartbeatTimeoutMillis,
// retry = 2
// )
// return null
// }.exceptionOrNull()
// }
}
private object syncMessageSvc {
// private suspend fun syncMessageSvc() {
// logger.info { "Syncing friend message history..." }
// withTimeoutOrNull(30000) {
// launch(CoroutineName("Syncing friend message history")) { nextEvent<MessageSvcPbGetMsg.GetMsgSuccess> { it.bot == this@QQAndroidBotNetworkHandler.bot } }
// MessageSvcPbGetMsg(bot.client, MsgSvc.SyncFlag.START, null).sendAndExpect<Packet>()
//
// } ?: error("timeout syncing friend message history.")
// logger.info { "Syncing friend message history: Success." }
// }
}
private object `config push syncer` {
// @Suppress("FunctionName", "UNUSED_VARIABLE")
// private fun BotNetworkHandler.ConfigPushSyncer(): suspend CoroutineScope.() -> Unit = launch@{
// logger.info { "Awaiting ConfigPushSvc.PushReq." }
// when (val resp: ConfigPushSvc.PushReq.PushReqResponse? = nextEventOrNull(20_000)) {
// null -> {
// val hasSession = bot.bdhSyncer.hasSession
// kotlin.runCatching { bot.bdhSyncer.bdhSession.completeExceptionally(CancellationException("Timeout waiting for ConfigPushSvc.PushReq")) }
// if (!hasSession) {
// logger.warning { "Missing ConfigPushSvc.PushReq. Switching server..." }
// bot.launch { BotOfflineEvent.RequireReconnect(bot).broadcast() }
// } else {
// logger.warning { "Missing ConfigPushSvc.PushReq. Using the latest response. File uploading may be affected." }
// }
// }
// is ConfigPushSvc.PushReq.PushReqResponse.ConfigPush -> {
// logger.info { "ConfigPushSvc.PushReq: Config updated." }
// }
// is ConfigPushSvc.PushReq.PushReqResponse.ServerListPush -> {
// logger.info { "ConfigPushSvc.PushReq: Server updated." }
// // handled in ConfigPushSvc
// return@launch
// }
// }
// }
}
private object `network init` {
// suspend fun init(): Unit = coroutineScope {
// check(bot.isActive) { "bot is dead therefore network can't init." }
// check(this@QQAndroidBotNetworkHandler.isActive) { "network is dead therefore can't init." }
//
// contactUpdater.closeAllContacts(CancellationException("re-init"))
//
// if (!pendingEnabled) {
// pendingIncomingPackets = ConcurrentLinkedQueue()
// _pendingEnabled.value = true
// }
//
// val registerResp = registerClientOnline()
//
// this@QQAndroidBotNetworkHandler.launch(
// CoroutineName("Awaiting ConfigPushSvc.PushReq"),
// block = ConfigPushSyncer()
// )
//
// launch {
// syncMessageSvc()
// }
//
// launch {
// bot.otherClientsLock.withLock {
// updateOtherClientsList()
// }
// }
//
// contactUpdater.loadAll(registerResp.origin)
//
// bot.firstLoginSucceed = true
// postInitActions()
// }
}
private object `update other client list` {
//
// private suspend fun updateOtherClientsList() {
// val list = Mirai.getOnlineOtherClientsList(bot)
// bot.otherClients.delegate.clear()
// bot.otherClients.delegate.addAll(list.map { bot.createOtherClient(it) })
//
// if (bot.otherClients.isEmpty()) {
// bot.logger.info { "No OtherClient online." }
// } else {
// bot.logger.info { "Online OtherClients: " + bot.otherClients.joinToString { "${it.deviceName}(${it.platform?.name ?: "unknown platform"})" } }
// }
// }
}
private object `skey refresh` {
// suspend fun refreshKeys() {
// WtLogin15(bot.client).sendAndExpect()
// }
/*
val bot = (bot as QQAndroidBot)
if (bot.firstLoginSucceed && bot.client.wLoginSigInfoInitialized) {
launch {
while (isActive) {
bot.client.wLoginSigInfo.vKey.run {
//由过期时间最短的且不会被skey更换更新的vkey计算重新登录的时间
val delay = (expireTime - creationTime).seconds - 5.minutes
logger.info { "Scheduled refresh login session in ${delay.toHumanReadableString()}." }
delay(delay)
}
runCatching {
doFastLogin()
registerClientOnline()
}.onFailure {
logger.warning("Failed to refresh login session.", it)
}
}
}
launch {
while (isActive) {
bot.client.wLoginSigInfo.sKey.run {
val delay = (expireTime - creationTime).seconds - 5.minutes
logger.info { "Scheduled key refresh in ${delay.toHumanReadableString()}." }
delay(delay)
}
runCatching {
refreshKeys()
}.onFailure {
logger.error("Failed to refresh key.", it)
}
}
}
}
*/
}

View File

@ -17,7 +17,7 @@ import kotlinx.io.core.writeFully
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.QQAndroidClient
import net.mamoe.mirai.internal.network.handler.QQAndroidBotNetworkHandler
import net.mamoe.mirai.internal.network.net.NetworkHandler
import net.mamoe.mirai.internal.utils.io.encryptAndWrite
import net.mamoe.mirai.internal.utils.io.writeHex
import net.mamoe.mirai.internal.utils.io.writeIntLVPacket
@ -65,7 +65,7 @@ internal class IncomingPacket constructor(
}
internal suspend inline fun <E : Packet> OutgoingPacketWithRespType<E>.sendAndExpect(
network: QQAndroidBotNetworkHandler,
network: NetworkHandler,
timeoutMillis: Long = 5000,
retry: Int = 2
): E = network.run {
@ -75,7 +75,7 @@ internal suspend inline fun <E : Packet> OutgoingPacketWithRespType<E>.sendAndEx
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
@kotlin.internal.LowPriorityInOverloadResolution
internal suspend inline fun <E : Packet> OutgoingPacket.sendAndExpect(
network: QQAndroidBotNetworkHandler,
network: NetworkHandler,
timeoutMillis: Long = 5000,
retry: Int = 2
): E = network.run {

View File

@ -9,7 +9,7 @@
package net.mamoe.mirai.internal.network.protocol.packet
import kotlinx.io.core.*
import kotlinx.io.core.ByteReadPacket
import net.mamoe.mirai.event.Event
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.network.Packet
@ -26,10 +26,9 @@ import net.mamoe.mirai.internal.network.protocol.packet.login.Heartbeat
import net.mamoe.mirai.internal.network.protocol.packet.login.StatSvc
import net.mamoe.mirai.internal.network.protocol.packet.login.WtLogin
import net.mamoe.mirai.internal.network.protocol.packet.summarycard.SummaryCard
import net.mamoe.mirai.internal.network.tryDecryptOrNull
import net.mamoe.mirai.internal.utils.crypto.TEA
import net.mamoe.mirai.internal.utils.crypto.adjustToPublicKey
import net.mamoe.mirai.utils.*
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.MiraiLoggerWithSwitch
import net.mamoe.mirai.utils.withSwitch
internal sealed class PacketFactory<TPacket : Packet?> {
/**
@ -107,12 +106,7 @@ internal suspend inline fun <P : Packet?> IncomingPacketFactory<P>.decode(
bot: QQAndroidBot,
packet: ByteReadPacket,
sequenceId: Int
): P =
packet.decode(bot, sequenceId)
internal val DECRYPTER_16_ZERO = ByteArray(16)
internal typealias PacketConsumer<T> = suspend (packetFactory: PacketFactory<T>, packet: T, commandName: String, ssoSequenceId: Int) -> Unit
): P = packet.decode(bot, sequenceId)
/**
* 数据包相关的调试输出.
@ -185,263 +179,8 @@ internal object KnownPacketFactories {
// MessageSvcPushReaded 电脑阅读了别人的消息, 告知手机
// OnlinePush.PbC2CMsgSync 电脑发消息给别人, 同步给手机
@Suppress("MemberVisibilityCanBePrivate") // debugging use
fun findPacketFactory(commandName: String): PacketFactory<*>? {
return OutgoingFactories.firstOrNull { it.receivingCommandName == commandName }
?: IncomingFactories.firstOrNull { it.receivingCommandName == commandName }
}
class PacketFactoryIllegalStateException @JvmOverloads constructor(
val code: Int,
override val message: String? = null,
override val cause: Throwable? = null
) : RuntimeException()
// do not inline. Exceptions thrown will not be reported correctly
@Suppress("UNCHECKED_CAST")
suspend fun <T : Packet?> parseIncomingPacket(
bot: QQAndroidBot,
rawInput: ByteReadPacket,
consumer: PacketConsumer<T>
): Unit = with(rawInput) {
// login
val flag1 = readInt()
PacketLogger.verbose { "开始处理一个包" }
val flag2 = readByte().toInt()
val flag3 = readByte().toInt()
check(flag3 == 0) {
"Illegal flag3. Expected 0, whereas got $flag3. flag1=$flag1, flag2=$flag2. " +
"Remaining=${this.readBytes().toUHexString()}"
}
readString(readInt() - 4)// uinAccount
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
ByteArrayPool.useInstance(this.remaining.toInt()) { data ->
val size = this.readAvailable(data)
kotlin.runCatching {
when (flag2) {
2 -> TEA.decrypt(data, DECRYPTER_16_ZERO, size)
1 -> TEA.decrypt(data, bot.client.wLoginSigInfo.d2Key, size)
0 -> data
else -> error("")
}
}.getOrElse {
bot.client.tryDecryptOrNull(data, size) { it }
}?.toReadPacket()?.let { decryptedData ->
when (flag1) {
0x0A -> parseSsoFrame(bot, decryptedData)
0x0B -> parseSsoFrame(bot, decryptedData) // 这里可能是 uni?? 但测试时候发现结构跟 sso 一样.
else -> error("unknown flag1: ${flag1.toByte().toUHexString()}")
}
}?.let {
it as IncomingPacket<T>
if (it.packetFactory is IncomingPacketFactory<T> && it.packetFactory.canBeCached && bot.network.pendingEnabled) {
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
bot.network.pendingIncomingPackets?.add(it.also {
it.consumer = consumer
it.flag2 = flag2
PacketLogger.info { "Cached ${it.commandName} #${it.sequenceId}" }
}) ?: handleIncomingPacket(it, bot, flag2, consumer)
} else {
handleIncomingPacket(it, bot, flag2, consumer)
}
} ?: kotlin.run {
PacketLogger.error { "任何key都无法解密: ${data.take(size).toUHexString()}" }
return
}
}
}
internal suspend fun <T : Packet?> handleIncomingPacket(
it: IncomingPacket<T>,
bot: QQAndroidBot,
flag2: Int,
consumer: PacketConsumer<T>
) {
if (it.packetFactory == null) {
bot.network.logger.debug { "Received unknown commandName: ${it.commandName}" }
PacketLogger.warning { "找不到 PacketFactory" }
PacketLogger.verbose {
"传递给 PacketFactory 的数据 = ${
it.data.useBytes { data, length ->
data.toUHexString(
length = length
)
}
}"
}
return
}
PacketLogger.info { "Handle packet: ${it.commandName}" }
it.data.withUse {
when (flag2) {
0, 1 -> when (it.packetFactory) {
is OutgoingPacketFactory<*> -> consumer(
it.packetFactory as OutgoingPacketFactory<T>,
it.packetFactory.run { decode(bot, it.data) },
it.packetFactory.commandName,
it.sequenceId
)
is IncomingPacketFactory<*> -> consumer(
it.packetFactory as IncomingPacketFactory<T>,
it.packetFactory.run { decode(bot, it.data, it.sequenceId) },
it.packetFactory.receivingCommandName,
it.sequenceId
)
}
2 -> it.data.parseOicqResponse(
bot,
it.packetFactory as OutgoingPacketFactory<T>,
it.sequenceId,
consumer
)
else -> error(
"unknown flag2: $flag2. Body to be parsed for inner packet=${it.data.readBytes().toUHexString()}"
)
}
}
}
class IncomingPacket<T : Packet?>(
val packetFactory: PacketFactory<T>?,
val sequenceId: Int,
val data: ByteReadPacket,
val commandName: String
) {
var flag2: Int = -1
lateinit var consumer: PacketConsumer<T>
}
private fun parseSsoFrame(bot: QQAndroidBot, input: ByteReadPacket): IncomingPacket<*> {
val commandName: String
val ssoSequenceId: Int
val dataCompressed: Int
input.readPacketExact(input.readInt() - 4).withUse {
ssoSequenceId = readInt()
PacketLogger.verbose { "sequenceId = $ssoSequenceId" }
val returnCode = readInt()
check(returnCode == 0) {
if (returnCode <= -10000) {
// https://github.com/mamoe/mirai/issues/470
throw PacketFactoryIllegalStateException(returnCode, "returnCode = $returnCode")
} else "returnCode = $returnCode"
}
if (PacketLogger.isEnabled) {
val extraData = readBytes(readInt() - 4)
PacketLogger.verbose { "(sso/inner)extraData = ${extraData.toUHexString()}" }
} else {
discardExact(readInt() - 4)
}
commandName = readString(readInt() - 4)
bot.client.outgoingPacketSessionId = readBytes(readInt() - 4)
dataCompressed = readInt()
}
val packet = when (dataCompressed) {
0 -> {
val size = input.readInt().toLong() and 0xffffffff
if (size == input.remaining || size == input.remaining + 4) {
input
} else {
buildPacket {
writeInt(size.toInt())
writePacket(input)
}
}
}
1 -> {
input.discardExact(4)
input.useBytes { data, length ->
data.unzip(0, length).let {
val size = it.toInt()
if (size == it.size || size == it.size + 4) {
it.toReadPacket(offset = 4)
} else {
it.toReadPacket()
}
}
}
}
8 -> input
else -> error("unknown dataCompressed flag: $dataCompressed")
}
// body
val packetFactory = findPacketFactory(commandName)
return IncomingPacket(packetFactory, ssoSequenceId, packet, commandName)
}
private suspend fun <T : Packet?> ByteReadPacket.parseOicqResponse(
bot: QQAndroidBot,
packetFactory: OutgoingPacketFactory<T>,
ssoSequenceId: Int,
consumer: PacketConsumer<T>
) {
@Suppress("DuplicatedCode")
check(readByte().toInt() == 2)
this.discardExact(2)
this.discardExact(2)
this.readUShort()
this.readShort()
this.readUInt().toLong()
val encryptionMethod = this.readUShort().toInt()
this.discardExact(1)
val packet = when (encryptionMethod) {
4 -> {
var data =
TEA.decrypt(this, bot.client.ecdh.keyPair.initialShareKey, length = (this.remaining - 1).toInt())
val peerShareKey =
bot.client.ecdh.calculateShareKeyByPeerPublicKey(readUShortLVByteArray().adjustToPublicKey())
data = TEA.decrypt(data, peerShareKey)
packetFactory.decode(bot, data)
}
3 -> {
// session
val data = TEA.decrypt(
this,
bot.client.wLoginSigInfo.wtSessionTicketKey,
length = (this.remaining - 1).toInt()
)
packetFactory.decode(bot, data)
}
0 -> {
val data = if (bot.client.loginState == 0) {
val size = (this.remaining - 1).toInt()
val byteArrayBuffer = this.readBytes(size)
runCatching {
TEA.decrypt(byteArrayBuffer, bot.client.ecdh.keyPair.initialShareKey, size)
}.getOrElse {
TEA.decrypt(byteArrayBuffer, bot.client.randomKey, size)
}.toReadPacket()
} else {
TEA.decrypt(this, bot.client.randomKey, 0, (this.remaining - 1).toInt())
}
packetFactory.decode(bot, data)
}
else -> error("Illegal encryption method. expected 0 or 4, got $encryptionMethod")
}
consumer(packetFactory, packet, packetFactory.commandName, ssoSequenceId)
}
}

View File

@ -131,7 +131,7 @@ internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Re
if (resp.result != 0) {
bot.network.logger
.warning { "MessageSvcPushNotify: result != 0, result = ${resp.result}, errorMsg=${resp.errmsg}" }
bot.network.launch(CoroutineName("MessageSvcPushNotify.retry")) {
bot.launch(CoroutineName("MessageSvcPushNotify.retry")) {
delay(500 + Random.nextLong(0, 1000))
bot.network.run {
MessageSvcPbGetMsg(bot.client, syncCookie = null).sendWithoutExpect()