[core] Review atomic calls and enable atomicfu compiler. Close #2315

This commit is contained in:
Him188 2022-10-29 12:54:30 +01:00
parent ab8e53fc76
commit 564a7ce8f8
No known key found for this signature in database
GPG Key ID: BA439CDDCF652375
33 changed files with 232 additions and 131 deletions

View File

@ -14,7 +14,7 @@ plugins {
kotlin("multiplatform")
kotlin("plugin.serialization")
//id("kotlinx-atomicfu")
id("kotlinx-atomicfu")
id("signing")
id("me.him188.kotlin-jvm-blocking-bridge")
id("me.him188.kotlin-dynamic-delegation")
@ -43,7 +43,6 @@ kotlin {
implementation(project(":mirai-core-utils"))
implementation(project(":mirai-console-compiler-annotations"))
implementation(`kotlinx-serialization-protobuf`)
implementation(`kotlinx-atomicfu`)
implementation(`ktor-io`)
}
}

View File

@ -86,8 +86,6 @@ public data class NewFriendRequestEvent @MiraiInternalApi public constructor(
*/
public val fromNick: String,
) : BotEvent, Packet, AbstractEvent(), FriendInfoChangeEvent {
internal val responded: AtomicBoolean = atomic(false)
/**
* @return 申请人来自的群. 当申请人来自其他途径申请时为 `null`
*/

View File

@ -16,8 +16,6 @@
package net.mamoe.mirai.event.events
import kotlinx.atomicfu.AtomicBoolean
import kotlinx.atomicfu.atomic
import me.him188.kotlin.jvm.blocking.bridge.JvmBlockingBridge
import net.mamoe.mirai.Bot
import net.mamoe.mirai.Mirai
@ -357,8 +355,6 @@ public data class BotInvitedJoinGroupRequestEvent @MiraiInternalApi constructor(
*/
public val invitor: Friend? get() = this.bot.getFriend(invitorId)
internal val responded: AtomicBoolean = atomic(false)
@JvmBlockingBridge
public suspend fun accept(): Unit = Mirai.acceptInvitedJoinGroupRequest(this)
@ -369,7 +365,6 @@ public data class BotInvitedJoinGroupRequestEvent @MiraiInternalApi constructor(
/**
* 一个账号请求加入群事件, [Bot] 在此群中是管理员或群主.
*/
@Suppress("DEPRECATION")
public data class MemberJoinRequestEvent @MiraiInternalApi constructor(
override val bot: Bot,
/**
@ -405,8 +400,6 @@ public data class MemberJoinRequestEvent @MiraiInternalApi constructor(
*/
public val invitor: NormalMember? by lazy { invitorId?.let { group?.get(it) } }
internal val responded: AtomicBoolean = atomic(false)
/**
* 同意这个请求
*/

View File

@ -35,7 +35,6 @@ kotlin {
api(`kotlinx-serialization-json`)
api(`kotlinx-coroutines-core`)
implementation(`kotlinx-atomicfu`)
implementation(`kotlinx-serialization-protobuf`)
implementation(`ktor-io`)
}

View File

@ -0,0 +1,50 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.utils
import kotlinx.atomicfu.atomic
/**
* Wraps a [atomic] to allow more complicated usages.
*/
@TestOnly
public class AtomicInteger(
value: Int
) {
private val delegate = atomic(value)
public var value: Int
get() = delegate.value
set(value) {
delegate.value = value
}
public fun compareAndSet(expect: Int, update: Int): Boolean = delegate.compareAndSet(expect, update)
public fun getAndIncrement(): Int = delegate.getAndIncrement()
public fun incrementAndGet(): Int = delegate.incrementAndGet()
}
/**
* Wraps a [atomic] to allow more complicated usages.
*/
@TestOnly
public class AtomicBoolean(
value: Boolean
) {
private val delegate = atomic(value)
public var value: Boolean
get() = delegate.value
set(value) {
delegate.value = value
}
public fun compareAndSet(expect: Boolean, update: Boolean): Boolean = delegate.compareAndSet(expect, update)
}

View File

@ -15,7 +15,7 @@ import org.jetbrains.kotlin.gradle.plugin.mpp.KotlinNativeTarget
plugins {
kotlin("multiplatform")
// id("kotlinx-atomicfu")
id("kotlinx-atomicfu")
kotlin("plugin.serialization")
id("me.him188.kotlin-jvm-blocking-bridge")
id("me.him188.kotlin-dynamic-delegation")
@ -42,7 +42,6 @@ kotlin {
implementation(project(":mirai-core-utils"))
implementation(`kotlinx-serialization-protobuf`)
implementation(`kotlinx-atomicfu`)
implementation(`ktor-io`)
implementation(`ktor-client-core`)
}

View File

@ -93,15 +93,9 @@ internal open class MiraiImpl : IMirai, LowLevelApiAccessor {
override var FileCacheStrategy: FileCacheStrategy = net.mamoe.mirai.utils.FileCacheStrategy.PlatformDefault
@Suppress("PrivatePropertyName")
private val httpClient: HttpClient = createDefaultHttpClient()
override suspend fun acceptNewFriendRequest(event: NewFriendRequestEvent) {
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
check(event.responded.compareAndSet(false, true)) {
"the request $this has already been responded"
}
check(!event.bot.friends.contains(event.fromId)) {
"the request $event is outdated: You had already responded it on another device."
}
@ -125,11 +119,6 @@ internal open class MiraiImpl : IMirai, LowLevelApiAccessor {
}
override suspend fun rejectNewFriendRequest(event: NewFriendRequestEvent, blackList: Boolean) {
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
check(event.responded.compareAndSet(false, true)) {
"the request $event has already been responded"
}
check(!event.bot.friends.contains(event.fromId)) {
"the request $event is outdated: You had already responded it on another device."
}
@ -147,10 +136,6 @@ internal open class MiraiImpl : IMirai, LowLevelApiAccessor {
override suspend fun acceptMemberJoinRequest(event: MemberJoinRequestEvent) {
@Suppress("DuplicatedCode")
checkGroupPermission(event.bot, event.groupId) { event::class.simpleName ?: "<anonymous class>" }
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
check(event.responded.compareAndSet(false, true)) {
"the request $this has already been responded"
}
if (event.group?.contains(event.fromId) == true) return
@ -168,10 +153,6 @@ internal open class MiraiImpl : IMirai, LowLevelApiAccessor {
@Suppress("DuplicatedCode")
override suspend fun rejectMemberJoinRequest(event: MemberJoinRequestEvent, blackList: Boolean, message: String) {
checkGroupPermission(event.bot, event.groupId) { event::class.simpleName ?: "<anonymous class>" }
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
check(event.responded.compareAndSet(false, true)) {
"the request $this has already been responded"
}
if (event.group?.contains(event.fromId) == true) return
@ -222,10 +203,6 @@ internal open class MiraiImpl : IMirai, LowLevelApiAccessor {
override suspend fun ignoreMemberJoinRequest(event: MemberJoinRequestEvent, blackList: Boolean) {
checkGroupPermission(event.bot, event.groupId) { event::class.simpleName ?: "<anonymous class>" }
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
check(event.responded.compareAndSet(false, true)) {
"the request $this has already been responded"
}
solveMemberJoinRequestEvent(
bot = event.bot,
@ -257,11 +234,6 @@ internal open class MiraiImpl : IMirai, LowLevelApiAccessor {
}
private suspend fun solveInvitedJoinGroupRequest(event: BotInvitedJoinGroupRequestEvent, accept: Boolean) {
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
check(event.responded.compareAndSet(false, true)) {
"the request $this has already been responded"
}
check(!event.bot.groups.contains(event.groupId)) {
"the request $this is outdated: Bot has been already in the group."
}
@ -391,14 +363,12 @@ internal open class MiraiImpl : IMirai, LowLevelApiAccessor {
return response is PbMessageSvc.PbMsgWithDraw.Response.Success
}
@Suppress("RemoveExplicitTypeArguments") // false positive
override suspend fun recallMessage(bot: Bot, source: MessageSource) = bot.asQQAndroidBot().run {
check(source is MessageSourceInternal)
source.ensureSequenceIdAvailable()
@Suppress("BooleanLiteralArgument", "INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") // false positive
check(!source.isRecalledOrPlanned.value && source.isRecalledOrPlanned.compareAndSet(false, true)) {
check(!source.isRecalledOrPlanned && source.setRecalled()) {
"$source had already been recalled."
}
@ -649,7 +619,7 @@ internal open class MiraiImpl : IMirai, LowLevelApiAccessor {
override fun createUnsupportedMessage(struct: ByteArray): UnsupportedMessage =
UnsupportedMessageImpl(struct.loadAs(ImMsgBody.Elem.serializer()))
@Suppress("DEPRECATION", "OverridingDeprecatedMember")
@Suppress("OverridingDeprecatedMember")
override suspend fun queryImageUrl(bot: Bot, image: Image): String = when (image) {
is ConstOriginUrlAware -> image.originUrl
is DeferredOriginUrlAware -> image.getUrl(bot)

View File

@ -12,6 +12,7 @@
package net.mamoe.mirai.internal.contact
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import net.mamoe.mirai.Bot
import net.mamoe.mirai.LowLevelApi
@ -149,7 +150,18 @@ internal abstract class CommonGroupImpl constructor(
final override val files: RemoteFiles by lazy { RemoteFilesImpl(this) }
val lastTalkative = atomic(members.find { GroupHonorType.TALKATIVE in it.active.honors })
private val _lastTalkative: AtomicRef<NormalMemberImpl?> = atomic(null)
init {
// Cannot move to argument of `atomic`, compiler error.
val value = members.find { GroupHonorType.TALKATIVE in it.active.honors }
_lastTalkative.value = value
}
val lastTalkative get() = _lastTalkative.value
fun casLastTalkative(expect: NormalMemberImpl?, update: NormalMemberImpl?): Boolean =
_lastTalkative.compareAndSet(expect, update)
final override val announcements: Announcements by lazy {
AnnouncementsImpl(

View File

@ -9,7 +9,6 @@
package net.mamoe.mirai.internal.message.source
import kotlinx.atomicfu.AtomicBoolean
import kotlinx.serialization.Transient
import net.mamoe.mirai.contact.Contact
import net.mamoe.mirai.internal.message.LightMessageRefiner.dropMiraiInternalFlags
@ -37,7 +36,8 @@ internal interface MessageSourceInternal : MessageMetadata {
val ids: IntArray
@Transient
val isRecalledOrPlanned: AtomicBoolean
val isRecalledOrPlanned: Boolean
fun setRecalled(): Boolean // CAS
fun toJceData(): ImMsgBody.SourceMsg

View File

@ -11,7 +11,6 @@
package net.mamoe.mirai.internal.message.source
import kotlinx.atomicfu.AtomicBoolean
import kotlinx.atomicfu.atomic
import kotlinx.serialization.KSerializer
import kotlinx.serialization.Serializable
@ -50,7 +49,12 @@ internal class OnlineMessageSourceFromFriendImpl(
object Serializer : KSerializer<MessageSource> by MessageSourceSerializerImpl("OnlineMessageSourceFromFriend")
override val sequenceIds: IntArray = msg.mapToIntArray { it.msgHead.msgSeq }
override var isRecalledOrPlanned: AtomicBoolean = atomic(false)
private val _isRecalledOrPlanned = atomic(false)
@Transient
override val isRecalledOrPlanned: Boolean = _isRecalledOrPlanned.value
override fun setRecalled(): Boolean = _isRecalledOrPlanned.compareAndSet(expect = false, update = true)
override val ids: IntArray get() = sequenceIds // msg.msgBody.richText.attr!!.random
override val internalIds: IntArray = msg.mapToIntArray {
it.msgBody.richText.attr?.random ?: 0
@ -83,7 +87,13 @@ internal class OnlineMessageSourceFromStrangerImpl(
object Serializer : KSerializer<MessageSource> by MessageSourceSerializerImpl("OnlineMessageSourceFromStranger")
override val sequenceIds: IntArray = msg.mapToIntArray { it.msgHead.msgSeq }
override var isRecalledOrPlanned: AtomicBoolean = atomic(false)
private val _isRecalledOrPlanned = atomic(false)
@Transient
override val isRecalledOrPlanned: Boolean = _isRecalledOrPlanned.value
override fun setRecalled(): Boolean = _isRecalledOrPlanned.compareAndSet(expect = false, update = true)
override val ids: IntArray get() = sequenceIds // msg.msgBody.richText.attr!!.random
override val internalIds: IntArray = msg.mapToIntArray {
it.msgBody.richText.attr?.random ?: 0
@ -156,7 +166,13 @@ internal class OnlineMessageSourceFromTempImpl(
override val sequenceIds: IntArray = msg.mapToIntArray { it.msgHead.msgSeq }
override val internalIds: IntArray = msg.mapToIntArray { it.msgBody.richText.attr!!.random }
override var isRecalledOrPlanned: AtomicBoolean = atomic(false)
private val _isRecalledOrPlanned = atomic(false)
@Transient
override val isRecalledOrPlanned: Boolean = _isRecalledOrPlanned.value
override fun setRecalled(): Boolean = _isRecalledOrPlanned.compareAndSet(expect = false, update = true)
override val ids: IntArray get() = sequenceIds //
override val time: Int = msg.first().msgHead.msgTime
override var originalMessageLazy = lazy {
@ -191,8 +207,12 @@ internal class OnlineMessageSourceFromGroupImpl(
) : OnlineMessageSource.Incoming.FromGroup(), IncomingMessageSourceInternal {
object Serializer : KSerializer<MessageSource> by MessageSourceSerializerImpl("OnlineMessageSourceFromGroupImpl")
private val _isRecalledOrPlanned = atomic(false)
@Transient
override var isRecalledOrPlanned: AtomicBoolean = atomic(false)
override val isRecalledOrPlanned: Boolean = _isRecalledOrPlanned.value
override fun setRecalled(): Boolean = _isRecalledOrPlanned.compareAndSet(expect = false, update = true)
override val sequenceIds: IntArray = msg.mapToIntArray { it.msgHead.msgSeq }
override val internalIds: IntArray = msg.mapToIntArray { it.msgBody.richText.attr!!.random }
override val ids: IntArray get() = sequenceIds

View File

@ -10,7 +10,6 @@
package net.mamoe.mirai.internal.message.source
import kotlinx.atomicfu.AtomicBoolean
import kotlinx.atomicfu.atomic
import kotlinx.serialization.KSerializer
import kotlinx.serialization.Serializable
@ -60,9 +59,11 @@ internal class OfflineMessageSourceImplData(
@Transient
var jceData: ImMsgBody.SourceMsg? = null
private val _isRecalledOrPlanned = atomic(false)
@Transient
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
override var isRecalledOrPlanned: AtomicBoolean = atomic(false)
override val isRecalledOrPlanned: Boolean = _isRecalledOrPlanned.value
override fun setRecalled(): Boolean = _isRecalledOrPlanned.compareAndSet(expect = false, update = true)
override fun toJceData(): ImMsgBody.SourceMsg {
return jceData ?: ImMsgBody.SourceMsg(

View File

@ -11,11 +11,11 @@
package net.mamoe.mirai.internal.message.source
import kotlinx.atomicfu.AtomicBoolean
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.*
import kotlinx.serialization.KSerializer
import kotlinx.serialization.Serializable
import kotlinx.serialization.Transient
import net.mamoe.mirai.Bot
import net.mamoe.mirai.contact.*
import net.mamoe.mirai.event.EventPriority
@ -98,7 +98,13 @@ internal class OnlineMessageSourceToFriendImpl(
get() = sender
override val ids: IntArray
get() = sequenceIds
override var isRecalledOrPlanned: AtomicBoolean = atomic(false)
private val _isRecalledOrPlanned = atomic(false)
@Transient
override val isRecalledOrPlanned: Boolean = _isRecalledOrPlanned.value
override fun setRecalled(): Boolean = _isRecalledOrPlanned.compareAndSet(expect = false, update = true)
private val jceData: ImMsgBody.SourceMsg by lazy { toJceDataImpl(subject) }
override fun toJceData(): ImMsgBody.SourceMsg = jceData
@ -131,7 +137,13 @@ internal class OnlineMessageSourceToStrangerImpl(
get() = sender
override val ids: IntArray
get() = sequenceIds
override var isRecalledOrPlanned: AtomicBoolean = atomic(false)
private val _isRecalledOrPlanned = atomic(false)
@Transient
override val isRecalledOrPlanned: Boolean = _isRecalledOrPlanned.value
override fun setRecalled(): Boolean = _isRecalledOrPlanned.compareAndSet(expect = false, update = true)
private val jceData: ImMsgBody.SourceMsg by lazy { toJceDataImpl(subject) }
override fun toJceData(): ImMsgBody.SourceMsg = jceData
@ -164,7 +176,13 @@ internal class OnlineMessageSourceToTempImpl(
get() = sender
override val ids: IntArray
get() = sequenceIds
override var isRecalledOrPlanned: AtomicBoolean = atomic(false)
private val _isRecalledOrPlanned = atomic(false)
@Transient
override val isRecalledOrPlanned: Boolean = _isRecalledOrPlanned.value
override fun setRecalled(): Boolean = _isRecalledOrPlanned.compareAndSet(expect = false, update = true)
private val jceData: ImMsgBody.SourceMsg by lazy { toJceDataImpl(subject) }
override fun toJceData(): ImMsgBody.SourceMsg = jceData
@ -193,7 +211,13 @@ internal class OnlineMessageSourceToGroupImpl(
get() = sequenceIds
override val bot: Bot
get() = sender
override var isRecalledOrPlanned: AtomicBoolean = atomic(false)
private val _isRecalledOrPlanned = atomic(false)
@Transient
override val isRecalledOrPlanned: Boolean = _isRecalledOrPlanned.value
override fun setRecalled(): Boolean = _isRecalledOrPlanned.compareAndSet(expect = false, update = true)
/**
* Note that in tests result of this Deferred is always `null`. See TestMessageSourceSequenceIdAwaiter.

View File

@ -106,10 +106,10 @@ internal class BotInitProcessorImpl(
}
state.value = INITIALIZED
bot.components[SsoProcessor].firstLoginResult.compareAndSet(null, FirstLoginResult.PASSED)
bot.components[SsoProcessor].casFirstLoginResult(null, FirstLoginResult.PASSED)
} catch (e: Throwable) {
setLoginHalted()
bot.components[SsoProcessor].firstLoginResult.compareAndSet(null, FirstLoginResult.OTHER_FAILURE)
bot.components[SsoProcessor].casFirstLoginResult(null, FirstLoginResult.OTHER_FAILURE)
throw e
}
}

View File

@ -45,7 +45,7 @@ internal class ConfigPushProcessorImpl(
val e = IllegalStateException("Timeout waiting for ConfigPush.")
bdhSyncer.bdhSession.completeExceptionally(e)
logger.warning { "Missing ConfigPush. Switching server..." }
network.context[SsoProcessor].firstLoginResult.compareAndSet(null, FirstLoginResult.CHANGE_SERVER)
network.context[SsoProcessor].casFirstLoginResult(null, FirstLoginResult.CHANGE_SERVER)
network.context.bot.components[EventDispatcher].broadcastAsync(
BotOfflineEvent.RequireReconnect(
network.context.bot,

View File

@ -44,8 +44,15 @@ internal interface SsoProcessor {
val client: QQAndroidClient
val ssoSession: SsoSession
val firstLoginResult: AtomicRef<FirstLoginResult?> // null means just initialized
val firstLoginSucceed: Boolean get() = firstLoginResult.value?.success ?: false
val firstLoginResult: FirstLoginResult? // null means just initialized
fun casFirstLoginResult(
expect: FirstLoginResult?,
update: FirstLoginResult?
): Boolean // enable compiler optimization
fun setFirstLoginResult(value: FirstLoginResult?)
val firstLoginSucceed: Boolean get() = firstLoginResult?.success ?: false
val registerResp: StatSvc.Register.Response?
/**
@ -114,7 +121,14 @@ internal class SsoProcessorImpl(
// public
///////////////////////////////////////////////////////////////////////////
override val firstLoginResult: AtomicRef<FirstLoginResult?> = atomic(null)
private val _firstLoginResult: AtomicRef<FirstLoginResult?> = atomic(null)
override val firstLoginResult = _firstLoginResult.value
override fun casFirstLoginResult(expect: FirstLoginResult?, update: FirstLoginResult?): Boolean =
_firstLoginResult.compareAndSet(expect, update)
override fun setFirstLoginResult(value: FirstLoginResult?) {
_firstLoginResult.value = value
}
@Volatile
override var registerResp: StatSvc.Register.Response? = null

View File

@ -23,7 +23,9 @@ import net.mamoe.mirai.utils.currentTimeSeconds
import kotlin.jvm.Volatile
internal interface SyncController {
val firstNotify: AtomicBoolean
val firstNotify: Boolean
fun casFirstNotify(expect: Boolean, update: Boolean): Boolean
var latestMsgNewGroupTime: Long
var latestMsgNewFriendTime: Long
@ -69,7 +71,9 @@ internal fun SyncController.syncOnlinePush(
)
internal class SyncControllerImpl : SyncController {
override val firstNotify: AtomicBoolean = atomic(true)
private val _firstNotify: AtomicBoolean = atomic(true)
override val firstNotify get() = _firstNotify.value
override fun casFirstNotify(expect: Boolean, update: Boolean): Boolean = _firstNotify.compareAndSet(expect, update)
@Volatile
override var latestMsgNewGroupTime: Long = currentTimeSeconds()

View File

@ -253,7 +253,7 @@ internal abstract class CommonNetworkHandler<Conn>(
this@CommonNetworkHandler.launch { resumeConnection() } // go to next state.
} else {
// failed in SSO stage
context[SsoProcessor].firstLoginResult.compareAndSet(null, FirstLoginResult.OTHER_FAILURE)
context[SsoProcessor].casFirstLoginResult(null, FirstLoginResult.OTHER_FAILURE)
if (error is CancellationException) {
// CancellationException is either caused by parent cancellation or manual `connectResult.cancel`.

View File

@ -119,7 +119,7 @@ internal abstract class AbstractKeepAliveNetworkHandlerSelector<H : NetworkHandl
lastNetwork = current
if (current != null) {
if (current.context[SsoProcessor].firstLoginResult.value?.canRecoverOnFirstLogin == false) {
if (current.context[SsoProcessor].firstLoginResult?.canRecoverOnFirstLogin == false) {
// == null 只表示
// == false 表示第一次登录失败, 且此失败没必要重试
logIfEnabled { "[FIRST LOGIN ERROR] current = $current" }
@ -176,7 +176,7 @@ internal abstract class AbstractKeepAliveNetworkHandlerSelector<H : NetworkHandl
// This may return false, meaning the error causing the state to be CLOSED is recoverable.
// Otherwise, it throws, meaning it is unrecoverable.
if (this@AbstractKeepAliveNetworkHandlerSelector.current.compareAndSet(current, null)) {
if (compareAndSetCurrent(current, null)) {
logIfEnabled { "... Set current to null." }
// invalidate the instance and try again.
@ -233,6 +233,9 @@ internal abstract class AbstractKeepAliveNetworkHandlerSelector<H : NetworkHandl
}
}
private fun compareAndSetCurrent(expect: H?, update: H?) =
current.compareAndSet(expect, update) // to enable compiler optimization
private val lock = SynchronizedObject()
protected open fun refreshInstance() {
synchronized(lock) { // avoid concurrent `createInstance()`

View File

@ -30,7 +30,7 @@ internal class ChunkedFlowSession<T>(
input.close()
}
private var offset = atomic(0L)
private val offset = atomic(0L)
internal suspend inline fun useAll(crossinline block: suspend (T) -> Unit) {
contract { callsInPlace(block, InvocationKind.UNKNOWN) }

View File

@ -10,7 +10,6 @@
package net.mamoe.mirai.internal.network.highway
import io.ktor.utils.io.core.*
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
@ -381,8 +380,6 @@ internal fun highwayPacketSession(
ByteArrayPool.checkBufferSize(sizePerPacket)
// require(ticket.size == 128) { "bad uKey. Required size=128, got ${ticket.size}" }
val ticket = atomic(initialTicket)
return ChunkedFlowSession(data.input(), ByteArray(sizePerPacket), callback) { buffer, size, offset ->
val head = CSDataHighwayHead.ReqDataHighwayHead(
msgBasehead = CSDataHighwayHead.DataHighwayHead(
@ -401,7 +398,7 @@ internal fun highwayPacketSession(
datalength = size,
dataoffset = offset,
filesize = data.size,
serviceticket = ticket.value,
serviceticket = initialTicket,
md5 = buffer.md5(0, size),
fileMd5 = fileMd5,
flag = 0,

View File

@ -363,9 +363,9 @@ internal class GroupNotificationProcessor(
val now = grayTip.msgTemplParam["uin"]?.findMember() ?: group.botAsMember
val previous = grayTip.msgTemplParam["uin_last"]?.findMember()
val lastTalkative = group.lastTalkative.value
val lastTalkative = group.lastTalkative
if (lastTalkative == now) return // duplicate
if (!group.lastTalkative.compareAndSet(lastTalkative, now)) return
if (!group.casLastTalkative(lastTalkative, now)) return
if (previous == null) {
collect(MemberHonorChangeEvent.Achieve(now, GroupHonorType.TALKATIVE))

View File

@ -10,7 +10,6 @@
package net.mamoe.mirai.internal.network.protocol.packet.chat.receive
import io.ktor.utils.io.core.*
import kotlinx.atomicfu.loop
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.network.components.SyncController.Companion.syncController
import net.mamoe.mirai.internal.network.protocol.data.jce.RequestPushNotify
@ -30,19 +29,23 @@ internal object MessageSvcPushNotify : IncomingPacketFactory<RequestPushNotify>(
}
override suspend fun QQAndroidBot.handle(packet: RequestPushNotify, sequenceId: Int): OutgoingPacket {
syncController.firstNotify.loop { firstNotify ->
network.run {
return MessageSvcPbGetMsg(
client,
MsgSvc.SyncFlag.START,
if (firstNotify) {
if (!syncController.firstNotify.compareAndSet(firstNotify, false)) {
return@loop
}
null
} else packet.vNotifyCookie,
)
while (true) {
val firstNotify = syncController.firstNotify
val cookie = if (firstNotify) {
if (!syncController.casFirstNotify(firstNotify, false)) {
continue
}
null
} else {
packet.vNotifyCookie
}
return MessageSvcPbGetMsg(
client,
MsgSvc.SyncFlag.START,
cookie,
)
}
}
}

View File

@ -16,23 +16,23 @@ import kotlinx.atomicfu.AtomicInt
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import net.mamoe.mirai.utils.getRandomUnsignedInt
import kotlin.jvm.JvmInline
import kotlin.jvm.JvmStatic
internal object AtomicIntSeq {
@JvmStatic
inline fun forMessageSeq(): AtomicIntMaxSeq = AtomicIntMaxSeq(atomic(0))
fun forMessageSeq(): AtomicIntMaxSeq = AtomicIntMaxSeq(0)
@JvmStatic
inline fun forPrivateSync(): AtomicInt65535Seq = AtomicInt65535Seq(atomic(getRandomUnsignedInt()))
fun forPrivateSync(): AtomicInt65535Seq = AtomicInt65535Seq(getRandomUnsignedInt())
}
// value classes to optimize space
@JvmInline
internal value class AtomicIntMaxSeq(
private val value: AtomicInt
internal class AtomicIntMaxSeq(
value: Int
) {
private val value: AtomicInt = atomic(value)
/**
* Increment [value] within the range from `0` (inclusive) to [Int.MAX_VALUE] (exclusive).
*/
@ -64,10 +64,11 @@ internal value class AtomicIntMaxSeq(
}
}
@JvmInline
internal value class AtomicInt65535Seq(
private val value: AtomicInt = atomic(0)
internal class AtomicInt65535Seq(
value: Int
) {
private val value: AtomicInt = atomic(value)
/**
* Increment [value] within the range from `0` (inclusive) to `65535` (exclusive).
*/

View File

@ -15,6 +15,7 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import net.mamoe.mirai.internal.test.AbstractTest
import net.mamoe.mirai.internal.utils.ScheduledJob
import net.mamoe.mirai.utils.AtomicInteger
import kotlin.test.Test
import kotlin.test.assertEquals
@ -25,7 +26,7 @@ internal class ScheduledJobTest : AbstractTest() {
val scope = CoroutineScope(CoroutineExceptionHandler { _, throwable ->
throwable.printStackTrace()
})
val invoked = atomic(0)
val invoked = AtomicInteger(0)
val job = ScheduledJob(scope.coroutineContext, 1000) {
invoked.incrementAndGet()
}

View File

@ -9,10 +9,10 @@
package net.mamoe.mirai.internal.event
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.*
import kotlinx.coroutines.test.runTest
import net.mamoe.mirai.event.*
import net.mamoe.mirai.utils.AtomicInteger
import net.mamoe.mirai.utils.childScope
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.test.AfterTest
@ -61,7 +61,7 @@ internal class EventTests : AbstractEventTest() {
fun `test concurrent listening`() {
resetEventListeners()
var listeners = 0
val counter = atomic(0)
val counter = AtomicInteger(0)
val channel = scope.globalEventChannel()
for (p in EventPriority.values()) {
repeat(2333) {
@ -85,8 +85,8 @@ internal class EventTests : AbstractEventTest() {
fun `test concurrent listening 3`() {
resetEventListeners()
runBlocking {
val called = atomic(0)
val registered = atomic(0)
val called = AtomicInteger(0)
val registered = AtomicInteger(0)
coroutineScope {
println("Step 0")
for (priority in EventPriority.values()) {
@ -116,8 +116,8 @@ internal class EventTests : AbstractEventTest() {
@Test
fun `test concurrent listening 2`() = runTest {
resetEventListeners()
val registered = atomic(0)
val called = atomic(0)
val registered = AtomicInteger(0)
val called = AtomicInteger(0)
val supervisor = CoroutineScope(SupervisorJob())

View File

@ -151,7 +151,7 @@ internal abstract class AbstractRealNetworkHandlerTest<H : NetworkHandler> : Abs
override suspend fun init() {
nhEvents.add(NHEvent.Init)
networkLogger.debug { "BotInitProcessor.init" }
bot.components[SsoProcessor].firstLoginResult.value = FirstLoginResult.PASSED
bot.components[SsoProcessor].setFirstLoginResult(FirstLoginResult.PASSED)
}
})
set(ServerList, ServerListImpl())
@ -207,9 +207,9 @@ internal abstract class AbstractRealNetworkHandlerTest<H : NetworkHandler> : Abs
val eventDispatcher get() = bot.components[EventDispatcher]
var firstLoginResult: FirstLoginResult?
get() = bot.components[SsoProcessor].firstLoginResult.value
get() = bot.components[SsoProcessor].firstLoginResult
set(value) {
bot.components[SsoProcessor].firstLoginResult.value = value
bot.components[SsoProcessor].setFirstLoginResult(value)
}
}

View File

@ -36,7 +36,16 @@ internal open class TestSsoProcessor(private val bot: QQAndroidBot) : SsoProcess
)
}
override val ssoSession: SsoSession get() = bot.client
override val firstLoginResult: AtomicRef<FirstLoginResult?> = atomic(null)
private val _firstLoginResult: AtomicRef<FirstLoginResult?> = atomic(null)
override val firstLoginResult get() = _firstLoginResult.value
override fun casFirstLoginResult(expect: FirstLoginResult?, update: FirstLoginResult?): Boolean {
return _firstLoginResult.compareAndSet(expect, update)
}
override fun setFirstLoginResult(value: FirstLoginResult?) {
_firstLoginResult.value = value
}
override var registerResp: StatSvc.Register.Response? = null
override suspend fun login(handler: NetworkHandler) {
bot.network.logger.debug { "SsoProcessor.login" }

View File

@ -53,7 +53,7 @@ internal class SelectorRecoveryTest : AbstractCommonNHTestWithSelector() {
throwExceptionOnLogin?.invoke()
}
}.apply {
firstLoginResult.value = FirstLoginResult.PASSED
setFirstLoginResult(FirstLoginResult.PASSED)
}
}

View File

@ -18,6 +18,7 @@ import net.mamoe.mirai.internal.network.framework.TestCommonNetworkHandler
import net.mamoe.mirai.internal.network.handler.selector.MaxAttemptsReachedException
import net.mamoe.mirai.internal.network.handler.selector.NetworkException
import net.mamoe.mirai.internal.test.runBlockingUnit
import net.mamoe.mirai.utils.AtomicInteger
import kotlin.test.*
/**
@ -59,8 +60,8 @@ internal class StandaloneSelectorTests : AbstractCommonNHTest() {
// Since #1963, any error during first login will close the bot. So we assume first login succeed to do our test.
@BeforeTest
private fun setFirstLoginPassed() {
assertEquals(null, bot.components[SsoProcessor].firstLoginResult.value)
bot.components[SsoProcessor].firstLoginResult.value = FirstLoginResult.PASSED
assertEquals(null, bot.components[SsoProcessor].firstLoginResult)
bot.components[SsoProcessor].setFirstLoginResult(FirstLoginResult.PASSED)
}
@Test
@ -79,7 +80,7 @@ internal class StandaloneSelectorTests : AbstractCommonNHTest() {
@Test
fun `NetworkException does not cause retrying if recoverable=false`() = runBlockingUnit {
// selector should not tolerant any exception during state initialization, or in the Jobs launched in states.
val times = atomic(0)
val times = AtomicInteger(0)
val theException = object : NetworkException(false) {}
throwExceptionOnConnecting = {
times.incrementAndGet()

View File

@ -50,7 +50,7 @@ internal class CommonNHEventTest : AbstractCommonNHTest() {
fun `BotOnlineEvent after successful reconnection`() = runBlockingUnit {
assertEquals(INITIALIZED, network.state)
bot.login()
bot.components[SsoProcessor].firstLoginResult.value = FirstLoginResult.PASSED
bot.components[SsoProcessor].setFirstLoginResult(FirstLoginResult.PASSED)
assertEquals(OK, network.state)
eventDispatcher.joinBroadcast() // `login` launches a job which broadcasts the event
assertEventBroadcasts<BotOnlineEvent>(1) {
@ -65,7 +65,7 @@ internal class CommonNHEventTest : AbstractCommonNHTest() {
fun `BotOfflineEvent after successful reconnection`() = runBlockingUnit {
assertEquals(INITIALIZED, network.state)
bot.login()
bot.components[SsoProcessor].firstLoginResult.value = FirstLoginResult.PASSED
bot.components[SsoProcessor].setFirstLoginResult(FirstLoginResult.PASSED)
assertEquals(OK, network.state)
eventDispatcher.joinBroadcast() // `login` launches a job which broadcasts the event
assertEventBroadcasts<BotOfflineEvent>(1) {
@ -95,24 +95,26 @@ internal class CommonNHEventTest : AbstractCommonNHTest() {
@Test
fun `from CONNECTING TO OK the second time`() = runBlockingUnit {
val ok = atomic(CompletableDeferred<Unit>())
val ok = object {
val v = atomic(CompletableDeferred<Unit>())
}
setSsoProcessor {
ok.value.join()
ok.v.value.join()
}
assertState(INITIALIZED)
network.setStateConnecting()
ok.value.complete(Unit)
ok.v.value.complete(Unit)
network.resumeConnection()
assertState(OK)
ok.value = CompletableDeferred()
ok.v.value = CompletableDeferred()
network.setStateConnecting()
eventDispatcher.joinBroadcast()
println("Starting receiving events")
assertEventBroadcasts<Event>(2) {
ok.value.complete(Unit)
ok.v.value.complete(Unit)
network.resumeConnection()
eventDispatcher.joinBroadcast()
}.let { event ->
@ -169,7 +171,7 @@ internal class CommonNHEventTest : AbstractCommonNHTest() {
assertState(INITIALIZED)
bot.login()
assertState(OK)
bot.components[SsoProcessor].firstLoginResult.value = FirstLoginResult.PASSED
bot.components[SsoProcessor].setFirstLoginResult(FirstLoginResult.PASSED)
network.setStateConnecting()
network.resumeConnection()
assertState(OK)

View File

@ -10,7 +10,6 @@
package net.mamoe.mirai.internal.network.impl.common
import io.ktor.utils.io.core.*
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.launch
import kotlinx.coroutines.yield
@ -19,6 +18,7 @@ import net.mamoe.mirai.internal.network.framework.AbstractCommonNHTest
import net.mamoe.mirai.internal.network.protocol.packet.IncomingPacket
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.internal.test.runBlockingUnit
import net.mamoe.mirai.utils.AtomicBoolean
import kotlin.test.Test
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
@ -29,7 +29,7 @@ internal class SendPacketTest : AbstractCommonNHTest() {
@Test
fun `sendPacketImpl suspends until a valid state`() = runBlockingUnit(singleThreadDispatcher) {
val expectStop = atomic(false)
val expectStop = AtomicBoolean(false)
// coroutine starts immediately and suspends at `net.mamoe.mirai.internal.network.impl.netty.NettyNetworkHandler.sendPacketImpl`
launch(singleThreadDispatcher, start = CoroutineStart.UNDISPATCHED) {
@ -48,7 +48,7 @@ internal class SendPacketTest : AbstractCommonNHTest() {
@Test
fun `sendPacketImpl does not suspend if state is valid`() = runBlockingUnit(singleThreadDispatcher) {
network.setStateOK(conn) // then we can send packet.
val expectStop = atomic(false)
val expectStop = AtomicBoolean(false)
val job = launch(singleThreadDispatcher, start = CoroutineStart.UNDISPATCHED) {
assertNotNull(network.sendAndExpect(OutgoingPacket("name", "cmd", 1, ByteReadPacket.Empty)))

View File

@ -60,7 +60,7 @@ internal abstract class AbstractNoticeProcessorTest : AbstractCommonNHTest(), Gr
pipeline: NoticeProcessorPipeline = bot.components.noticeProcessorPipeline,
block: UseTestContext.() -> ProtocolStruct
): Collection<Packet> {
bot.components[SsoProcessor].firstLoginResult.value = FirstLoginResult.PASSED
bot.components[SsoProcessor].setFirstLoginResult(FirstLoginResult.PASSED)
val handler = LoggingPacketHandlerAdapter(PacketLoggingStrategyImpl(bot), bot.logger)
val context = UseTestContext(attributes.toMutableTypeSafeMap())
return pipeline.process(block(context), context.attributes).collected.also { list ->

View File

@ -9,10 +9,10 @@
package net.mamoe.mirai.internal.event
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.runBlocking
import net.mamoe.mirai.event.*
import net.mamoe.mirai.utils.AtomicBoolean
import net.mamoe.mirai.utils.JavaFriendlyAPI
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.test.Test
@ -23,10 +23,11 @@ import kotlin.test.Test
internal class SimpleListenerHostTestJava : AbstractEventTest() {
@Test
fun testJavaSimpleListenerHostWork() {
val called = atomic(false)
val called = AtomicBoolean(false)
val host: SimpleListenerHost = object : SimpleListenerHost() {
@EventHandler
@net.mamoe.mirai.utils.EventListenerLikeJava
@Suppress("unused")
fun testListen(
event: AbstractEvent?
) {