Fix image uploading

This commit is contained in:
Him188 2020-02-28 20:23:09 +08:00
parent cfbf4f8ccc
commit e2170175c9
4 changed files with 27 additions and 26 deletions

View File

@ -602,15 +602,15 @@ internal class GroupImpl(
).also { ImageUploadEvent.Succeed(this@GroupImpl, image, it).broadcast() } ).also { ImageUploadEvent.Succeed(this@GroupImpl, image, it).broadcast() }
} }
is ImgStore.GroupPicUp.Response.RequireUpload -> { is ImgStore.GroupPicUp.Response.RequireUpload -> {
// 每 100KB 等 1 秒 // 每 10KB 等 1 秒
withTimeoutOrNull(image.inputSize / 1024 / 100) { withTimeoutOrNull(image.inputSize * 1000 / 1024 / 10) {
HighwayHelper.uploadImage( HighwayHelper.uploadImage(
client = bot.client, client = bot.client,
serverIp = response.uploadIpList.first().toIpV4AddressString(), serverIp = response.uploadIpList.first().toIpV4AddressString(),
serverPort = response.uploadPortList.first(), serverPort = response.uploadPortList.first(),
imageInput = image.input, imageInput = image.input,
inputSize = image.inputSize.toInt(), inputSize = image.inputSize.toInt(),
md5 = image.md5, fileMd5 = image.md5,
uKey = response.uKey, uKey = response.uKey,
commandId = 2 commandId = 2
) )

View File

@ -100,11 +100,11 @@ internal object HighwayHelper {
uKey: ByteArray, uKey: ByteArray,
imageInput: Any, imageInput: Any,
inputSize: Int, inputSize: Int,
md5: ByteArray, fileMd5: ByteArray,
commandId: Int // group=2, friend=1 commandId: Int // group=2, friend=1
) { ) {
require(imageInput is Input || imageInput is InputStream || imageInput is ByteReadChannel) { "unsupported imageInput: ${imageInput::class.simpleName}" } require(imageInput is Input || imageInput is InputStream || imageInput is ByteReadChannel) { "unsupported imageInput: ${imageInput::class.simpleName}" }
require(md5.size == 16) { "bad md5. Required size=16, got ${md5.size}" } require(fileMd5.size == 16) { "bad md5. Required size=16, got ${fileMd5.size}" }
require(uKey.size == 128) { "bad uKey. Required size=128, got ${uKey.size}" } require(uKey.size == 128) { "bad uKey. Required size=128, got ${uKey.size}" }
require(commandId == 2 || commandId == 1) { "bad commandId. Must be 1 or 2" } require(commandId == 2 || commandId == 1) { "bad commandId. Must be 1 or 2" }
@ -112,18 +112,16 @@ internal object HighwayHelper {
socket.connect(serverIp, serverPort) socket.connect(serverIp, serverPort)
socket.use { socket.use {
createImageDataPacketSequence( createImageDataPacketSequence(
uin = client.uin, client = client,
command = "PicUp.DataUp", command = "PicUp.DataUp",
sequenceId =
if (commandId == 2) client.nextHighwayDataTransSequenceIdForGroup()
else client.nextHighwayDataTransSequenceIdForFriend(),
commandId = commandId, commandId = commandId,
uKey = uKey, uKey = uKey,
data = imageInput, data = imageInput,
dataSize = inputSize, dataSize = inputSize,
md5 = md5 fileMd5 = fileMd5
).collect { ).collect {
socket.send(it) socket.send(it)
println("sent")
//0A 3C 08 01 12 0A 31 39 39 34 37 30 31 30 32 31 1A 0C 50 69 63 55 70 2E 44 61 74 61 55 70 20 E9 A7 05 28 00 30 BD DB 8B 80 02 38 80 20 40 02 4A 0A 38 2E 32 2E 30 2E 31 32 39 36 50 84 10 12 3D 08 00 10 FD 08 18 00 20 FD 08 28 C6 01 38 00 42 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 4A 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 50 89 92 A2 FB 06 58 00 60 00 18 53 20 01 28 00 30 04 3A 00 40 E6 B7 F7 D9 80 2E 48 00 50 00 //0A 3C 08 01 12 0A 31 39 39 34 37 30 31 30 32 31 1A 0C 50 69 63 55 70 2E 44 61 74 61 55 70 20 E9 A7 05 28 00 30 BD DB 8B 80 02 38 80 20 40 02 4A 0A 38 2E 32 2E 30 2E 31 32 39 36 50 84 10 12 3D 08 00 10 FD 08 18 00 20 FD 08 28 C6 01 38 00 42 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 4A 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 50 89 92 A2 FB 06 58 00 60 00 18 53 20 01 28 00 30 04 3A 00 40 E6 B7 F7 D9 80 2E 48 00 50 00
socket.read().withUse { socket.read().withUse {
discardExact(1) discardExact(1)
@ -132,6 +130,7 @@ internal object HighwayHelper {
val proto = readProtoBuf(CSDataHighwayHead.RspDataHighwayHead.serializer(), length = headLength) val proto = readProtoBuf(CSDataHighwayHead.RspDataHighwayHead.serializer(), length = headLength)
check(proto.errorCode == 0) { "image upload failed: Transfer errno=${proto.errorCode}" } check(proto.errorCode == 0) { "image upload failed: Transfer errno=${proto.errorCode}" }
} }
println("read")
} }
} }
} }

View File

@ -17,6 +17,7 @@ import kotlinx.coroutines.flow.map
import kotlinx.io.InputStream import kotlinx.io.InputStream
import kotlinx.io.core.* import kotlinx.io.core.*
import net.mamoe.mirai.qqandroid.io.serialization.toByteArray import net.mamoe.mirai.qqandroid.io.serialization.toByteArray
import net.mamoe.mirai.qqandroid.network.QQAndroidClient
import net.mamoe.mirai.qqandroid.network.protocol.data.proto.CSDataHighwayHead import net.mamoe.mirai.qqandroid.network.protocol.data.proto.CSDataHighwayHead
import net.mamoe.mirai.qqandroid.network.protocol.packet.EMPTY_BYTE_ARRAY import net.mamoe.mirai.qqandroid.network.protocol.packet.EMPTY_BYTE_ARRAY
import net.mamoe.mirai.utils.MiraiInternalAPI import net.mamoe.mirai.utils.MiraiInternalAPI
@ -24,9 +25,8 @@ import net.mamoe.mirai.utils.io.*
@UseExperimental(MiraiInternalAPI::class) @UseExperimental(MiraiInternalAPI::class)
internal fun createImageDataPacketSequence( // RequestDataTrans internal fun createImageDataPacketSequence( // RequestDataTrans
uin: Long, client: QQAndroidClient,
command: String, command: String,
sequenceId: Int,
appId: Int = 537062845, appId: Int = 537062845,
dataFlag: Int = 4096, dataFlag: Int = 4096,
commandId: Int, commandId: Int,
@ -35,9 +35,10 @@ internal fun createImageDataPacketSequence( // RequestDataTrans
data: Any, data: Any,
dataSize: Int, dataSize: Int,
md5: ByteArray, fileMd5: ByteArray,
sizePerPacket: Int = 8192 sizePerPacket: Int = 8000
): Flow<ByteReadPacket> { ): Flow<ByteReadPacket> {
ByteArrayPool.checkBufferSize(sizePerPacket)
require(data is Input || data is InputStream || data is ByteReadChannel) { "unsupported data: ${data::class.simpleName}" } require(data is Input || data is InputStream || data is ByteReadChannel) { "unsupported data: ${data::class.simpleName}" }
require(uKey.size == 128) { "bad uKey. Required size=128, got ${uKey.size}" } require(uKey.size == 128) { "bad uKey. Required size=128, got ${uKey.size}" }
require(data !is ByteReadPacket || data.remaining.toInt() == dataSize) { "bad input. given dataSize=$dataSize, but actual readRemaining=${(data as ByteReadPacket).remaining}" } require(data !is ByteReadPacket || data.remaining.toInt() == dataSize) { "bad input. given dataSize=$dataSize, but actual readRemaining=${(data as ByteReadPacket).remaining}" }
@ -56,9 +57,10 @@ internal fun createImageDataPacketSequence( // RequestDataTrans
val head = CSDataHighwayHead.ReqDataHighwayHead( val head = CSDataHighwayHead.ReqDataHighwayHead(
msgBasehead = CSDataHighwayHead.DataHighwayHead( msgBasehead = CSDataHighwayHead.DataHighwayHead(
version = 1, version = 1,
uin = uin.toString(), uin = client.uin.toString(),
command = command, command = command,
seq = sequenceId, seq = if (commandId == 2) client.nextHighwayDataTransSequenceIdForGroup()
else client.nextHighwayDataTransSequenceIdForFriend(),
retryTimes = 0, retryTimes = 0,
appid = appId, appid = appId,
dataflag = dataFlag, dataflag = dataFlag,
@ -66,11 +68,11 @@ internal fun createImageDataPacketSequence( // RequestDataTrans
localeId = localId localeId = localId
), ),
msgSeghead = CSDataHighwayHead.SegHead( msgSeghead = CSDataHighwayHead.SegHead(
datalength = dataSize, datalength = chunkedInput.bufferSize,
filesize = dataSize.toLong(), filesize = dataSize.toLong(),
serviceticket = uKey, serviceticket = uKey,
md5 = md5, md5 = net.mamoe.mirai.utils.md5(chunkedInput.buffer, 0, chunkedInput.bufferSize),
fileMd5 = md5, fileMd5 = fileMd5,
flag = 0, flag = 0,
rtcode = 0 rtcode = 0
), ),
@ -80,8 +82,9 @@ internal fun createImageDataPacketSequence( // RequestDataTrans
writeByte(40) writeByte(40)
writeInt(head.size) writeInt(head.size)
writeInt(dataSize) writeInt(chunkedInput.bufferSize)
writeFully(head) writeFully(head)
println(chunkedInput.bufferSize)
writeFully(chunkedInput.buffer, 0, chunkedInput.bufferSize) writeFully(chunkedInput.buffer, 0, chunkedInput.bufferSize)
writeByte(41) writeByte(41)
} }

View File

@ -17,7 +17,6 @@ import kotlinx.coroutines.flow.flowOf
import kotlinx.io.InputStream import kotlinx.io.InputStream
import kotlinx.io.core.ByteReadPacket import kotlinx.io.core.ByteReadPacket
import kotlinx.io.core.Input import kotlinx.io.core.Input
import kotlinx.io.core.readAvailable
import kotlinx.io.pool.useInstance import kotlinx.io.pool.useInstance
import net.mamoe.mirai.utils.MiraiInternalAPI import net.mamoe.mirai.utils.MiraiInternalAPI
@ -55,14 +54,14 @@ fun ByteReadPacket.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> {
ByteArrayPool.checkBufferSize(sizePerPacket) ByteArrayPool.checkBufferSize(sizePerPacket)
if (this.remaining <= sizePerPacket.toLong()) { if (this.remaining <= sizePerPacket.toLong()) {
ByteArrayPool.useInstance { buffer -> ByteArrayPool.useInstance { buffer ->
return flowOf(ChunkedInput(buffer, this.readAvailable(buffer))) return flowOf(ChunkedInput(buffer, this.readAvailable(buffer, 0, sizePerPacket)))
} }
} }
return flow { return flow {
ByteArrayPool.useInstance { buffer -> ByteArrayPool.useInstance { buffer ->
val chunkedInput = ChunkedInput(buffer, 0) val chunkedInput = ChunkedInput(buffer, 0)
do { do {
chunkedInput.size = this@chunkedFlow.readAvailable(buffer) chunkedInput.size = this@chunkedFlow.readAvailable(buffer, 0, sizePerPacket)
emit(chunkedInput) emit(chunkedInput)
} while (this@chunkedFlow.isNotEmpty) } while (this@chunkedFlow.isNotEmpty)
} }
@ -85,7 +84,7 @@ fun ByteReadChannel.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> {
ByteArrayPool.useInstance { buffer -> ByteArrayPool.useInstance { buffer ->
val chunkedInput = ChunkedInput(buffer, 0) val chunkedInput = ChunkedInput(buffer, 0)
do { do {
chunkedInput.size = this@chunkedFlow.readAvailable(buffer, 0, buffer.size) chunkedInput.size = this@chunkedFlow.readAvailable(buffer, 0, sizePerPacket)
emit(chunkedInput) emit(chunkedInput)
} while (!this@chunkedFlow.isClosedForRead) } while (!this@chunkedFlow.isClosedForRead)
} }
@ -111,7 +110,7 @@ internal fun Input.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> {
ByteArrayPool.useInstance { buffer -> ByteArrayPool.useInstance { buffer ->
val chunkedInput = ChunkedInput(buffer, 0) val chunkedInput = ChunkedInput(buffer, 0)
while (!this@chunkedFlow.endOfInput) { while (!this@chunkedFlow.endOfInput) {
chunkedInput.size = this@chunkedFlow.readAvailable(buffer) chunkedInput.size = this@chunkedFlow.readAvailable(buffer, 0, sizePerPacket)
emit(chunkedInput) emit(chunkedInput)
} }
} }
@ -134,7 +133,7 @@ internal fun InputStream.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> {
ByteArrayPool.useInstance { buffer -> ByteArrayPool.useInstance { buffer ->
val chunkedInput = ChunkedInput(buffer, 0) val chunkedInput = ChunkedInput(buffer, 0)
while (this@chunkedFlow.available() != 0) { while (this@chunkedFlow.available() != 0) {
chunkedInput.size = this@chunkedFlow.read(buffer) chunkedInput.size = this@chunkedFlow.read(buffer, 0, sizePerPacket)
emit(chunkedInput) emit(chunkedInput)
} }
} }