Adjust timeout, add retry for several servers

This commit is contained in:
Him188 2020-04-07 14:36:51 +08:00
parent 7049cc4591
commit 5c95ab299e
5 changed files with 54 additions and 28 deletions

View File

@ -20,6 +20,7 @@ import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.async import kotlinx.coroutines.async
import kotlinx.coroutines.io.ByteReadChannel import kotlinx.coroutines.io.ByteReadChannel
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.serialization.UnstableDefault import kotlinx.serialization.UnstableDefault
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonConfiguration import kotlinx.serialization.json.JsonConfiguration
@ -417,11 +418,12 @@ internal abstract class QQAndroidBotBase constructor(
val group = getGroup(groupCode) val group = getGroup(groupCode)
val time = currentTimeSeconds val time = currentTimeSeconds
val sequenceId = client.atomicNextMessageSequenceId()
message.firstIsInstanceOrNull<QuoteReply>()?.source?.ensureSequenceIdAvailable() message.firstIsInstanceOrNull<QuoteReply>()?.source?.ensureSequenceIdAvailable()
network.run { network.run {
val data = message.calculateValidationDataForGroup( val data = message.calculateValidationDataForGroup(
sequenceId = client.atomicNextMessageSequenceId(), sequenceId = sequenceId,
time = time.toInt(), time = time.toInt(),
random = Random.nextInt().absoluteValue.toUInt(), random = Random.nextInt().absoluteValue.toUInt(),
groupCode = groupCode, groupCode = groupCode,
@ -464,16 +466,28 @@ internal abstract class QQAndroidBotBase constructor(
) )
).toByteArray(LongMsg.ReqBody.serializer()) ).toByteArray(LongMsg.ReqBody.serializer())
val success = response.proto.uint32UpIp.zip(response.proto.uint32UpPort).any { (ip, port) ->
withTimeoutOrNull((body.size * 1000L / 1024 / 10).coerceAtLeast(5000L)) {
network.logger.verbose { "[Highway] Uploading group long message#$sequenceId to ${ip.toIpV4AddressString()}:$port: size=${body.size}" }
HighwayHelper.uploadImage( HighwayHelper.uploadImage(
client, client,
serverIp = response.proto.uint32UpIp!!.first().toIpV4AddressString(), serverIp = ip.toIpV4AddressString(),
serverPort = response.proto.uint32UpPort!!.first(), serverPort = port,
ticket = response.proto.msgSig, // 104 ticket = response.proto.msgSig, // 104
imageInput = body.toReadPacket(), imageInput = body.toReadPacket(),
inputSize = body.size, inputSize = body.size,
fileMd5 = MiraiPlatformUtils.md5(body), fileMd5 = MiraiPlatformUtils.md5(body),
commandId = 27 // long msg commandId = 27 // long msg
) )
network.logger.verbose { "[Highway] Uploading group long message#$sequenceId: succeed" }
true
} ?: kotlin.run {
network.logger.verbose { "[Highway] Uploading group long message: timeout, retrying next server" }
false
}
}
check(success) { "cannot upload group image, failed on all servers." }
} }
} }

View File

@ -389,19 +389,30 @@ internal class GroupImpl(
).also { ImageUploadEvent.Succeed(this@GroupImpl, image, it).broadcast() } ).also { ImageUploadEvent.Succeed(this@GroupImpl, image, it).broadcast() }
} }
is ImgStore.GroupPicUp.Response.RequireUpload -> { is ImgStore.GroupPicUp.Response.RequireUpload -> {
// 每 10KB 等 1 秒 // 每 10KB 等 1 秒, 最少等待 5 秒
withTimeoutOrNull(image.inputSize * 1000 / 1024 / 10) { val success = response.uploadIpList.zip(response.uploadPortList).any { (ip, port) ->
withTimeoutOrNull((image.inputSize * 1000 / 1024 / 10).coerceAtLeast(5000)) {
bot.network.logger.verbose { "[Highway] Uploading group image to ${ip.toIpV4AddressString()}:$port: size=${image.inputSize / 1024} KiB" }
HighwayHelper.uploadImage( HighwayHelper.uploadImage(
client = bot.client, client = bot.client,
serverIp = response.uploadIpList.first().toIpV4AddressString(), serverIp = ip.toIpV4AddressString(),
serverPort = response.uploadPortList.first(), serverPort = port,
imageInput = image.input, imageInput = image.input,
inputSize = image.inputSize.toInt(), inputSize = image.inputSize.toInt(),
fileMd5 = image.md5, fileMd5 = image.md5,
ticket = response.uKey, ticket = response.uKey,
commandId = 2 commandId = 2
) )
} ?: error("timeout uploading image: ${image.filename}") bot.network.logger.verbose { "[Highway] Uploading group image: succeed" }
true
} ?: kotlin.run {
bot.network.logger.verbose { "[Highway] Uploading group image: timeout, retrying next server" }
false
}
}
check(success) { "cannot upload group image, failed on all servers." }
val resourceId = image.calculateImageResourceId() val resourceId = image.calculateImageResourceId()
// return NotOnlineImageFromFile( // return NotOnlineImageFromFile(
// resourceId = resourceId, // resourceId = resourceId,

View File

@ -206,7 +206,8 @@ internal class OfflineMessageSourceImplBySourceMsg( // from others' quotation
*/ */
override val id: Int override val id: Int
get() = delegate.pbReserve.loadAs(SourceMsg.ResvAttr.serializer()).origUids!!.toInt() get() = delegate.pbReserve.loadAs(SourceMsg.ResvAttr.serializer()).origUids?.toInt()
?: error("在读取 OfflineMessageSourceImplBySourceMsg.id 时找不到 origUids, delegate=${delegate._miraiContentToString()}")
// override val sourceMessage: MessageChain get() = delegate.toMessageChain() // override val sourceMessage: MessageChain get() = delegate.toMessageChain()
override val fromId: Long get() = delegate.senderUin override val fromId: Long get() = delegate.senderUin

View File

@ -42,7 +42,7 @@ internal fun createImageDataPacketSequence( // RequestDataTrans
data: Any, data: Any,
dataSize: Int, dataSize: Int,
fileMd5: ByteArray, fileMd5: ByteArray,
sizePerPacket: Int = 8192 sizePerPacket: Int = 8192.coerceAtMost(ByteArrayPool.BUFFER_SIZE)
): Flow<ByteReadPacket> { ): Flow<ByteReadPacket> {
ByteArrayPool.checkBufferSize(sizePerPacket) ByteArrayPool.checkBufferSize(sizePerPacket)
require(data is Input || data is InputStream || data is ByteReadChannel) { "unsupported data: ${data::class.simpleName}" } require(data is Input || data is InputStream || data is ByteReadChannel) { "unsupported data: ${data::class.simpleName}" }

View File

@ -46,8 +46,8 @@ internal class MultiMsg : ProtoBuf {
@ProtoId(1) val result: Int = 0, @ProtoId(1) val result: Int = 0,
@ProtoId(2) val msgResid: String = "", @ProtoId(2) val msgResid: String = "",
@ProtoId(3) val msgUkey: ByteArray = EMPTY_BYTE_ARRAY, @ProtoId(3) val msgUkey: ByteArray = EMPTY_BYTE_ARRAY,
@ProtoId(4) val uint32UpIp: List<Int>? = null, @ProtoId(4) val uint32UpIp: List<Int>,
@ProtoId(5) val uint32UpPort: List<Int>? = null, @ProtoId(5) val uint32UpPort: List<Int>,
@ProtoId(6) val blockSize: Long = 0L, @ProtoId(6) val blockSize: Long = 0L,
@ProtoId(7) val upOffset: Long = 0L, @ProtoId(7) val upOffset: Long = 0L,
@ProtoId(8) val applyId: Int = 0, @ProtoId(8) val applyId: Int = 0,