States and connection impl

This commit is contained in:
Him188 2021-04-17 02:07:37 +08:00
parent 1fef89cf1c
commit 70551167ba
7 changed files with 98 additions and 38 deletions

View File

@ -41,7 +41,11 @@ internal class NetworkHandlerContextImpl(
override val ssoProcessor: SsoProcessor,
override val logger: MiraiLogger,
override val stateObserver: StateObserver?,
) : NetworkHandlerContext
) : NetworkHandlerContext {
override fun toString(): String {
return "NetworkHandlerContextImpl(bot=${bot.id}, stateObserver=$stateObserver)"
}
}
/**
* Basic interface available to application. Usually wrapped with [SelectorNetworkHandler].

View File

@ -42,6 +42,8 @@ internal class SelectorNetworkHandler(
override fun close(cause: Throwable?) {
selector.getResumedInstance()?.close(cause)
}
override fun toString(): String = "SelectorNetworkHandler(currentInstance=${selector.getResumedInstance()})"
}
internal class ExceptionInSelectorResumeException(

View File

@ -14,6 +14,7 @@ import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext
import net.mamoe.mirai.internal.network.handler.logger
import net.mamoe.mirai.internal.network.net.protocol.PacketCodec.PACKET_DEBUG
import net.mamoe.mirai.internal.network.net.protocol.RawIncomingPacket
import net.mamoe.mirai.internal.network.protocol.packet.IncomingPacket
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
@ -22,8 +23,6 @@ import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.coroutines.CoroutineContext
private val PACKET_DEBUG = systemProp("mirai.debug.network.packet.logger", true)
internal abstract class NetworkHandlerSupport(
override val context: NetworkHandlerContext,
final override val coroutineContext: CoroutineContext = SupervisorJob(),
@ -36,6 +35,7 @@ internal abstract class NetworkHandlerSupport(
* Called when a packet is received.
*/
protected fun collectReceived(packet: IncomingPacket) {
logger.verbose({ "Recv: ${packet.commandName} ${packet.data ?: packet.exception}" }, packet.exception)
for (listener in packetListeners) {
if (!listener.isExpected(packet)) continue
if (packetListeners.remove(listener)) {
@ -56,10 +56,11 @@ internal abstract class NetworkHandlerSupport(
final override suspend fun sendAndExpect(packet: OutgoingPacket, timeout: Long, attempts: Int): Packet? {
val listener = PacketListener(packet.commandName, packet.sequenceId)
packetListeners.add(listener)
var exception: Throwable? = null
repeat(attempts.coerceAtLeast(1)) {
logger.verbose { "Send: ${packet.commandName}" }
try {
packetListeners.add(listener)
sendPacketImpl(packet)
try {
return withTimeout(timeout) {
@ -72,19 +73,25 @@ internal abstract class NetworkHandlerSupport(
exception = e // show last exception
}
} finally {
packetListeners.remove()
listener.result.complete(null)
packetListeners.remove(listener)
}
}
throw exception!!
}
final override suspend fun sendWithoutExpect(packet: OutgoingPacket) {
logger.verbose { "Send: ${packet.commandName}" }
sendPacketImpl(packet)
}
override fun close(cause: Throwable?) {
logger.info { "NetworkHandler closed: $cause" }
coroutineContext.job.cancel("NetworkHandler closed.")
// if (cause == null) {
// logger.info { "NetworkHandler '$this' closed" }
// } else {
// logger.info { "NetworkHandler '$this' closed: $cause" }
// }
coroutineContext.job.cancel("NetworkHandler closed.", cause)
}
protected val packetLogger: MiraiLogger by lazy {
@ -153,6 +160,18 @@ internal abstract class NetworkHandlerSupport(
final override val state: NetworkHandler.State get() = _state.correspondingState
/**
* Can only be used in a job launched within the state scope.
*/
@Suppress("SuspendFunctionOnCoroutineScope")
protected suspend inline fun setStateForJobCompletion(crossinline new: () -> BaseStateImpl) {
val job = currentCoroutineContext()[Job]
this.launch {
job?.join()
setState(new)
}
}
/**
* You may need to call [BaseStateImpl.resumeConnection] since state is lazy.
*
@ -170,7 +189,7 @@ internal abstract class NetworkHandlerSupport(
val old = _state
check(old !== impl) { "Old and new states cannot be the same." }
old.cancel()
old.cancel(CancellationException("State is switched from $old to $impl"))
_state = impl
context.stateObserver?.stateChanged(this, old, impl)

View File

@ -11,6 +11,7 @@ package net.mamoe.mirai.internal.network.handler.impl
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.debug
import net.mamoe.mirai.utils.error
/**
@ -54,13 +55,16 @@ internal interface StateObserver {
internal class LoggingStateObserver(
val logger: MiraiLogger
) : StateObserver {
override fun toString(): String {
return "LoggingStateObserver(logger=$logger)"
}
override fun stateChanged(
networkHandler: NetworkHandlerSupport,
previous: NetworkHandlerSupport.BaseStateImpl,
new: NetworkHandlerSupport.BaseStateImpl
) {
logger.error { "State changed: ${previous.correspondingState} -> ${new.correspondingState}" }
logger.debug { "State changed: ${previous.correspondingState} -> ${new.correspondingState}" }
}
override fun exceptionOnCreatingNewState(
@ -68,7 +72,7 @@ internal class LoggingStateObserver(
previousState: NetworkHandlerSupport.BaseStateImpl,
exception: Throwable
) {
logger.error({ "State changed: ${previousState.correspondingState} -> $exception" }, exception)
logger.debug({ "State changed: ${previousState.correspondingState} -> $exception" }, exception)
}
override fun afterStateResume(
@ -78,10 +82,10 @@ internal class LoggingStateObserver(
) {
result.fold(
onSuccess = {
logger.error { "State resumed: ${state.correspondingState}." }
logger.debug { "State resumed: ${state.correspondingState}." }
},
onFailure = {
logger.error(
logger.debug(
{ "State resumed: ${state.correspondingState} ${result.exceptionOrNull()}" },
result.exceptionOrNull()
)
@ -101,6 +105,11 @@ internal class SafeStateObserver(
val delegate: StateObserver,
val logger: MiraiLogger,
) : StateObserver {
override fun toString(): String {
return "SafeStateObserver(delegate=$delegate)"
}
override fun stateChanged(
networkHandler: NetworkHandlerSupport,
previous: NetworkHandlerSupport.BaseStateImpl,

View File

@ -11,14 +11,12 @@ package net.mamoe.mirai.internal.network.handler.impl.netty
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.ChannelInitializer
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.*
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.LengthFieldBasedFrameDecoder
import io.netty.handler.codec.MessageToByteEncoder
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.sendBlocking
@ -27,20 +25,24 @@ import kotlinx.coroutines.flow.consumeAsFlow
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext
import net.mamoe.mirai.internal.network.handler.impl.NetworkHandlerSupport
import net.mamoe.mirai.internal.network.handler.logger
import net.mamoe.mirai.internal.network.net.protocol.PacketCodec
import net.mamoe.mirai.internal.network.net.protocol.RawIncomingPacket
import net.mamoe.mirai.internal.network.net.protocol.SsoProcessor
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.utils.childScope
import net.mamoe.mirai.utils.debug
import java.net.SocketAddress
import kotlin.coroutines.CoroutineContext
import io.netty.channel.Channel as NettyChannel
internal class NettyNetworkHandler(
context: NetworkHandlerContext,
private val address: SocketAddress,
) : NetworkHandlerSupport(context) {
override fun close(cause: Throwable?) {
setState { StateClosed(cause) }
setState { StateClosed(CancellationException("Closed manually.", cause)) }
// wrap an exception, more stacktrace information
}
private fun closeSuper(cause: Throwable?) = super.close(cause)
@ -50,6 +52,10 @@ internal class NettyNetworkHandler(
state.sendPacketImpl(packet)
}
override fun toString(): String {
return "NettyNetworkHandler(context=$context, address=$address)"
}
///////////////////////////////////////////////////////////////////////////
// netty conn.
///////////////////////////////////////////////////////////////////////////
@ -70,25 +76,30 @@ internal class NettyNetworkHandler(
}
}
private suspend fun createConnection(decodePipeline: PacketDecodePipeline): ChannelHandlerContext {
val contextResult = CompletableDeferred<ChannelHandlerContext>()
private inner class OutgoingPacketEncoder : MessageToByteEncoder<OutgoingPacket>(OutgoingPacket::class.java) {
override fun encode(ctx: ChannelHandlerContext, msg: OutgoingPacket, out: ByteBuf) {
logger.debug { "encode: $msg" }
out.writeBytes(msg.delegate)
}
}
private suspend fun createConnection(decodePipeline: PacketDecodePipeline): NettyChannel {
val contextResult = CompletableDeferred<NettyChannel>()
val eventLoopGroup = NioEventLoopGroup()
val future = Bootstrap().group(eventLoopGroup)
.channel(NioSocketChannel::class.java)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(object : ChannelInitializer<SocketChannel>() {
override fun initChannel(ch: SocketChannel) {
ch.pipeline()
.addLast(object : ChannelInboundHandlerAdapter() {
override fun channelActive(ctx: ChannelHandlerContext) {
contextResult.complete(ctx)
}
override fun channelInactive(ctx: ChannelHandlerContext?) {
eventLoopGroup.shutdownGracefully()
}
})
.addLast(LengthFieldBasedFrameDecoder(Int.MAX_VALUE, 0, 4, -4, 0))
.addLast(OutgoingPacketEncoder())
.addLast(LengthFieldBasedFrameDecoder(Int.MAX_VALUE, 0, 4, -4, 4))
.addLast(ByteBufToIncomingPacketDecoder())
.addLast(RawIncomingPacketCollector(decodePipeline))
}
@ -96,6 +107,8 @@ internal class NettyNetworkHandler(
.connect(address)
.awaitKt()
contextResult.complete(future.channel())
future.channel().closeFuture().addListener {
setState { StateConnectionLost(it.cause()) }
}
@ -145,6 +158,8 @@ internal class NettyNetworkHandler(
setState { StateConnecting(PacketDecodePipeline(this@NettyNetworkHandler.coroutineContext)) }
.resumeConnection()
}
override fun toString(): String = "StateInitialized"
}
/**
@ -161,35 +176,44 @@ internal class NettyNetworkHandler(
private val connectResult = async {
val connection = connection.await()
context.ssoProcessor.login(this@NettyNetworkHandler)
setState { StateOK(connection) }
setStateForJobCompletion { StateOK(connection) }
}.apply {
invokeOnCompletion { error ->
if (error != null) setState { StateClosed(error) } // logon failure closes the network handler.
if (error != null) setState {
StateClosed(
CancellationException("Connection failure.", error)
)
} // logon failure closes the network handler.
// and this error will also be thrown by `StateConnecting.resumeConnection`
}
}
override suspend fun sendPacketImpl(packet: OutgoingPacket) {
connection.await().writeAndFlush(packet)
connection.await() // split line number
.writeAndFlush(packet)
}
override suspend fun resumeConnection0() {
connectResult.await() // propagates exceptions
}
override fun toString(): String = "StateConnecting"
}
private inner class StateOK(
private val connection: ChannelHandlerContext
private val connection: NettyChannel
) : NettyState(NetworkHandler.State.OK) {
override suspend fun sendPacketImpl(packet: OutgoingPacket) {
connection.writeAndFlush(packet)
}
override suspend fun resumeConnection0() {} // noop
override fun toString(): String = "StateOK"
}
private inner class StateConnectionLost(private val cause: Throwable) :
NettyState(NetworkHandler.State.CONNECTION_LOST) {
private inner class StateConnectionLost(
private val cause: Throwable
) : NettyState(NetworkHandler.State.CONNECTION_LOST) {
override suspend fun sendPacketImpl(packet: OutgoingPacket) {
throw IllegalStateException("Connection is lost so cannot send packet. Call resumeConnection first.", cause)
}
@ -202,7 +226,7 @@ internal class NettyNetworkHandler(
private inner class StateClosed(
val exception: Throwable?
) : NettyState(NetworkHandler.State.OK) {
) : NettyState(NetworkHandler.State.CLOSED) {
init {
closeSuper(exception)
}
@ -211,6 +235,8 @@ internal class NettyNetworkHandler(
override suspend fun resumeConnection0() {
exception?.let { throw it }
} // noop
override fun toString(): String = "StateClosed"
}
override fun initialState(): BaseStateImpl = StateInitialized()

View File

@ -23,12 +23,14 @@ import kotlin.io.use
* - Transforms [ByteReadPacket] to [RawIncomingPacket]
*/
internal object PacketCodec {
val PACKET_DEBUG = systemProp("mirai.debug.network.packet.logger", true)
/**
* 数据包相关的调试输出.
* 它默认是关闭的.
*/
internal val PacketLogger: MiraiLoggerWithSwitch by lazy {
MiraiLogger.create("Packet").withSwitch(false)
MiraiLogger.create("Packet").withSwitch(PACKET_DEBUG)
}
/**
@ -88,7 +90,7 @@ internal object PacketCodec {
)
private fun parseSsoFrame(client: SsoSession, bytes: ByteArray): DecodeResult =
bytes.toReadPacket().use { input ->
bytes.toReadPacket().let { input ->
val commandName: String
val ssoSequenceId: Int
val dataCompressed: Int

View File

@ -10,10 +10,7 @@
package net.mamoe.mirai.internal.network.protocol.packet
import kotlinx.io.core.BytePacketBuilder
import kotlinx.io.core.ByteReadPacket
import kotlinx.io.core.buildPacket
import kotlinx.io.core.writeFully
import kotlinx.io.core.*
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.QQAndroidClient
@ -38,8 +35,9 @@ internal open class OutgoingPacket constructor(
name: String?,
val commandName: String,
val sequenceId: Int,
val delegate: ByteReadPacket
delegate: ByteReadPacket
) {
val delegate = delegate.readBytes()
val name: String = name ?: commandName
}