Rearrange highway image upload

This commit is contained in:
Him188 2020-04-23 18:36:14 +08:00
parent 8e8f91bbee
commit 6f69497247

View File

@ -22,23 +22,24 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.io.ByteReadChannel import kotlinx.coroutines.io.ByteReadChannel
import kotlinx.coroutines.isActive import kotlinx.coroutines.isActive
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.io.InputStream import kotlinx.io.InputStream
import kotlinx.io.core.Input import kotlinx.io.core.Input
import kotlinx.io.core.discardExact import kotlinx.io.core.discardExact
import kotlinx.io.core.readAvailable import kotlinx.io.core.readAvailable
import kotlinx.io.core.use import kotlinx.io.core.use
import kotlinx.serialization.InternalSerializationApi import kotlinx.serialization.InternalSerializationApi
import net.mamoe.mirai.qqandroid.QQAndroidBot
import net.mamoe.mirai.qqandroid.network.QQAndroidClient import net.mamoe.mirai.qqandroid.network.QQAndroidClient
import net.mamoe.mirai.qqandroid.network.protocol.data.proto.CSDataHighwayHead import net.mamoe.mirai.qqandroid.network.protocol.data.proto.CSDataHighwayHead
import net.mamoe.mirai.qqandroid.utils.ByteArrayPool import net.mamoe.mirai.qqandroid.utils.*
import net.mamoe.mirai.qqandroid.utils.NoRouteToHostException
import net.mamoe.mirai.qqandroid.utils.PlatformSocket
import net.mamoe.mirai.qqandroid.utils.SocketException
import net.mamoe.mirai.qqandroid.utils.io.serialization.readProtoBuf import net.mamoe.mirai.qqandroid.utils.io.serialization.readProtoBuf
import net.mamoe.mirai.qqandroid.utils.io.withUse import net.mamoe.mirai.qqandroid.utils.io.withUse
import net.mamoe.mirai.utils.MiraiInternalAPI import net.mamoe.mirai.utils.*
import net.mamoe.mirai.utils.copyAndClose
import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.EmptyCoroutineContext
import kotlin.math.roundToInt
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
@OptIn(MiraiInternalAPI::class, InternalSerializationApi::class) @OptIn(MiraiInternalAPI::class, InternalSerializationApi::class)
@Suppress("SpellCheckingInspection") @Suppress("SpellCheckingInspection")
@ -73,6 +74,7 @@ internal suspend fun HttpClient.postImage(
override val contentType: ContentType = ContentType.Image.Any override val contentType: ContentType = ContentType.Image.Any
override val contentLength: Long = inputSize override val contentLength: Long = inputSize
@OptIn(MiraiExperimentalAPI::class)
override suspend fun writeTo(channel: ByteWriteChannel) { override suspend fun writeTo(channel: ByteWriteChannel) {
ByteArrayPool.useInstance { buffer: ByteArray -> ByteArrayPool.useInstance { buffer: ByteArray ->
when (imageInput) { when (imageInput) {
@ -100,8 +102,45 @@ internal suspend fun HttpClient.postImage(
@OptIn(MiraiInternalAPI::class, InternalSerializationApi::class) @OptIn(MiraiInternalAPI::class, InternalSerializationApi::class)
internal object HighwayHelper { internal object HighwayHelper {
@OptIn(ExperimentalTime::class)
suspend fun uploadImageToServers(
bot: QQAndroidBot,
servers: List<Pair<Int, Int>>,
uKey: ByteArray,
image: ExternalImage,
kind: String,
commandId: Int
) = servers.retryWithServers(
(image.inputSize * 1000 / 1024 / 10).coerceAtLeast(5000),
onFail = {
throw IllegalStateException("cannot upload $kind image, failed on all servers.", it)
}
) { ip, port ->
bot.network.logger.verbose {
"[Highway] Uploading $kind image to ${ip}:$port, size=${image.inputSize / 1024} KiB"
}
val time = measureTime {
uploadImage(
client = bot.client,
serverIp = ip,
serverPort = port,
imageInput = image.input,
inputSize = image.inputSize.toInt(),
fileMd5 = image.md5,
ticket = uKey,
commandId = commandId
)
}
bot.network.logger.verbose {
"[Highway] Uploading $kind image: succeed at ${(image.inputSize.toDouble() / 1024 / time.inSeconds).roundToInt()} KiB/s"
}
}
@OptIn(InternalCoroutinesApi::class) @OptIn(InternalCoroutinesApi::class)
suspend fun uploadImage( private suspend fun uploadImage(
client: QQAndroidClient, client: QQAndroidClient,
serverIp: String, serverIp: String,
serverPort: Int, serverPort: Int,
@ -137,6 +176,7 @@ internal object HighwayHelper {
).collect { ).collect {
socket.send(it) socket.send(it)
//0A 3C 08 01 12 0A 31 39 39 34 37 30 31 30 32 31 1A 0C 50 69 63 55 70 2E 44 61 74 61 55 70 20 E9 A7 05 28 00 30 BD DB 8B 80 02 38 80 20 40 02 4A 0A 38 2E 32 2E 30 2E 31 32 39 36 50 84 10 12 3D 08 00 10 FD 08 18 00 20 FD 08 28 C6 01 38 00 42 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 4A 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 50 89 92 A2 FB 06 58 00 60 00 18 53 20 01 28 00 30 04 3A 00 40 E6 B7 F7 D9 80 2E 48 00 50 00 //0A 3C 08 01 12 0A 31 39 39 34 37 30 31 30 32 31 1A 0C 50 69 63 55 70 2E 44 61 74 61 55 70 20 E9 A7 05 28 00 30 BD DB 8B 80 02 38 80 20 40 02 4A 0A 38 2E 32 2E 30 2E 31 32 39 36 50 84 10 12 3D 08 00 10 FD 08 18 00 20 FD 08 28 C6 01 38 00 42 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 4A 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 50 89 92 A2 FB 06 58 00 60 00 18 53 20 01 28 00 30 04 3A 00 40 E6 B7 F7 D9 80 2E 48 00 50 00
socket.read().withUse { socket.read().withUse {
discardExact(1) discardExact(1)
val headLength = readInt() val headLength = readInt()
@ -147,4 +187,30 @@ internal object HighwayHelper {
} }
} }
} }
}
internal suspend inline fun List<Pair<Int, Int>>.retryWithServers(
timeoutMillis: Long,
onFail: (exception: Throwable?) -> Unit,
crossinline block: suspend (ip: String, port: Int) -> Unit
) {
require(this.isNotEmpty()) { "receiver of retryWithServers must not be empty" }
var exception: Throwable? = null
for (pair in this) {
return kotlin.runCatching {
withTimeoutOrNull(timeoutMillis) {
block(pair.first.toIpV4AddressString(), pair.second)
}
}.recover {
if (exception != null) {
exception!!.addSuppressedMirai(it)
}
exception = it
null
}.getOrNull() ?: continue
}
onFail(exception)
} }