Make member functions in LockFreeLinkedList open

This commit is contained in:
Him188 2020-01-06 18:27:42 +08:00
parent 5fe23800f0
commit ce6da7d193

View File

@ -4,7 +4,6 @@ package net.mamoe.mirai.timpc.network
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.io.core.* import kotlinx.io.core.*
import kotlinx.io.pool.useInstance
import net.mamoe.mirai.Bot import net.mamoe.mirai.Bot
import net.mamoe.mirai.data.LoginResult import net.mamoe.mirai.data.LoginResult
import net.mamoe.mirai.data.OnlineStatus import net.mamoe.mirai.data.OnlineStatus
@ -21,13 +20,10 @@ import net.mamoe.mirai.timpc.network.handler.DataPacketSocketAdapter
import net.mamoe.mirai.timpc.network.handler.TemporaryPacketHandler import net.mamoe.mirai.timpc.network.handler.TemporaryPacketHandler
import net.mamoe.mirai.timpc.network.packet.* import net.mamoe.mirai.timpc.network.packet.*
import net.mamoe.mirai.timpc.network.packet.login.* import net.mamoe.mirai.timpc.network.packet.login.*
import net.mamoe.mirai.utils.LockFreeLinkedList import net.mamoe.mirai.utils.*
import net.mamoe.mirai.utils.LoginFailedException
import net.mamoe.mirai.utils.NoLog
import net.mamoe.mirai.utils.cryptor.Decrypter import net.mamoe.mirai.utils.cryptor.Decrypter
import net.mamoe.mirai.utils.cryptor.NoDecrypter import net.mamoe.mirai.utils.cryptor.NoDecrypter
import net.mamoe.mirai.utils.io.* import net.mamoe.mirai.utils.io.*
import net.mamoe.mirai.utils.unsafeWeakRef
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.random.Random import kotlin.random.Random
@ -109,6 +105,7 @@ internal class TIMPCBotNetworkHandler internal constructor(coroutineContext: Cor
this.socket.close() this.socket.close()
} }
@UseExperimental(MiraiInternalAPI::class)
internal inner class BotSocketAdapter(override val serverIp: String) : internal inner class BotSocketAdapter(override val serverIp: String) :
DataPacketSocketAdapter { DataPacketSocketAdapter {
@ -120,10 +117,8 @@ internal class TIMPCBotNetworkHandler internal constructor(coroutineContext: Cor
private suspend fun processReceive() { private suspend fun processReceive() {
while (channel.isOpen) { while (channel.isOpen) {
val buffer = IoBuffer.Pool.borrow() val input: ByteReadPacket = try {
channel.read()// JVM: withContext(IO)
try {
channel.read(buffer)// JVM: withContext(IO)
} catch (e: ClosedChannelException) { } catch (e: ClosedChannelException) {
close() close()
return return
@ -135,25 +130,13 @@ internal class TIMPCBotNetworkHandler internal constructor(coroutineContext: Cor
} catch (e: Throwable) { } catch (e: Throwable) {
bot.logger.error("Caught unexpected exceptions", e) bot.logger.error("Caught unexpected exceptions", e)
continue continue
} finally {
if (!buffer.canRead() || buffer.readRemaining == 0) {//size==0
//bot.logger.debug("processReceive: Buffer cannot be read")
buffer.release(IoBuffer.Pool)
continue
}// sometimes exceptions are thrown without this `if` clause
} }
//buffer.resetForRead() //buffer.resetForRead()
launch(CoroutineName("handleServerPacket")) { launch(CoroutineName("handleServerPacket")) {
// `.use`: Ensure that the packet is consumed **totally** // `.use`: Ensure that the packet is consumed **totally**
// so that all the buffers are released // so that all the buffers are released
ByteArrayPool.useInstance { input.use {
val length = buffer.readRemaining - 1
buffer.readFully(it, 0, length)
buffer.resetForWrite()
buffer.writeFully(it, 0, length)
}
ByteReadPacket(buffer, IoBuffer.Pool).use { input ->
try { try {
input.discardExact(3) input.discardExact(3)
@ -165,7 +148,7 @@ internal class TIMPCBotNetworkHandler internal constructor(coroutineContext: Cor
val packet = try { val packet = try {
with(id.factory) { with(id.factory) {
loginHandler!!.provideDecrypter(id.factory) loginHandler!!.provideDecrypter(id.factory)
.decrypt(input) .decrypt(input, 0, (input.remaining - 1).toInt()) // tail
.decode(id, sequenceId, this@TIMPCBotNetworkHandler) .decode(id, sequenceId, this@TIMPCBotNetworkHandler)
} }
} finally { } finally {
@ -252,16 +235,9 @@ internal class TIMPCBotNetworkHandler internal constructor(coroutineContext: Cor
internal suspend fun sendPacket(packet: OutgoingPacket): Unit = withContext(coroutineContext + CoroutineName("sendPacket")) { internal suspend fun sendPacket(packet: OutgoingPacket): Unit = withContext(coroutineContext + CoroutineName("sendPacket")) {
check(channel.isOpen) { "channel is not open" } check(channel.isOpen) { "channel is not open" }
packet.delegate.use { built -> packet.delegate.use {
val buffer = IoBuffer.Pool.borrow()
try { try {
built.readAvailable(buffer) check(channel.send(it) ) { "Send packet failed" }
val shouldBeSent = buffer.readRemaining
check(channel.send(buffer) == shouldBeSent) {
"Buffer is not entirely sent. " +
"Required sent length=$shouldBeSent, but after channel.send, " +
"buffer remains ${buffer.readBytes().toUHexString()}"
}//JVM: withContext(IO)
} catch (e: SendPacketInternalException) { } catch (e: SendPacketInternalException) {
if (e.cause !is CancellationException) { if (e.cause !is CancellationException) {
bot.logger.error("Caught SendPacketInternalException: ${e.cause?.message}") bot.logger.error("Caught SendPacketInternalException: ${e.cause?.message}")
@ -269,8 +245,6 @@ internal class TIMPCBotNetworkHandler internal constructor(coroutineContext: Cor
delay(bot.configuration.firstReconnectDelayMillis) delay(bot.configuration.firstReconnectDelayMillis)
bot.tryReinitializeNetworkHandler(e) bot.tryReinitializeNetworkHandler(e)
return@withContext return@withContext
} finally {
buffer.release(IoBuffer.Pool)
} }
} }