diff --git a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/internal/InternalEventListeners.kt b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/internal/InternalEventListeners.kt index 1d068c826..da0c25082 100644 --- a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/internal/InternalEventListeners.kt +++ b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/internal/InternalEventListeners.kt @@ -34,22 +34,33 @@ fun , E : Event> KClass.subscribeInternal(listener: L): L @Suppress("FunctionName") internal fun CoroutineScope.Handler( coroutineContext: CoroutineContext, + concurrencyKind: Listener.ConcurrencyKind, handler: suspend (E) -> ListeningStatus ): Handler { @OptIn(ExperimentalCoroutinesApi::class) // don't remove val context = this.newCoroutineContext(coroutineContext) - return Handler(context[Job], context, handler) + return Handler(context[Job], context, handler, concurrencyKind) } -private inline fun inline(block: () -> Unit) = block() /** * 事件处理器. */ @PublishedApi internal class Handler -@PublishedApi internal constructor(parentJob: Job?, private val subscriberContext: CoroutineContext, @JvmField val handler: suspend (E) -> ListeningStatus) : +@PublishedApi internal constructor( + parentJob: Job?, + private val subscriberContext: CoroutineContext, + @JvmField val handler: suspend (E) -> ListeningStatus, + override val concurrencyKind: Listener.ConcurrencyKind +) : Listener, CompletableJob by Job(parentJob) { + @MiraiInternalAPI + val lock: Mutex? = when (concurrencyKind) { + Listener.ConcurrencyKind.CONCURRENT -> null + Listener.ConcurrencyKind.LOCKED -> Mutex() + } + @OptIn(MiraiDebugAPI::class) override suspend fun onEvent(event: E): ListeningStatus { if (isCompleted || isCancelled) return ListeningStatus.STOPPED @@ -60,7 +71,7 @@ internal class Handler } catch (e: Throwable) { subscriberContext[CoroutineExceptionHandler]?.handleException(subscriberContext, e) ?: coroutineContext[CoroutineExceptionHandler]?.handleException(subscriberContext, e) - ?: inline { + ?: kotlin.run { @Suppress("DEPRECATION") MiraiLogger.warning( """Event processing: An exception occurred but no CoroutineExceptionHandler found, @@ -75,9 +86,6 @@ internal class Handler ListeningStatus.LISTENING } } - - @MiraiInternalAPI - override val lock: Mutex = Mutex() } /** @@ -161,7 +169,13 @@ private fun CoroutineScope.callAndRemoveIfRequired(event: E, listene listeners.forEachNode { node -> launch { val listener = node.nodeValue - listener.lock.withLock { + if (listener.concurrencyKind == Listener.ConcurrencyKind.LOCKED) { + (listener as Handler).lock!!.withLock { + if (!node.isRemoved() && listener.onEvent(event) == ListeningStatus.STOPPED) { + listeners.remove(listener) // atomic remove + } + } + } else { if (!node.isRemoved() && listener.onEvent(event) == ListeningStatus.STOPPED) { listeners.remove(listener) // atomic remove } diff --git a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/subscriber.kt b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/subscriber.kt index c652d88d5..b478f0a49 100644 --- a/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/subscriber.kt +++ b/mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/subscriber.kt @@ -52,11 +52,23 @@ enum class ListeningStatus { * 取消监听: [complete] */ interface Listener : CompletableJob { + + enum class ConcurrencyKind { + /** + * 并发地同时处理多个事件, 但无法保证 [onEvent] 返回 [ListeningStatus.STOPPED] 后立即停止事件监听. + */ + CONCURRENT, + + /** + * 使用 [Mutex] 保证同一时刻只处理一个事件. + */ + LOCKED + } + /** - * [onEvent] 的锁 + * 并发类型 */ - @MiraiInternalAPI - val lock: Mutex + val concurrencyKind: ConcurrencyKind suspend fun onEvent(event: E): ListeningStatus } @@ -121,9 +133,10 @@ interface Listener : CompletableJob { @OptIn(MiraiInternalAPI::class) inline fun CoroutineScope.subscribe( coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, noinline handler: suspend E.(E) -> ListeningStatus ): Listener = - E::class.subscribeInternal(Handler(coroutineContext) { it.handler(it); }) + E::class.subscribeInternal(Handler(coroutineContext, concurrency) { it.handler(it); }) /** * 在指定的 [CoroutineScope] 下订阅所有 [E] 及其子类事件. @@ -139,12 +152,17 @@ inline fun CoroutineScope.subscribe( @OptIn(MiraiInternalAPI::class, ExperimentalContracts::class) inline fun CoroutineScope.subscribeAlways( coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, noinline listener: suspend E.(E) -> Unit ): Listener { contract { callsInPlace(listener, InvocationKind.UNKNOWN) } - return E::class.subscribeInternal(Handler(coroutineContext) { it.listener(it); ListeningStatus.LISTENING }) + return E::class.subscribeInternal( + Handler( + coroutineContext, + concurrency + ) { it.listener(it); ListeningStatus.LISTENING }) } /** @@ -163,7 +181,11 @@ inline fun CoroutineScope.subscribeOnce( coroutineContext: CoroutineContext = EmptyCoroutineContext, noinline listener: suspend E.(E) -> Unit ): Listener = - E::class.subscribeInternal(Handler(coroutineContext) { it.listener(it); ListeningStatus.STOPPED }) + E::class.subscribeInternal( + Handler( + coroutineContext, + Listener.ConcurrencyKind.LOCKED + ) { it.listener(it); ListeningStatus.STOPPED }) // @@ -187,9 +209,14 @@ inline fun CoroutineScope.subscribeOnce( @OptIn(MiraiInternalAPI::class) inline fun Bot.subscribe( coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED, noinline handler: suspend E.(E) -> ListeningStatus ): Listener = - E::class.subscribeInternal(Handler(coroutineContext) { if (it.bot === this) it.handler(it) else ListeningStatus.LISTENING }) + E::class.subscribeInternal( + Handler( + coroutineContext, + concurrency + ) { if (it.bot === this) it.handler(it) else ListeningStatus.LISTENING }) /** @@ -207,9 +234,14 @@ inline fun Bot.subscribe( @OptIn(MiraiInternalAPI::class) inline fun Bot.subscribeAlways( coroutineContext: CoroutineContext = EmptyCoroutineContext, + concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT, noinline listener: suspend E.(E) -> Unit ): Listener { - return E::class.subscribeInternal(Handler(coroutineContext) { if (it.bot === this) it.listener(it); ListeningStatus.LISTENING }) + return E::class.subscribeInternal( + Handler( + coroutineContext, + concurrency + ) { if (it.bot === this) it.listener(it); ListeningStatus.LISTENING }) } /** @@ -229,7 +261,7 @@ inline fun Bot.subscribeOnce( coroutineContext: CoroutineContext = EmptyCoroutineContext, noinline listener: suspend E.(E) -> Unit ): Listener = - E::class.subscribeInternal(Handler(coroutineContext) { + E::class.subscribeInternal(Handler(coroutineContext, Listener.ConcurrencyKind.LOCKED) { if (it.bot === this) { it.listener(it) ListeningStatus.STOPPED diff --git a/mirai-core/src/jvmMain/kotlin/net/mamoe/mirai/event/internal/EventInternalJvm.kt b/mirai-core/src/jvmMain/kotlin/net/mamoe/mirai/event/internal/EventInternalJvm.kt index ddae0cdd2..cccc0ad63 100644 --- a/mirai-core/src/jvmMain/kotlin/net/mamoe/mirai/event/internal/EventInternalJvm.kt +++ b/mirai-core/src/jvmMain/kotlin/net/mamoe/mirai/event/internal/EventInternalJvm.kt @@ -7,6 +7,8 @@ * https://github.com/mamoe/mirai/blob/master/LICENSE */ +@file:Suppress("unused") + package net.mamoe.mirai.event.internal import kotlinx.coroutines.CoroutineScope @@ -21,11 +23,19 @@ import kotlin.coroutines.EmptyCoroutineContext @MiraiInternalAPI @Suppress("FunctionName") fun Class._subscribeEventForJaptOnly(scope: CoroutineScope, onEvent: Function): Listener { - return this.kotlin.subscribeInternal(scope.Handler(EmptyCoroutineContext) { onEvent.apply(it) }) + return this.kotlin.subscribeInternal( + scope.Handler( + EmptyCoroutineContext, + Listener.ConcurrencyKind.CONCURRENT + ) { onEvent.apply(it) }) } @MiraiInternalAPI @Suppress("FunctionName") fun Class._subscribeEventForJaptOnly(scope: CoroutineScope, onEvent: Consumer): Listener { - return this.kotlin.subscribeInternal(scope.Handler(EmptyCoroutineContext) { onEvent.accept(it); ListeningStatus.LISTENING; }) + return this.kotlin.subscribeInternal( + scope.Handler( + EmptyCoroutineContext, + Listener.ConcurrencyKind.CONCURRENT + ) { onEvent.accept(it); ListeningStatus.LISTENING; }) } \ No newline at end of file