@ -35,9 +35,11 @@ import kotlin.jvm.JvmSynthetic
interface Event {
Don't implement Event but extend AbstractEvent instead.
""", level = DeprecationLevel.HIDDEN) // so Kotlin class won't be compiled.
""", level = DeprecationLevel.HIDDEN
) // so Kotlin class won't be compiled.
@get:JvmSynthetic // so Java user won't see it
internal val DoNotImplementThisClassButExtendAbstractEvent: Nothing
@ -54,10 +56,9 @@ abstract class AbstractEvent : Event {
final override val DoNotImplementThisClassButExtendAbstractEvent: Nothing
get() = throw Error("Shouldn't be reached")
private val _intercepted = atomic(false)
private var _intercepted = false
private val _cancelled = atomic(false)
* 事件是否已被拦截.
@ -65,7 +66,7 @@ abstract class AbstractEvent : Event {
val isIntercepted: Boolean
get() = _intercepted.value
get() = _intercepted
* 拦截这个事件.
@ -73,7 +74,7 @@ abstract class AbstractEvent : Event {
fun intercept() {
_intercepted.value = true
_intercepted = true
@ -145,5 +146,6 @@ interface BroadcastControllable : Event {
"use AbstractEvent and implement CancellableEvent",
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith("AbstractEvent", "net.mamoe.mirai.event.AbstractEvent"))
replaceWith = ReplaceWith("AbstractEvent", "net.mamoe.mirai.event.AbstractEvent")
abstract class AbstractCancellableEvent : AbstractEvent(), CancellableEvent
@ -13,40 +13,41 @@ package net.mamoe.mirai.event.internal
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import net.mamoe.mirai.event.Event
import net.mamoe.mirai.event.EventDisabled
import net.mamoe.mirai.event.Listener
import net.mamoe.mirai.event.ListeningStatus
import net.mamoe.mirai.utils.LockFreeLinkedList
import net.mamoe.mirai.utils.MiraiInternalAPI
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.isRemoved
import net.mamoe.mirai.event.*
import net.mamoe.mirai.utils.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.jvm.JvmField
import kotlin.reflect.KClass
internal fun <L : Listener<E>, E : Event> KClass<out E>.subscribeInternal(listener: L): L {
with(this.listeners()) {
with(GlobalEventListeners[listener.priority]) {
val node = ListenerNode(listener as Listener<Event>, this@subscribeInternal)
listener.invokeOnCompletion {
return listener
internal fun <E : Event> CoroutineScope.Handler(
coroutineContext: CoroutineContext,
concurrencyKind: Listener.ConcurrencyKind,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
handler: suspend (E) -> ListeningStatus
): Handler<E> {
@OptIn(ExperimentalCoroutinesApi::class) // don't remove
val context = this.newCoroutineContext(coroutineContext)
return Handler(context[Job], context, handler, concurrencyKind)
return Handler(context[Job], context, handler, concurrencyKind, priority)
@ -58,15 +59,17 @@ internal class Handler<in E : Event>
parentJob: Job?,
subscriberContext: CoroutineContext,
@JvmField val handler: suspend (E) -> ListeningStatus,
override val concurrencyKind: Listener.ConcurrencyKind
override val concurrencyKind: Listener.ConcurrencyKind,
override val priority: Listener.EventPriority
) : Listener<E>, CompletableJob by SupervisorJob(parentJob) { // avoid being cancelled on handling event
private val subscriberContext: CoroutineContext = subscriberContext + this // override Job.
val lock: Mutex? = when (concurrencyKind) {
Listener.ConcurrencyKind.CONCURRENT -> null
Listener.ConcurrencyKind.LOCKED -> Mutex()
else -> null
@ -95,14 +98,12 @@ internal class Handler<in E : Event>
* 这个事件类的监听器 list
internal fun <E : Event> KClass<out E>.listeners(): EventListeners<E> = EventListenerManager.get(this)
internal expect class EventListeners<E : Event>(clazz: KClass<E>) : LockFreeLinkedList<Listener<E>> {
val supertypes: Set<KClass<out Event>>
internal class ListenerNode(
val listener: Listener<Event>,
val owner: KClass<out Event>
internal expect object GlobalEventListeners {
operator fun get(priority: Listener.EventPriority): LockFreeLinkedList<ListenerNode>
internal expect class MiraiAtomicBoolean(initial: Boolean) {
@ -112,63 +113,56 @@ internal expect class MiraiAtomicBoolean(initial: Boolean) {
var value: Boolean
* 管理每个事件 class 的 [EventListeners].
* [EventListeners] 是 lazy 的: 它们只会在被需要的时候才创建和存储.
internal object EventListenerManager {
private data class Registry<E : Event>(val clazz: KClass<E>, val listeners: EventListeners<E>)
private val registries = LockFreeLinkedList<Registry<*>>()
// 不要用 atomicfu. 在 publish 后会出现 VerifyError
private val lock: MiraiAtomicBoolean = MiraiAtomicBoolean(false)
@Suppress("UNCHECKED_CAST", "BooleanLiteralArgument")
internal tailrec fun <E : Event> get(clazz: KClass<out E>): EventListeners<E> {
registries.forEach {
if (it.clazz == clazz) {
return it.listeners as EventListeners<E>
if (lock.compareAndSet(false, true)) {
val registry = Registry(clazz as KClass<E>, EventListeners(clazz))
lock.value = false
return registry.listeners
return get(clazz)
// inline: NO extra Continuation
internal suspend inline fun Event.broadcastInternal() = coroutineScope {
if (EventDisabled) return@coroutineScope
val listeners = this@broadcastInternal::class.listeners()
callAndRemoveIfRequired(this@broadcastInternal, listeners)
listeners.supertypes.forEach {
callAndRemoveIfRequired(this@broadcastInternal, it.listeners())
callAndRemoveIfRequired(this@broadcastInternal as? AbstractEvent ?: error("Events must extends AbstractEvent"))
private fun <E : Event> CoroutineScope.callAndRemoveIfRequired(event: E, listeners: EventListeners<E>) {
// atomic foreach
listeners.forEachNode { node ->
launch {
val listener = node.nodeValue
if (listener.concurrencyKind == Listener.ConcurrencyKind.LOCKED) {
private suspend fun <E : AbstractEvent> callAndRemoveIfRequired(
event: E
) {
coroutineScope {
for (p in Listener.EventPriority.values()) {
GlobalEventListeners[p].forEachNode { eventNode ->
if (event.isIntercepted) {
val node = eventNode.nodeValue
if (!node.owner.isInstance(event)) return@forEachNode
val listener = node.listener
when (listener.concurrencyKind) {
Listener.ConcurrencyKind.LOCKED -> {
(listener as Handler).lock!!.withLock {
if (!node.isRemoved() && listener.onEvent(event) == ListeningStatus.STOPPED) {
listeners.remove(listener) // atomic remove
kotlin.runCatching {
when (listener.onEvent(event)) {
ListeningStatus.STOPPED -> {
else -> {
}.onFailure {
// TODO("Exception catching")
Listener.ConcurrencyKind.CONCURRENT -> {
kotlin.runCatching {
when (listener.onEvent(event)) {
ListeningStatus.STOPPED -> {
else -> {
}.onFailure {
// TODO("Exception catching")
} else {
if (!node.isRemoved() && listener.onEvent(event) == ListeningStatus.STOPPED) {
listeners.remove(listener) // atomic remove
@ -70,7 +70,15 @@ interface Listener<in E : Event> : CompletableJob {
val concurrencyKind: ConcurrencyKind
enum class EventPriority {
val priority: EventPriority get() = EventPriority.NORMAL
suspend fun onEvent(event: E): ListeningStatus
// region 顶层方法 创建当前 coroutineContext 下的子 Job
@ -120,6 +128,7 @@ interface Listener<in E : Event> : CompletableJob {
* @param coroutineContext 给事件监听协程的额外的 [CoroutineContext].
* @param concurrency 并发类型. 查看 [Listener.ConcurrencyKind]
* @param priority 监听优先级,优先级越高越先执行
* @see syncFromEvent 监听一个事件, 并尝试从这个事件中获取一个值.
* @see asyncFromEvent 异步监听一个事件, 并尝试从这个事件中获取一个值.
@ -137,8 +146,9 @@ interface Listener<in E : Event> : CompletableJob {
inline fun <reified E : Event> CoroutineScope.subscribe(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
noinline handler: suspend E.(E) -> ListeningStatus
): Listener<E> = subscribe(E::class, coroutineContext, concurrency, handler)
): Listener<E> = subscribe(E::class, coroutineContext, concurrency, priority, handler)
* @see CoroutineScope.subscribe
@ -148,8 +158,9 @@ fun <E : Event> CoroutineScope.subscribe(
eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
handler: suspend E.(E) -> ListeningStatus
): Listener<E> = eventClass.subscribeInternal(Handler(coroutineContext, concurrency) { it.handler(it); })
): Listener<E> = eventClass.subscribeInternal(Handler(coroutineContext, concurrency, priority) { it.handler(it); })
* 在指定的 [CoroutineScope] 下订阅所有 [E] 及其子类事件.
@ -159,14 +170,17 @@ fun <E : Event> CoroutineScope.subscribe(
* [Bot] 被关闭后事件监听会被 [取消][Listener.cancel].
* @param coroutineContext 给事件监听协程的额外的 [CoroutineContext]
* @param priority 处理优先级, 优先级高的先执行
* @see CoroutineScope.subscribe 获取更多说明
inline fun <reified E : Event> CoroutineScope.subscribeAlways(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
noinline listener: suspend E.(E) -> Unit
): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, listener)
): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, priority, listener)
* @see CoroutineScope.subscribeAlways
@ -176,9 +190,10 @@ fun <E : Event> CoroutineScope.subscribeAlways(
eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
listener: suspend E.(E) -> Unit
): Listener<E> = eventClass.subscribeInternal(
Handler(coroutineContext, concurrency) { it.listener(it); ListeningStatus.LISTENING }
Handler(coroutineContext, concurrency, priority) { it.listener(it); ListeningStatus.LISTENING }
@ -189,14 +204,16 @@ fun <E : Event> CoroutineScope.subscribeAlways(
* [Bot] 被关闭后事件监听会被 [取消][Listener.cancel].
* @param coroutineContext 给事件监听协程的额外的 [CoroutineContext]
* @param priority 处理优先级, 优先级高的先执行
* @see subscribe 获取更多说明
inline fun <reified E : Event> CoroutineScope.subscribeOnce(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
noinline listener: suspend E.(E) -> Unit
): Listener<E> = subscribeOnce(E::class, coroutineContext, listener)
): Listener<E> = subscribeOnce(E::class, coroutineContext, priority, listener)
* @see CoroutineScope.subscribeOnce
@ -205,9 +222,10 @@ inline fun <reified E : Event> CoroutineScope.subscribeOnce(
fun <E : Event> CoroutineScope.subscribeOnce(
eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
listener: suspend E.(E) -> Unit
): Listener<E> = eventClass.subscribeInternal(
Handler(coroutineContext, Listener.ConcurrencyKind.LOCKED) { it.listener(it); ListeningStatus.STOPPED }
Handler(coroutineContext, Listener.ConcurrencyKind.LOCKED, priority) { it.listener(it); ListeningStatus.STOPPED }
@ -224,6 +242,7 @@ fun <E : Event> CoroutineScope.subscribeOnce(
* [Bot] 被关闭后事件监听会被 [取消][Listener.cancel].
* @param coroutineContext 给事件监听协程的额外的 [CoroutineContext]
* @param priority 事件优先级, 优先级高的先处理
* @see subscribe 获取更多说明
@ -233,8 +252,9 @@ fun <E : Event> CoroutineScope.subscribeOnce(
inline fun <reified E : BotEvent> Bot.subscribe(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
noinline handler: suspend E.(E) -> ListeningStatus
): Listener<E> = this.subscribe(E::class, coroutineContext, concurrency, handler)
): Listener<E> = this.subscribe(E::class, coroutineContext, concurrency, priority, handler)
* @see Bot.subscribe
@ -244,12 +264,16 @@ fun <E : BotEvent> Bot.subscribe(
eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
handler: suspend E.(E) -> ListeningStatus
): Listener<E> = eventClass.subscribeInternal(
Handler(coroutineContext, concurrency) { if (it.bot === this) it.handler(it) else ListeningStatus.LISTENING }
) { if (it.bot === this) it.handler(it) else ListeningStatus.LISTENING }
* 在 [Bot] 的 [CoroutineScope] 下订阅所有 [E] 及其子类事件.
* 每当 [事件广播][Event.broadcast] 时, [listener] 都会被执行.
@ -258,6 +282,7 @@ fun <E : BotEvent> Bot.subscribe(
* [Bot] 被关闭后事件监听会被 [取消][Listener.cancel].
* @param coroutineContext 给事件监听协程的额外的 [CoroutineContext]
* @param priority 事件优先级, 优先级高的先处理
* @see subscribe 获取更多说明
@ -267,8 +292,9 @@ fun <E : BotEvent> Bot.subscribe(
inline fun <reified E : BotEvent> Bot.subscribeAlways(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
noinline listener: suspend E.(E) -> Unit
): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, listener)
): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, priority, listener)
* @see Bot.subscribeAlways
@ -278,9 +304,10 @@ fun <E : BotEvent> Bot.subscribeAlways(
eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
listener: suspend E.(E) -> Unit
): Listener<E> = eventClass.subscribeInternal(
Handler(coroutineContext, concurrency) { if (it.bot === this) it.listener(it); ListeningStatus.LISTENING }
Handler(coroutineContext, concurrency, priority) { if (it.bot === this) it.listener(it); ListeningStatus.LISTENING }
@ -291,6 +318,7 @@ fun <E : BotEvent> Bot.subscribeAlways(
* [Bot] 被关闭后事件监听会被 [取消][Listener.cancel].
* @param coroutineContext 给事件监听协程的额外的 [CoroutineContext]
* @param priority 事件优先级, 高的先处理
* @see subscribe 获取更多说明
@ -298,8 +326,9 @@ fun <E : BotEvent> Bot.subscribeAlways(
inline fun <reified E : BotEvent> Bot.subscribeOnce(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
noinline listener: suspend E.(E) -> Unit
): Listener<E> = subscribeOnce(E::class, coroutineContext, listener)
): Listener<E> = subscribeOnce(E::class, coroutineContext, priority, listener)
* @see Bot.subscribeOnce
@ -308,9 +337,10 @@ inline fun <reified E : BotEvent> Bot.subscribeOnce(
fun <E : BotEvent> Bot.subscribeOnce(
eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
priority: Listener.EventPriority = Listener.EventPriority.NORMAL,
listener: suspend E.(E) -> Unit
): Listener<E> =
eventClass.subscribeInternal(Handler(coroutineContext, Listener.ConcurrencyKind.LOCKED) {
eventClass.subscribeInternal(Handler(coroutineContext, Listener.ConcurrencyKind.LOCKED, priority) {
if (it.bot === this) {
@ -318,3 +348,177 @@ fun <E : BotEvent> Bot.subscribeOnce(
// endregion
// region 为了兼容旧版本的方法
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
inline fun <reified E : Event> CoroutineScope.subscribeDeprecated(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
noinline handler: suspend E.(E) -> ListeningStatus
): Listener<E> = subscribe(
coroutineContext = coroutineContext,
concurrency = concurrency,
handler = handler
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
fun <E : Event> CoroutineScope.subscribeDeprecated(
eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
handler: suspend E.(E) -> ListeningStatus
): Listener<E> = subscribe(
eventClass = eventClass,
coroutineContext = coroutineContext,
concurrency = concurrency,
handler = handler
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
inline fun <reified E : Event> CoroutineScope.subscribeAlwaysDeprecated(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
noinline listener: suspend E.(E) -> Unit
): Listener<E> = subscribeAlways(
coroutineContext = coroutineContext,
concurrency = concurrency,
listener = listener
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
fun <E : Event> CoroutineScope.subscribeAlwaysDeprecated(
eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
listener: suspend E.(E) -> Unit
): Listener<E> = subscribeAlways(
eventClass = eventClass,
coroutineContext = coroutineContext,
concurrency = concurrency,
listener = listener
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
inline fun <reified E : Event> CoroutineScope.subscribeOnceDeprecated(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
noinline listener: suspend E.(E) -> Unit
): Listener<E> = subscribeOnce(coroutineContext = coroutineContext, listener = listener)
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
fun <E : Event> CoroutineScope.subscribeOnceDeprecated(
eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
listener: suspend E.(E) -> Unit
): Listener<E> = subscribeOnce(
eventClass = eventClass,
coroutineContext = coroutineContext,
listener = listener
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
inline fun <reified E : BotEvent> Bot.subscribeDeprecated(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
noinline handler: suspend E.(E) -> ListeningStatus
): Listener<E> = this.subscribe(
coroutineContext = coroutineContext,
concurrency = concurrency,
handler = handler
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
fun <E : BotEvent> Bot.subscribeDeprecated(
eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
handler: suspend E.(E) -> ListeningStatus
): Listener<E> = subscribe(
eventClass = eventClass,
coroutineContext = coroutineContext,
concurrency = concurrency,
handler = handler
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
inline fun <reified E : BotEvent> Bot.subscribeAlwaysDeprecated(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
noinline listener: suspend E.(E) -> Unit
): Listener<E> = subscribeAlways(
coroutineContext = coroutineContext,
concurrency = concurrency,
listener = listener
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
fun <E : BotEvent> Bot.subscribeAlwaysDeprecated(
eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
listener: suspend E.(E) -> Unit
): Listener<E> = subscribeAlways(
eventClass = eventClass,
coroutineContext = coroutineContext,
concurrency = concurrency,
listener = listener
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
inline fun <reified E : BotEvent> Bot.subscribeOnceDeprecated(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
noinline listener: suspend E.(E) -> Unit
): Listener<E> = subscribeOnce(
coroutineContext = coroutineContext,
listener = listener
@Deprecated("for binary compatibility", level = DeprecationLevel.HIDDEN)
fun <E : BotEvent> Bot.subscribeOnceDeprecated(
eventClass: KClass<E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
listener: suspend E.(E) -> Unit
): Listener<E> = subscribeOnce(
eventClass = eventClass,
coroutineContext = coroutineContext,
listener = listener
// endregion
@ -156,6 +156,18 @@ open class LockFreeLinkedList<E> {
open fun tryInsertAfter(node: LockFreeLinkedListNode<E>, newValue: E): Boolean {
if (node == tail) {
error("Cannot insert value after tail")
if (node.isRemoved()) {
return false
val next = node.nextNodeRef.value
val newNode = newValue.asNode(next)
return node.nextNodeRef.compareAndSet(next, newNode)
* 先把元素建立好链表, 再加入到 list.
@ -329,7 +341,8 @@ open class LockFreeLinkedList<E> {
inline fun forEachNode(block: (LockFreeLinkedListNode<E>) -> Unit) {
inline fun forEachNode(block: LockFreeLinkedList<E>.(LockFreeLinkedListNode<E>) -> Unit) {
// Copy from forEach
var node: LockFreeLinkedListNode<E> = head
while (true) {
if (node === tail) return
@ -358,28 +371,41 @@ open class LockFreeLinkedList<E> {
open fun removeAll(elements: Collection<E>): Boolean = elements.all { remove(it) }
private fun removeNode(node: Node<E>): Boolean {
open fun removeNode(node: LockFreeLinkedListNode<E>): Boolean {
if (node == tail) {
return false
while (true) {
val before = head.iterateBeforeFirst { it === node }
val toRemove = before.nextNode
val next = toRemove.nextNode
if (toRemove == tail) { // This
return true
if (toRemove === tail) {
return false
if (toRemove.isRemoved()) {
@Suppress("BooleanLiteralArgument") // false positive
if (!toRemove.removed.compareAndSet(false, true)) {
// logically remove: all the operations will recognize this node invalid
toRemove.nodeValue = null // logically remove first, then all the operations will recognize this node invalid
if (before.nextNodeRef.compareAndSet(toRemove, next)) { // physically remove: try to fix the link
// physically remove: try to fix the link
var next: LockFreeLinkedListNode<E> = toRemove.nextNode
while (next !== tail && next.isRemoved()) {
next = next.nextNode
if (before.nextNodeRef.compareAndSet(toRemove, next)) {
return true
fun removeAt(index: Int): E {
require(index >= 0) { "index must be >= 0" }
val nodeBeforeIndex = head.iterateValidNodeNTimes(index)
@ -1,3 +1,12 @@
* Copyright 2020 Mamoe Technologies and contributors.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* https://github.com/mamoe/mirai/blob/master/LICENSE
package net.mamoe.mirai.event.internal
@ -5,7 +14,10 @@ package net.mamoe.mirai.event.internal
import net.mamoe.mirai.event.Event
import net.mamoe.mirai.event.Listener
import net.mamoe.mirai.utils.LockFreeLinkedList
import net.mamoe.mirai.utils.LockFreeLinkedListNode
import net.mamoe.mirai.utils.isRemoved
import net.mamoe.mirai.utils.MiraiInternalAPI
import java.util.*
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.reflect.KClass
@ -24,14 +36,30 @@ internal actual class MiraiAtomicBoolean actual constructor(initial: Boolean) {
internal actual object GlobalEventListeners {
private val map: Map<Listener.EventPriority, LockFreeLinkedList<ListenerNode>>
init {
val map = EnumMap<Listener.EventPriority, LockFreeLinkedList<ListenerNode>>(Listener.EventPriority::class.java)
Listener.EventPriority.values().forEach {
map[it] = LockFreeLinkedList()
this.map = map
actual operator fun get(priority: Listener.EventPriority): LockFreeLinkedList<ListenerNode> = map[priority]!!
internal actual class EventListeners<E : Event> actual constructor(clazz: KClass<E>) :
LockFreeLinkedList<Listener<E>>() {
actual val supertypes: Set<KClass<out Event>> by lazy {
val supertypes = mutableSetOf<KClass<out Event>>()
fun addSupertypes(clazz: KClass<out Event>) {
clazz.supertypes.forEach {
fun addSupertypes(klass: KClass<out Event>) {
klass.supertypes.forEach {
val classifier = it.classifier as? KClass<out Event>
if (classifier != null) {
@ -43,4 +71,6 @@ internal actual class EventListeners<E : Event> actual constructor(clazz: KClass
@ -9,9 +9,14 @@
package net.mamoe.mirai.event
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.GlobalScope
import kotlinx.atomicfu.AtomicInt
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.*
import net.mamoe.mirai.utils.StepUtil
import net.mamoe.mirai.utils.internal.runBlocking
import java.util.concurrent.Executor
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
import kotlin.test.Test
import kotlin.test.assertTrue
@ -43,6 +48,96 @@ class EventTests {
fun `test concurrent listening`() {
var listeners = 0
val counter = AtomicInteger(0)
for (p in Listener.EventPriority.values()) {
repeat(2333) {
GlobalScope.subscribeAlways<ParentEvent> {
kotlinx.coroutines.runBlocking {
delay(5000L) // ?
val called = counter.get()
println("Registered $listeners listeners and $called called")
if (listeners != called) {
throw IllegalStateException("Registered $listeners listeners but only $called called")
fun `test concurrent listening 3`() {
runBlocking {
val called = AtomicInteger()
val registered = AtomicInteger()
coroutineScope {
println("Step 0")
for (priority in Listener.EventPriority.values()) {
launch {
repeat(5000) {
priority = priority
) {
println("Registeterd $priority")
println("Step 1")
println("Step 2")
println("Step 3")
check(called.get() == registered.get())
println("Called ${called.get()}, registered ${registered.get()}")
fun `test concurrent listening 2`() {
val registered = AtomicInteger()
val called = AtomicInteger()
val threads = mutableListOf<Thread>()
repeat(50) {
threads.add(thread {
repeat(444) {
GlobalScope.launch {
subscribeAlways<ParentEvent> {
Thread.sleep(5000L)// Wait all thread started.
threads.forEach {
it.join() // Wait all finished
println("All listeners registered")
val postCount = 3
kotlinx.coroutines.runBlocking {
repeat(postCount) {
val calledCount = called.get()
val shouldCalled = registered.get() * postCount
println("Should call $shouldCalled times and $called called")
if (shouldCalled != calledCount) {
throw IllegalStateException("?")
open class ParentEvent : Event, AbstractEvent() {
var triggered = false
@ -76,4 +171,111 @@ class EventTests {
open class PriorityTestEvent : AbstractEvent() {}
fun singleThreaded(step: StepUtil, invoke: suspend CoroutineScope.() -> Unit) {
// runBlocking 会完全堵死, 没法退出
val scope = CoroutineScope(Executor { it.run() }.asCoroutineDispatcher())
val job = scope.launch {
kotlinx.coroutines.runBlocking {
fun `test handler remvoe`() {
val step = StepUtil()
singleThreaded(step) {
subscribe<Event> {
fun `test boom`() {
val step = StepUtil()
singleThreaded(step) {
fun `test intercept with always`() {
val step = StepUtil()
singleThreaded(step) {
subscribeAlways<ParentEvent> {
subscribe<Event> {
step.step(-1, "Boom")
fun `test intercept`() {
val step = StepUtil()
singleThreaded(step) {
subscribeAlways<AbstractEvent> {
subscribe<Event> {
step.step(-1, "Boom")
fun `test listener complete`() {
val step = StepUtil()
singleThreaded(step) {
val listener = subscribeAlways<ParentEvent> {
step.step(0, "boom!")
fun `test event priority`() {
val step = StepUtil()
singleThreaded(step) {
subscribe<PriorityTestEvent> {
subscribe<PriorityTestEvent>(priority = Listener.EventPriority.HIGH) {
subscribe<PriorityTestEvent>(priority = Listener.EventPriority.LOW) {
subscribe<PriorityTestEvent> {
@ -0,0 +1,23 @@
package net.mamoe.mirai.utils
import kotlinx.atomicfu.atomic
import java.util.concurrent.ConcurrentLinkedDeque
class StepUtil {
val step = atomic(0)
val exceptions = ConcurrentLinkedDeque<Throwable>()
fun step(step: Int, message: String = "Wrong step") {
println("Invoking step $step")
if (step != this.step.getAndIncrement()) {
throw IllegalStateException(message).also { exceptions.add(it) }
fun throws() {
if (exceptions.isEmpty()) return
val root = exceptions.poll()!!
while (true) {
root.addSuppressed(exceptions.poll() ?: throw root)
