支持监听优先级, 事件传递拦截 (#279)

* 支持监听优先级, 事件传递拦截

* Fix test

* 并发

* 优先级&并发

* Test

* Fix unused

* To GlobalEventListeners

* Add tests

* intercept with subscribeAlways

* test listener.complete()

* Add functions

* Fix test and add new test

* Test concurrent listening

* Test concurrent listening

* update broadcast

* Fix Boom

Co-authored-by: Him188 <Him188@mamoe.net>
This commit is contained in:
Karlatemp 2020-05-01 21:05:04 +08:00 committed by GitHub
parent 659cfa2288
commit c17e8a3263
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 586 additions and 105 deletions

View File

@ -35,9 +35,11 @@ import kotlin.jvm.JvmSynthetic
*/ */
interface Event { interface Event {
@Deprecated(""" @Deprecated(
"""
Don't implement Event but extend AbstractEvent instead. Don't implement Event but extend AbstractEvent instead.
""", level = DeprecationLevel.HIDDEN) // so Kotlin class won't be compiled. """, level = DeprecationLevel.HIDDEN
) // so Kotlin class won't be compiled.
@Suppress("WRONG_MODIFIER_CONTAINING_DECLARATION", "PropertyName") @Suppress("WRONG_MODIFIER_CONTAINING_DECLARATION", "PropertyName")
@get:JvmSynthetic // so Java user won't see it @get:JvmSynthetic // so Java user won't see it
internal val DoNotImplementThisClassButExtendAbstractEvent: Nothing internal val DoNotImplementThisClassButExtendAbstractEvent: Nothing
@ -54,10 +56,9 @@ abstract class AbstractEvent : Event {
final override val DoNotImplementThisClassButExtendAbstractEvent: Nothing final override val DoNotImplementThisClassButExtendAbstractEvent: Nothing
get() = throw Error("Shouldn't be reached") get() = throw Error("Shouldn't be reached")
private val _intercepted = atomic(false) private var _intercepted = false
private val _cancelled = atomic(false) private val _cancelled = atomic(false)
/** /**
* 事件是否已被拦截. * 事件是否已被拦截.
* *
@ -65,7 +66,7 @@ abstract class AbstractEvent : Event {
*/ */
@SinceMirai("1.0.0") @SinceMirai("1.0.0")
val isIntercepted: Boolean val isIntercepted: Boolean
get() = _intercepted.value get() = _intercepted
/** /**
* 拦截这个事件. * 拦截这个事件.
@ -73,7 +74,7 @@ abstract class AbstractEvent : Event {
*/ */
@SinceMirai("1.0.0") @SinceMirai("1.0.0")
fun intercept() { fun intercept() {
_intercepted.value = true _intercepted = true
} }
@ -145,5 +146,6 @@ interface BroadcastControllable : Event {
@Deprecated( @Deprecated(
"use AbstractEvent and implement CancellableEvent", "use AbstractEvent and implement CancellableEvent",
level = DeprecationLevel.ERROR, level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith("AbstractEvent", "net.mamoe.mirai.event.AbstractEvent")) replaceWith = ReplaceWith("AbstractEvent", "net.mamoe.mirai.event.AbstractEvent")
)
abstract class AbstractCancellableEvent : AbstractEvent(), CancellableEvent abstract class AbstractCancellableEvent : AbstractEvent(), CancellableEvent

View File

@ -13,40 +13,41 @@ package net.mamoe.mirai.event.internal
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import net.mamoe.mirai.event.Event import net.mamoe.mirai.event.*
import net.mamoe.mirai.event.EventDisabled import net.mamoe.mirai.utils.*
import net.mamoe.mirai.event.Listener
import net.mamoe.mirai.event.ListeningStatus
import net.mamoe.mirai.utils.LockFreeLinkedList
import net.mamoe.mirai.utils.MiraiInternalAPI
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.isRemoved
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext import kotlin.coroutines.coroutineContext
import kotlin.jvm.JvmField import kotlin.jvm.JvmField
import kotlin.reflect.KClass import kotlin.reflect.KClass
@PublishedApi @PublishedApi
internal fun <L : Listener<E>, E : Event> KClass<out E>.subscribeInternal(listener: L): L { internal fun <L : Listener<E>, E : Event> KClass<out E>.subscribeInternal(listener: L): L {
with(this.listeners()) { with(GlobalEventListeners[listener.priority]) {
addLast(listener) @Suppress("UNCHECKED_CAST")
val node = ListenerNode(listener as Listener<Event>, this@subscribeInternal)
@OptIn(MiraiInternalAPI::class)
addLast(node)
listener.invokeOnCompletion { listener.invokeOnCompletion {
this.remove(listener) @OptIn(MiraiInternalAPI::class)
this.remove(node)
} }
} }
return listener return listener
} }
@PublishedApi @PublishedApi
@Suppress("FunctionName") @Suppress("FunctionName")
internal fun <E : Event> CoroutineScope.Handler( internal fun <E : Event> CoroutineScope.Handler(
coroutineContext: CoroutineContext, coroutineContext: CoroutineContext,
concurrencyKind: Listener.ConcurrencyKind, concurrencyKind: Listener.ConcurrencyKind,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
handler: suspend (E) -> ListeningStatus handler: suspend (E) -> ListeningStatus
): Handler<E> { ): Handler<E> {
@OptIn(ExperimentalCoroutinesApi::class) // don't remove @OptIn(ExperimentalCoroutinesApi::class) // don't remove
val context = this.newCoroutineContext(coroutineContext) val context = this.newCoroutineContext(coroutineContext)
return Handler(context[Job], context, handler, concurrencyKind) return Handler(context[Job], context, handler, concurrencyKind, priority)
} }
/** /**
@ -58,15 +59,17 @@ internal class Handler<in E : Event>
parentJob: Job?, parentJob: Job?,
subscriberContext: CoroutineContext, subscriberContext: CoroutineContext,
@JvmField val handler: suspend (E) -> ListeningStatus, @JvmField val handler: suspend (E) -> ListeningStatus,
override val concurrencyKind: Listener.ConcurrencyKind override val concurrencyKind: Listener.ConcurrencyKind,
override val priority: Listener.EventPriority
) : Listener<E>, CompletableJob by SupervisorJob(parentJob) { // avoid being cancelled on handling event ) : Listener<E>, CompletableJob by SupervisorJob(parentJob) { // avoid being cancelled on handling event
private val subscriberContext: CoroutineContext = subscriberContext + this // override Job. private val subscriberContext: CoroutineContext = subscriberContext + this // override Job.
@MiraiInternalAPI @MiraiInternalAPI
val lock: Mutex? = when (concurrencyKind) { val lock: Mutex? = when (concurrencyKind) {
Listener.ConcurrencyKind.CONCURRENT -> null
Listener.ConcurrencyKind.LOCKED -> Mutex() Listener.ConcurrencyKind.LOCKED -> Mutex()
else -> null
} }
@Suppress("unused") @Suppress("unused")
@ -95,14 +98,12 @@ internal class Handler<in E : Event>
} }
} }
/** internal class ListenerNode(
* 这个事件类的监听器 list val listener: Listener<Event>,
*/ val owner: KClass<out Event>
internal fun <E : Event> KClass<out E>.listeners(): EventListeners<E> = EventListenerManager.get(this) )
internal expect object GlobalEventListeners {
internal expect class EventListeners<E : Event>(clazz: KClass<E>) : LockFreeLinkedList<Listener<E>> { operator fun get(priority: Listener.EventPriority): LockFreeLinkedList<ListenerNode>
@Suppress("UNCHECKED_CAST", "UNSUPPORTED", "NO_REFLECTION_IN_CLASS_PATH")
val supertypes: Set<KClass<out Event>>
} }
internal expect class MiraiAtomicBoolean(initial: Boolean) { internal expect class MiraiAtomicBoolean(initial: Boolean) {
@ -112,63 +113,56 @@ internal expect class MiraiAtomicBoolean(initial: Boolean) {
var value: Boolean var value: Boolean
} }
/**
* 管理每个事件 class [EventListeners].
* [EventListeners] lazy : 它们只会在被需要的时候才创建和存储.
*/
internal object EventListenerManager {
private data class Registry<E : Event>(val clazz: KClass<E>, val listeners: EventListeners<E>)
private val registries = LockFreeLinkedList<Registry<*>>()
// 不要用 atomicfu. 在 publish 后会出现 VerifyError
private val lock: MiraiAtomicBoolean = MiraiAtomicBoolean(false)
@OptIn(MiraiInternalAPI::class)
@Suppress("UNCHECKED_CAST", "BooleanLiteralArgument")
internal tailrec fun <E : Event> get(clazz: KClass<out E>): EventListeners<E> {
registries.forEach {
if (it.clazz == clazz) {
return it.listeners as EventListeners<E>
}
}
if (lock.compareAndSet(false, true)) {
val registry = Registry(clazz as KClass<E>, EventListeners(clazz))
registries.addLast(registry)
lock.value = false
return registry.listeners
}
return get(clazz)
}
}
// inline: NO extra Continuation // inline: NO extra Continuation
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
internal suspend inline fun Event.broadcastInternal() = coroutineScope { internal suspend inline fun Event.broadcastInternal() = coroutineScope {
if (EventDisabled) return@coroutineScope if (EventDisabled) return@coroutineScope
callAndRemoveIfRequired(this@broadcastInternal as? AbstractEvent ?: error("Events must extends AbstractEvent"))
val listeners = this@broadcastInternal::class.listeners()
callAndRemoveIfRequired(this@broadcastInternal, listeners)
listeners.supertypes.forEach {
callAndRemoveIfRequired(this@broadcastInternal, it.listeners())
}
} }
@OptIn(MiraiInternalAPI::class) @OptIn(MiraiInternalAPI::class)
private fun <E : Event> CoroutineScope.callAndRemoveIfRequired(event: E, listeners: EventListeners<E>) { private suspend fun <E : AbstractEvent> callAndRemoveIfRequired(
// atomic foreach event: E
listeners.forEachNode { node -> ) {
launch { coroutineScope {
val listener = node.nodeValue for (p in Listener.EventPriority.values()) {
if (listener.concurrencyKind == Listener.ConcurrencyKind.LOCKED) { GlobalEventListeners[p].forEachNode { eventNode ->
(listener as Handler).lock!!.withLock { if (event.isIntercepted) {
if (!node.isRemoved() && listener.onEvent(event) == ListeningStatus.STOPPED) { return@coroutineScope
listeners.remove(listener) // atomic remove
}
} }
} else { val node = eventNode.nodeValue
if (!node.isRemoved() && listener.onEvent(event) == ListeningStatus.STOPPED) { if (!node.owner.isInstance(event)) return@forEachNode
listeners.remove(listener) // atomic remove val listener = node.listener
when (listener.concurrencyKind) {
Listener.ConcurrencyKind.LOCKED -> {
(listener as Handler).lock!!.withLock {
kotlin.runCatching {
when (listener.onEvent(event)) {
ListeningStatus.STOPPED -> {
removeNode(eventNode)
}
else -> {
}
}
}.onFailure {
// TODO("Exception catching")
}
}
}
Listener.ConcurrencyKind.CONCURRENT -> {
kotlin.runCatching {
when (listener.onEvent(event)) {
ListeningStatus.STOPPED -> {
removeNode(eventNode)
}
else -> {
}
}
}.onFailure {
// TODO("Exception catching")
}
}
} }
} }
} }

View File

@ -70,7 +70,15 @@ interface Listener<in E : Event> : CompletableJob {
*/ */
val concurrencyKind: ConcurrencyKind val concurrencyKind: ConcurrencyKind
enum class EventPriority {
MONITOR, HIGHEST, HIGH, NORMAL, LOW, LOWEST
}
val priority: EventPriority get() = EventPriority.NORMAL
suspend fun onEvent(event: E): ListeningStatus suspend fun onEvent(event: E): ListeningStatus
} }
// region 顶层方法 创建当前 coroutineContext 下的子 Job // region 顶层方法 创建当前 coroutineContext 下的子 Job
@ -120,6 +128,7 @@ interface Listener<in E : Event> : CompletableJob {
* *
* @param coroutineContext 给事件监听协程的额外的 [CoroutineContext]. * @param coroutineContext 给事件监听协程的额外的 [CoroutineContext].
* @param concurrency 并发类型. 查看 [Listener.ConcurrencyKind] * @param concurrency 并发类型. 查看 [Listener.ConcurrencyKind]
* @param priority 监听优先级优先级越高越先执行
* *
* @see syncFromEvent 监听一个事件, 并尝试从这个事件中获取一个值. * @see syncFromEvent 监听一个事件, 并尝试从这个事件中获取一个值.
* @see asyncFromEvent 异步监听一个事件, 并尝试从这个事件中获取一个值. * @see asyncFromEvent 异步监听一个事件, 并尝试从这个事件中获取一个值.
@ -137,8 +146,9 @@ interface Listener<in E : Event> : CompletableJob {
inline fun <reified E : Event> CoroutineScope.subscribe( inline fun <reified E : Event> CoroutineScope.subscribe(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
noinline handler: suspend E.(E) -> ListeningStatus noinline handler: suspend E.(E) -> ListeningStatus
): Listener<E> = subscribe(E::class, coroutineContext, concurrency, handler) ): Listener<E> = subscribe(E::class, coroutineContext, concurrency, priority, handler)
/** /**
* @see CoroutineScope.subscribe * @see CoroutineScope.subscribe
@ -148,8 +158,9 @@ fun <E : Event> CoroutineScope.subscribe(
eventClass: KClass<E>, eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
handler: suspend E.(E) -> ListeningStatus handler: suspend E.(E) -> ListeningStatus
): Listener<E> = eventClass.subscribeInternal(Handler(coroutineContext, concurrency) { it.handler(it); }) ): Listener<E> = eventClass.subscribeInternal(Handler(coroutineContext, concurrency, priority) { it.handler(it); })
/** /**
* 在指定的 [CoroutineScope] 下订阅所有 [E] 及其子类事件. * 在指定的 [CoroutineScope] 下订阅所有 [E] 及其子类事件.
@ -159,14 +170,17 @@ fun <E : Event> CoroutineScope.subscribe(
* [Bot] 被关闭后事件监听会被 [取消][Listener.cancel]. * [Bot] 被关闭后事件监听会被 [取消][Listener.cancel].
* *
* @param coroutineContext 给事件监听协程的额外的 [CoroutineContext] * @param coroutineContext 给事件监听协程的额外的 [CoroutineContext]
* @param priority 处理优先级, 优先级高的先执行
* *
* @see CoroutineScope.subscribe 获取更多说明 * @see CoroutineScope.subscribe 获取更多说明
*/ */
inline fun <reified E : Event> CoroutineScope.subscribeAlways( inline fun <reified E : Event> CoroutineScope.subscribeAlways(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
noinline listener: suspend E.(E) -> Unit noinline listener: suspend E.(E) -> Unit
): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, listener) ): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, priority, listener)
/** /**
* @see CoroutineScope.subscribeAlways * @see CoroutineScope.subscribeAlways
@ -176,9 +190,10 @@ fun <E : Event> CoroutineScope.subscribeAlways(
eventClass: KClass<E>, eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
listener: suspend E.(E) -> Unit listener: suspend E.(E) -> Unit
): Listener<E> = eventClass.subscribeInternal( ): Listener<E> = eventClass.subscribeInternal(
Handler(coroutineContext, concurrency) { it.listener(it); ListeningStatus.LISTENING } Handler(coroutineContext, concurrency, priority) { it.listener(it); ListeningStatus.LISTENING }
) )
/** /**
@ -189,14 +204,16 @@ fun <E : Event> CoroutineScope.subscribeAlways(
* [Bot] 被关闭后事件监听会被 [取消][Listener.cancel]. * [Bot] 被关闭后事件监听会被 [取消][Listener.cancel].
* *
* @param coroutineContext 给事件监听协程的额外的 [CoroutineContext] * @param coroutineContext 给事件监听协程的额外的 [CoroutineContext]
* @param priority 处理优先级, 优先级高的先执行
* *
* @see subscribe 获取更多说明 * @see subscribe 获取更多说明
*/ */
@JvmSynthetic @JvmSynthetic
inline fun <reified E : Event> CoroutineScope.subscribeOnce( inline fun <reified E : Event> CoroutineScope.subscribeOnce(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
noinline listener: suspend E.(E) -> Unit noinline listener: suspend E.(E) -> Unit
): Listener<E> = subscribeOnce(E::class, coroutineContext, listener) ): Listener<E> = subscribeOnce(E::class, coroutineContext, priority, listener)
/** /**
* @see CoroutineScope.subscribeOnce * @see CoroutineScope.subscribeOnce
@ -205,9 +222,10 @@ inline fun <reified E : Event> CoroutineScope.subscribeOnce(
fun <E : Event> CoroutineScope.subscribeOnce( fun <E : Event> CoroutineScope.subscribeOnce(
eventClass: KClass<E>, eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
listener: suspend E.(E) -> Unit listener: suspend E.(E) -> Unit
): Listener<E> = eventClass.subscribeInternal( ): Listener<E> = eventClass.subscribeInternal(
Handler(coroutineContext, Listener.ConcurrencyKind.LOCKED) { it.listener(it); ListeningStatus.STOPPED } Handler(coroutineContext, Listener.ConcurrencyKind.LOCKED, priority) { it.listener(it); ListeningStatus.STOPPED }
) )
// //
@ -224,6 +242,7 @@ fun <E : Event> CoroutineScope.subscribeOnce(
* [Bot] 被关闭后事件监听会被 [取消][Listener.cancel]. * [Bot] 被关闭后事件监听会被 [取消][Listener.cancel].
* *
* @param coroutineContext 给事件监听协程的额外的 [CoroutineContext] * @param coroutineContext 给事件监听协程的额外的 [CoroutineContext]
* @param priority 事件优先级, 优先级高的先处理
* *
* @see subscribe 获取更多说明 * @see subscribe 获取更多说明
*/ */
@ -233,8 +252,9 @@ fun <E : Event> CoroutineScope.subscribeOnce(
inline fun <reified E : BotEvent> Bot.subscribe( inline fun <reified E : BotEvent> Bot.subscribe(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
noinline handler: suspend E.(E) -> ListeningStatus noinline handler: suspend E.(E) -> ListeningStatus
): Listener<E> = this.subscribe(E::class, coroutineContext, concurrency, handler) ): Listener<E> = this.subscribe(E::class, coroutineContext, concurrency, priority, handler)
/** /**
* @see Bot.subscribe * @see Bot.subscribe
@ -244,12 +264,16 @@ fun <E : BotEvent> Bot.subscribe(
eventClass: KClass<E>, eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
handler: suspend E.(E) -> ListeningStatus handler: suspend E.(E) -> ListeningStatus
): Listener<E> = eventClass.subscribeInternal( ): Listener<E> = eventClass.subscribeInternal(
Handler(coroutineContext, concurrency) { if (it.bot === this) it.handler(it) else ListeningStatus.LISTENING } Handler(
coroutineContext,
concurrency,
priority
) { if (it.bot === this) it.handler(it) else ListeningStatus.LISTENING }
) )
/** /**
* [Bot] [CoroutineScope] 下订阅所有 [E] 及其子类事件. * [Bot] [CoroutineScope] 下订阅所有 [E] 及其子类事件.
* 每当 [事件广播][Event.broadcast] , [listener] 都会被执行. * 每当 [事件广播][Event.broadcast] , [listener] 都会被执行.
@ -258,6 +282,7 @@ fun <E : BotEvent> Bot.subscribe(
* [Bot] 被关闭后事件监听会被 [取消][Listener.cancel]. * [Bot] 被关闭后事件监听会被 [取消][Listener.cancel].
* *
* @param coroutineContext 给事件监听协程的额外的 [CoroutineContext] * @param coroutineContext 给事件监听协程的额外的 [CoroutineContext]
* @param priority 事件优先级, 优先级高的先处理
* *
* @see subscribe 获取更多说明 * @see subscribe 获取更多说明
*/ */
@ -267,8 +292,9 @@ fun <E : BotEvent> Bot.subscribe(
inline fun <reified E : BotEvent> Bot.subscribeAlways( inline fun <reified E : BotEvent> Bot.subscribeAlways(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT, concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
noinline listener: suspend E.(E) -> Unit noinline listener: suspend E.(E) -> Unit
): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, listener) ): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, priority, listener)
/** /**
* @see Bot.subscribeAlways * @see Bot.subscribeAlways
@ -278,9 +304,10 @@ fun <E : BotEvent> Bot.subscribeAlways(
eventClass: KClass<E>, eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT, concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
listener: suspend E.(E) -> Unit listener: suspend E.(E) -> Unit
): Listener<E> = eventClass.subscribeInternal( ): Listener<E> = eventClass.subscribeInternal(
Handler(coroutineContext, concurrency) { if (it.bot === this) it.listener(it); ListeningStatus.LISTENING } Handler(coroutineContext, concurrency, priority) { if (it.bot === this) it.listener(it); ListeningStatus.LISTENING }
) )
/** /**
@ -291,6 +318,7 @@ fun <E : BotEvent> Bot.subscribeAlways(
* [Bot] 被关闭后事件监听会被 [取消][Listener.cancel]. * [Bot] 被关闭后事件监听会被 [取消][Listener.cancel].
* *
* @param coroutineContext 给事件监听协程的额外的 [CoroutineContext] * @param coroutineContext 给事件监听协程的额外的 [CoroutineContext]
* @param priority 事件优先级, 高的先处理
* *
* @see subscribe 获取更多说明 * @see subscribe 获取更多说明
*/ */
@ -298,8 +326,9 @@ fun <E : BotEvent> Bot.subscribeAlways(
@JvmName("subscribeOnceForBot2") @JvmName("subscribeOnceForBot2")
inline fun <reified E : BotEvent> Bot.subscribeOnce( inline fun <reified E : BotEvent> Bot.subscribeOnce(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
noinline listener: suspend E.(E) -> Unit noinline listener: suspend E.(E) -> Unit
): Listener<E> = subscribeOnce(E::class, coroutineContext, listener) ): Listener<E> = subscribeOnce(E::class, coroutineContext, priority, listener)
/** /**
* @see Bot.subscribeOnce * @see Bot.subscribeOnce
@ -308,13 +337,188 @@ inline fun <reified E : BotEvent> Bot.subscribeOnce(
fun <E : BotEvent> Bot.subscribeOnce( fun <E : BotEvent> Bot.subscribeOnce(
eventClass: KClass<E>, eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
listener: suspend E.(E) -> Unit listener: suspend E.(E) -> Unit
): Listener<E> = ): Listener<E> =
eventClass.subscribeInternal(Handler(coroutineContext, Listener.ConcurrencyKind.LOCKED) { eventClass.subscribeInternal(Handler(coroutineContext, Listener.ConcurrencyKind.LOCKED, priority) {
if (it.bot === this) { if (it.bot === this) {
it.listener(it) it.listener(it)
ListeningStatus.STOPPED ListeningStatus.STOPPED
} else ListeningStatus.LISTENING } else ListeningStatus.LISTENING
}) })
// endregion
// region 为了兼容旧版本的方法
@JvmName("subscribe")
@JvmSynthetic
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
@Suppress("unused")
inline fun <reified E : Event> CoroutineScope.subscribeDeprecated(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
noinline handler: suspend E.(E) -> ListeningStatus
): Listener<E> = subscribe(
coroutineContext = coroutineContext,
concurrency = concurrency,
handler = handler
)
@JvmName("subscribe")
@JvmSynthetic
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
@Suppress("unused")
fun <E : Event> CoroutineScope.subscribeDeprecated(
eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
handler: suspend E.(E) -> ListeningStatus
): Listener<E> = subscribe(
eventClass = eventClass,
coroutineContext = coroutineContext,
concurrency = concurrency,
handler = handler
)
@JvmName("subscribeAlways")
@JvmSynthetic
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
@Suppress("unused")
inline fun <reified E : Event> CoroutineScope.subscribeAlwaysDeprecated(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
noinline listener: suspend E.(E) -> Unit
): Listener<E> = subscribeAlways(
coroutineContext = coroutineContext,
concurrency = concurrency,
listener = listener
)
@JvmName("subscribeAlways")
@JvmSynthetic
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
@Suppress("unused")
fun <E : Event> CoroutineScope.subscribeAlwaysDeprecated(
eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
listener: suspend E.(E) -> Unit
): Listener<E> = subscribeAlways(
eventClass = eventClass,
coroutineContext = coroutineContext,
concurrency = concurrency,
listener = listener
)
@JvmName("subscribeOnce")
@JvmSynthetic
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
@Suppress("unused")
inline fun <reified E : Event> CoroutineScope.subscribeOnceDeprecated(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
noinline listener: suspend E.(E) -> Unit
): Listener<E> = subscribeOnce(coroutineContext = coroutineContext, listener = listener)
@JvmName("subscribeOnce")
@JvmSynthetic
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
@Suppress("unused")
fun <E : Event> CoroutineScope.subscribeOnceDeprecated(
eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
listener: suspend E.(E) -> Unit
): Listener<E> = subscribeOnce(
eventClass = eventClass,
coroutineContext = coroutineContext,
listener = listener
)
@JvmSynthetic
@JvmName("subscribeAlwaysForBot")
@OptIn(MiraiInternalAPI::class)
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
@Suppress("unused")
inline fun <reified E : BotEvent> Bot.subscribeDeprecated(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
noinline handler: suspend E.(E) -> ListeningStatus
): Listener<E> = this.subscribe(
coroutineContext = coroutineContext,
concurrency = concurrency,
handler = handler
)
@JvmSynthetic
@JvmName("subscribe")
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
@Suppress("unused")
fun <E : BotEvent> Bot.subscribeDeprecated(
eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
handler: suspend E.(E) -> ListeningStatus
): Listener<E> = subscribe(
eventClass = eventClass,
coroutineContext = coroutineContext,
concurrency = concurrency,
handler = handler
)
@JvmSynthetic
@JvmName("subscribeAlwaysForBot1")
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
@Suppress("unused")
@OptIn(MiraiInternalAPI::class)
inline fun <reified E : BotEvent> Bot.subscribeAlwaysDeprecated(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
noinline listener: suspend E.(E) -> Unit
): Listener<E> = subscribeAlways(
coroutineContext = coroutineContext,
concurrency = concurrency,
listener = listener
)
@JvmSynthetic
@JvmName("subscribeAlways")
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
@Suppress("unused")
fun <E : BotEvent> Bot.subscribeAlwaysDeprecated(
eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
listener: suspend E.(E) -> Unit
): Listener<E> = subscribeAlways(
eventClass = eventClass,
coroutineContext = coroutineContext,
concurrency = concurrency,
listener = listener
)
@JvmSynthetic
@JvmName("subscribeOnceForBot2")
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
@Suppress("unused")
inline fun <reified E : BotEvent> Bot.subscribeOnceDeprecated(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
noinline listener: suspend E.(E) -> Unit
): Listener<E> = subscribeOnce(
coroutineContext = coroutineContext,
listener = listener
)
@JvmSynthetic
@JvmName("subscribeOnce")
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
@Suppress("unused")
fun <E : BotEvent> Bot.subscribeOnceDeprecated(
eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
listener: suspend E.(E) -> Unit
): Listener<E> = subscribeOnce(
eventClass = eventClass,
coroutineContext = coroutineContext,
listener = listener
)
// endregion // endregion

View File

@ -156,6 +156,18 @@ open class LockFreeLinkedList<E> {
} }
} }
open fun tryInsertAfter(node: LockFreeLinkedListNode<E>, newValue: E): Boolean {
if (node == tail) {
error("Cannot insert value after tail")
}
if (node.isRemoved()) {
return false
}
val next = node.nextNodeRef.value
val newNode = newValue.asNode(next)
return node.nextNodeRef.compareAndSet(next, newNode)
}
/** /**
* 先把元素建立好链表, 再加入到 list. * 先把元素建立好链表, 再加入到 list.
*/ */
@ -329,7 +341,8 @@ open class LockFreeLinkedList<E> {
} }
} }
inline fun forEachNode(block: (LockFreeLinkedListNode<E>) -> Unit) { inline fun forEachNode(block: LockFreeLinkedList<E>.(LockFreeLinkedListNode<E>) -> Unit) {
// Copy from forEach
var node: LockFreeLinkedListNode<E> = head var node: LockFreeLinkedListNode<E> = head
while (true) { while (true) {
if (node === tail) return if (node === tail) return
@ -358,28 +371,41 @@ open class LockFreeLinkedList<E> {
@Suppress("unused") @Suppress("unused")
open fun removeAll(elements: Collection<E>): Boolean = elements.all { remove(it) } open fun removeAll(elements: Collection<E>): Boolean = elements.all { remove(it) }
/* @Suppress("DuplicatedCode")
open fun removeNode(node: LockFreeLinkedListNode<E>): Boolean {
private fun removeNode(node: Node<E>): Boolean {
if (node == tail) { if (node == tail) {
return false return false
} }
while (true) { while (true) {
val before = head.iterateBeforeFirst { it === node } val before = head.iterateBeforeFirst { it === node }
val toRemove = before.nextNode val toRemove = before.nextNode
val next = toRemove.nextNode if (toRemove === tail) {
if (toRemove == tail) { // This return false
return true }
if (toRemove.isRemoved()) {
continue
}
@Suppress("BooleanLiteralArgument") // false positive
if (!toRemove.removed.compareAndSet(false, true)) {
// logically remove: all the operations will recognize this node invalid
continue
} }
toRemove.nodeValue = null // logically remove first, then all the operations will recognize this node invalid
if (before.nextNodeRef.compareAndSet(toRemove, next)) { // physically remove: try to fix the link
// physically remove: try to fix the link
var next: LockFreeLinkedListNode<E> = toRemove.nextNode
while (next !== tail && next.isRemoved()) {
next = next.nextNode
}
if (before.nextNodeRef.compareAndSet(toRemove, next)) {
return true return true
} }
} }
} }
/*
fun removeAt(index: Int): E { fun removeAt(index: Int): E {
require(index >= 0) { "index must be >= 0" } require(index >= 0) { "index must be >= 0" }
val nodeBeforeIndex = head.iterateValidNodeNTimes(index) val nodeBeforeIndex = head.iterateValidNodeNTimes(index)

View File

@ -1,3 +1,12 @@
/*
* Copyright 2020 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
@file:OptIn(MiraiInternalAPI::class) @file:OptIn(MiraiInternalAPI::class)
package net.mamoe.mirai.event.internal package net.mamoe.mirai.event.internal
@ -5,7 +14,10 @@ package net.mamoe.mirai.event.internal
import net.mamoe.mirai.event.Event import net.mamoe.mirai.event.Event
import net.mamoe.mirai.event.Listener import net.mamoe.mirai.event.Listener
import net.mamoe.mirai.utils.LockFreeLinkedList import net.mamoe.mirai.utils.LockFreeLinkedList
import net.mamoe.mirai.utils.LockFreeLinkedListNode
import net.mamoe.mirai.utils.isRemoved
import net.mamoe.mirai.utils.MiraiInternalAPI import net.mamoe.mirai.utils.MiraiInternalAPI
import java.util.*
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import kotlin.reflect.KClass import kotlin.reflect.KClass
@ -24,14 +36,30 @@ internal actual class MiraiAtomicBoolean actual constructor(initial: Boolean) {
} }
} }
internal actual object GlobalEventListeners {
private val map: Map<Listener.EventPriority, LockFreeLinkedList<ListenerNode>>
init {
val map = EnumMap<Listener.EventPriority, LockFreeLinkedList<ListenerNode>>(Listener.EventPriority::class.java)
Listener.EventPriority.values().forEach {
map[it] = LockFreeLinkedList()
}
this.map = map
}
actual operator fun get(priority: Listener.EventPriority): LockFreeLinkedList<ListenerNode> = map[priority]!!
}
/*
internal actual class EventListeners<E : Event> actual constructor(clazz: KClass<E>) : internal actual class EventListeners<E : Event> actual constructor(clazz: KClass<E>) :
LockFreeLinkedList<Listener<E>>() { LockFreeLinkedList<Listener<E>>() {
@Suppress("UNCHECKED_CAST", "UNSUPPORTED", "NO_REFLECTION_IN_CLASS_PATH") @Suppress("UNCHECKED_CAST", "UNSUPPORTED", "NO_REFLECTION_IN_CLASS_PATH")
actual val supertypes: Set<KClass<out Event>> by lazy { actual val supertypes: Set<KClass<out Event>> by lazy {
val supertypes = mutableSetOf<KClass<out Event>>() val supertypes = mutableSetOf<KClass<out Event>>()
fun addSupertypes(clazz: KClass<out Event>) { fun addSupertypes(klass: KClass<out Event>) {
clazz.supertypes.forEach { klass.supertypes.forEach {
val classifier = it.classifier as? KClass<out Event> val classifier = it.classifier as? KClass<out Event>
if (classifier != null) { if (classifier != null) {
supertypes.add(classifier) supertypes.add(classifier)
@ -43,4 +71,6 @@ internal actual class EventListeners<E : Event> actual constructor(clazz: KClass
supertypes supertypes
} }
}
}
*/

View File

@ -9,9 +9,14 @@
package net.mamoe.mirai.event package net.mamoe.mirai.event
import kotlinx.coroutines.CompletableJob import kotlinx.atomicfu.AtomicInt
import kotlinx.coroutines.GlobalScope import kotlinx.atomicfu.atomic
import kotlinx.coroutines.*
import net.mamoe.mirai.utils.StepUtil
import net.mamoe.mirai.utils.internal.runBlocking import net.mamoe.mirai.utils.internal.runBlocking
import java.util.concurrent.Executor
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
import kotlin.test.Test import kotlin.test.Test
import kotlin.test.assertTrue import kotlin.test.assertTrue
@ -43,6 +48,96 @@ class EventTests {
} }
} }
@Test
fun `test concurrent listening`() {
var listeners = 0
val counter = AtomicInteger(0)
for (p in Listener.EventPriority.values()) {
repeat(2333) {
listeners++
GlobalScope.subscribeAlways<ParentEvent> {
counter.getAndIncrement()
}
}
}
kotlinx.coroutines.runBlocking {
ParentEvent().broadcast()
delay(5000L) // ?
}
val called = counter.get()
println("Registered $listeners listeners and $called called")
if (listeners != called) {
throw IllegalStateException("Registered $listeners listeners but only $called called")
}
}
@Test
fun `test concurrent listening 3`() {
runBlocking {
val called = AtomicInteger()
val registered = AtomicInteger()
coroutineScope {
println("Step 0")
for (priority in Listener.EventPriority.values()) {
launch {
repeat(5000) {
registered.getAndIncrement()
GlobalScope.subscribeAlways<ParentEvent>(
priority = priority
) {
called.getAndIncrement()
}
}
println("Registeterd $priority")
}
}
println("Step 1")
}
println("Step 2")
ParentEvent().broadcast()
println("Step 3")
check(called.get() == registered.get())
println("Done")
println("Called ${called.get()}, registered ${registered.get()}")
}
}
@Test
fun `test concurrent listening 2`() {
val registered = AtomicInteger()
val called = AtomicInteger()
val threads = mutableListOf<Thread>()
repeat(50) {
threads.add(thread {
repeat(444) {
registered.getAndIncrement()
GlobalScope.launch {
subscribeAlways<ParentEvent> {
called.getAndIncrement()
}
}
}
})
}
Thread.sleep(5000L)// Wait all thread started.
threads.forEach {
it.join() // Wait all finished
}
println("All listeners registered")
val postCount = 3
kotlinx.coroutines.runBlocking {
repeat(postCount) {
ParentEvent().broadcast()
}
delay(5000L)
}
val calledCount = called.get()
val shouldCalled = registered.get() * postCount
println("Should call $shouldCalled times and $called called")
if (shouldCalled != calledCount) {
throw IllegalStateException("?")
}
}
open class ParentEvent : Event, AbstractEvent() { open class ParentEvent : Event, AbstractEvent() {
var triggered = false var triggered = false
@ -76,4 +171,111 @@ class EventTests {
job.complete() job.complete()
} }
} }
open class PriorityTestEvent : AbstractEvent() {}
fun singleThreaded(step: StepUtil, invoke: suspend CoroutineScope.() -> Unit) {
// runBlocking 会完全堵死, 没法退出
val scope = CoroutineScope(Executor { it.run() }.asCoroutineDispatcher())
val job = scope.launch {
invoke(scope)
}
kotlinx.coroutines.runBlocking {
job.join()
}
step.throws()
}
@Test
fun `test handler remvoe`() {
val step = StepUtil()
singleThreaded(step) {
subscribe<Event> {
step.step(0)
ListeningStatus.STOPPED
}
ParentEvent().broadcast()
ParentEvent().broadcast()
}
}
/*
@Test
fun `test boom`() {
val step = StepUtil()
singleThreaded(step) {
step.step(0)
step.step(0)
}
}
*/
@Test
fun `test intercept with always`() {
val step = StepUtil()
singleThreaded(step) {
subscribeAlways<ParentEvent> {
step.step(0)
intercept()
}
subscribe<Event> {
step.step(-1, "Boom")
ListeningStatus.LISTENING
}
ParentEvent().broadcast()
}
}
@Test
fun `test intercept`() {
val step = StepUtil()
singleThreaded(step) {
subscribeAlways<AbstractEvent> {
step.step(0)
intercept()
}
subscribe<Event> {
step.step(-1, "Boom")
ListeningStatus.LISTENING
}
ParentEvent().broadcast()
}
}
@Test
fun `test listener complete`() {
val step = StepUtil()
singleThreaded(step) {
val listener = subscribeAlways<ParentEvent> {
step.step(0, "boom!")
}
ParentEvent().broadcast()
listener.complete()
ParentEvent().broadcast()
}
}
@Test
fun `test event priority`() {
val step = StepUtil()
singleThreaded(step) {
subscribe<PriorityTestEvent> {
step.step(1)
ListeningStatus.LISTENING
}
subscribe<PriorityTestEvent>(priority = Listener.EventPriority.HIGH) {
step.step(0)
ListeningStatus.LISTENING
}
subscribe<PriorityTestEvent>(priority = Listener.EventPriority.LOW) {
step.step(3)
ListeningStatus.LISTENING
}
subscribe<PriorityTestEvent> {
step.step(2)
ListeningStatus.LISTENING
}
PriorityTestEvent().broadcast()
}
}
} }

View File

@ -0,0 +1,23 @@
package net.mamoe.mirai.utils
import kotlinx.atomicfu.atomic
import java.util.concurrent.ConcurrentLinkedDeque
class StepUtil {
val step = atomic(0)
val exceptions = ConcurrentLinkedDeque<Throwable>()
fun step(step: Int, message: String = "Wrong step") {
println("Invoking step $step")
if (step != this.step.getAndIncrement()) {
throw IllegalStateException(message).also { exceptions.add(it) }
}
}
fun throws() {
if (exceptions.isEmpty()) return
val root = exceptions.poll()!!
while (true) {
root.addSuppressed(exceptions.poll() ?: throw root)
}
}
}