From e1c1c35d701e61ab0286162adf496b1396df8e71 Mon Sep 17 00:00:00 2001 From: Him188 <Him188@mamoe.net> Date: Wed, 15 Apr 2020 17:51:32 +0800 Subject: [PATCH] Introduce AtomicResizeCacheList to reduce duplication, fix #225 --- .../qqandroid/network/QQAndroidClient.kt | 4 +- .../packet/chat/receive/OnlinePush.kt | 37 +------- .../qqandroid/utils/AtomicResizeCacheList.kt | 89 +++++++++++++++++++ 3 files changed, 95 insertions(+), 35 deletions(-) create mode 100644 mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/AtomicResizeCacheList.kt diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/QQAndroidClient.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/QQAndroidClient.kt index 62d6dcb8a..c82eaa35c 100644 --- a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/QQAndroidClient.kt +++ b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/QQAndroidClient.kt @@ -165,8 +165,8 @@ internal open class QQAndroidClient( private val highwayDataTransSequenceIdForApplyUp: AtomicInt = atomic(77918) internal fun nextHighwayDataTransSequenceIdForApplyUp(): Int = highwayDataTransSequenceIdForApplyUp.getAndAdd(2) - internal val onlinePushCacheList: LockFreeLinkedList<Short> = LockFreeLinkedList() - internal val pbPushTransMsgCacheList: LockFreeLinkedList<Int> = LockFreeLinkedList() + internal val onlinePushCacheList: AtomicResizeCacheList<Short> = AtomicResizeCacheList(20.secondsToMillis) + internal val pbPushTransMsgCacheList: AtomicResizeCacheList<Int> = AtomicResizeCacheList(20.secondsToMillis) val appClientVersion: Int = 0 diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/chat/receive/OnlinePush.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/chat/receive/OnlinePush.kt index 5dc21c6d5..c910b9a8b 100644 --- a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/chat/receive/OnlinePush.kt +++ b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/protocol/packet/chat/receive/OnlinePush.kt @@ -46,7 +46,6 @@ import net.mamoe.mirai.qqandroid.utils.io.readString import net.mamoe.mirai.qqandroid.utils.io.serialization.* import net.mamoe.mirai.qqandroid.utils.read import net.mamoe.mirai.qqandroid.utils.toUHexString -import net.mamoe.mirai.utils.LockFreeLinkedList import net.mamoe.mirai.utils.MiraiInternalAPI import net.mamoe.mirai.utils.currentTimeSeconds import net.mamoe.mirai.utils.debug @@ -123,11 +122,8 @@ internal class OnlinePush { override suspend fun ByteReadPacket.decode(bot: QQAndroidBot, sequenceId: Int): Packet? { val content = this.readProtoBuf(OnlinePushTrans.PbMsgInfo.serializer()) - val cache = bot.client.pbPushTransMsgCacheList.removeUntilFirst { it == content.msgSeq } - if (cache == null) { - bot.client.pbPushTransMsgCacheList.addLast(content.msgSeq) - } else { - bot.client.pbPushTransMsgCacheList.remove(cache) + + if (!bot.client.pbPushTransMsgCacheList.ensureNoDuplication(content.msgSeq)) { return null } @@ -234,14 +230,7 @@ internal class OnlinePush { mapper: ByteReadPacket.(msgInfo: MsgInfo) -> Sequence<Packet> ): Sequence<Packet> { return asSequence().filter { msg -> - val cache = client.onlinePushCacheList.removeUntilFirst { it == msg.shMsgSeq } - if (cache == null) { - client.onlinePushCacheList.addLast(msg.shMsgSeq) - true - } else { - client.onlinePushCacheList.remove(cache) - false - } + client.onlinePushCacheList.ensureNoDuplication(msg.shMsgSeq) }.flatMap { it.vMsg.read { mapper(it) } } } @@ -558,14 +547,7 @@ internal class OnlinePush { fromUin = msg.lFromUin, shMsgSeq = msg.shMsgSeq, vMsgCookies = msg.vMsgCookies, - uMsgTime = 0, - clientIp = 0, - sendTime = 0, - ssoIp = 0, - ssoSeq = 0, - uAppId = 0, - uMsgType = 0, - wCmd = 0 + uMsgTime = msg.uMsgTime // captured 0 ) } ) @@ -577,14 +559,3 @@ internal class OnlinePush { } } -private inline fun <E> LockFreeLinkedList<E>.removeUntilFirst(block: (E) -> Boolean): E? { - this.forEach { - if (!block(it)) { - this.remove(it) - } else { - return it - } - } - return null -} - diff --git a/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/AtomicResizeCacheList.kt b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/AtomicResizeCacheList.kt new file mode 100644 index 000000000..fb15ba96f --- /dev/null +++ b/mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/utils/AtomicResizeCacheList.kt @@ -0,0 +1,89 @@ +/* + * Copyright 2020 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/master/LICENSE + */ + +package net.mamoe.mirai.qqandroid.utils + +import kotlinx.atomicfu.AtomicLong +import kotlinx.atomicfu.atomic +import kotlinx.atomicfu.locks.reentrantLock +import kotlinx.atomicfu.locks.withLock +import net.mamoe.mirai.utils.currentTimeMillis +import kotlin.jvm.JvmField +import kotlin.jvm.Volatile + + +/** + * Dynamically sized cache list with retention period. + * No concurrency guaranteed on same elements. + */ +internal class AtomicResizeCacheList<E>(private val retention: Long) { + private inner class Cache { + @Volatile + @JvmField + var element: E? = null + + val time: AtomicLong = atomic(0L) + } + + companion object { + const val initialCapacity: Int = 32 + } + + private val list: MutableList<Cache> = ArrayList(initialCapacity) + private val lock = reentrantLock() + + /** + * Adds an element, also cleanup outdated caches, but no duplication is removed. + * No concurrency guaranteed on same [element]. + */ + private fun add(element: E) { + val currentTime = currentTimeMillis + findAvailable@ while (true) { + for (cache in list) { + val instant = cache.time.value + when { + instant == currentTime -> { + if (cache.time.compareAndSet(instant, currentTime + retention)) { + cache.element = element + return + } else continue@findAvailable + } + // outdated + instant < currentTime -> cache.time.compareAndSet(instant, 0) + } + } + // no more Cache instance available + lock.withLock { + list.add(Cache().apply { + this.element = element + this.time.value = currentTime + retention + }) + } + return + } + } + + /** + * No concurrency guaranteed on same [element] + */ + private fun removeDuplication(element: E): Boolean { + val duplicate = list.firstOrNull { it.element == element } ?: return false + duplicate.time.value = 0 + return true + } + + fun ensureNoDuplication(element: E): Boolean { + return if (removeDuplication(element)) { + false + } else { + add(element) + true + } + } +} \ No newline at end of file