Use ConcurrentLinkedQueue for EventSystem, #630

This commit is contained in:
Karlatemp 2020-12-20 10:49:17 +08:00
parent 565abae671
commit c934ff5b89
No known key found for this signature in database
GPG Key ID: 21FBDDF664FF06F8

View File

@ -14,9 +14,9 @@ import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import net.mamoe.mirai.event.* import net.mamoe.mirai.event.*
import net.mamoe.mirai.event.events.BotEvent import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.utils.LockFreeLinkedList
import net.mamoe.mirai.utils.MiraiLogger import net.mamoe.mirai.utils.MiraiLogger
import java.util.* import java.util.*
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext import kotlin.coroutines.coroutineContext
import kotlin.reflect.KClass import kotlin.reflect.KClass
@ -26,7 +26,7 @@ internal fun <L : Listener<E>, E : Event> KClass<out E>.subscribeInternal(listen
with(GlobalEventListeners[listener.priority]) { with(GlobalEventListeners[listener.priority]) {
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
val node = ListenerRegistry(listener as Listener<Event>, this@subscribeInternal) val node = ListenerRegistry(listener as Listener<Event>, this@subscribeInternal)
addLast(node) add(node)
listener.invokeOnCompletion { listener.invokeOnCompletion {
this.remove(node) this.remove(node)
} }
@ -98,18 +98,18 @@ internal class ListenerRegistry(
internal object GlobalEventListeners { internal object GlobalEventListeners {
private val ALL_LEVEL_REGISTRIES: Map<EventPriority, LockFreeLinkedList<ListenerRegistry>> private val ALL_LEVEL_REGISTRIES: Map<EventPriority, ConcurrentLinkedQueue<ListenerRegistry>>
init { init {
val map = val map =
EnumMap<Listener.EventPriority, LockFreeLinkedList<ListenerRegistry>>(Listener.EventPriority::class.java) EnumMap<Listener.EventPriority, ConcurrentLinkedQueue<ListenerRegistry>>(Listener.EventPriority::class.java)
EventPriority.values().forEach { EventPriority.values().forEach {
map[it] = LockFreeLinkedList() map[it] = ConcurrentLinkedQueue()
} }
this.ALL_LEVEL_REGISTRIES = map this.ALL_LEVEL_REGISTRIES = map
} }
operator fun get(priority: Listener.EventPriority): LockFreeLinkedList<ListenerRegistry> = operator fun get(priority: Listener.EventPriority): ConcurrentLinkedQueue<ListenerRegistry> =
ALL_LEVEL_REGISTRIES[priority]!! ALL_LEVEL_REGISTRIES[priority]!!
} }
@ -121,54 +121,56 @@ internal suspend inline fun AbstractEvent.broadcastInternal() {
callAndRemoveIfRequired(this@broadcastInternal) callAndRemoveIfRequired(this@broadcastInternal)
} }
internal inline fun <E, T : Iterable<E>> T.forEach0(block: T.(E) -> Unit) {
forEach { block(it) }
}
@Suppress("DuplicatedCode") @Suppress("DuplicatedCode")
internal suspend inline fun <E : AbstractEvent> callAndRemoveIfRequired( internal suspend inline fun <E : AbstractEvent> callAndRemoveIfRequired(
event: E event: E
) { ) {
for (p in Listener.EventPriority.prioritiesExcludedMonitor) { for (p in Listener.EventPriority.prioritiesExcludedMonitor) {
GlobalEventListeners[p].forEachNode { registeredRegistryNode -> GlobalEventListeners[p].forEach0 { registeredRegistry ->
if (event.isIntercepted) { if (event.isIntercepted) {
return return
} }
val listenerRegistry = registeredRegistryNode.nodeValue if (!registeredRegistry.type.isInstance(event)) return@forEach0
if (!listenerRegistry.type.isInstance(event)) return@forEachNode val listener = registeredRegistry.listener
val listener = listenerRegistry.listener
when (listener.concurrencyKind) { when (listener.concurrencyKind) {
Listener.ConcurrencyKind.LOCKED -> { Listener.ConcurrencyKind.LOCKED -> {
(listener as Handler).lock!!.withLock { (listener as Handler).lock!!.withLock {
if (listener.onEvent(event) == ListeningStatus.STOPPED) { if (listener.onEvent(event) == ListeningStatus.STOPPED) {
removeNode(registeredRegistryNode) remove(registeredRegistry)
} }
} }
} }
Listener.ConcurrencyKind.CONCURRENT -> { Listener.ConcurrencyKind.CONCURRENT -> {
if (listener.onEvent(event) == ListeningStatus.STOPPED) { if (listener.onEvent(event) == ListeningStatus.STOPPED) {
removeNode(registeredRegistryNode) remove(registeredRegistry)
} }
} }
} }
} }
} }
coroutineScope { coroutineScope {
GlobalEventListeners[EventPriority.MONITOR].forEachNode { registeredRegistryNode -> GlobalEventListeners[EventPriority.MONITOR].forEach0 { registeredRegistry ->
if (event.isIntercepted) { if (event.isIntercepted) {
return@coroutineScope return@coroutineScope
} }
val listenerRegistry = registeredRegistryNode.nodeValue if (!registeredRegistry.type.isInstance(event)) return@forEach0
if (!listenerRegistry.type.isInstance(event)) return@forEachNode val listener = registeredRegistry.listener
val listener = listenerRegistry.listener
launch { launch {
when (listener.concurrencyKind) { when (listener.concurrencyKind) {
Listener.ConcurrencyKind.LOCKED -> { Listener.ConcurrencyKind.LOCKED -> {
(listener as Handler).lock!!.withLock { (listener as Handler).lock!!.withLock {
if (listener.onEvent(event) == ListeningStatus.STOPPED) { if (listener.onEvent(event) == ListeningStatus.STOPPED) {
removeNode(registeredRegistryNode) remove(registeredRegistry)
} }
} }
} }
Listener.ConcurrencyKind.CONCURRENT -> { Listener.ConcurrencyKind.CONCURRENT -> {
if (listener.onEvent(event) == ListeningStatus.STOPPED) { if (listener.onEvent(event) == ListeningStatus.STOPPED) {
removeNode(registeredRegistryNode) remove(registeredRegistry)
} }
} }
} }