diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/QQAndroidBot.common.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/QQAndroidBot.common.kt index adde7d7d4..bc5013516 100644 --- a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/QQAndroidBot.common.kt +++ b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/QQAndroidBot.common.kt @@ -50,7 +50,6 @@ import net.mamoe.mirai.qqandroid.network.protocol.packet.list.FriendList import net.mamoe.mirai.qqandroid.utils.MiraiPlatformUtils import net.mamoe.mirai.qqandroid.utils.encodeToString import net.mamoe.mirai.qqandroid.utils.io.serialization.toByteArray -import net.mamoe.mirai.qqandroid.utils.toReadPacket import net.mamoe.mirai.utils.* import kotlin.collections.asSequence import kotlin.contracts.ExperimentalContracts @@ -678,8 +677,8 @@ internal abstract class QQAndroidBotBase constructor( response.proto.uint32UpIp.zip(response.proto.uint32UpPort), response.proto.msgSig, MiraiPlatformUtils.md5(body), - body.toReadPacket(), - body.size.toLong().and(0xFFFF_FFFF), // don't use toLongUnsigned: Overload resolution ambiguity + @Suppress("INVISIBLE_REFERENCE") + net.mamoe.mirai.utils.internal.asReusableInput0(body), // don't use toLongUnsigned: Overload resolution ambiguity "group long message", 27 ) diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/HighwayHelper.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/HighwayHelper.kt index 5c3accac4..dee0e811d 100644 --- a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/HighwayHelper.kt +++ b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/HighwayHelper.kt @@ -20,22 +20,24 @@ import io.ktor.utils.io.ByteWriteChannel import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.io.ByteReadChannel import kotlinx.coroutines.isActive import kotlinx.coroutines.withTimeoutOrNull -import kotlinx.io.InputStream -import kotlinx.io.core.Input import kotlinx.io.core.discardExact -import kotlinx.io.core.readAvailable import kotlinx.io.core.use import kotlinx.serialization.InternalSerializationApi import net.mamoe.mirai.qqandroid.QQAndroidBot import net.mamoe.mirai.qqandroid.network.QQAndroidClient import net.mamoe.mirai.qqandroid.network.protocol.data.proto.CSDataHighwayHead -import net.mamoe.mirai.qqandroid.utils.* +import net.mamoe.mirai.qqandroid.utils.PlatformSocket +import net.mamoe.mirai.qqandroid.utils.SocketException +import net.mamoe.mirai.qqandroid.utils.addSuppressedMirai import net.mamoe.mirai.qqandroid.utils.io.serialization.readProtoBuf import net.mamoe.mirai.qqandroid.utils.io.withUse -import net.mamoe.mirai.utils.* +import net.mamoe.mirai.qqandroid.utils.toIpV4AddressString +import net.mamoe.mirai.utils.ExternalImage +import net.mamoe.mirai.utils.MiraiExperimentalAPI +import net.mamoe.mirai.utils.MiraiInternalAPI +import net.mamoe.mirai.utils.verbose import kotlin.coroutines.EmptyCoroutineContext import kotlin.math.roundToInt import kotlin.time.ExperimentalTime @@ -75,26 +77,8 @@ internal suspend fun HttpClient.postImage( @OptIn(MiraiExperimentalAPI::class) override suspend fun writeTo(channel: ByteWriteChannel) { - ByteArrayPool.useInstance { buffer: ByteArray -> - when (imageInput) { - is Input -> { - var size: Int - while (imageInput.readAvailable(buffer).also { size = it } > 0) { - channel.writeFully(buffer, 0, size) - channel.flush() - } - } - is ByteReadChannel -> imageInput.copyAndClose(channel) - is InputStream -> { - var size: Int - while (imageInput.read(buffer).also { size = it } > 0) { - channel.writeFully(buffer, 0, size) - channel.flush() - } - } - else -> error("unsupported imageInput: ${imageInput::class.simpleName}") - } - } + imageInput.writeTo(channel) + } } } == HttpStatusCode.OK @@ -109,26 +93,26 @@ internal object HighwayHelper { image: ExternalImage.ReusableInput, kind: String, commandId: Int - ) = uploadImageToServers(bot, servers, uKey, image.md5, image.input(), image.size, kind, commandId) + ) = uploadImageToServers(bot, servers, uKey, image.md5, image, kind, commandId) + @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") @OptIn(ExperimentalTime::class) suspend fun uploadImageToServers( bot: QQAndroidBot, servers: List<Pair<Int, Int>>, uKey: ByteArray, md5: ByteArray, - input: Any, - inputSize: Long, + input: ExternalImage.ReusableInput, kind: String, commandId: Int ) = servers.retryWithServers( - (inputSize * 1000 / 1024 / 10).coerceAtLeast(5000), + (input.size * 1000 / 1024 / 10).coerceAtLeast(5000), onFail = { throw IllegalStateException("cannot upload $kind, failed on all servers.", it) } ) { ip, port -> bot.network.logger.verbose { - "[Highway] Uploading $kind to ${ip}:$port, size=${inputSize.sizeToString()}" + "[Highway] Uploading $kind to ${ip}:$port, size=${input.size.sizeToString()}" } val time = measureTime { @@ -137,7 +121,6 @@ internal object HighwayHelper { serverIp = ip, serverPort = port, imageInput = input, - inputSize = inputSize, fileMd5 = md5, ticket = uKey, commandId = commandId @@ -145,22 +128,21 @@ internal object HighwayHelper { } bot.network.logger.verbose { - "[Highway] Uploading $kind: succeed at ${(inputSize.toDouble() / 1024 / time.inSeconds).roundToInt()} KiB/s" + "[Highway] Uploading $kind: succeed at ${(input.size.toDouble() / 1024 / time.inSeconds).roundToInt()} KiB/s" } } + @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") @OptIn(InternalCoroutinesApi::class) internal suspend fun uploadImage( client: QQAndroidClient, serverIp: String, serverPort: Int, ticket: ByteArray, - imageInput: Any, - inputSize: Long, + imageInput: ExternalImage.ReusableInput, fileMd5: ByteArray, commandId: Int // group=2, friend=1 ) { - require(imageInput is Input || imageInput is InputStream || imageInput is ByteReadChannel) { "unsupported imageInput: ${imageInput::class.simpleName}" } require(fileMd5.size == 16) { "bad md5. Required size=16, got ${fileMd5.size}" } // require(ticket.size == 128) { "bad uKey. Required size=128, got ${ticket.size}" } // require(commandId == 2 || commandId == 1) { "bad commandId. Must be 1 or 2" } @@ -181,18 +163,19 @@ internal object HighwayHelper { commandId = commandId, ticket = ticket, data = imageInput, - dataSize = inputSize, fileMd5 = fileMd5 - ).collect { - socket.send(it) - //0A 3C 08 01 12 0A 31 39 39 34 37 30 31 30 32 31 1A 0C 50 69 63 55 70 2E 44 61 74 61 55 70 20 E9 A7 05 28 00 30 BD DB 8B 80 02 38 80 20 40 02 4A 0A 38 2E 32 2E 30 2E 31 32 39 36 50 84 10 12 3D 08 00 10 FD 08 18 00 20 FD 08 28 C6 01 38 00 42 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 4A 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 50 89 92 A2 FB 06 58 00 60 00 18 53 20 01 28 00 30 04 3A 00 40 E6 B7 F7 D9 80 2E 48 00 50 00 + ).withUse { + flow.collect { + socket.send(it) + //0A 3C 08 01 12 0A 31 39 39 34 37 30 31 30 32 31 1A 0C 50 69 63 55 70 2E 44 61 74 61 55 70 20 E9 A7 05 28 00 30 BD DB 8B 80 02 38 80 20 40 02 4A 0A 38 2E 32 2E 30 2E 31 32 39 36 50 84 10 12 3D 08 00 10 FD 08 18 00 20 FD 08 28 C6 01 38 00 42 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 4A 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 50 89 92 A2 FB 06 58 00 60 00 18 53 20 01 28 00 30 04 3A 00 40 E6 B7 F7 D9 80 2E 48 00 50 00 - socket.read().withUse { - discardExact(1) - val headLength = readInt() - discardExact(4) - val proto = readProtoBuf(CSDataHighwayHead.RspDataHighwayHead.serializer(), length = headLength) - check(proto.errorCode == 0) { "highway transfer failed, error ${proto.errorCode}" } + socket.read().withUse { + discardExact(1) + val headLength = readInt() + discardExact(4) + val proto = readProtoBuf(CSDataHighwayHead.RspDataHighwayHead.serializer(), length = headLength) + check(proto.errorCode == 0) { "highway transfer failed, error ${proto.errorCode}" } + } } } } diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/highway.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/highway.kt index 4621bdd2d..c84bdcf61 100644 --- a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/highway.kt +++ b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/highway.kt @@ -11,12 +11,7 @@ package net.mamoe.mirai.qqandroid.network.highway -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.io.ByteReadChannel -import kotlinx.io.InputStream import kotlinx.io.core.ByteReadPacket -import kotlinx.io.core.Input import kotlinx.io.core.buildPacket import kotlinx.io.core.writeFully import kotlinx.serialization.InternalSerializationApi @@ -25,9 +20,12 @@ import net.mamoe.mirai.qqandroid.network.protocol.data.proto.CSDataHighwayHead import net.mamoe.mirai.qqandroid.network.protocol.packet.EMPTY_BYTE_ARRAY import net.mamoe.mirai.qqandroid.utils.ByteArrayPool import net.mamoe.mirai.qqandroid.utils.MiraiPlatformUtils -import net.mamoe.mirai.qqandroid.utils.io.chunkedFlow import net.mamoe.mirai.qqandroid.utils.io.serialization.toByteArray +import net.mamoe.mirai.utils.ExternalImage import net.mamoe.mirai.utils.MiraiInternalAPI +import net.mamoe.mirai.utils.internal.ChunkedFlowSession +import net.mamoe.mirai.utils.internal.ChunkedInput +import net.mamoe.mirai.utils.internal.map @OptIn(MiraiInternalAPI::class, InternalSerializationApi::class) internal fun createImageDataPacketSequence( @@ -39,27 +37,17 @@ internal fun createImageDataPacketSequence( commandId: Int, localId: Int = 2052, ticket: ByteArray, - - data: Any, - dataSize: Long, + data: ExternalImage.ReusableInput, fileMd5: ByteArray, sizePerPacket: Int = ByteArrayPool.BUFFER_SIZE -): Flow<ByteReadPacket> { +): ChunkedFlowSession<ByteReadPacket> { ByteArrayPool.checkBufferSize(sizePerPacket) - require(data is Input || data is InputStream || data is ByteReadChannel) { "unsupported data: ${data::class.simpleName}" } // require(ticket.size == 128) { "bad uKey. Required size=128, got ${ticket.size}" } - require(data !is ByteReadPacket) { "bad input. given dataSize=$dataSize, but actual readRemaining=${(data as ByteReadPacket).remaining}" } - val flow = when (data) { - is ByteReadPacket -> data.chunkedFlow(sizePerPacket) - is Input -> data.chunkedFlow(sizePerPacket) - is ByteReadChannel -> data.chunkedFlow(sizePerPacket) - is InputStream -> data.chunkedFlow(sizePerPacket) - else -> error("unreachable code") - } + val session: ChunkedFlowSession<ChunkedInput> = data.chunkedFlow(sizePerPacket) var offset = 0L - return flow.map { chunkedInput -> + return session.map { chunkedInput -> buildPacket { val head = CSDataHighwayHead.ReqDataHighwayHead( msgBasehead = CSDataHighwayHead.DataHighwayHead( @@ -82,7 +70,7 @@ internal fun createImageDataPacketSequence( // cacheAddr = 812157193, datalength = chunkedInput.bufferSize, dataoffset = offset, - filesize = dataSize, + filesize = data.size, serviceticket = ticket, md5 = MiraiPlatformUtils.md5(chunkedInput.buffer, 0, chunkedInput.bufferSize), fileMd5 = fileMd5, diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/PacketFactory.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/PacketFactory.kt index b99091331..a1ae4a7b7 100644 --- a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/PacketFactory.kt +++ b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/PacketFactory.kt @@ -197,6 +197,7 @@ internal object KnownPacketFactories { readString(readInt() - 4)// uinAccount + @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") ByteArrayPool.useInstance(this.remaining.toInt()) { data -> val size = this.readAvailable(data) @@ -385,6 +386,7 @@ internal object KnownPacketFactories { } 0 -> { val data = if (bot.client.loginState == 0) { + @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") ByteArrayPool.useInstance(this.remaining.toInt()) { byteArrayBuffer -> val size = (this.remaining - 1).toInt() this.readFully(byteArrayBuffer, 0, size) diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/login/ConfigPushSvc.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/login/ConfigPushSvc.kt index ae668d57a..2aa225d6b 100644 --- a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/login/ConfigPushSvc.kt +++ b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/login/ConfigPushSvc.kt @@ -70,6 +70,7 @@ internal class ConfigPushSvc { override suspend fun ByteReadPacket.decode(bot: QQAndroidBot, sequenceId: Int): PushReqResponse? { + @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") ByteArrayPool.useInstance(this.remaining.toInt()) { buffer -> val length = this.readAvailable(buffer) diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/ByteArrayPool.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/ByteArrayPool.kt index 586633752..37c296940 100644 --- a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/ByteArrayPool.kt +++ b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/ByteArrayPool.kt @@ -1,41 +1,9 @@ package net.mamoe.mirai.qqandroid.utils -import kotlinx.io.pool.DefaultPool import kotlinx.io.pool.ObjectPool /** * 缓存 [ByteArray] 实例的 [ObjectPool] */ -internal object ByteArrayPool : DefaultPool<ByteArray>(256) { - /** - * 每一个 [ByteArray] 的大小 - */ - const val BUFFER_SIZE: Int = 8192 * 8 - - override fun produceInstance(): ByteArray = ByteArray(BUFFER_SIZE) - - override fun clearInstance(instance: ByteArray): ByteArray = instance - - fun checkBufferSize(size: Int) { - require(size <= BUFFER_SIZE) { "sizePerPacket is too large. Maximum buffer size=$BUFFER_SIZE" } - } - - fun checkBufferSize(size: Long) { - require(size <= BUFFER_SIZE) { "sizePerPacket is too large. Maximum buffer size=$BUFFER_SIZE" } - } - - /** - * 请求一个大小至少为 [requestedSize] 的 [ByteArray] 实例. - */ // 不要写为扩展函数. 它需要优先于 kotlinx.io 的扩展函数 resolve - inline fun <R> useInstance(requestedSize: Int = 0, block: (ByteArray) -> R): R { - if (requestedSize > BUFFER_SIZE) { - return ByteArray(requestedSize).run(block) - } - val instance = borrow() - try { - return block(instance) - } finally { - recycle(instance) - } - } -} \ No newline at end of file +@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") +internal typealias ByteArrayPool = net.mamoe.mirai.utils.internal.ByteArrayPool \ No newline at end of file diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/io/input.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/io/input.kt index a267e5b73..0b374df63 100644 --- a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/io/input.kt +++ b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/io/input.kt @@ -27,6 +27,7 @@ import kotlin.jvm.JvmMultifileClass import kotlin.jvm.JvmName import kotlin.jvm.JvmSynthetic +@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") internal inline fun <R> ByteReadPacket.useBytes( n: Int = remaining.toInt(),//not that safe but adequate block: (data: ByteArray, length: Int) -> R diff --git a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/ExternalImage.kt b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/ExternalImage.kt index c956be61d..9e527093d 100644 --- a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/ExternalImage.kt +++ b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/ExternalImage.kt @@ -11,18 +11,15 @@ package net.mamoe.mirai.utils -import kotlinx.coroutines.io.ByteReadChannel -import kotlinx.io.InputStream -import kotlinx.io.core.ByteReadPacket -import kotlinx.io.core.Input -import kotlinx.serialization.InternalSerializationApi +import io.ktor.utils.io.ByteWriteChannel import net.mamoe.mirai.contact.Contact import net.mamoe.mirai.contact.Group import net.mamoe.mirai.contact.User import net.mamoe.mirai.message.MessageReceipt import net.mamoe.mirai.message.data.Image import net.mamoe.mirai.message.data.sendTo -import net.mamoe.mirai.utils.internal.asReusableInput +import net.mamoe.mirai.utils.internal.ChunkedFlowSession +import net.mamoe.mirai.utils.internal.ChunkedInput import kotlin.jvm.JvmField import kotlin.jvm.JvmSynthetic @@ -36,43 +33,20 @@ import kotlin.jvm.JvmSynthetic */ @OptIn(MiraiInternalAPI::class) class ExternalImage internal constructor( - val md5: ByteArray, @JvmField internal val input: ReusableInput // Input from kotlinx.io, InputStream from kotlinx.io MPP, ByteReadChannel from ktor ) { + val md5: ByteArray get() = this.input.md5 + @SinceMirai("1.0.0") internal interface ReusableInput { val md5: ByteArray val size: Long - /** - * Create a input for once usage - */ - fun input(): Input + fun chunkedFlow(sizePerPacket: Int): ChunkedFlowSession<ChunkedInput> + suspend fun writeTo(out: ByteWriteChannel): Long } - - constructor( - md5: ByteArray, - input: ByteReadChannel - ) : this(md5, input.asReusableInput()) - - constructor( - md5: ByteArray, - input: Input - ) : this(md5, input.asReusableInput()) - - constructor( - md5: ByteArray, - input: ByteReadPacket - ) : this(md5, input.asReusableInput()) - - @OptIn(InternalSerializationApi::class) - constructor( - md5: ByteArray, - input: InputStream - ) : this(md5, input.asReusableInput()) - init { require(input.size < 30L * 1024 * 1024) { "Image file is too big. Maximum is 30 MiB, but recommended to be 20 MiB" } } diff --git a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/internal/asReusableInput.common.kt b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/internal/asReusableInput.common.kt index c6cd87f5a..52a4ded47 100644 --- a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/internal/asReusableInput.common.kt +++ b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/internal/asReusableInput.common.kt @@ -9,15 +9,9 @@ package net.mamoe.mirai.utils.internal -import kotlinx.coroutines.io.ByteReadChannel -import kotlinx.io.InputStream -import kotlinx.io.core.Input -import kotlinx.serialization.InternalSerializationApi import net.mamoe.mirai.utils.ExternalImage -internal expect fun ByteReadChannel.asReusableInput(): ExternalImage.ReusableInput -internal expect fun Input.asReusableInput(): ExternalImage.ReusableInput +internal expect fun ByteArray.asReusableInput(): ExternalImage.ReusableInput -@OptIn(InternalSerializationApi::class) -internal expect fun InputStream.asReusableInput(): ExternalImage.ReusableInput +internal fun asReusableInput0(input: ByteArray): ExternalImage.ReusableInput = input.asReusableInput() \ No newline at end of file diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/io/chunked.kt b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/internal/chunked.kt similarity index 85% rename from mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/io/chunked.kt rename to mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/internal/chunked.kt index 27608dad4..5ca9dcb8e 100644 --- a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/io/chunked.kt +++ b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/internal/chunked.kt @@ -7,20 +7,33 @@ * https://github.com/mamoe/mirai/blob/master/LICENSE */ -package net.mamoe.mirai.qqandroid.utils.io +package net.mamoe.mirai.utils.internal import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.map import kotlinx.coroutines.io.ByteReadChannel import kotlinx.io.InputStream import kotlinx.io.core.ByteReadPacket +import kotlinx.io.core.Closeable import kotlinx.io.core.Input -import kotlinx.io.pool.useInstance -import net.mamoe.mirai.utils.MiraiInternalAPI import kotlinx.serialization.InternalSerializationApi -import net.mamoe.mirai.qqandroid.utils.ByteArrayPool +import kotlin.jvm.JvmField + + +internal interface ChunkedFlowSession<T> : Closeable { + val flow: Flow<T> + override fun close() +} + +internal inline fun <T, R> ChunkedFlowSession<T>.map(crossinline mapper: suspend ChunkedFlowSession<T>.(T) -> R): ChunkedFlowSession<R> { + return object : ChunkedFlowSession<R> { + override val flow: Flow<R> = this@map.flow.map { this@map.mapper(it) } + override fun close() = this@map.close() + } +} /** @@ -34,13 +47,13 @@ internal class ChunkedInput( * * **注意**: 不要将他带出 [Flow.collect] 作用域, 否则将造成内存泄露 */ - val buffer: ByteArray, - internal var size: Int + @JvmField val buffer: ByteArray, + @JvmField internal var size: Int ) { /** * [buffer] 的有效大小 */ - val bufferSize: Int get() = size + inline val bufferSize: Int get() = size } /** @@ -51,7 +64,6 @@ internal class ChunkedInput( * * 若 [ByteReadPacket.remaining] 小于 [sizePerPacket], 将会返回唯一元素 [this] 的 [Sequence] */ -@OptIn(MiraiInternalAPI::class) internal fun ByteReadPacket.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> { ByteArrayPool.checkBufferSize(sizePerPacket) if (this.remaining <= sizePerPacket.toLong()) { @@ -81,7 +93,6 @@ internal fun ByteReadPacket.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> * 对于一个 1000 长度的 [ByteReadChannel] 和参数 [sizePerPacket] = 300, 将会产生含四个元素的 [Sequence], * 其长度分别为: 300, 300, 300, 100. */ -@OptIn(MiraiInternalAPI::class) internal fun ByteReadChannel.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> { ByteArrayPool.checkBufferSize(sizePerPacket) if (this.isClosedForRead) { @@ -105,7 +116,7 @@ internal fun ByteReadChannel.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> * 对于一个 1000 长度的 [Input] 和参数 [sizePerPacket] = 300, 将会产生含四个元素的 [Sequence], * 其长度分别为: 300, 300, 300, 100. */ -@OptIn(MiraiInternalAPI::class, ExperimentalCoroutinesApi::class) +@OptIn(ExperimentalCoroutinesApi::class) internal fun Input.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> { ByteArrayPool.checkBufferSize(sizePerPacket) @@ -132,9 +143,7 @@ internal fun Input.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> { * * 若 [ByteReadPacket.remaining] 小于 [sizePerPacket], 将会返回唯一元素 [this] 的 [Sequence] */ -@OptIn( - MiraiInternalAPI::class, ExperimentalCoroutinesApi::class, InternalSerializationApi::class -) +@OptIn(ExperimentalCoroutinesApi::class, InternalSerializationApi::class) internal fun InputStream.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> { ByteArrayPool.checkBufferSize(sizePerPacket) diff --git a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/internal/md5.common.kt b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/internal/md5.common.kt index 9c9864351..6a30b73ac 100644 --- a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/internal/md5.common.kt +++ b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/internal/md5.common.kt @@ -38,11 +38,11 @@ internal inline fun InputStream.readInSequence(block: (ByteArray, len: Int) -> U /** * 缓存 [ByteArray] 实例的 [ObjectPool] */ -internal object ByteArrayPool : DefaultPool<ByteArray>(32) { +internal object ByteArrayPool : DefaultPool<ByteArray>(256) { /** * 每一个 [ByteArray] 的大小 */ - const val BUFFER_SIZE: Int = 8192 + const val BUFFER_SIZE: Int = 8192 * 8 override fun produceInstance(): ByteArray = ByteArray(BUFFER_SIZE) diff --git a/mirai-core/src/jvmMain/kotlin/net/mamoe/mirai/utils/ExternalImageJvm.kt b/mirai-core/src/jvmMain/kotlin/net/mamoe/mirai/utils/ExternalImageJvm.kt index fc13c18d5..5f0e9746a 100644 --- a/mirai-core/src/jvmMain/kotlin/net/mamoe/mirai/utils/ExternalImageJvm.kt +++ b/mirai-core/src/jvmMain/kotlin/net/mamoe/mirai/utils/ExternalImageJvm.kt @@ -18,6 +18,7 @@ import kotlinx.io.core.Input import kotlinx.io.core.copyTo import kotlinx.io.errors.IOException import kotlinx.io.streams.asOutput +import net.mamoe.mirai.utils.internal.asReusableInput import net.mamoe.mirai.utils.internal.md5 import java.awt.image.BufferedImage import java.io.File @@ -63,7 +64,7 @@ fun BufferedImage.toExternalImage(formatName: String = "png"): ExternalImage { } @Suppress("DEPRECATION_ERROR") - return ExternalImage(digest.digest(), file.inputStream()) + return ExternalImage(file.asReusableInput()) } suspend inline fun BufferedImage.suspendToExternalImage(): ExternalImage = withContext(IO) { toExternalImage() } @@ -76,8 +77,7 @@ suspend inline fun BufferedImage.suspendToExternalImage(): ExternalImage = withC fun File.toExternalImage(): ExternalImage { @Suppress("DEPRECATION_ERROR") return ExternalImage( - md5 = this.inputStream().md5(), // dont change - input = this.inputStream() + input = this.asReusableInput() ) } diff --git a/mirai-core/src/jvmMain/kotlin/net/mamoe/mirai/utils/internal/asReusableInput.jvm.kt b/mirai-core/src/jvmMain/kotlin/net/mamoe/mirai/utils/internal/asReusableInput.jvm.kt index 22af57641..23ed10be3 100644 --- a/mirai-core/src/jvmMain/kotlin/net/mamoe/mirai/utils/internal/asReusableInput.jvm.kt +++ b/mirai-core/src/jvmMain/kotlin/net/mamoe/mirai/utils/internal/asReusableInput.jvm.kt @@ -1,18 +1,70 @@ package net.mamoe.mirai.utils.internal -import kotlinx.coroutines.io.ByteReadChannel -import kotlinx.io.core.Input +import io.ktor.utils.io.ByteWriteChannel +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.withContext +import net.mamoe.mirai.message.data.toLongUnsigned import net.mamoe.mirai.utils.ExternalImage +import java.io.File import java.io.InputStream -internal actual fun ByteReadChannel.asReusableInput(): ExternalImage.ReusableInput { - TODO("Not yet implemented") +internal actual fun ByteArray.asReusableInput(): ExternalImage.ReusableInput { + return object : ExternalImage.ReusableInput { + override val md5: ByteArray = md5() + override val size: Long get() = this@asReusableInput.size.toLongUnsigned() + + override fun chunkedFlow(sizePerPacket: Int): ChunkedFlowSession<ChunkedInput> { + return object : ChunkedFlowSession<ChunkedInput> { + override val flow: Flow<ChunkedInput> = inputStream().chunkedFlow(sizePerPacket) + + override fun close() { + // nothing to do + } + } + } + + override suspend fun writeTo(out: ByteWriteChannel): Long { + out.writeFully(this@asReusableInput, 0, this@asReusableInput.size) + out.flush() + return this@asReusableInput.size.toLongUnsigned() + } + } } -internal actual fun Input.asReusableInput(): ExternalImage.ReusableInput { - TODO("Not yet implemented") +internal fun File.asReusableInput(): ExternalImage.ReusableInput { + return object : ExternalImage.ReusableInput { + override val md5: ByteArray = inputStream().use { it.md5() } + override val size: Long get() = length() + + override fun chunkedFlow(sizePerPacket: Int): ChunkedFlowSession<ChunkedInput> { + val stream = inputStream() + return object : ChunkedFlowSession<ChunkedInput> { + override val flow: Flow<ChunkedInput> = stream.chunkedFlow(sizePerPacket) + override fun close() = stream.close() + } + } + + override suspend fun writeTo(out: ByteWriteChannel): Long { + return inputStream().use { it.copyTo(out) } + } + } } -internal actual fun InputStream.asReusableInput(): ExternalImage.ReusableInput { - TODO("Not yet implemented") + +private suspend fun InputStream.copyTo(out: ByteWriteChannel): Long = withContext(Dispatchers.IO) { + var bytesCopied: Long = 0 + + ByteArrayPool.useInstance { buffer -> + var bytes = read(buffer) + while (bytes >= 0) { + out.writeFully(buffer, 0, bytes) + bytesCopied += bytes + bytes = read(buffer) + } + } + + out.flush() + + return@withContext bytesCopied } \ No newline at end of file