From ccf5df944e5fa7edc8e51d6afd1fe1a1a3ef4abf Mon Sep 17 00:00:00 2001 From: Him188 Date: Fri, 28 Feb 2020 19:17:32 +0800 Subject: [PATCH] ImageUpload: enhance performance using `ByteArrayPool`, send chunked packets separately --- .../mirai/qqandroid/network/highway/Codec.kt | 119 ------------------ .../network/highway/HighwayHelper.kt | 52 ++++---- .../qqandroid/network/highway/highway.kt | 89 +++++++++++++ 3 files changed, 115 insertions(+), 145 deletions(-) delete mode 100644 mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/Codec.kt create mode 100644 mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/highway.kt diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/Codec.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/Codec.kt deleted file mode 100644 index 900688dc5..000000000 --- a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/Codec.kt +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright 2020 Mamoe Technologies and contributors. - * - * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. - * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. - * - * https://github.com/mamoe/mirai/blob/master/LICENSE - */ - -package net.mamoe.mirai.qqandroid.network.highway - -import io.ktor.utils.io.ByteReadChannel -import kotlinx.io.InputStream -import kotlinx.io.core.* -import kotlinx.io.pool.useInstance -import net.mamoe.mirai.qqandroid.io.serialization.toByteArray -import net.mamoe.mirai.qqandroid.network.protocol.data.proto.CSDataHighwayHead -import net.mamoe.mirai.qqandroid.network.protocol.packet.EMPTY_BYTE_ARRAY -import net.mamoe.mirai.utils.io.ByteArrayPool - -object Highway { - suspend fun RequestDataTrans( - uin: Long, - command: String, - sequenceId: Int, - appId: Int = 537062845, - dataFlag: Int = 4096, - commandId: Int, - localId: Int = 2052, - uKey: ByteArray, - - data: Any, - dataSize: Int, - md5: ByteArray - ): ByteReadPacket { - require(data is Input || data is InputStream || data is ByteReadChannel) { "unsupported data: ${data::class.simpleName}" } - require(uKey.size == 128) { "bad uKey. Required size=128, got ${uKey.size}" } - require(data !is ByteReadPacket || data.remaining.toInt() == dataSize) { "bad input. given dataSize=$dataSize, but actual readRemaining=${(data as ByteReadPacket).remaining}" } - require(data !is IoBuffer || data.readRemaining == dataSize) { "bad input. given dataSize=$dataSize, but actual readRemaining=${(data as IoBuffer).readRemaining}" } - - val dataHighwayHead = CSDataHighwayHead.DataHighwayHead( - version = 1, - uin = uin.toString(), - command = command, - seq = sequenceId, - retryTimes = 0, - appid = appId, - dataflag = dataFlag, - commandId = commandId, - localeId = localId - ) - val segHead = CSDataHighwayHead.SegHead( - datalength = dataSize, - filesize = dataSize.toLong(), - serviceticket = uKey, - md5 = md5, - fileMd5 = md5, - flag = 0, - rtcode = 0 - ) - //println(data.readBytes().toUHexString()) - return Codec.buildC2SData(dataHighwayHead, segHead, EMPTY_BYTE_ARRAY, null, data, dataSize) - } - - private object Codec { - suspend fun buildC2SData( - dataHighwayHead: CSDataHighwayHead.DataHighwayHead, - segHead: CSDataHighwayHead.SegHead, - extendInfo: ByteArray, - loginSigHead: CSDataHighwayHead.LoginSigHead?, - body: Any, - bodySize: Int - ): ByteReadPacket { - require(body is Input || body is InputStream || body is ByteReadChannel) { "unsupported body: ${body::class.simpleName}" } - val head = CSDataHighwayHead.ReqDataHighwayHead( - msgBasehead = dataHighwayHead, - msgSeghead = segHead, - reqExtendinfo = extendInfo, - msgLoginSigHead = loginSigHead - ).toByteArray(CSDataHighwayHead.ReqDataHighwayHead.serializer()) - - return buildPacket { - writeByte(40) - writeInt(head.size) - writeInt(bodySize) - writeFully(head) - when (body) { - is ByteReadPacket -> writePacket(body) - is Input -> body.use { - ByteArrayPool.useInstance { buffer -> - var size: Int - while (body.readAvailable(buffer).also { size = it } != 0) { - this@buildPacket.writeFully(buffer, 0, size) - } - } - } - is ByteReadChannel -> ByteArrayPool.useInstance { buffer -> - var size: Int - while (body.readAvailable(buffer, 0, buffer.size).also { size = it } != 0) { - this@buildPacket.writeFully(buffer, 0, size) - } - } - is InputStream -> try { - ByteArrayPool.useInstance { buffer -> - var size: Int - while (body.read(buffer).also { size = it } != 0) { - this@buildPacket.writeFully(buffer, 0, size) - } - } - } finally { - body.close() - } - } - - writeByte(41) - } - } - } -} \ No newline at end of file diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/HighwayHelper.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/HighwayHelper.kt index b2e351eb0..3dd7be0ee 100644 --- a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/HighwayHelper.kt +++ b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/HighwayHelper.kt @@ -18,8 +18,11 @@ import io.ktor.http.content.OutgoingContent import io.ktor.http.userAgent import io.ktor.utils.io.ByteReadChannel import io.ktor.utils.io.copyAndClose +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.flow.collect import kotlinx.io.InputStream import kotlinx.io.core.Input +import kotlinx.io.core.discardExact import kotlinx.io.core.readAvailable import kotlinx.io.core.use import kotlinx.io.pool.useInstance @@ -30,9 +33,8 @@ import net.mamoe.mirai.qqandroid.network.protocol.packet.withUse import net.mamoe.mirai.utils.MiraiInternalAPI import net.mamoe.mirai.utils.io.ByteArrayPool import net.mamoe.mirai.utils.io.PlatformSocket -import net.mamoe.mirai.utils.io.discardExact - +@UseExperimental(MiraiInternalAPI::class) @Suppress("SpellCheckingInspection") internal suspend fun HttpClient.postImage( htcmd: String, @@ -90,6 +92,7 @@ internal suspend fun HttpClient.postImage( @UseExperimental(MiraiInternalAPI::class) internal object HighwayHelper { + @UseExperimental(InternalCoroutinesApi::class) suspend fun uploadImage( client: QQAndroidClient, serverIp: String, @@ -108,30 +111,27 @@ internal object HighwayHelper { val socket = PlatformSocket() socket.connect(serverIp, serverPort) socket.use { - - // TODO: 2020/2/23 使用缓存, 或使用 HTTP 发送更好 (因为无需读取到内存) - socket.send( - Highway.RequestDataTrans( - uin = client.uin, - command = "PicUp.DataUp", - sequenceId = - if (commandId == 2) client.nextHighwayDataTransSequenceIdForGroup() - else client.nextHighwayDataTransSequenceIdForFriend(), - uKey = uKey, - data = imageInput, - dataSize = inputSize, - md5 = md5, - commandId = commandId - ) - ) - - //0A 3C 08 01 12 0A 31 39 39 34 37 30 31 30 32 31 1A 0C 50 69 63 55 70 2E 44 61 74 61 55 70 20 E9 A7 05 28 00 30 BD DB 8B 80 02 38 80 20 40 02 4A 0A 38 2E 32 2E 30 2E 31 32 39 36 50 84 10 12 3D 08 00 10 FD 08 18 00 20 FD 08 28 C6 01 38 00 42 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 4A 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 50 89 92 A2 FB 06 58 00 60 00 18 53 20 01 28 00 30 04 3A 00 40 E6 B7 F7 D9 80 2E 48 00 50 00 - socket.read().withUse { - discardExact(1) - val headLength = readInt() - discardExact(4) - val proto = readProtoBuf(CSDataHighwayHead.RspDataHighwayHead.serializer(), length = headLength) - check(proto.errorCode == 0) { "image upload failed: Transfer errno=${proto.errorCode}" } + createImageDataPacketSequence( + uin = client.uin, + command = "PicUp.DataUp", + sequenceId = + if (commandId == 2) client.nextHighwayDataTransSequenceIdForGroup() + else client.nextHighwayDataTransSequenceIdForFriend(), + commandId = commandId, + uKey = uKey, + data = imageInput, + dataSize = inputSize, + md5 = md5 + ).collect { + socket.send(it) + //0A 3C 08 01 12 0A 31 39 39 34 37 30 31 30 32 31 1A 0C 50 69 63 55 70 2E 44 61 74 61 55 70 20 E9 A7 05 28 00 30 BD DB 8B 80 02 38 80 20 40 02 4A 0A 38 2E 32 2E 30 2E 31 32 39 36 50 84 10 12 3D 08 00 10 FD 08 18 00 20 FD 08 28 C6 01 38 00 42 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 4A 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 50 89 92 A2 FB 06 58 00 60 00 18 53 20 01 28 00 30 04 3A 00 40 E6 B7 F7 D9 80 2E 48 00 50 00 + socket.read().withUse { + discardExact(1) + val headLength = readInt() + discardExact(4) + val proto = readProtoBuf(CSDataHighwayHead.RspDataHighwayHead.serializer(), length = headLength) + check(proto.errorCode == 0) { "image upload failed: Transfer errno=${proto.errorCode}" } + } } } } diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/highway.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/highway.kt new file mode 100644 index 000000000..7fa4c0f00 --- /dev/null +++ b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/highway.kt @@ -0,0 +1,89 @@ +/* + * Copyright 2020 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/master/LICENSE + */ + +@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") + +package net.mamoe.mirai.qqandroid.network.highway + +import io.ktor.utils.io.ByteReadChannel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import kotlinx.io.InputStream +import kotlinx.io.core.* +import net.mamoe.mirai.qqandroid.io.serialization.toByteArray +import net.mamoe.mirai.qqandroid.network.protocol.data.proto.CSDataHighwayHead +import net.mamoe.mirai.qqandroid.network.protocol.packet.EMPTY_BYTE_ARRAY +import net.mamoe.mirai.utils.MiraiInternalAPI +import net.mamoe.mirai.utils.io.* + +@UseExperimental(MiraiInternalAPI::class) +internal fun createImageDataPacketSequence( // RequestDataTrans + uin: Long, + command: String, + sequenceId: Int, + appId: Int = 537062845, + dataFlag: Int = 4096, + commandId: Int, + localId: Int = 2052, + uKey: ByteArray, + + data: Any, + dataSize: Int, + md5: ByteArray, + sizePerPacket: Int = 8192 +): Flow { + require(data is Input || data is InputStream || data is ByteReadChannel) { "unsupported data: ${data::class.simpleName}" } + require(uKey.size == 128) { "bad uKey. Required size=128, got ${uKey.size}" } + require(data !is ByteReadPacket || data.remaining.toInt() == dataSize) { "bad input. given dataSize=$dataSize, but actual readRemaining=${(data as ByteReadPacket).remaining}" } + require(data !is IoBuffer || data.readRemaining == dataSize) { "bad input. given dataSize=$dataSize, but actual readRemaining=${(data as IoBuffer).readRemaining}" } + + val flow = when (data) { + is ByteReadPacket -> data.chunkedFlow(sizePerPacket) + is Input -> data.chunkedFlow(sizePerPacket) + is ByteReadChannel -> data.chunkedFlow(sizePerPacket) + is InputStream -> data.chunkedFlow(sizePerPacket) + else -> error("unreachable code") + } + + return flow.map { chunkedInput -> + buildPacket { + val head = CSDataHighwayHead.ReqDataHighwayHead( + msgBasehead = CSDataHighwayHead.DataHighwayHead( + version = 1, + uin = uin.toString(), + command = command, + seq = sequenceId, + retryTimes = 0, + appid = appId, + dataflag = dataFlag, + commandId = commandId, + localeId = localId + ), + msgSeghead = CSDataHighwayHead.SegHead( + datalength = dataSize, + filesize = dataSize.toLong(), + serviceticket = uKey, + md5 = md5, + fileMd5 = md5, + flag = 0, + rtcode = 0 + ), + reqExtendinfo = EMPTY_BYTE_ARRAY, + msgLoginSigHead = null + ).toByteArray(CSDataHighwayHead.ReqDataHighwayHead.serializer()) + + writeByte(40) + writeInt(head.size) + writeInt(dataSize) + writeFully(head) + writeFully(chunkedInput.buffer, 0, chunkedInput.bufferSize) + writeByte(41) + } + } +} \ No newline at end of file