1
0
mirror of https://github.com/mamoe/mirai.git synced 2025-03-25 06:50:09 +08:00

Introduce AtomicResizeCacheList to reduce duplication, fix

This commit is contained in:
Him188 2020-04-15 17:51:32 +08:00
parent b30c380637
commit e1c1c35d70
3 changed files with 95 additions and 35 deletions
mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid
network
QQAndroidClient.kt
protocol/packet/chat/receive
utils

View File

@ -165,8 +165,8 @@ internal open class QQAndroidClient(
private val highwayDataTransSequenceIdForApplyUp: AtomicInt = atomic(77918)
internal fun nextHighwayDataTransSequenceIdForApplyUp(): Int = highwayDataTransSequenceIdForApplyUp.getAndAdd(2)
internal val onlinePushCacheList: LockFreeLinkedList<Short> = LockFreeLinkedList()
internal val pbPushTransMsgCacheList: LockFreeLinkedList<Int> = LockFreeLinkedList()
internal val onlinePushCacheList: AtomicResizeCacheList<Short> = AtomicResizeCacheList(20.secondsToMillis)
internal val pbPushTransMsgCacheList: AtomicResizeCacheList<Int> = AtomicResizeCacheList(20.secondsToMillis)
val appClientVersion: Int = 0

View File

@ -46,7 +46,6 @@ import net.mamoe.mirai.qqandroid.utils.io.readString
import net.mamoe.mirai.qqandroid.utils.io.serialization.*
import net.mamoe.mirai.qqandroid.utils.read
import net.mamoe.mirai.qqandroid.utils.toUHexString
import net.mamoe.mirai.utils.LockFreeLinkedList
import net.mamoe.mirai.utils.MiraiInternalAPI
import net.mamoe.mirai.utils.currentTimeSeconds
import net.mamoe.mirai.utils.debug
@ -123,11 +122,8 @@ internal class OnlinePush {
override suspend fun ByteReadPacket.decode(bot: QQAndroidBot, sequenceId: Int): Packet? {
val content = this.readProtoBuf(OnlinePushTrans.PbMsgInfo.serializer())
val cache = bot.client.pbPushTransMsgCacheList.removeUntilFirst { it == content.msgSeq }
if (cache == null) {
bot.client.pbPushTransMsgCacheList.addLast(content.msgSeq)
} else {
bot.client.pbPushTransMsgCacheList.remove(cache)
if (!bot.client.pbPushTransMsgCacheList.ensureNoDuplication(content.msgSeq)) {
return null
}
@ -234,14 +230,7 @@ internal class OnlinePush {
mapper: ByteReadPacket.(msgInfo: MsgInfo) -> Sequence<Packet>
): Sequence<Packet> {
return asSequence().filter { msg ->
val cache = client.onlinePushCacheList.removeUntilFirst { it == msg.shMsgSeq }
if (cache == null) {
client.onlinePushCacheList.addLast(msg.shMsgSeq)
true
} else {
client.onlinePushCacheList.remove(cache)
false
}
client.onlinePushCacheList.ensureNoDuplication(msg.shMsgSeq)
}.flatMap { it.vMsg.read { mapper(it) } }
}
@ -558,14 +547,7 @@ internal class OnlinePush {
fromUin = msg.lFromUin,
shMsgSeq = msg.shMsgSeq,
vMsgCookies = msg.vMsgCookies,
uMsgTime = 0,
clientIp = 0,
sendTime = 0,
ssoIp = 0,
ssoSeq = 0,
uAppId = 0,
uMsgType = 0,
wCmd = 0
uMsgTime = msg.uMsgTime // captured 0
)
}
)
@ -577,14 +559,3 @@ internal class OnlinePush {
}
}
private inline fun <E> LockFreeLinkedList<E>.removeUntilFirst(block: (E) -> Boolean): E? {
this.forEach {
if (!block(it)) {
this.remove(it)
} else {
return it
}
}
return null
}

View File

@ -0,0 +1,89 @@
/*
* Copyright 2020 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.qqandroid.utils
import kotlinx.atomicfu.AtomicLong
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.locks.reentrantLock
import kotlinx.atomicfu.locks.withLock
import net.mamoe.mirai.utils.currentTimeMillis
import kotlin.jvm.JvmField
import kotlin.jvm.Volatile
/**
* Dynamically sized cache list with retention period.
* No concurrency guaranteed on same elements.
*/
internal class AtomicResizeCacheList<E>(private val retention: Long) {
private inner class Cache {
@Volatile
@JvmField
var element: E? = null
val time: AtomicLong = atomic(0L)
}
companion object {
const val initialCapacity: Int = 32
}
private val list: MutableList<Cache> = ArrayList(initialCapacity)
private val lock = reentrantLock()
/**
* Adds an element, also cleanup outdated caches, but no duplication is removed.
* No concurrency guaranteed on same [element].
*/
private fun add(element: E) {
val currentTime = currentTimeMillis
findAvailable@ while (true) {
for (cache in list) {
val instant = cache.time.value
when {
instant == currentTime -> {
if (cache.time.compareAndSet(instant, currentTime + retention)) {
cache.element = element
return
} else continue@findAvailable
}
// outdated
instant < currentTime -> cache.time.compareAndSet(instant, 0)
}
}
// no more Cache instance available
lock.withLock {
list.add(Cache().apply {
this.element = element
this.time.value = currentTime + retention
})
}
return
}
}
/**
* No concurrency guaranteed on same [element]
*/
private fun removeDuplication(element: E): Boolean {
val duplicate = list.firstOrNull { it.element == element } ?: return false
duplicate.time.value = 0
return true
}
fun ensureNoDuplication(element: E): Boolean {
return if (removeDuplication(element)) {
false
} else {
add(element)
true
}
}
}