mirror of
https://github.com/mamoe/mirai.git
synced 2025-01-20 17:54:52 +08:00
Improve logging
This commit is contained in:
parent
5fa54114c1
commit
a7a4879d05
@ -15,7 +15,8 @@ import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import net.mamoe.mirai.event.events.BotEvent
|
||||
import net.mamoe.mirai.internal.event.broadcastInternal
|
||||
import net.mamoe.mirai.event.events.MessageEvent
|
||||
import net.mamoe.mirai.internal.event.callAndRemoveIfRequired
|
||||
import net.mamoe.mirai.internal.network.Packet
|
||||
import net.mamoe.mirai.utils.JavaFriendlyAPI
|
||||
import net.mamoe.mirai.utils.MiraiExperimentalApi
|
||||
@ -143,26 +144,32 @@ public interface CancellableEvent : Event {
|
||||
* @see __broadcastJava Java 使用
|
||||
*/
|
||||
@JvmSynthetic
|
||||
public suspend fun <E : Event> E.broadcast(): E = apply {
|
||||
public suspend fun <E : Event> E.broadcast(): E {
|
||||
check(this is AbstractEvent) {
|
||||
"Events must extend AbstractEvent"
|
||||
}
|
||||
|
||||
if (this is BroadcastControllable && !this.shouldBroadcast) {
|
||||
return@apply
|
||||
return this
|
||||
}
|
||||
this.broadCastLock.withLock {
|
||||
this._intercepted = false
|
||||
if (EventDisabled) return@withLock
|
||||
if (this is Packet.NoEventLog) return@withLock
|
||||
if (this is Packet) return@withLock // all [Packet]s are logged in [LoggingPacketHandler]
|
||||
if (this is Packet.NoLog) return@withLock
|
||||
if (this is MessageEvent) return@withLock // specially handled in [LoggingPacketHandlerAdapter]
|
||||
// if (this is Packet) return@withLock // all [Packet]s are logged in [LoggingPacketHandlerAdapter]
|
||||
|
||||
if (this is BotEvent) {
|
||||
this.bot.logger.verbose { "Event: $this" }
|
||||
} else {
|
||||
MiraiLogger.TopLevel.verbose { "Event: $this" }
|
||||
}
|
||||
this.broadcastInternal() // inline, no extra cost
|
||||
|
||||
callAndRemoveIfRequired(this)
|
||||
}
|
||||
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -97,66 +97,53 @@ internal object GlobalEventListeners {
|
||||
}
|
||||
|
||||
|
||||
// inline: NO extra Continuation
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
internal suspend inline fun AbstractEvent.broadcastInternal() {
|
||||
callAndRemoveIfRequired(this@broadcastInternal)
|
||||
}
|
||||
|
||||
internal inline fun <E, T : Iterable<E>> T.forEach0(block: T.(E) -> Unit) {
|
||||
forEach { block(it) }
|
||||
}
|
||||
|
||||
@Suppress("DuplicatedCode")
|
||||
internal suspend inline fun <E : AbstractEvent> callAndRemoveIfRequired(
|
||||
event: E
|
||||
) {
|
||||
internal suspend fun <E : AbstractEvent> callAndRemoveIfRequired(event: E) {
|
||||
for (p in EventPriority.prioritiesExcludedMonitor) {
|
||||
GlobalEventListeners[p].forEach0 { registeredRegistry ->
|
||||
if (event.isIntercepted) {
|
||||
return
|
||||
}
|
||||
if (!registeredRegistry.type.isInstance(event)) return@forEach0
|
||||
val listener = registeredRegistry.listener
|
||||
when (listener.concurrencyKind) {
|
||||
ConcurrencyKind.LOCKED -> {
|
||||
(listener as Handler).lock!!.withLock {
|
||||
if (listener.onEvent(event) == ListeningStatus.STOPPED) {
|
||||
remove(registeredRegistry)
|
||||
}
|
||||
}
|
||||
}
|
||||
ConcurrencyKind.CONCURRENT -> {
|
||||
if (listener.onEvent(event) == ListeningStatus.STOPPED) {
|
||||
remove(registeredRegistry)
|
||||
}
|
||||
}
|
||||
val container = GlobalEventListeners[p]
|
||||
for (registry in container) {
|
||||
if (event.isIntercepted) return
|
||||
if (!registry.type.isInstance(event)) continue
|
||||
val listener = registry.listener
|
||||
process(container, registry, listener, event)
|
||||
}
|
||||
}
|
||||
|
||||
if (event.isIntercepted) return
|
||||
val container = GlobalEventListeners[EventPriority.MONITOR]
|
||||
when (container.size) {
|
||||
0 -> return
|
||||
1 -> {
|
||||
val registry = container.firstOrNull() ?: return
|
||||
if (!registry.type.isInstance(event)) return
|
||||
process(container, registry, registry.listener, event)
|
||||
}
|
||||
else -> supervisorScope {
|
||||
for (registry in GlobalEventListeners[EventPriority.MONITOR]) {
|
||||
if (!registry.type.isInstance(event)) continue
|
||||
launch { process(container, registry, registry.listener, event) }
|
||||
}
|
||||
}
|
||||
}
|
||||
coroutineScope {
|
||||
GlobalEventListeners[EventPriority.MONITOR].forEach0 { registeredRegistry ->
|
||||
if (event.isIntercepted) {
|
||||
return@coroutineScope
|
||||
}
|
||||
if (!registeredRegistry.type.isInstance(event)) return@forEach0
|
||||
val listener = registeredRegistry.listener
|
||||
launch {
|
||||
when (listener.concurrencyKind) {
|
||||
ConcurrencyKind.LOCKED -> {
|
||||
(listener as Handler).lock!!.withLock {
|
||||
if (listener.onEvent(event) == ListeningStatus.STOPPED) {
|
||||
remove(registeredRegistry)
|
||||
}
|
||||
}
|
||||
}
|
||||
ConcurrencyKind.CONCURRENT -> {
|
||||
if (listener.onEvent(event) == ListeningStatus.STOPPED) {
|
||||
remove(registeredRegistry)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun <E : AbstractEvent> process(
|
||||
container: ConcurrentLinkedQueue<ListenerRegistry>,
|
||||
registry: ListenerRegistry,
|
||||
listener: Listener<Event>,
|
||||
event: E,
|
||||
) {
|
||||
when (listener.concurrencyKind) {
|
||||
ConcurrencyKind.LOCKED -> {
|
||||
(listener as Handler).lock!!.withLock {
|
||||
if (listener.onEvent(event) == ListeningStatus.STOPPED) {
|
||||
container.remove(registry)
|
||||
}
|
||||
}
|
||||
}
|
||||
ConcurrencyKind.CONCURRENT -> {
|
||||
if (listener.onEvent(event) == ListeningStatus.STOPPED) {
|
||||
container.remove(registry)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -27,7 +27,7 @@ import java.util.*
|
||||
*
|
||||
* 示例:
|
||||
* ```log
|
||||
* 2020-05-21 19:51:09 V/Bot 1994701021: Send: OidbSvc.0x88d_7
|
||||
* 2020-05-21 19:51:09 V/Bot 123456789: Send: OidbSvc.0x88d_7
|
||||
* ```
|
||||
*
|
||||
* 日期时间格式为 `yyyy-MM-dd HH:mm:ss`,
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2019-2020 Mamoe Technologies and contributors.
|
||||
* Copyright 2019-2021 Mamoe Technologies and contributors.
|
||||
*
|
||||
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
|
||||
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
|
||||
@ -234,7 +234,7 @@ public inline fun MiraiLogger.error(message: () -> String?, e: Throwable?) {
|
||||
*
|
||||
* 示例:
|
||||
* ```log
|
||||
* 2020-05-21 19:51:09 V/Bot 1994701021: Send: OidbSvc.0x88d_7
|
||||
* 2020-05-21 19:51:09 V/Bot 123456789: Send: OidbSvc.0x88d_7
|
||||
* ```
|
||||
*
|
||||
* 日期时间格式为 `yyyy-MM-dd HH:mm:ss`,
|
||||
|
@ -27,7 +27,7 @@ import java.util.*
|
||||
*
|
||||
* 示例:
|
||||
* ```log
|
||||
* 2020-05-21 19:51:09 V/Bot 1994701021: Send: OidbSvc.0x88d_7
|
||||
* 2020-05-21 19:51:09 V/Bot 123456789: Send: OidbSvc.0x88d_7
|
||||
* ```
|
||||
*
|
||||
* 日期时间格式为 `yyyy-MM-dd HH:mm:ss`,
|
||||
|
@ -163,9 +163,10 @@ internal open class QQAndroidBot constructor(
|
||||
set(ContactUpdater, ContactUpdaterImpl(bot, components, networkLogger))
|
||||
set(BdhSessionSyncer, BdhSessionSyncerImpl(configuration, networkLogger, components))
|
||||
set(ServerList, ServerListImpl())
|
||||
set(PacketLoggingStrategy, PacketLoggingStrategyImpl(bot))
|
||||
set(
|
||||
PacketHandler, PacketHandlerChain(
|
||||
LoggingPacketHandler(bot, networkLogger),
|
||||
LoggingPacketHandlerAdapter(networkLogger, get(PacketLoggingStrategy)),
|
||||
EventBroadcasterPacketHandler(networkLogger),
|
||||
CallPacketFactoryPacketHandler(bot)
|
||||
)
|
||||
|
@ -9,7 +9,7 @@
|
||||
|
||||
package net.mamoe.mirai.internal.network
|
||||
|
||||
import net.mamoe.mirai.internal.QQAndroidBot
|
||||
import net.mamoe.mirai.internal.AbstractBot
|
||||
import net.mamoe.mirai.internal.network.handler.logger
|
||||
import net.mamoe.mirai.utils.MiraiLogger
|
||||
|
||||
@ -52,12 +52,12 @@ internal class ParseErrorPacket(
|
||||
) : Packet, Packet.NoLog {
|
||||
enum class Direction {
|
||||
TO_BOT_LOGGER {
|
||||
override fun getLogger(bot: QQAndroidBot): MiraiLogger = bot.logger
|
||||
override fun getLogger(bot: AbstractBot): MiraiLogger = bot.logger
|
||||
},
|
||||
TO_NETWORK_LOGGER {
|
||||
override fun getLogger(bot: QQAndroidBot): MiraiLogger = bot.network.logger
|
||||
override fun getLogger(bot: AbstractBot): MiraiLogger = bot.network.logger
|
||||
};
|
||||
|
||||
abstract fun getLogger(bot: QQAndroidBot): MiraiLogger
|
||||
abstract fun getLogger(bot: AbstractBot): MiraiLogger
|
||||
}
|
||||
}
|
||||
|
@ -13,17 +13,13 @@ import net.mamoe.mirai.event.BroadcastControllable
|
||||
import net.mamoe.mirai.event.CancellableEvent
|
||||
import net.mamoe.mirai.event.Event
|
||||
import net.mamoe.mirai.event.broadcast
|
||||
import net.mamoe.mirai.event.events.MessageEvent
|
||||
import net.mamoe.mirai.internal.QQAndroidBot
|
||||
import net.mamoe.mirai.internal.contact.logMessageReceived
|
||||
import net.mamoe.mirai.internal.contact.replaceMagicCodes
|
||||
import net.mamoe.mirai.internal.network.MultiPacket
|
||||
import net.mamoe.mirai.internal.network.Packet
|
||||
import net.mamoe.mirai.internal.network.ParseErrorPacket
|
||||
import net.mamoe.mirai.internal.network.component.ComponentKey
|
||||
import net.mamoe.mirai.internal.network.protocol.packet.*
|
||||
import net.mamoe.mirai.utils.MiraiLogger
|
||||
import net.mamoe.mirai.utils.cast
|
||||
import net.mamoe.mirai.utils.verbose
|
||||
|
||||
internal interface PacketHandler {
|
||||
suspend fun handlePacket(incomingPacket: IncomingPacket)
|
||||
@ -53,29 +49,15 @@ internal data class ExceptionInPacketHandlerException(
|
||||
override val cause: Throwable,
|
||||
) : IllegalStateException("Exception in PacketHandler '$packetHandler'.")
|
||||
|
||||
internal class LoggingPacketHandler(
|
||||
private val bot: QQAndroidBot,
|
||||
internal class LoggingPacketHandlerAdapter(
|
||||
private val logger: MiraiLogger,
|
||||
private val strategy: PacketLoggingStrategy,
|
||||
) : PacketHandler {
|
||||
override suspend fun handlePacket(incomingPacket: IncomingPacket) {
|
||||
val packet = incomingPacket.data ?: return
|
||||
if (!bot.logger.isEnabled && !logger.isEnabled) return
|
||||
when {
|
||||
packet is ParseErrorPacket -> {
|
||||
packet.direction.getLogger(bot).error(packet.error)
|
||||
}
|
||||
packet is MessageEvent -> packet.logMessageReceived()
|
||||
packet is Packet.NoLog -> {
|
||||
// nothing to do
|
||||
}
|
||||
packet is Event && packet !is Packet.NoEventLog -> bot.logger.verbose {
|
||||
"Event: $packet".replaceMagicCodes()
|
||||
}
|
||||
else -> logger.verbose { "Recv: ${incomingPacket.commandName} ${incomingPacket.data}".replaceMagicCodes() }
|
||||
}
|
||||
strategy.logReceived(logger, incomingPacket)
|
||||
}
|
||||
|
||||
override fun toString(): String = "LoggingPacketHandler"
|
||||
override fun toString(): String = "LoggingPacketHandlerAdapter"
|
||||
}
|
||||
|
||||
internal class EventBroadcasterPacketHandler(
|
||||
@ -83,7 +65,16 @@ internal class EventBroadcasterPacketHandler(
|
||||
) : PacketHandler {
|
||||
|
||||
override suspend fun handlePacket(incomingPacket: IncomingPacket) {
|
||||
val packet = incomingPacket.data ?: return
|
||||
val data = incomingPacket.data ?: return
|
||||
impl(data)
|
||||
}
|
||||
|
||||
private suspend fun impl(packet: Packet) {
|
||||
if (packet is MultiPacket<*>) {
|
||||
for (p in packet) {
|
||||
impl(p)
|
||||
}
|
||||
}
|
||||
when {
|
||||
packet is CancellableEvent && packet.isCancelled -> return
|
||||
packet is BroadcastControllable && !packet.shouldBroadcast -> return
|
||||
@ -105,7 +96,7 @@ internal class EventBroadcasterPacketHandler(
|
||||
return qualified.substringAfter("net.mamoe.mirai.event.events.")
|
||||
}
|
||||
|
||||
override fun toString(): String = "LoggingPacketHandler"
|
||||
override fun toString(): String = "EventBroadcasterPacketHandler"
|
||||
}
|
||||
|
||||
internal class CallPacketFactoryPacketHandler(
|
||||
|
@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Copyright 2019-2021 Mamoe Technologies and contributors.
|
||||
*
|
||||
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
|
||||
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
|
||||
*
|
||||
* https://github.com/mamoe/mirai/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package net.mamoe.mirai.internal.network.components
|
||||
|
||||
import net.mamoe.mirai.event.Event
|
||||
import net.mamoe.mirai.event.events.MessageEvent
|
||||
import net.mamoe.mirai.internal.AbstractBot
|
||||
import net.mamoe.mirai.internal.contact.logMessageReceived
|
||||
import net.mamoe.mirai.internal.contact.replaceMagicCodes
|
||||
import net.mamoe.mirai.internal.network.Packet
|
||||
import net.mamoe.mirai.internal.network.ParseErrorPacket
|
||||
import net.mamoe.mirai.internal.network.component.ComponentKey
|
||||
import net.mamoe.mirai.internal.network.protocol.packet.IncomingPacket
|
||||
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
|
||||
import net.mamoe.mirai.utils.MiraiLogger
|
||||
import net.mamoe.mirai.utils.systemProp
|
||||
import net.mamoe.mirai.utils.verbose
|
||||
|
||||
/**
|
||||
* Implementation must be fast and non-blocking, throwing no exception.
|
||||
*/
|
||||
internal interface PacketLoggingStrategy {
|
||||
fun logSent(logger: MiraiLogger, outgoingPacket: OutgoingPacket)
|
||||
fun logReceived(logger: MiraiLogger, incomingPacket: IncomingPacket)
|
||||
|
||||
companion object : ComponentKey<PacketLoggingStrategy>
|
||||
}
|
||||
|
||||
internal class PacketLoggingStrategyImpl(
|
||||
private val bot: AbstractBot,
|
||||
private val blacklist: Set<String> = DEFAULT_BLACKLIST,
|
||||
) : PacketLoggingStrategy {
|
||||
override fun logSent(logger: MiraiLogger, outgoingPacket: OutgoingPacket) {
|
||||
if (outgoingPacket.commandName in blacklist) return
|
||||
logger.verbose { "Send: ${outgoingPacket.commandName}" }
|
||||
}
|
||||
|
||||
override fun logReceived(logger: MiraiLogger, incomingPacket: IncomingPacket) {
|
||||
val packet = incomingPacket.data ?: return
|
||||
if (!bot.logger.isEnabled && !logger.isEnabled) return
|
||||
when {
|
||||
packet is ParseErrorPacket -> {
|
||||
packet.direction.getLogger(bot).error(packet.error)
|
||||
}
|
||||
packet is MessageEvent -> packet.logMessageReceived()
|
||||
packet is Packet.NoLog -> {
|
||||
// nothing to do
|
||||
}
|
||||
packet is Event && packet !is Packet.NoEventLog -> bot.logger.verbose {
|
||||
"Event: $packet".replaceMagicCodes()
|
||||
}
|
||||
else -> {
|
||||
if (SHOW_PACKET_DETAILS) {
|
||||
logger.verbose { "Recv: ${incomingPacket.commandName} ${incomingPacket.data}".replaceMagicCodes() }
|
||||
} else {
|
||||
logger.verbose { "Recv: ${incomingPacket.commandName}".replaceMagicCodes() }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
@JvmField
|
||||
val DEFAULT_BLACKLIST = setOf(
|
||||
"MessageSvc.PbDeleteMsg",
|
||||
"MessageSvc.PbGetMsg", // they are too verbose.
|
||||
"OnlinePush.RespPush"
|
||||
)
|
||||
|
||||
@JvmField
|
||||
var SHOW_PACKET_DETAILS = systemProp("mirai.debug.network.show.packet.details", false)
|
||||
}
|
||||
}
|
@ -9,6 +9,7 @@
|
||||
|
||||
package net.mamoe.mirai.internal.network.handler
|
||||
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.selects.SelectClause1
|
||||
import net.mamoe.mirai.Bot
|
||||
import net.mamoe.mirai.internal.network.Packet
|
||||
@ -30,7 +31,7 @@ import net.mamoe.mirai.utils.MiraiLogger
|
||||
* @see NetworkHandlerSupport
|
||||
* @see NetworkHandlerFactory
|
||||
*/
|
||||
internal interface NetworkHandler {
|
||||
internal interface NetworkHandler : CoroutineScope {
|
||||
val context: NetworkHandlerContext
|
||||
|
||||
fun isOk() = state == State.OK
|
||||
|
@ -14,6 +14,7 @@ import kotlinx.coroutines.selects.SelectClause1
|
||||
import net.mamoe.mirai.internal.network.Packet
|
||||
import net.mamoe.mirai.internal.network.components.PacketCodec
|
||||
import net.mamoe.mirai.internal.network.components.PacketHandler
|
||||
import net.mamoe.mirai.internal.network.components.PacketLoggingStrategy
|
||||
import net.mamoe.mirai.internal.network.components.RawIncomingPacket
|
||||
import net.mamoe.mirai.internal.network.handler.state.StateObserver
|
||||
import net.mamoe.mirai.internal.network.protocol.packet.IncomingPacket
|
||||
@ -69,7 +70,7 @@ internal abstract class NetworkHandlerSupport(
|
||||
val listener = PacketListener(packet.commandName, packet.sequenceId)
|
||||
var exception: Throwable? = null
|
||||
repeat(attempts.coerceAtLeast(1)) {
|
||||
logger.verbose { "Send: ${packet.commandName}" }
|
||||
context[PacketLoggingStrategy].logSent(logger, packet)
|
||||
try {
|
||||
packetListeners.add(listener)
|
||||
sendPacketImpl(packet)
|
||||
@ -92,7 +93,7 @@ internal abstract class NetworkHandlerSupport(
|
||||
}
|
||||
|
||||
final override suspend fun sendWithoutExpect(packet: OutgoingPacket) {
|
||||
logger.verbose { "Send: ${packet.commandName}" }
|
||||
context[PacketLoggingStrategy].logSent(logger, packet)
|
||||
sendPacketImpl(packet)
|
||||
}
|
||||
|
||||
@ -102,7 +103,7 @@ internal abstract class NetworkHandlerSupport(
|
||||
// } else {
|
||||
// logger.info { "NetworkHandler '$this' closed: $cause" }
|
||||
// }
|
||||
coroutineContext.job.cancel("NetworkHandler closed.", cause)
|
||||
coroutineContext.job.cancel("NetworkHandler closed", cause)
|
||||
}
|
||||
|
||||
protected val packetLogger: MiraiLogger by lazy {
|
||||
|
@ -18,6 +18,7 @@ import net.mamoe.mirai.internal.network.handler.NetworkHandler
|
||||
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
|
||||
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext
|
||||
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
|
||||
/**
|
||||
* A proxy to [NetworkHandler] that delegates calls to instance returned by [NetworkHandlerSelector.awaitResumeInstance].
|
||||
@ -52,6 +53,9 @@ internal class SelectorNetworkHandler(
|
||||
selector.getResumedInstance()?.close(cause)
|
||||
}
|
||||
|
||||
override val coroutineContext: CoroutineContext
|
||||
get() = selector.getResumedInstance()?.coroutineContext ?: scope.coroutineContext // merely use fallback
|
||||
|
||||
override fun toString(): String = "SelectorNetworkHandler(currentInstance=${selector.getResumedInstance()})"
|
||||
}
|
||||
|
||||
|
@ -81,7 +81,7 @@ internal open class NettyNetworkHandler(
|
||||
|
||||
private inner class OutgoingPacketEncoder : MessageToByteEncoder<OutgoingPacket>(OutgoingPacket::class.java) {
|
||||
override fun encode(ctx: ChannelHandlerContext, msg: OutgoingPacket, out: ByteBuf) {
|
||||
PacketCodec.PacketLogger.debug { "encode: $msg" }
|
||||
packetLogger.debug { "encode: $msg" }
|
||||
out.writeBytes(msg.delegate)
|
||||
}
|
||||
}
|
||||
@ -261,19 +261,29 @@ internal open class NettyNetworkHandler(
|
||||
connection.writeAndFlush(packet)
|
||||
}
|
||||
|
||||
private val configPush = this@NettyNetworkHandler.launch(CoroutineName("ConfigPush sync")) {
|
||||
try {
|
||||
context[ConfigPushProcessor].syncConfigPush(this@NettyNetworkHandler)
|
||||
} catch (e: ConfigPushProcessor.RequireReconnectException) {
|
||||
setState { StateConnecting(ExceptionCollector(e), false) }
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun resumeConnection0() {
|
||||
(coroutineContext.job as CompletableJob).run {
|
||||
complete()
|
||||
join()
|
||||
}
|
||||
setState { StateOK(connection) }
|
||||
joinCompleted(configPush) // throw exception
|
||||
setState { StateOK(connection, configPush) }
|
||||
} // noop
|
||||
|
||||
override fun toString(): String = "StateLoading"
|
||||
}
|
||||
|
||||
protected inner class StateOK(
|
||||
private val connection: NettyChannel
|
||||
private val connection: NettyChannel,
|
||||
private val configPush: Job,
|
||||
) : NettyState(State.OK) {
|
||||
init {
|
||||
coroutineContext.job.invokeOnCompletion {
|
||||
@ -307,14 +317,6 @@ internal open class NettyNetworkHandler(
|
||||
}
|
||||
}
|
||||
|
||||
private val configPush = launch(CoroutineName("ConfigPush sync")) {
|
||||
try {
|
||||
context[ConfigPushProcessor].syncConfigPush(this@NettyNetworkHandler)
|
||||
} catch (e: ConfigPushProcessor.RequireReconnectException) {
|
||||
setState { StateClosed(e) }
|
||||
}
|
||||
}
|
||||
|
||||
// we can also move them as observers if needed.
|
||||
|
||||
private val keyRefresh = launch(CoroutineName("Key refresh")) {
|
||||
@ -332,10 +334,6 @@ internal open class NettyNetworkHandler(
|
||||
joinCompleted(keyRefresh)
|
||||
} // noop
|
||||
|
||||
private suspend inline fun joinCompleted(job: Job) {
|
||||
if (job.isCompleted) job.join()
|
||||
}
|
||||
|
||||
override fun toString(): String = "StateOK"
|
||||
}
|
||||
|
||||
|
@ -14,6 +14,7 @@ import io.netty.buffer.ByteBufInputStream
|
||||
import io.netty.channel.ChannelFuture
|
||||
import kotlinx.coroutines.CoroutineExceptionHandler
|
||||
import kotlinx.coroutines.CoroutineName
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.io.core.ByteReadPacket
|
||||
import net.mamoe.mirai.utils.MiraiLogger
|
||||
import net.mamoe.mirai.utils.SimpleLogger
|
||||
@ -55,3 +56,8 @@ internal fun MiraiLogger.asCoroutineExceptionHandler(
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
internal suspend inline fun joinCompleted(job: Job) {
|
||||
if (job.isCompleted) job.join()
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ import kotlinx.coroutines.flow.*
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.io.core.*
|
||||
import net.mamoe.mirai.Bot
|
||||
import net.mamoe.mirai.Mirai
|
||||
import net.mamoe.mirai.contact.Group
|
||||
import net.mamoe.mirai.contact.MemberPermission
|
||||
@ -55,7 +56,10 @@ import net.mamoe.mirai.message.data.MessageSourceKind.STRANGER
|
||||
import net.mamoe.mirai.message.data.MessageSourceKind.TEMP
|
||||
import net.mamoe.mirai.message.data.PlainText
|
||||
import net.mamoe.mirai.message.data.buildMessageChain
|
||||
import net.mamoe.mirai.utils.*
|
||||
import net.mamoe.mirai.utils.cast
|
||||
import net.mamoe.mirai.utils.debug
|
||||
import net.mamoe.mirai.utils.read
|
||||
import net.mamoe.mirai.utils.toUHexString
|
||||
import kotlin.random.Random
|
||||
|
||||
|
||||
@ -93,12 +97,13 @@ internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Re
|
||||
)
|
||||
}
|
||||
|
||||
open class GetMsgSuccess(delegate: List<Packet>, syncCookie: ByteArray?, val bot: QQAndroidBot) : Response(
|
||||
open class GetMsgSuccess(delegate: List<Packet>, syncCookie: ByteArray?, bot: QQAndroidBot) : Response(
|
||||
MsgSvc.SyncFlag.STOP, delegate,
|
||||
syncCookie
|
||||
syncCookie,
|
||||
bot
|
||||
), Event,
|
||||
Packet.NoLog {
|
||||
override fun toString(): String = "MessageSvcPbGetMsg.GetMsgSuccess(messages=<Iterable>))"
|
||||
override fun toString(): String = "MessageSvcPbGetMsg.GetMsgSuccess"
|
||||
}
|
||||
|
||||
/**
|
||||
@ -107,15 +112,16 @@ internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Re
|
||||
open class Response(
|
||||
internal val syncFlagFromServer: MsgSvc.SyncFlag,
|
||||
delegate: List<Packet>,
|
||||
val syncCookie: ByteArray?
|
||||
val syncCookie: ByteArray?, override val bot: Bot
|
||||
) :
|
||||
AbstractEvent(),
|
||||
MultiPacket<Packet>,
|
||||
Iterable<Packet> by (delegate),
|
||||
Packet.NoLog {
|
||||
Packet.NoLog,
|
||||
BotEvent {
|
||||
|
||||
override fun toString(): String =
|
||||
"MessageSvcPbGetMsg.Response(syncFlagFromServer=$syncFlagFromServer, messages=<Iterable>))"
|
||||
"MessageSvcPbGetMsg.Response(flag=$syncFlagFromServer)"
|
||||
}
|
||||
|
||||
class EmptyResponse(
|
||||
@ -128,9 +134,12 @@ internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Re
|
||||
val resp = readProtoBuf(MsgSvc.PbGetMsgResp.serializer())
|
||||
|
||||
if (resp.result != 0) {
|
||||
bot.network.logger
|
||||
.warning { "MessageSvcPushNotify: result != 0, result = ${resp.result}, errorMsg=${resp.errmsg}" }
|
||||
bot.launch(CoroutineName("MessageSvcPushNotify.retry")) {
|
||||
// this is normally recoverable, no need to log
|
||||
|
||||
|
||||
// bot.network.logger
|
||||
// .warning { "MessageSvcPushNotify: result != 0, result = ${resp.result}, errorMsg=${resp.errmsg}" }
|
||||
bot.network.launch(CoroutineName("MessageSvcPushNotify.retry")) {
|
||||
delay(500 + Random.nextLong(0, 1000))
|
||||
bot.network.run {
|
||||
MessageSvcPbGetMsg(bot.client, syncCookie = null).sendWithoutExpect()
|
||||
@ -188,7 +197,7 @@ internal object MessageSvcPbGetMsg : OutgoingPacketFactory<MessageSvcPbGetMsg.Re
|
||||
if (resp.syncFlag == MsgSvc.SyncFlag.STOP) {
|
||||
return GetMsgSuccess(list, resp.syncCookie, bot)
|
||||
}
|
||||
return Response(resp.syncFlag, list, resp.syncCookie)
|
||||
return Response(resp.syncFlag, list, resp.syncCookie, bot)
|
||||
}
|
||||
|
||||
override suspend fun QQAndroidBot.handle(packet: Response) {
|
||||
|
@ -110,7 +110,8 @@ internal abstract class AbstractRealNetworkHandlerTest<H : NetworkHandler> : Abs
|
||||
set(ContactUpdater, ContactUpdaterImpl(bot, components, networkLogger))
|
||||
set(BdhSessionSyncer, BdhSessionSyncerImpl(configuration, networkLogger, components))
|
||||
set(ServerList, ServerListImpl())
|
||||
set(PacketHandler, LoggingPacketHandler(bot, networkLogger))
|
||||
set(PacketLoggingStrategy, PacketLoggingStrategyImpl(bot))
|
||||
set(PacketHandler, LoggingPacketHandlerAdapter(networkLogger, get(PacketLoggingStrategy)))
|
||||
set(PacketCodec, PacketCodecImpl())
|
||||
set(OtherClientUpdater, OtherClientUpdaterImpl(bot, components, bot.logger))
|
||||
set(ConfigPushSyncer, ConfigPushSyncerImpl())
|
||||
|
Loading…
Reference in New Issue
Block a user