1
0
mirror of https://github.com/mamoe/mirai.git synced 2025-04-25 04:50:26 +08:00

Fix LengthDelimitedPacketReader

This commit is contained in:
Him188 2022-06-02 17:50:16 +01:00
parent 853bcc22a4
commit 40c622dfad
No known key found for this signature in database
GPG Key ID: BA439CDDCF652375
3 changed files with 73 additions and 13 deletions
mirai-core-utils/src/commonMain/kotlin
mirai-core/src
nativeTest/kotlin/network
unixMain/kotlin/utils

View File

@ -121,8 +121,12 @@ public fun UByteArray.toUHexString(separator: String = " ", offset: Int = 0, len
}
}
public inline fun ByteArray.toReadPacket(offset: Int = 0, length: Int = this.size - offset): ByteReadPacket =
ByteReadPacket(this, offset = offset, length = length)
public inline fun ByteArray.toReadPacket(
offset: Int = 0,
length: Int = this.size - offset,
noinline release: (ByteArray) -> Unit = {}
): ByteReadPacket =
ByteReadPacket(this, offset = offset, length = length, block = release)
public inline fun <R> ByteArray.read(t: ByteReadPacket.() -> R): R {
contract {

File diff suppressed because one or more lines are too long

View File

@ -42,7 +42,6 @@ internal actual class PlatformSocket(
private val sendDispatcher: CoroutineDispatcher = newSingleThreadContext("PlatformSocket#$socket.dispatcher")
private val readLock = Mutex()
private val readBuffer = ByteArray(bufferSize).pin()
private val writeLock = Mutex()
private val writeBuffer = ByteArray(bufferSize).pin()
@ -53,6 +52,7 @@ internal actual class PlatformSocket(
close(socket)
(readDispatcher as CloseableCoroutineDispatcher).close()
(sendDispatcher as CloseableCoroutineDispatcher).close()
writeBuffer.unpin()
}
@OptIn(ExperimentalIoApi::class)
@ -93,17 +93,25 @@ internal actual class PlatformSocket(
actual override suspend fun read(): ByteReadPacket = readLock.withLock {
withContext(readDispatcher) {
logger.info { "Native socket reading." }
val readBuffer = readBuffer
val length = recv(socket, readBuffer.addressOf(0), readBuffer.get().size.convert(), 0).convert<Long>()
if (length <= 0L) {
throw EOFException("recv: $length, errno=$errno")
val readBuffer = ByteArrayPool.borrow()
try {
val length = readBuffer.usePinned { pinned ->
recv(socket, pinned.addressOf(0), pinned.get().size.convert(), 0).convert<Long>()
}
if (length <= 0L) throw EOFException("recv: $length, errno=$errno")
logger.info {
"Native socket read $length bytes: ${
readBuffer.copyOf(length.toInt()).toUHexString()
}"
}
readBuffer.toReadPacket(length = length.toInt()) { ByteArrayPool.recycle(it) }
} catch (e: Throwable) {
ByteArrayPool.recycle(readBuffer)
throw e
}
logger.info {
"Native socket read $length bytes: ${
readBuffer.get().copyOf(length.toInt()).toUHexString()
}"
}
readBuffer.get().toReadPacket(length = length.toInt())
}
}