mirror of
https://github.com/mamoe/mirai.git
synced 2025-03-24 14:30:09 +08:00
Implement new ExternalImage and image chunk strategy
This commit is contained in:
parent
b14bfde694
commit
6b5cbc3dd7
@ -50,7 +50,6 @@ import net.mamoe.mirai.qqandroid.network.protocol.packet.list.FriendList
|
||||
import net.mamoe.mirai.qqandroid.utils.MiraiPlatformUtils
|
||||
import net.mamoe.mirai.qqandroid.utils.encodeToString
|
||||
import net.mamoe.mirai.qqandroid.utils.io.serialization.toByteArray
|
||||
import net.mamoe.mirai.qqandroid.utils.toReadPacket
|
||||
import net.mamoe.mirai.utils.*
|
||||
import kotlin.collections.asSequence
|
||||
import kotlin.contracts.ExperimentalContracts
|
||||
@ -678,8 +677,8 @@ internal abstract class QQAndroidBotBase constructor(
|
||||
response.proto.uint32UpIp.zip(response.proto.uint32UpPort),
|
||||
response.proto.msgSig,
|
||||
MiraiPlatformUtils.md5(body),
|
||||
body.toReadPacket(),
|
||||
body.size.toLong().and(0xFFFF_FFFF), // don't use toLongUnsigned: Overload resolution ambiguity
|
||||
@Suppress("INVISIBLE_REFERENCE")
|
||||
net.mamoe.mirai.utils.internal.asReusableInput0(body), // don't use toLongUnsigned: Overload resolution ambiguity
|
||||
"group long message",
|
||||
27
|
||||
)
|
||||
|
@ -20,22 +20,24 @@ import io.ktor.utils.io.ByteWriteChannel
|
||||
import kotlinx.coroutines.InternalCoroutinesApi
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.io.ByteReadChannel
|
||||
import kotlinx.coroutines.isActive
|
||||
import kotlinx.coroutines.withTimeoutOrNull
|
||||
import kotlinx.io.InputStream
|
||||
import kotlinx.io.core.Input
|
||||
import kotlinx.io.core.discardExact
|
||||
import kotlinx.io.core.readAvailable
|
||||
import kotlinx.io.core.use
|
||||
import kotlinx.serialization.InternalSerializationApi
|
||||
import net.mamoe.mirai.qqandroid.QQAndroidBot
|
||||
import net.mamoe.mirai.qqandroid.network.QQAndroidClient
|
||||
import net.mamoe.mirai.qqandroid.network.protocol.data.proto.CSDataHighwayHead
|
||||
import net.mamoe.mirai.qqandroid.utils.*
|
||||
import net.mamoe.mirai.qqandroid.utils.PlatformSocket
|
||||
import net.mamoe.mirai.qqandroid.utils.SocketException
|
||||
import net.mamoe.mirai.qqandroid.utils.addSuppressedMirai
|
||||
import net.mamoe.mirai.qqandroid.utils.io.serialization.readProtoBuf
|
||||
import net.mamoe.mirai.qqandroid.utils.io.withUse
|
||||
import net.mamoe.mirai.utils.*
|
||||
import net.mamoe.mirai.qqandroid.utils.toIpV4AddressString
|
||||
import net.mamoe.mirai.utils.ExternalImage
|
||||
import net.mamoe.mirai.utils.MiraiExperimentalAPI
|
||||
import net.mamoe.mirai.utils.MiraiInternalAPI
|
||||
import net.mamoe.mirai.utils.verbose
|
||||
import kotlin.coroutines.EmptyCoroutineContext
|
||||
import kotlin.math.roundToInt
|
||||
import kotlin.time.ExperimentalTime
|
||||
@ -75,26 +77,8 @@ internal suspend fun HttpClient.postImage(
|
||||
|
||||
@OptIn(MiraiExperimentalAPI::class)
|
||||
override suspend fun writeTo(channel: ByteWriteChannel) {
|
||||
ByteArrayPool.useInstance { buffer: ByteArray ->
|
||||
when (imageInput) {
|
||||
is Input -> {
|
||||
var size: Int
|
||||
while (imageInput.readAvailable(buffer).also { size = it } > 0) {
|
||||
channel.writeFully(buffer, 0, size)
|
||||
channel.flush()
|
||||
}
|
||||
}
|
||||
is ByteReadChannel -> imageInput.copyAndClose(channel)
|
||||
is InputStream -> {
|
||||
var size: Int
|
||||
while (imageInput.read(buffer).also { size = it } > 0) {
|
||||
channel.writeFully(buffer, 0, size)
|
||||
channel.flush()
|
||||
}
|
||||
}
|
||||
else -> error("unsupported imageInput: ${imageInput::class.simpleName}")
|
||||
}
|
||||
}
|
||||
imageInput.writeTo(channel)
|
||||
|
||||
}
|
||||
}
|
||||
} == HttpStatusCode.OK
|
||||
@ -109,26 +93,26 @@ internal object HighwayHelper {
|
||||
image: ExternalImage.ReusableInput,
|
||||
kind: String,
|
||||
commandId: Int
|
||||
) = uploadImageToServers(bot, servers, uKey, image.md5, image.input(), image.size, kind, commandId)
|
||||
) = uploadImageToServers(bot, servers, uKey, image.md5, image, kind, commandId)
|
||||
|
||||
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
|
||||
@OptIn(ExperimentalTime::class)
|
||||
suspend fun uploadImageToServers(
|
||||
bot: QQAndroidBot,
|
||||
servers: List<Pair<Int, Int>>,
|
||||
uKey: ByteArray,
|
||||
md5: ByteArray,
|
||||
input: Any,
|
||||
inputSize: Long,
|
||||
input: ExternalImage.ReusableInput,
|
||||
kind: String,
|
||||
commandId: Int
|
||||
) = servers.retryWithServers(
|
||||
(inputSize * 1000 / 1024 / 10).coerceAtLeast(5000),
|
||||
(input.size * 1000 / 1024 / 10).coerceAtLeast(5000),
|
||||
onFail = {
|
||||
throw IllegalStateException("cannot upload $kind, failed on all servers.", it)
|
||||
}
|
||||
) { ip, port ->
|
||||
bot.network.logger.verbose {
|
||||
"[Highway] Uploading $kind to ${ip}:$port, size=${inputSize.sizeToString()}"
|
||||
"[Highway] Uploading $kind to ${ip}:$port, size=${input.size.sizeToString()}"
|
||||
}
|
||||
|
||||
val time = measureTime {
|
||||
@ -137,7 +121,6 @@ internal object HighwayHelper {
|
||||
serverIp = ip,
|
||||
serverPort = port,
|
||||
imageInput = input,
|
||||
inputSize = inputSize,
|
||||
fileMd5 = md5,
|
||||
ticket = uKey,
|
||||
commandId = commandId
|
||||
@ -145,22 +128,21 @@ internal object HighwayHelper {
|
||||
}
|
||||
|
||||
bot.network.logger.verbose {
|
||||
"[Highway] Uploading $kind: succeed at ${(inputSize.toDouble() / 1024 / time.inSeconds).roundToInt()} KiB/s"
|
||||
"[Highway] Uploading $kind: succeed at ${(input.size.toDouble() / 1024 / time.inSeconds).roundToInt()} KiB/s"
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
|
||||
@OptIn(InternalCoroutinesApi::class)
|
||||
internal suspend fun uploadImage(
|
||||
client: QQAndroidClient,
|
||||
serverIp: String,
|
||||
serverPort: Int,
|
||||
ticket: ByteArray,
|
||||
imageInput: Any,
|
||||
inputSize: Long,
|
||||
imageInput: ExternalImage.ReusableInput,
|
||||
fileMd5: ByteArray,
|
||||
commandId: Int // group=2, friend=1
|
||||
) {
|
||||
require(imageInput is Input || imageInput is InputStream || imageInput is ByteReadChannel) { "unsupported imageInput: ${imageInput::class.simpleName}" }
|
||||
require(fileMd5.size == 16) { "bad md5. Required size=16, got ${fileMd5.size}" }
|
||||
// require(ticket.size == 128) { "bad uKey. Required size=128, got ${ticket.size}" }
|
||||
// require(commandId == 2 || commandId == 1) { "bad commandId. Must be 1 or 2" }
|
||||
@ -181,18 +163,19 @@ internal object HighwayHelper {
|
||||
commandId = commandId,
|
||||
ticket = ticket,
|
||||
data = imageInput,
|
||||
dataSize = inputSize,
|
||||
fileMd5 = fileMd5
|
||||
).collect {
|
||||
socket.send(it)
|
||||
//0A 3C 08 01 12 0A 31 39 39 34 37 30 31 30 32 31 1A 0C 50 69 63 55 70 2E 44 61 74 61 55 70 20 E9 A7 05 28 00 30 BD DB 8B 80 02 38 80 20 40 02 4A 0A 38 2E 32 2E 30 2E 31 32 39 36 50 84 10 12 3D 08 00 10 FD 08 18 00 20 FD 08 28 C6 01 38 00 42 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 4A 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 50 89 92 A2 FB 06 58 00 60 00 18 53 20 01 28 00 30 04 3A 00 40 E6 B7 F7 D9 80 2E 48 00 50 00
|
||||
).withUse {
|
||||
flow.collect {
|
||||
socket.send(it)
|
||||
//0A 3C 08 01 12 0A 31 39 39 34 37 30 31 30 32 31 1A 0C 50 69 63 55 70 2E 44 61 74 61 55 70 20 E9 A7 05 28 00 30 BD DB 8B 80 02 38 80 20 40 02 4A 0A 38 2E 32 2E 30 2E 31 32 39 36 50 84 10 12 3D 08 00 10 FD 08 18 00 20 FD 08 28 C6 01 38 00 42 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 4A 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 50 89 92 A2 FB 06 58 00 60 00 18 53 20 01 28 00 30 04 3A 00 40 E6 B7 F7 D9 80 2E 48 00 50 00
|
||||
|
||||
socket.read().withUse {
|
||||
discardExact(1)
|
||||
val headLength = readInt()
|
||||
discardExact(4)
|
||||
val proto = readProtoBuf(CSDataHighwayHead.RspDataHighwayHead.serializer(), length = headLength)
|
||||
check(proto.errorCode == 0) { "highway transfer failed, error ${proto.errorCode}" }
|
||||
socket.read().withUse {
|
||||
discardExact(1)
|
||||
val headLength = readInt()
|
||||
discardExact(4)
|
||||
val proto = readProtoBuf(CSDataHighwayHead.RspDataHighwayHead.serializer(), length = headLength)
|
||||
check(proto.errorCode == 0) { "highway transfer failed, error ${proto.errorCode}" }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -11,12 +11,7 @@
|
||||
|
||||
package net.mamoe.mirai.qqandroid.network.highway
|
||||
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.io.ByteReadChannel
|
||||
import kotlinx.io.InputStream
|
||||
import kotlinx.io.core.ByteReadPacket
|
||||
import kotlinx.io.core.Input
|
||||
import kotlinx.io.core.buildPacket
|
||||
import kotlinx.io.core.writeFully
|
||||
import kotlinx.serialization.InternalSerializationApi
|
||||
@ -25,9 +20,12 @@ import net.mamoe.mirai.qqandroid.network.protocol.data.proto.CSDataHighwayHead
|
||||
import net.mamoe.mirai.qqandroid.network.protocol.packet.EMPTY_BYTE_ARRAY
|
||||
import net.mamoe.mirai.qqandroid.utils.ByteArrayPool
|
||||
import net.mamoe.mirai.qqandroid.utils.MiraiPlatformUtils
|
||||
import net.mamoe.mirai.qqandroid.utils.io.chunkedFlow
|
||||
import net.mamoe.mirai.qqandroid.utils.io.serialization.toByteArray
|
||||
import net.mamoe.mirai.utils.ExternalImage
|
||||
import net.mamoe.mirai.utils.MiraiInternalAPI
|
||||
import net.mamoe.mirai.utils.internal.ChunkedFlowSession
|
||||
import net.mamoe.mirai.utils.internal.ChunkedInput
|
||||
import net.mamoe.mirai.utils.internal.map
|
||||
|
||||
@OptIn(MiraiInternalAPI::class, InternalSerializationApi::class)
|
||||
internal fun createImageDataPacketSequence(
|
||||
@ -39,27 +37,17 @@ internal fun createImageDataPacketSequence(
|
||||
commandId: Int,
|
||||
localId: Int = 2052,
|
||||
ticket: ByteArray,
|
||||
|
||||
data: Any,
|
||||
dataSize: Long,
|
||||
data: ExternalImage.ReusableInput,
|
||||
fileMd5: ByteArray,
|
||||
sizePerPacket: Int = ByteArrayPool.BUFFER_SIZE
|
||||
): Flow<ByteReadPacket> {
|
||||
): ChunkedFlowSession<ByteReadPacket> {
|
||||
ByteArrayPool.checkBufferSize(sizePerPacket)
|
||||
require(data is Input || data is InputStream || data is ByteReadChannel) { "unsupported data: ${data::class.simpleName}" }
|
||||
// require(ticket.size == 128) { "bad uKey. Required size=128, got ${ticket.size}" }
|
||||
require(data !is ByteReadPacket) { "bad input. given dataSize=$dataSize, but actual readRemaining=${(data as ByteReadPacket).remaining}" }
|
||||
|
||||
val flow = when (data) {
|
||||
is ByteReadPacket -> data.chunkedFlow(sizePerPacket)
|
||||
is Input -> data.chunkedFlow(sizePerPacket)
|
||||
is ByteReadChannel -> data.chunkedFlow(sizePerPacket)
|
||||
is InputStream -> data.chunkedFlow(sizePerPacket)
|
||||
else -> error("unreachable code")
|
||||
}
|
||||
val session: ChunkedFlowSession<ChunkedInput> = data.chunkedFlow(sizePerPacket)
|
||||
|
||||
var offset = 0L
|
||||
return flow.map { chunkedInput ->
|
||||
return session.map { chunkedInput ->
|
||||
buildPacket {
|
||||
val head = CSDataHighwayHead.ReqDataHighwayHead(
|
||||
msgBasehead = CSDataHighwayHead.DataHighwayHead(
|
||||
@ -82,7 +70,7 @@ internal fun createImageDataPacketSequence(
|
||||
// cacheAddr = 812157193,
|
||||
datalength = chunkedInput.bufferSize,
|
||||
dataoffset = offset,
|
||||
filesize = dataSize,
|
||||
filesize = data.size,
|
||||
serviceticket = ticket,
|
||||
md5 = MiraiPlatformUtils.md5(chunkedInput.buffer, 0, chunkedInput.bufferSize),
|
||||
fileMd5 = fileMd5,
|
||||
|
@ -197,6 +197,7 @@ internal object KnownPacketFactories {
|
||||
|
||||
readString(readInt() - 4)// uinAccount
|
||||
|
||||
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
|
||||
ByteArrayPool.useInstance(this.remaining.toInt()) { data ->
|
||||
val size = this.readAvailable(data)
|
||||
|
||||
@ -385,6 +386,7 @@ internal object KnownPacketFactories {
|
||||
}
|
||||
0 -> {
|
||||
val data = if (bot.client.loginState == 0) {
|
||||
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
|
||||
ByteArrayPool.useInstance(this.remaining.toInt()) { byteArrayBuffer ->
|
||||
val size = (this.remaining - 1).toInt()
|
||||
this.readFully(byteArrayBuffer, 0, size)
|
||||
|
@ -70,6 +70,7 @@ internal class ConfigPushSvc {
|
||||
|
||||
|
||||
override suspend fun ByteReadPacket.decode(bot: QQAndroidBot, sequenceId: Int): PushReqResponse? {
|
||||
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
|
||||
ByteArrayPool.useInstance(this.remaining.toInt()) { buffer ->
|
||||
val length = this.readAvailable(buffer)
|
||||
|
||||
|
@ -1,41 +1,9 @@
|
||||
package net.mamoe.mirai.qqandroid.utils
|
||||
|
||||
import kotlinx.io.pool.DefaultPool
|
||||
import kotlinx.io.pool.ObjectPool
|
||||
|
||||
/**
|
||||
* 缓存 [ByteArray] 实例的 [ObjectPool]
|
||||
*/
|
||||
internal object ByteArrayPool : DefaultPool<ByteArray>(256) {
|
||||
/**
|
||||
* 每一个 [ByteArray] 的大小
|
||||
*/
|
||||
const val BUFFER_SIZE: Int = 8192 * 8
|
||||
|
||||
override fun produceInstance(): ByteArray = ByteArray(BUFFER_SIZE)
|
||||
|
||||
override fun clearInstance(instance: ByteArray): ByteArray = instance
|
||||
|
||||
fun checkBufferSize(size: Int) {
|
||||
require(size <= BUFFER_SIZE) { "sizePerPacket is too large. Maximum buffer size=$BUFFER_SIZE" }
|
||||
}
|
||||
|
||||
fun checkBufferSize(size: Long) {
|
||||
require(size <= BUFFER_SIZE) { "sizePerPacket is too large. Maximum buffer size=$BUFFER_SIZE" }
|
||||
}
|
||||
|
||||
/**
|
||||
* 请求一个大小至少为 [requestedSize] 的 [ByteArray] 实例.
|
||||
*/ // 不要写为扩展函数. 它需要优先于 kotlinx.io 的扩展函数 resolve
|
||||
inline fun <R> useInstance(requestedSize: Int = 0, block: (ByteArray) -> R): R {
|
||||
if (requestedSize > BUFFER_SIZE) {
|
||||
return ByteArray(requestedSize).run(block)
|
||||
}
|
||||
val instance = borrow()
|
||||
try {
|
||||
return block(instance)
|
||||
} finally {
|
||||
recycle(instance)
|
||||
}
|
||||
}
|
||||
}
|
||||
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
|
||||
internal typealias ByteArrayPool = net.mamoe.mirai.utils.internal.ByteArrayPool
|
@ -27,6 +27,7 @@ import kotlin.jvm.JvmMultifileClass
|
||||
import kotlin.jvm.JvmName
|
||||
import kotlin.jvm.JvmSynthetic
|
||||
|
||||
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
|
||||
internal inline fun <R> ByteReadPacket.useBytes(
|
||||
n: Int = remaining.toInt(),//not that safe but adequate
|
||||
block: (data: ByteArray, length: Int) -> R
|
||||
|
@ -11,18 +11,15 @@
|
||||
|
||||
package net.mamoe.mirai.utils
|
||||
|
||||
import kotlinx.coroutines.io.ByteReadChannel
|
||||
import kotlinx.io.InputStream
|
||||
import kotlinx.io.core.ByteReadPacket
|
||||
import kotlinx.io.core.Input
|
||||
import kotlinx.serialization.InternalSerializationApi
|
||||
import io.ktor.utils.io.ByteWriteChannel
|
||||
import net.mamoe.mirai.contact.Contact
|
||||
import net.mamoe.mirai.contact.Group
|
||||
import net.mamoe.mirai.contact.User
|
||||
import net.mamoe.mirai.message.MessageReceipt
|
||||
import net.mamoe.mirai.message.data.Image
|
||||
import net.mamoe.mirai.message.data.sendTo
|
||||
import net.mamoe.mirai.utils.internal.asReusableInput
|
||||
import net.mamoe.mirai.utils.internal.ChunkedFlowSession
|
||||
import net.mamoe.mirai.utils.internal.ChunkedInput
|
||||
import kotlin.jvm.JvmField
|
||||
import kotlin.jvm.JvmSynthetic
|
||||
|
||||
@ -36,43 +33,20 @@ import kotlin.jvm.JvmSynthetic
|
||||
*/
|
||||
@OptIn(MiraiInternalAPI::class)
|
||||
class ExternalImage internal constructor(
|
||||
val md5: ByteArray,
|
||||
@JvmField
|
||||
internal val input: ReusableInput // Input from kotlinx.io, InputStream from kotlinx.io MPP, ByteReadChannel from ktor
|
||||
) {
|
||||
val md5: ByteArray get() = this.input.md5
|
||||
|
||||
@SinceMirai("1.0.0")
|
||||
internal interface ReusableInput {
|
||||
val md5: ByteArray
|
||||
val size: Long
|
||||
|
||||
/**
|
||||
* Create a input for once usage
|
||||
*/
|
||||
fun input(): Input
|
||||
fun chunkedFlow(sizePerPacket: Int): ChunkedFlowSession<ChunkedInput>
|
||||
suspend fun writeTo(out: ByteWriteChannel): Long
|
||||
}
|
||||
|
||||
|
||||
constructor(
|
||||
md5: ByteArray,
|
||||
input: ByteReadChannel
|
||||
) : this(md5, input.asReusableInput())
|
||||
|
||||
constructor(
|
||||
md5: ByteArray,
|
||||
input: Input
|
||||
) : this(md5, input.asReusableInput())
|
||||
|
||||
constructor(
|
||||
md5: ByteArray,
|
||||
input: ByteReadPacket
|
||||
) : this(md5, input.asReusableInput())
|
||||
|
||||
@OptIn(InternalSerializationApi::class)
|
||||
constructor(
|
||||
md5: ByteArray,
|
||||
input: InputStream
|
||||
) : this(md5, input.asReusableInput())
|
||||
|
||||
init {
|
||||
require(input.size < 30L * 1024 * 1024) { "Image file is too big. Maximum is 30 MiB, but recommended to be 20 MiB" }
|
||||
}
|
||||
|
@ -9,15 +9,9 @@
|
||||
|
||||
package net.mamoe.mirai.utils.internal
|
||||
|
||||
import kotlinx.coroutines.io.ByteReadChannel
|
||||
import kotlinx.io.InputStream
|
||||
import kotlinx.io.core.Input
|
||||
import kotlinx.serialization.InternalSerializationApi
|
||||
import net.mamoe.mirai.utils.ExternalImage
|
||||
|
||||
|
||||
internal expect fun ByteReadChannel.asReusableInput(): ExternalImage.ReusableInput
|
||||
internal expect fun Input.asReusableInput(): ExternalImage.ReusableInput
|
||||
internal expect fun ByteArray.asReusableInput(): ExternalImage.ReusableInput
|
||||
|
||||
@OptIn(InternalSerializationApi::class)
|
||||
internal expect fun InputStream.asReusableInput(): ExternalImage.ReusableInput
|
||||
internal fun asReusableInput0(input: ByteArray): ExternalImage.ReusableInput = input.asReusableInput()
|
@ -7,20 +7,33 @@
|
||||
* https://github.com/mamoe/mirai/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package net.mamoe.mirai.qqandroid.utils.io
|
||||
package net.mamoe.mirai.utils.internal
|
||||
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import kotlinx.coroutines.flow.flowOf
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.io.ByteReadChannel
|
||||
import kotlinx.io.InputStream
|
||||
import kotlinx.io.core.ByteReadPacket
|
||||
import kotlinx.io.core.Closeable
|
||||
import kotlinx.io.core.Input
|
||||
import kotlinx.io.pool.useInstance
|
||||
import net.mamoe.mirai.utils.MiraiInternalAPI
|
||||
import kotlinx.serialization.InternalSerializationApi
|
||||
import net.mamoe.mirai.qqandroid.utils.ByteArrayPool
|
||||
import kotlin.jvm.JvmField
|
||||
|
||||
|
||||
internal interface ChunkedFlowSession<T> : Closeable {
|
||||
val flow: Flow<T>
|
||||
override fun close()
|
||||
}
|
||||
|
||||
internal inline fun <T, R> ChunkedFlowSession<T>.map(crossinline mapper: suspend ChunkedFlowSession<T>.(T) -> R): ChunkedFlowSession<R> {
|
||||
return object : ChunkedFlowSession<R> {
|
||||
override val flow: Flow<R> = this@map.flow.map { this@map.mapper(it) }
|
||||
override fun close() = this@map.close()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
@ -34,13 +47,13 @@ internal class ChunkedInput(
|
||||
*
|
||||
* **注意**: 不要将他带出 [Flow.collect] 作用域, 否则将造成内存泄露
|
||||
*/
|
||||
val buffer: ByteArray,
|
||||
internal var size: Int
|
||||
@JvmField val buffer: ByteArray,
|
||||
@JvmField internal var size: Int
|
||||
) {
|
||||
/**
|
||||
* [buffer] 的有效大小
|
||||
*/
|
||||
val bufferSize: Int get() = size
|
||||
inline val bufferSize: Int get() = size
|
||||
}
|
||||
|
||||
/**
|
||||
@ -51,7 +64,6 @@ internal class ChunkedInput(
|
||||
*
|
||||
* 若 [ByteReadPacket.remaining] 小于 [sizePerPacket], 将会返回唯一元素 [this] 的 [Sequence]
|
||||
*/
|
||||
@OptIn(MiraiInternalAPI::class)
|
||||
internal fun ByteReadPacket.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> {
|
||||
ByteArrayPool.checkBufferSize(sizePerPacket)
|
||||
if (this.remaining <= sizePerPacket.toLong()) {
|
||||
@ -81,7 +93,6 @@ internal fun ByteReadPacket.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput>
|
||||
* 对于一个 1000 长度的 [ByteReadChannel] 和参数 [sizePerPacket] = 300, 将会产生含四个元素的 [Sequence],
|
||||
* 其长度分别为: 300, 300, 300, 100.
|
||||
*/
|
||||
@OptIn(MiraiInternalAPI::class)
|
||||
internal fun ByteReadChannel.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> {
|
||||
ByteArrayPool.checkBufferSize(sizePerPacket)
|
||||
if (this.isClosedForRead) {
|
||||
@ -105,7 +116,7 @@ internal fun ByteReadChannel.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput>
|
||||
* 对于一个 1000 长度的 [Input] 和参数 [sizePerPacket] = 300, 将会产生含四个元素的 [Sequence],
|
||||
* 其长度分别为: 300, 300, 300, 100.
|
||||
*/
|
||||
@OptIn(MiraiInternalAPI::class, ExperimentalCoroutinesApi::class)
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
internal fun Input.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> {
|
||||
ByteArrayPool.checkBufferSize(sizePerPacket)
|
||||
|
||||
@ -132,9 +143,7 @@ internal fun Input.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> {
|
||||
*
|
||||
* 若 [ByteReadPacket.remaining] 小于 [sizePerPacket], 将会返回唯一元素 [this] 的 [Sequence]
|
||||
*/
|
||||
@OptIn(
|
||||
MiraiInternalAPI::class, ExperimentalCoroutinesApi::class, InternalSerializationApi::class
|
||||
)
|
||||
@OptIn(ExperimentalCoroutinesApi::class, InternalSerializationApi::class)
|
||||
internal fun InputStream.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> {
|
||||
ByteArrayPool.checkBufferSize(sizePerPacket)
|
||||
|
@ -38,11 +38,11 @@ internal inline fun InputStream.readInSequence(block: (ByteArray, len: Int) -> U
|
||||
/**
|
||||
* 缓存 [ByteArray] 实例的 [ObjectPool]
|
||||
*/
|
||||
internal object ByteArrayPool : DefaultPool<ByteArray>(32) {
|
||||
internal object ByteArrayPool : DefaultPool<ByteArray>(256) {
|
||||
/**
|
||||
* 每一个 [ByteArray] 的大小
|
||||
*/
|
||||
const val BUFFER_SIZE: Int = 8192
|
||||
const val BUFFER_SIZE: Int = 8192 * 8
|
||||
|
||||
override fun produceInstance(): ByteArray = ByteArray(BUFFER_SIZE)
|
||||
|
||||
|
@ -18,6 +18,7 @@ import kotlinx.io.core.Input
|
||||
import kotlinx.io.core.copyTo
|
||||
import kotlinx.io.errors.IOException
|
||||
import kotlinx.io.streams.asOutput
|
||||
import net.mamoe.mirai.utils.internal.asReusableInput
|
||||
import net.mamoe.mirai.utils.internal.md5
|
||||
import java.awt.image.BufferedImage
|
||||
import java.io.File
|
||||
@ -63,7 +64,7 @@ fun BufferedImage.toExternalImage(formatName: String = "png"): ExternalImage {
|
||||
}
|
||||
|
||||
@Suppress("DEPRECATION_ERROR")
|
||||
return ExternalImage(digest.digest(), file.inputStream())
|
||||
return ExternalImage(file.asReusableInput())
|
||||
}
|
||||
|
||||
suspend inline fun BufferedImage.suspendToExternalImage(): ExternalImage = withContext(IO) { toExternalImage() }
|
||||
@ -76,8 +77,7 @@ suspend inline fun BufferedImage.suspendToExternalImage(): ExternalImage = withC
|
||||
fun File.toExternalImage(): ExternalImage {
|
||||
@Suppress("DEPRECATION_ERROR")
|
||||
return ExternalImage(
|
||||
md5 = this.inputStream().md5(), // dont change
|
||||
input = this.inputStream()
|
||||
input = this.asReusableInput()
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -1,18 +1,70 @@
|
||||
package net.mamoe.mirai.utils.internal
|
||||
|
||||
import kotlinx.coroutines.io.ByteReadChannel
|
||||
import kotlinx.io.core.Input
|
||||
import io.ktor.utils.io.ByteWriteChannel
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.withContext
|
||||
import net.mamoe.mirai.message.data.toLongUnsigned
|
||||
import net.mamoe.mirai.utils.ExternalImage
|
||||
import java.io.File
|
||||
import java.io.InputStream
|
||||
|
||||
internal actual fun ByteReadChannel.asReusableInput(): ExternalImage.ReusableInput {
|
||||
TODO("Not yet implemented")
|
||||
internal actual fun ByteArray.asReusableInput(): ExternalImage.ReusableInput {
|
||||
return object : ExternalImage.ReusableInput {
|
||||
override val md5: ByteArray = md5()
|
||||
override val size: Long get() = this@asReusableInput.size.toLongUnsigned()
|
||||
|
||||
override fun chunkedFlow(sizePerPacket: Int): ChunkedFlowSession<ChunkedInput> {
|
||||
return object : ChunkedFlowSession<ChunkedInput> {
|
||||
override val flow: Flow<ChunkedInput> = inputStream().chunkedFlow(sizePerPacket)
|
||||
|
||||
override fun close() {
|
||||
// nothing to do
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun writeTo(out: ByteWriteChannel): Long {
|
||||
out.writeFully(this@asReusableInput, 0, this@asReusableInput.size)
|
||||
out.flush()
|
||||
return this@asReusableInput.size.toLongUnsigned()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal actual fun Input.asReusableInput(): ExternalImage.ReusableInput {
|
||||
TODO("Not yet implemented")
|
||||
internal fun File.asReusableInput(): ExternalImage.ReusableInput {
|
||||
return object : ExternalImage.ReusableInput {
|
||||
override val md5: ByteArray = inputStream().use { it.md5() }
|
||||
override val size: Long get() = length()
|
||||
|
||||
override fun chunkedFlow(sizePerPacket: Int): ChunkedFlowSession<ChunkedInput> {
|
||||
val stream = inputStream()
|
||||
return object : ChunkedFlowSession<ChunkedInput> {
|
||||
override val flow: Flow<ChunkedInput> = stream.chunkedFlow(sizePerPacket)
|
||||
override fun close() = stream.close()
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun writeTo(out: ByteWriteChannel): Long {
|
||||
return inputStream().use { it.copyTo(out) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal actual fun InputStream.asReusableInput(): ExternalImage.ReusableInput {
|
||||
TODO("Not yet implemented")
|
||||
|
||||
private suspend fun InputStream.copyTo(out: ByteWriteChannel): Long = withContext(Dispatchers.IO) {
|
||||
var bytesCopied: Long = 0
|
||||
|
||||
ByteArrayPool.useInstance { buffer ->
|
||||
var bytes = read(buffer)
|
||||
while (bytes >= 0) {
|
||||
out.writeFully(buffer, 0, bytes)
|
||||
bytesCopied += bytes
|
||||
bytes = read(buffer)
|
||||
}
|
||||
}
|
||||
|
||||
out.flush()
|
||||
|
||||
return@withContext bytesCopied
|
||||
}
|
Loading…
Reference in New Issue
Block a user