From c17e8a32638ffda4757a964edba6c6db610cfa1d Mon Sep 17 00:00:00 2001 From: Karlatemp Date: Fri, 1 May 2020 21:05:04 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E7=9B=91=E5=90=AC=E4=BC=98?= =?UTF-8?q?=E5=85=88=E7=BA=A7,=20=E4=BA=8B=E4=BB=B6=E4=BC=A0=E9=80=92?= =?UTF-8?q?=E6=8B=A6=E6=88=AA=20(#279)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 支持监听优先级, 事件传递拦截 * Fix test * 并发 * 优先级&并发 * Test * Fix unused * To GlobalEventListeners * Add tests * intercept with subscribeAlways * test listener.complete() * Add functions * Fix test and add new test * Test concurrent listening * Test concurrent listening * update broadcast * Fix Boom Co-authored-by: Him188 --- .../kotlin/net.mamoe.mirai/event/Event.kt | 16 +- .../event/internal/InternalEventListeners.kt | 134 +++++----- .../net.mamoe.mirai/event/subscriber.kt | 230 +++++++++++++++++- .../utils/LockFreeLinkedList.kt | 46 +++- .../event/internal/MiraiAtomicBoolean.kt | 36 ++- .../net/mamoe/mirai/event/EventTests.kt | 206 +++++++++++++++- .../kotlin/net/mamoe/mirai/utils/StepUtil.kt | 23 ++ 7 files changed, 586 insertions(+), 105 deletions(-) create mode 100644 mirai-core/src/jvmTest/kotlin/net/mamoe/mirai/utils/StepUtil.kt diff --git a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/Event.kt b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/Event.kt index 7ca11cda5..3d7f83a78 100644 --- a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/Event.kt +++ b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/Event.kt @@ -35,9 +35,11 @@ import kotlin.jvm.JvmSynthetic */ interface Event { - @Deprecated(""" + @Deprecated( + """ Don't implement Event but extend AbstractEvent instead. - """, level = DeprecationLevel.HIDDEN) // so Kotlin class won't be compiled. + """, level = DeprecationLevel.HIDDEN + ) // so Kotlin class won't be compiled. @Suppress("WRONG_MODIFIER_CONTAINING_DECLARATION", "PropertyName") @get:JvmSynthetic // so Java user won't see it internal val DoNotImplementThisClassButExtendAbstractEvent: Nothing @@ -54,10 +56,9 @@ abstract class AbstractEvent : Event { final override val DoNotImplementThisClassButExtendAbstractEvent: Nothing get() = throw Error("Shouldn't be reached") - private val _intercepted = atomic(false) + private var _intercepted = false private val _cancelled = atomic(false) - /** * 事件是否已被拦截. * @@ -65,7 +66,7 @@ abstract class AbstractEvent : Event { */ @SinceMirai("1.0.0") val isIntercepted: Boolean - get() = _intercepted.value + get() = _intercepted /** * 拦截这个事件. @@ -73,7 +74,7 @@ abstract class AbstractEvent : Event { */ @SinceMirai("1.0.0") fun intercept() { - _intercepted.value = true + _intercepted = true } @@ -145,5 +146,6 @@ interface BroadcastControllable : Event { @Deprecated( "use AbstractEvent and implement CancellableEvent", level = DeprecationLevel.ERROR, - replaceWith = ReplaceWith("AbstractEvent", "net.mamoe.mirai.event.AbstractEvent")) + replaceWith = ReplaceWith("AbstractEvent", "net.mamoe.mirai.event.AbstractEvent") +) abstract class AbstractCancellableEvent : AbstractEvent(), CancellableEvent diff --git a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/internal/InternalEventListeners.kt b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/internal/InternalEventListeners.kt index b64bc4063..ac76ec619 100644 --- a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/internal/InternalEventListeners.kt +++ b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/internal/InternalEventListeners.kt @@ -13,40 +13,41 @@ package net.mamoe.mirai.event.internal import kotlinx.coroutines.* import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock -import net.mamoe.mirai.event.Event -import net.mamoe.mirai.event.EventDisabled -import net.mamoe.mirai.event.Listener -import net.mamoe.mirai.event.ListeningStatus -import net.mamoe.mirai.utils.LockFreeLinkedList -import net.mamoe.mirai.utils.MiraiInternalAPI -import net.mamoe.mirai.utils.MiraiLogger -import net.mamoe.mirai.utils.isRemoved +import net.mamoe.mirai.event.* +import net.mamoe.mirai.utils.* import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext import kotlin.jvm.JvmField import kotlin.reflect.KClass + @PublishedApi internal fun , E : Event> KClass.subscribeInternal(listener: L): L { - with(this.listeners()) { - addLast(listener) + with(GlobalEventListeners[listener.priority]) { + @Suppress("UNCHECKED_CAST") + val node = ListenerNode(listener as Listener, this@subscribeInternal) + @OptIn(MiraiInternalAPI::class) + addLast(node) listener.invokeOnCompletion { - this.remove(listener) + @OptIn(MiraiInternalAPI::class) + this.remove(node) } } return listener } + @PublishedApi @Suppress("FunctionName") internal fun CoroutineScope.Handler( coroutineContext: CoroutineContext, concurrencyKind: Listener.ConcurrencyKind, + priority: Listener.EventPriority = Listener.EventPriority.NORMAL, handler: suspend (E) -> ListeningStatus ): Handler { @OptIn(ExperimentalCoroutinesApi::class) // don't remove val context = this.newCoroutineContext(coroutineContext) - return Handler(context[Job], context, handler, concurrencyKind) + return Handler(context[Job], context, handler, concurrencyKind, priority) } /** @@ -58,15 +59,17 @@ internal class Handler parentJob: Job?, subscriberContext: CoroutineContext, @JvmField val handler: suspend (E) -> ListeningStatus, - override val concurrencyKind: Listener.ConcurrencyKind + override val concurrencyKind: Listener.ConcurrencyKind, + override val priority: Listener.EventPriority ) : Listener, CompletableJob by SupervisorJob(parentJob) { // avoid being cancelled on handling event private val subscriberContext: CoroutineContext = subscriberContext + this // override Job. + @MiraiInternalAPI val lock: Mutex? = when (concurrencyKind) { - Listener.ConcurrencyKind.CONCURRENT -> null Listener.ConcurrencyKind.LOCKED -> Mutex() + else -> null } @Suppress("unused") @@ -95,14 +98,12 @@ internal class Handler } } -/** - * 这个事件类的监听器 list - */ -internal fun KClass.listeners(): EventListeners = EventListenerManager.get(this) - -internal expect class EventListeners(clazz: KClass) : LockFreeLinkedList> { - @Suppress("UNCHECKED_CAST", "UNSUPPORTED", "NO_REFLECTION_IN_CLASS_PATH") - val supertypes: Set> +internal class ListenerNode( + val listener: Listener, + val owner: KClass +) +internal expect object GlobalEventListeners { + operator fun get(priority: Listener.EventPriority): LockFreeLinkedList } internal expect class MiraiAtomicBoolean(initial: Boolean) { @@ -112,63 +113,56 @@ internal expect class MiraiAtomicBoolean(initial: Boolean) { var value: Boolean } -/** - * 管理每个事件 class 的 [EventListeners]. - * [EventListeners] 是 lazy 的: 它们只会在被需要的时候才创建和存储. - */ -internal object EventListenerManager { - private data class Registry(val clazz: KClass, val listeners: EventListeners) - - private val registries = LockFreeLinkedList>() - - // 不要用 atomicfu. 在 publish 后会出现 VerifyError - private val lock: MiraiAtomicBoolean = MiraiAtomicBoolean(false) - - @OptIn(MiraiInternalAPI::class) - @Suppress("UNCHECKED_CAST", "BooleanLiteralArgument") - internal tailrec fun get(clazz: KClass): EventListeners { - registries.forEach { - if (it.clazz == clazz) { - return it.listeners as EventListeners - } - } - if (lock.compareAndSet(false, true)) { - val registry = Registry(clazz as KClass, EventListeners(clazz)) - registries.addLast(registry) - lock.value = false - return registry.listeners - } - return get(clazz) - } -} // inline: NO extra Continuation @Suppress("UNCHECKED_CAST") internal suspend inline fun Event.broadcastInternal() = coroutineScope { if (EventDisabled) return@coroutineScope - - val listeners = this@broadcastInternal::class.listeners() - callAndRemoveIfRequired(this@broadcastInternal, listeners) - listeners.supertypes.forEach { - callAndRemoveIfRequired(this@broadcastInternal, it.listeners()) - } + callAndRemoveIfRequired(this@broadcastInternal as? AbstractEvent ?: error("Events must extends AbstractEvent")) } @OptIn(MiraiInternalAPI::class) -private fun CoroutineScope.callAndRemoveIfRequired(event: E, listeners: EventListeners) { - // atomic foreach - listeners.forEachNode { node -> - launch { - val listener = node.nodeValue - if (listener.concurrencyKind == Listener.ConcurrencyKind.LOCKED) { - (listener as Handler).lock!!.withLock { - if (!node.isRemoved() && listener.onEvent(event) == ListeningStatus.STOPPED) { - listeners.remove(listener) // atomic remove - } +private suspend fun callAndRemoveIfRequired( + event: E +) { + coroutineScope { + for (p in Listener.EventPriority.values()) { + GlobalEventListeners[p].forEachNode { eventNode -> + if (event.isIntercepted) { + return@coroutineScope } - } else { - if (!node.isRemoved() && listener.onEvent(event) == ListeningStatus.STOPPED) { - listeners.remove(listener) // atomic remove + val node = eventNode.nodeValue + if (!node.owner.isInstance(event)) return@forEachNode + val listener = node.listener + when (listener.concurrencyKind) { + Listener.ConcurrencyKind.LOCKED -> { + (listener as Handler).lock!!.withLock { + kotlin.runCatching { + when (listener.onEvent(event)) { + ListeningStatus.STOPPED -> { + removeNode(eventNode) + } + else -> { + } + } + }.onFailure { + // TODO("Exception catching") + } + } + } + Listener.ConcurrencyKind.CONCURRENT -> { + kotlin.runCatching { + when (listener.onEvent(event)) { + ListeningStatus.STOPPED -> { + removeNode(eventNode) + } + else -> { + } + } + }.onFailure { + // TODO("Exception catching") + } + } } } } diff --git a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/subscriber.kt b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/subscriber.kt index 365996e8b..9c2cee87d 100644 --- a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/subscriber.kt +++ b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/subscriber.kt @@ -70,7 +70,15 @@ interface Listener : CompletableJob { */ val concurrencyKind: ConcurrencyKind + enum class EventPriority { + MONITOR, HIGHEST, HIGH, NORMAL, LOW, LOWEST + + } + + val priority: EventPriority get() = EventPriority.NORMAL + suspend fun onEvent(event: E): ListeningStatus + } // region 顶层方法 创建当前 coroutineContext 下的子 Job @@ -120,6 +128,7 @@ interface Listener : CompletableJob { * * @param coroutineContext 给事件监听协程的额外的 [CoroutineContext]. * @param concurrency 并发类型. 查看 [Listener.ConcurrencyKind] + * @param priority 监听优先级,优先级越高越先执行 * * @see syncFromEvent 监听一个事件, 并尝试从这个事件中获取一个值. * @see asyncFromEvent 异步监听一个事件, 并尝试从这个事件中获取一个值. @@ -137,8 +146,9 @@ interface Listener : CompletableJob { inline fun CoroutineScope.subscribe( coroutineContext: CoroutineContext = EmptyCoroutineContext, concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, + priority: Listener.EventPriority = Listener.EventPriority.NORMAL, noinline handler: suspend E.(E) -> ListeningStatus -): Listener = subscribe(E::class, coroutineContext, concurrency, handler) +): Listener = subscribe(E::class, coroutineContext, concurrency, priority, handler) /** * @see CoroutineScope.subscribe @@ -148,8 +158,9 @@ fun CoroutineScope.subscribe( eventClass: KClass, coroutineContext: CoroutineContext = EmptyCoroutineContext, concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, + priority: Listener.EventPriority = Listener.EventPriority.NORMAL, handler: suspend E.(E) -> ListeningStatus -): Listener = eventClass.subscribeInternal(Handler(coroutineContext, concurrency) { it.handler(it); }) +): Listener = eventClass.subscribeInternal(Handler(coroutineContext, concurrency, priority) { it.handler(it); }) /** * 在指定的 [CoroutineScope] 下订阅所有 [E] 及其子类事件. @@ -159,14 +170,17 @@ fun CoroutineScope.subscribe( * [Bot] 被关闭后事件监听会被 [取消][Listener.cancel]. * * @param coroutineContext 给事件监听协程的额外的 [CoroutineContext] + * @param priority 处理优先级, 优先级高的先执行 * * @see CoroutineScope.subscribe 获取更多说明 */ inline fun CoroutineScope.subscribeAlways( coroutineContext: CoroutineContext = EmptyCoroutineContext, concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, + priority: Listener.EventPriority = Listener.EventPriority.NORMAL, noinline listener: suspend E.(E) -> Unit -): Listener = subscribeAlways(E::class, coroutineContext, concurrency, listener) +): Listener = subscribeAlways(E::class, coroutineContext, concurrency, priority, listener) + /** * @see CoroutineScope.subscribeAlways @@ -176,9 +190,10 @@ fun CoroutineScope.subscribeAlways( eventClass: KClass, coroutineContext: CoroutineContext = EmptyCoroutineContext, concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, + priority: Listener.EventPriority = Listener.EventPriority.NORMAL, listener: suspend E.(E) -> Unit ): Listener = eventClass.subscribeInternal( - Handler(coroutineContext, concurrency) { it.listener(it); ListeningStatus.LISTENING } + Handler(coroutineContext, concurrency, priority) { it.listener(it); ListeningStatus.LISTENING } ) /** @@ -189,14 +204,16 @@ fun CoroutineScope.subscribeAlways( * [Bot] 被关闭后事件监听会被 [取消][Listener.cancel]. * * @param coroutineContext 给事件监听协程的额外的 [CoroutineContext] + * @param priority 处理优先级, 优先级高的先执行 * * @see subscribe 获取更多说明 */ @JvmSynthetic inline fun CoroutineScope.subscribeOnce( coroutineContext: CoroutineContext = EmptyCoroutineContext, + priority: Listener.EventPriority = Listener.EventPriority.NORMAL, noinline listener: suspend E.(E) -> Unit -): Listener = subscribeOnce(E::class, coroutineContext, listener) +): Listener = subscribeOnce(E::class, coroutineContext, priority, listener) /** * @see CoroutineScope.subscribeOnce @@ -205,9 +222,10 @@ inline fun CoroutineScope.subscribeOnce( fun CoroutineScope.subscribeOnce( eventClass: KClass, coroutineContext: CoroutineContext = EmptyCoroutineContext, + priority: Listener.EventPriority = Listener.EventPriority.NORMAL, listener: suspend E.(E) -> Unit ): Listener = eventClass.subscribeInternal( - Handler(coroutineContext, Listener.ConcurrencyKind.LOCKED) { it.listener(it); ListeningStatus.STOPPED } + Handler(coroutineContext, Listener.ConcurrencyKind.LOCKED, priority) { it.listener(it); ListeningStatus.STOPPED } ) // @@ -224,6 +242,7 @@ fun CoroutineScope.subscribeOnce( * [Bot] 被关闭后事件监听会被 [取消][Listener.cancel]. * * @param coroutineContext 给事件监听协程的额外的 [CoroutineContext] + * @param priority 事件优先级, 优先级高的先处理 * * @see subscribe 获取更多说明 */ @@ -233,8 +252,9 @@ fun CoroutineScope.subscribeOnce( inline fun Bot.subscribe( coroutineContext: CoroutineContext = EmptyCoroutineContext, concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, + priority: Listener.EventPriority = Listener.EventPriority.NORMAL, noinline handler: suspend E.(E) -> ListeningStatus -): Listener = this.subscribe(E::class, coroutineContext, concurrency, handler) +): Listener = this.subscribe(E::class, coroutineContext, concurrency, priority, handler) /** * @see Bot.subscribe @@ -244,12 +264,16 @@ fun Bot.subscribe( eventClass: KClass, coroutineContext: CoroutineContext = EmptyCoroutineContext, concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, + priority: Listener.EventPriority = Listener.EventPriority.NORMAL, handler: suspend E.(E) -> ListeningStatus ): Listener = eventClass.subscribeInternal( - Handler(coroutineContext, concurrency) { if (it.bot === this) it.handler(it) else ListeningStatus.LISTENING } + Handler( + coroutineContext, + concurrency, + priority + ) { if (it.bot === this) it.handler(it) else ListeningStatus.LISTENING } ) - /** * 在 [Bot] 的 [CoroutineScope] 下订阅所有 [E] 及其子类事件. * 每当 [事件广播][Event.broadcast] 时, [listener] 都会被执行. @@ -258,6 +282,7 @@ fun Bot.subscribe( * [Bot] 被关闭后事件监听会被 [取消][Listener.cancel]. * * @param coroutineContext 给事件监听协程的额外的 [CoroutineContext] + * @param priority 事件优先级, 优先级高的先处理 * * @see subscribe 获取更多说明 */ @@ -267,8 +292,9 @@ fun Bot.subscribe( inline fun Bot.subscribeAlways( coroutineContext: CoroutineContext = EmptyCoroutineContext, concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT, + priority: Listener.EventPriority = Listener.EventPriority.NORMAL, noinline listener: suspend E.(E) -> Unit -): Listener = subscribeAlways(E::class, coroutineContext, concurrency, listener) +): Listener = subscribeAlways(E::class, coroutineContext, concurrency, priority, listener) /** * @see Bot.subscribeAlways @@ -278,9 +304,10 @@ fun Bot.subscribeAlways( eventClass: KClass, coroutineContext: CoroutineContext = EmptyCoroutineContext, concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT, + priority: Listener.EventPriority = Listener.EventPriority.NORMAL, listener: suspend E.(E) -> Unit ): Listener = eventClass.subscribeInternal( - Handler(coroutineContext, concurrency) { if (it.bot === this) it.listener(it); ListeningStatus.LISTENING } + Handler(coroutineContext, concurrency, priority) { if (it.bot === this) it.listener(it); ListeningStatus.LISTENING } ) /** @@ -291,6 +318,7 @@ fun Bot.subscribeAlways( * [Bot] 被关闭后事件监听会被 [取消][Listener.cancel]. * * @param coroutineContext 给事件监听协程的额外的 [CoroutineContext] + * @param priority 事件优先级, 高的先处理 * * @see subscribe 获取更多说明 */ @@ -298,8 +326,9 @@ fun Bot.subscribeAlways( @JvmName("subscribeOnceForBot2") inline fun Bot.subscribeOnce( coroutineContext: CoroutineContext = EmptyCoroutineContext, + priority: Listener.EventPriority = Listener.EventPriority.NORMAL, noinline listener: suspend E.(E) -> Unit -): Listener = subscribeOnce(E::class, coroutineContext, listener) +): Listener = subscribeOnce(E::class, coroutineContext, priority, listener) /** * @see Bot.subscribeOnce @@ -308,13 +337,188 @@ inline fun Bot.subscribeOnce( fun Bot.subscribeOnce( eventClass: KClass, coroutineContext: CoroutineContext = EmptyCoroutineContext, + priority: Listener.EventPriority = Listener.EventPriority.NORMAL, listener: suspend E.(E) -> Unit ): Listener = - eventClass.subscribeInternal(Handler(coroutineContext, Listener.ConcurrencyKind.LOCKED) { + eventClass.subscribeInternal(Handler(coroutineContext, Listener.ConcurrencyKind.LOCKED, priority) { if (it.bot === this) { it.listener(it) ListeningStatus.STOPPED } else ListeningStatus.LISTENING }) +// endregion + +// region 为了兼容旧版本的方法 + +@JvmName("subscribe") +@JvmSynthetic +@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN) +@Suppress("unused") +inline fun CoroutineScope.subscribeDeprecated( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, + noinline handler: suspend E.(E) -> ListeningStatus +): Listener = subscribe( + coroutineContext = coroutineContext, + concurrency = concurrency, + handler = handler +) + +@JvmName("subscribe") +@JvmSynthetic +@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN) +@Suppress("unused") +fun CoroutineScope.subscribeDeprecated( + eventClass: KClass, + coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, + handler: suspend E.(E) -> ListeningStatus +): Listener = subscribe( + eventClass = eventClass, + coroutineContext = coroutineContext, + concurrency = concurrency, + handler = handler +) + +@JvmName("subscribeAlways") +@JvmSynthetic +@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN) +@Suppress("unused") +inline fun CoroutineScope.subscribeAlwaysDeprecated( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, + noinline listener: suspend E.(E) -> Unit +): Listener = subscribeAlways( + coroutineContext = coroutineContext, + concurrency = concurrency, + listener = listener +) + +@JvmName("subscribeAlways") +@JvmSynthetic +@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN) +@Suppress("unused") +fun CoroutineScope.subscribeAlwaysDeprecated( + eventClass: KClass, + coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, + listener: suspend E.(E) -> Unit +): Listener = subscribeAlways( + eventClass = eventClass, + coroutineContext = coroutineContext, + concurrency = concurrency, + listener = listener +) + +@JvmName("subscribeOnce") +@JvmSynthetic +@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN) +@Suppress("unused") +inline fun CoroutineScope.subscribeOnceDeprecated( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + noinline listener: suspend E.(E) -> Unit +): Listener = subscribeOnce(coroutineContext = coroutineContext, listener = listener) + +@JvmName("subscribeOnce") +@JvmSynthetic +@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN) +@Suppress("unused") +fun CoroutineScope.subscribeOnceDeprecated( + eventClass: KClass, + coroutineContext: CoroutineContext = EmptyCoroutineContext, + listener: suspend E.(E) -> Unit +): Listener = subscribeOnce( + eventClass = eventClass, + coroutineContext = coroutineContext, + listener = listener +) + +@JvmSynthetic +@JvmName("subscribeAlwaysForBot") +@OptIn(MiraiInternalAPI::class) +@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN) +@Suppress("unused") +inline fun Bot.subscribeDeprecated( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, + noinline handler: suspend E.(E) -> ListeningStatus +): Listener = this.subscribe( + coroutineContext = coroutineContext, + concurrency = concurrency, + handler = handler +) + +@JvmSynthetic +@JvmName("subscribe") +@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN) +@Suppress("unused") +fun Bot.subscribeDeprecated( + eventClass: KClass, + coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, + handler: suspend E.(E) -> ListeningStatus +): Listener = subscribe( + eventClass = eventClass, + coroutineContext = coroutineContext, + concurrency = concurrency, + handler = handler +) + +@JvmSynthetic +@JvmName("subscribeAlwaysForBot1") +@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN) +@Suppress("unused") +@OptIn(MiraiInternalAPI::class) +inline fun Bot.subscribeAlwaysDeprecated( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT, + noinline listener: suspend E.(E) -> Unit +): Listener = subscribeAlways( + coroutineContext = coroutineContext, + concurrency = concurrency, + listener = listener +) + +@JvmSynthetic +@JvmName("subscribeAlways") +@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN) +@Suppress("unused") +fun Bot.subscribeAlwaysDeprecated( + eventClass: KClass, + coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT, + listener: suspend E.(E) -> Unit +): Listener = subscribeAlways( + eventClass = eventClass, + coroutineContext = coroutineContext, + concurrency = concurrency, + listener = listener +) + +@JvmSynthetic +@JvmName("subscribeOnceForBot2") +@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN) +@Suppress("unused") +inline fun Bot.subscribeOnceDeprecated( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + noinline listener: suspend E.(E) -> Unit +): Listener = subscribeOnce( + coroutineContext = coroutineContext, + listener = listener +) + +@JvmSynthetic +@JvmName("subscribeOnce") +@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN) +@Suppress("unused") +fun Bot.subscribeOnceDeprecated( + eventClass: KClass, + coroutineContext: CoroutineContext = EmptyCoroutineContext, + listener: suspend E.(E) -> Unit +): Listener = subscribeOnce( + eventClass = eventClass, + coroutineContext = coroutineContext, + listener = listener +) // endregion \ No newline at end of file diff --git a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/LockFreeLinkedList.kt b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/LockFreeLinkedList.kt index adae98c1e..04db2c23e 100644 --- a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/LockFreeLinkedList.kt +++ b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/utils/LockFreeLinkedList.kt @@ -156,6 +156,18 @@ open class LockFreeLinkedList { } } + open fun tryInsertAfter(node: LockFreeLinkedListNode, newValue: E): Boolean { + if (node == tail) { + error("Cannot insert value after tail") + } + if (node.isRemoved()) { + return false + } + val next = node.nextNodeRef.value + val newNode = newValue.asNode(next) + return node.nextNodeRef.compareAndSet(next, newNode) + } + /** * 先把元素建立好链表, 再加入到 list. */ @@ -329,7 +341,8 @@ open class LockFreeLinkedList { } } - inline fun forEachNode(block: (LockFreeLinkedListNode) -> Unit) { + inline fun forEachNode(block: LockFreeLinkedList.(LockFreeLinkedListNode) -> Unit) { + // Copy from forEach var node: LockFreeLinkedListNode = head while (true) { if (node === tail) return @@ -358,28 +371,41 @@ open class LockFreeLinkedList { @Suppress("unused") open fun removeAll(elements: Collection): Boolean = elements.all { remove(it) } - /* - - - private fun removeNode(node: Node): Boolean { + @Suppress("DuplicatedCode") + open fun removeNode(node: LockFreeLinkedListNode): Boolean { if (node == tail) { return false } while (true) { val before = head.iterateBeforeFirst { it === node } val toRemove = before.nextNode - val next = toRemove.nextNode - if (toRemove == tail) { // This - return true + if (toRemove === tail) { + return false + } + if (toRemove.isRemoved()) { + continue + } + @Suppress("BooleanLiteralArgument") // false positive + if (!toRemove.removed.compareAndSet(false, true)) { + // logically remove: all the operations will recognize this node invalid + continue } - toRemove.nodeValue = null // logically remove first, then all the operations will recognize this node invalid - if (before.nextNodeRef.compareAndSet(toRemove, next)) { // physically remove: try to fix the link + + // physically remove: try to fix the link + var next: LockFreeLinkedListNode = toRemove.nextNode + while (next !== tail && next.isRemoved()) { + next = next.nextNode + } + if (before.nextNodeRef.compareAndSet(toRemove, next)) { return true } } } + /* + + fun removeAt(index: Int): E { require(index >= 0) { "index must be >= 0" } val nodeBeforeIndex = head.iterateValidNodeNTimes(index) diff --git a/mirai-core/src/jvmMain/kotlin/net/mamoe/mirai/event/internal/MiraiAtomicBoolean.kt b/mirai-core/src/jvmMain/kotlin/net/mamoe/mirai/event/internal/MiraiAtomicBoolean.kt index 7101b3adc..e89a7b165 100644 --- a/mirai-core/src/jvmMain/kotlin/net/mamoe/mirai/event/internal/MiraiAtomicBoolean.kt +++ b/mirai-core/src/jvmMain/kotlin/net/mamoe/mirai/event/internal/MiraiAtomicBoolean.kt @@ -1,3 +1,12 @@ +/* + * Copyright 2020 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/master/LICENSE + */ + @file:OptIn(MiraiInternalAPI::class) package net.mamoe.mirai.event.internal @@ -5,7 +14,10 @@ package net.mamoe.mirai.event.internal import net.mamoe.mirai.event.Event import net.mamoe.mirai.event.Listener import net.mamoe.mirai.utils.LockFreeLinkedList +import net.mamoe.mirai.utils.LockFreeLinkedListNode +import net.mamoe.mirai.utils.isRemoved import net.mamoe.mirai.utils.MiraiInternalAPI +import java.util.* import java.util.concurrent.atomic.AtomicBoolean import kotlin.reflect.KClass @@ -24,14 +36,30 @@ internal actual class MiraiAtomicBoolean actual constructor(initial: Boolean) { } } +internal actual object GlobalEventListeners { + private val map: Map> + + init { + val map = EnumMap>(Listener.EventPriority::class.java) + Listener.EventPriority.values().forEach { + map[it] = LockFreeLinkedList() + } + this.map = map + } + + actual operator fun get(priority: Listener.EventPriority): LockFreeLinkedList = map[priority]!! + +} + +/* internal actual class EventListeners actual constructor(clazz: KClass) : LockFreeLinkedList>() { @Suppress("UNCHECKED_CAST", "UNSUPPORTED", "NO_REFLECTION_IN_CLASS_PATH") actual val supertypes: Set> by lazy { val supertypes = mutableSetOf>() - fun addSupertypes(clazz: KClass) { - clazz.supertypes.forEach { + fun addSupertypes(klass: KClass) { + klass.supertypes.forEach { val classifier = it.classifier as? KClass if (classifier != null) { supertypes.add(classifier) @@ -43,4 +71,6 @@ internal actual class EventListeners actual constructor(clazz: KClass supertypes } -} \ No newline at end of file + +} + */ \ No newline at end of file diff --git a/mirai-core/src/jvmTest/kotlin/net/mamoe/mirai/event/EventTests.kt b/mirai-core/src/jvmTest/kotlin/net/mamoe/mirai/event/EventTests.kt index 6b2b2f254..b740314f9 100644 --- a/mirai-core/src/jvmTest/kotlin/net/mamoe/mirai/event/EventTests.kt +++ b/mirai-core/src/jvmTest/kotlin/net/mamoe/mirai/event/EventTests.kt @@ -9,9 +9,14 @@ package net.mamoe.mirai.event -import kotlinx.coroutines.CompletableJob -import kotlinx.coroutines.GlobalScope +import kotlinx.atomicfu.AtomicInt +import kotlinx.atomicfu.atomic +import kotlinx.coroutines.* +import net.mamoe.mirai.utils.StepUtil import net.mamoe.mirai.utils.internal.runBlocking +import java.util.concurrent.Executor +import java.util.concurrent.atomic.AtomicInteger +import kotlin.concurrent.thread import kotlin.test.Test import kotlin.test.assertTrue @@ -43,6 +48,96 @@ class EventTests { } } + @Test + fun `test concurrent listening`() { + var listeners = 0 + val counter = AtomicInteger(0) + for (p in Listener.EventPriority.values()) { + repeat(2333) { + listeners++ + GlobalScope.subscribeAlways { + counter.getAndIncrement() + } + } + } + kotlinx.coroutines.runBlocking { + ParentEvent().broadcast() + delay(5000L) // ? + } + val called = counter.get() + println("Registered $listeners listeners and $called called") + if (listeners != called) { + throw IllegalStateException("Registered $listeners listeners but only $called called") + } + } + + @Test + fun `test concurrent listening 3`() { + runBlocking { + val called = AtomicInteger() + val registered = AtomicInteger() + coroutineScope { + println("Step 0") + for (priority in Listener.EventPriority.values()) { + launch { + repeat(5000) { + registered.getAndIncrement() + GlobalScope.subscribeAlways( + priority = priority + ) { + called.getAndIncrement() + } + } + println("Registeterd $priority") + } + } + println("Step 1") + } + println("Step 2") + ParentEvent().broadcast() + println("Step 3") + check(called.get() == registered.get()) + println("Done") + println("Called ${called.get()}, registered ${registered.get()}") + } + } + + @Test + fun `test concurrent listening 2`() { + val registered = AtomicInteger() + val called = AtomicInteger() + val threads = mutableListOf() + repeat(50) { + threads.add(thread { + repeat(444) { + registered.getAndIncrement() + GlobalScope.launch { + subscribeAlways { + called.getAndIncrement() + } + } + } + }) + } + Thread.sleep(5000L)// Wait all thread started. + threads.forEach { + it.join() // Wait all finished + } + println("All listeners registered") + val postCount = 3 + kotlinx.coroutines.runBlocking { + repeat(postCount) { + ParentEvent().broadcast() + } + delay(5000L) + } + val calledCount = called.get() + val shouldCalled = registered.get() * postCount + println("Should call $shouldCalled times and $called called") + if (shouldCalled != calledCount) { + throw IllegalStateException("?") + } + } open class ParentEvent : Event, AbstractEvent() { var triggered = false @@ -76,4 +171,111 @@ class EventTests { job.complete() } } + + open class PriorityTestEvent : AbstractEvent() {} + + fun singleThreaded(step: StepUtil, invoke: suspend CoroutineScope.() -> Unit) { + // runBlocking 会完全堵死, 没法退出 + val scope = CoroutineScope(Executor { it.run() }.asCoroutineDispatcher()) + val job = scope.launch { + invoke(scope) + } + kotlinx.coroutines.runBlocking { + job.join() + } + step.throws() + } + + @Test + fun `test handler remvoe`() { + val step = StepUtil() + singleThreaded(step) { + subscribe { + step.step(0) + ListeningStatus.STOPPED + } + ParentEvent().broadcast() + ParentEvent().broadcast() + } + } + + /* + @Test + fun `test boom`() { + val step = StepUtil() + singleThreaded(step) { + step.step(0) + step.step(0) + } + } + */ + + @Test + fun `test intercept with always`() { + val step = StepUtil() + singleThreaded(step) { + subscribeAlways { + step.step(0) + intercept() + } + subscribe { + step.step(-1, "Boom") + ListeningStatus.LISTENING + } + ParentEvent().broadcast() + } + } + + @Test + fun `test intercept`() { + val step = StepUtil() + singleThreaded(step) { + subscribeAlways { + step.step(0) + intercept() + } + subscribe { + step.step(-1, "Boom") + ListeningStatus.LISTENING + } + ParentEvent().broadcast() + } + } + + @Test + fun `test listener complete`() { + val step = StepUtil() + singleThreaded(step) { + val listener = subscribeAlways { + step.step(0, "boom!") + } + ParentEvent().broadcast() + listener.complete() + ParentEvent().broadcast() + } + } + + @Test + fun `test event priority`() { + val step = StepUtil() + singleThreaded(step) { + subscribe { + step.step(1) + ListeningStatus.LISTENING + } + subscribe(priority = Listener.EventPriority.HIGH) { + step.step(0) + ListeningStatus.LISTENING + } + subscribe(priority = Listener.EventPriority.LOW) { + step.step(3) + ListeningStatus.LISTENING + } + subscribe { + step.step(2) + ListeningStatus.LISTENING + } + PriorityTestEvent().broadcast() + } + } } \ No newline at end of file diff --git a/mirai-core/src/jvmTest/kotlin/net/mamoe/mirai/utils/StepUtil.kt b/mirai-core/src/jvmTest/kotlin/net/mamoe/mirai/utils/StepUtil.kt new file mode 100644 index 000000000..a62bd1a29 --- /dev/null +++ b/mirai-core/src/jvmTest/kotlin/net/mamoe/mirai/utils/StepUtil.kt @@ -0,0 +1,23 @@ +package net.mamoe.mirai.utils + +import kotlinx.atomicfu.atomic +import java.util.concurrent.ConcurrentLinkedDeque + +class StepUtil { + val step = atomic(0) + val exceptions = ConcurrentLinkedDeque() + fun step(step: Int, message: String = "Wrong step") { + println("Invoking step $step") + if (step != this.step.getAndIncrement()) { + throw IllegalStateException(message).also { exceptions.add(it) } + } + } + + fun throws() { + if (exceptions.isEmpty()) return + val root = exceptions.poll()!! + while (true) { + root.addSuppressed(exceptions.poll() ?: throw root) + } + } +} \ No newline at end of file