Add concurrency control to event listeners

This commit is contained in:
Him188 2020-03-20 18:27:04 +08:00
parent 1b152b28fb
commit 55ed5f9d70
3 changed files with 75 additions and 19 deletions

View File

@ -34,22 +34,33 @@ fun <L : Listener<E>, E : Event> KClass<out E>.subscribeInternal(listener: L): L
@Suppress("FunctionName") @Suppress("FunctionName")
internal fun <E : Event> CoroutineScope.Handler( internal fun <E : Event> CoroutineScope.Handler(
coroutineContext: CoroutineContext, coroutineContext: CoroutineContext,
concurrencyKind: Listener.ConcurrencyKind,
handler: suspend (E) -> ListeningStatus handler: suspend (E) -> ListeningStatus
): Handler<E> { ): Handler<E> {
@OptIn(ExperimentalCoroutinesApi::class) // don't remove @OptIn(ExperimentalCoroutinesApi::class) // don't remove
val context = this.newCoroutineContext(coroutineContext) val context = this.newCoroutineContext(coroutineContext)
return Handler(context[Job], context, handler) return Handler(context[Job], context, handler, concurrencyKind)
} }
private inline fun inline(block: () -> Unit) = block()
/** /**
* 事件处理器. * 事件处理器.
*/ */
@PublishedApi @PublishedApi
internal class Handler<in E : Event> internal class Handler<in E : Event>
@PublishedApi internal constructor(parentJob: Job?, private val subscriberContext: CoroutineContext, @JvmField val handler: suspend (E) -> ListeningStatus) : @PublishedApi internal constructor(
parentJob: Job?,
private val subscriberContext: CoroutineContext,
@JvmField val handler: suspend (E) -> ListeningStatus,
override val concurrencyKind: Listener.ConcurrencyKind
) :
Listener<E>, CompletableJob by Job(parentJob) { Listener<E>, CompletableJob by Job(parentJob) {
@MiraiInternalAPI
val lock: Mutex? = when (concurrencyKind) {
Listener.ConcurrencyKind.CONCURRENT -> null
Listener.ConcurrencyKind.LOCKED -> Mutex()
}
@OptIn(MiraiDebugAPI::class) @OptIn(MiraiDebugAPI::class)
override suspend fun onEvent(event: E): ListeningStatus { override suspend fun onEvent(event: E): ListeningStatus {
if (isCompleted || isCancelled) return ListeningStatus.STOPPED if (isCompleted || isCancelled) return ListeningStatus.STOPPED
@ -60,7 +71,7 @@ internal class Handler<in E : Event>
} catch (e: Throwable) { } catch (e: Throwable) {
subscriberContext[CoroutineExceptionHandler]?.handleException(subscriberContext, e) subscriberContext[CoroutineExceptionHandler]?.handleException(subscriberContext, e)
?: coroutineContext[CoroutineExceptionHandler]?.handleException(subscriberContext, e) ?: coroutineContext[CoroutineExceptionHandler]?.handleException(subscriberContext, e)
?: inline { ?: kotlin.run {
@Suppress("DEPRECATION") @Suppress("DEPRECATION")
MiraiLogger.warning( MiraiLogger.warning(
"""Event processing: An exception occurred but no CoroutineExceptionHandler found, """Event processing: An exception occurred but no CoroutineExceptionHandler found,
@ -75,9 +86,6 @@ internal class Handler<in E : Event>
ListeningStatus.LISTENING ListeningStatus.LISTENING
} }
} }
@MiraiInternalAPI
override val lock: Mutex = Mutex()
} }
/** /**
@ -161,7 +169,13 @@ private fun <E : Event> CoroutineScope.callAndRemoveIfRequired(event: E, listene
listeners.forEachNode { node -> listeners.forEachNode { node ->
launch { launch {
val listener = node.nodeValue val listener = node.nodeValue
listener.lock.withLock { if (listener.concurrencyKind == Listener.ConcurrencyKind.LOCKED) {
(listener as Handler).lock!!.withLock {
if (!node.isRemoved() && listener.onEvent(event) == ListeningStatus.STOPPED) {
listeners.remove(listener) // atomic remove
}
}
} else {
if (!node.isRemoved() && listener.onEvent(event) == ListeningStatus.STOPPED) { if (!node.isRemoved() && listener.onEvent(event) == ListeningStatus.STOPPED) {
listeners.remove(listener) // atomic remove listeners.remove(listener) // atomic remove
} }

View File

@ -52,11 +52,23 @@ enum class ListeningStatus {
* 取消监听: [complete] * 取消监听: [complete]
*/ */
interface Listener<in E : Event> : CompletableJob { interface Listener<in E : Event> : CompletableJob {
enum class ConcurrencyKind {
/** /**
* [onEvent] 的锁 * 并发地同时处理多个事件, 但无法保证 [onEvent] 返回 [ListeningStatus.STOPPED] 后立即停止事件监听.
*/ */
@MiraiInternalAPI CONCURRENT,
val lock: Mutex
/**
* 使用 [Mutex] 保证同一时刻只处理一个事件.
*/
LOCKED
}
/**
* 并发类型
*/
val concurrencyKind: ConcurrencyKind
suspend fun onEvent(event: E): ListeningStatus suspend fun onEvent(event: E): ListeningStatus
} }
@ -121,9 +133,10 @@ interface Listener<in E : Event> : CompletableJob {
@OptIn(MiraiInternalAPI::class) @OptIn(MiraiInternalAPI::class)
inline fun <reified E : Event> CoroutineScope.subscribe( inline fun <reified E : Event> CoroutineScope.subscribe(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
noinline handler: suspend E.(E) -> ListeningStatus noinline handler: suspend E.(E) -> ListeningStatus
): Listener<E> = ): Listener<E> =
E::class.subscribeInternal(Handler(coroutineContext) { it.handler(it); }) E::class.subscribeInternal(Handler(coroutineContext, concurrency) { it.handler(it); })
/** /**
* 在指定的 [CoroutineScope] 下订阅所有 [E] 及其子类事件. * 在指定的 [CoroutineScope] 下订阅所有 [E] 及其子类事件.
@ -139,12 +152,17 @@ inline fun <reified E : Event> CoroutineScope.subscribe(
@OptIn(MiraiInternalAPI::class, ExperimentalContracts::class) @OptIn(MiraiInternalAPI::class, ExperimentalContracts::class)
inline fun <reified E : Event> CoroutineScope.subscribeAlways( inline fun <reified E : Event> CoroutineScope.subscribeAlways(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
noinline listener: suspend E.(E) -> Unit noinline listener: suspend E.(E) -> Unit
): Listener<E> { ): Listener<E> {
contract { contract {
callsInPlace(listener, InvocationKind.UNKNOWN) callsInPlace(listener, InvocationKind.UNKNOWN)
} }
return E::class.subscribeInternal(Handler(coroutineContext) { it.listener(it); ListeningStatus.LISTENING }) return E::class.subscribeInternal(
Handler(
coroutineContext,
concurrency
) { it.listener(it); ListeningStatus.LISTENING })
} }
/** /**
@ -163,7 +181,11 @@ inline fun <reified E : Event> CoroutineScope.subscribeOnce(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
noinline listener: suspend E.(E) -> Unit noinline listener: suspend E.(E) -> Unit
): Listener<E> = ): Listener<E> =
E::class.subscribeInternal(Handler(coroutineContext) { it.listener(it); ListeningStatus.STOPPED }) E::class.subscribeInternal(
Handler(
coroutineContext,
Listener.ConcurrencyKind.LOCKED
) { it.listener(it); ListeningStatus.STOPPED })
// //
@ -187,9 +209,14 @@ inline fun <reified E : Event> CoroutineScope.subscribeOnce(
@OptIn(MiraiInternalAPI::class) @OptIn(MiraiInternalAPI::class)
inline fun <reified E : BotEvent> Bot.subscribe( inline fun <reified E : BotEvent> Bot.subscribe(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.LOCKED,
noinline handler: suspend E.(E) -> ListeningStatus noinline handler: suspend E.(E) -> ListeningStatus
): Listener<E> = ): Listener<E> =
E::class.subscribeInternal(Handler(coroutineContext) { if (it.bot === this) it.handler(it) else ListeningStatus.LISTENING }) E::class.subscribeInternal(
Handler(
coroutineContext,
concurrency
) { if (it.bot === this) it.handler(it) else ListeningStatus.LISTENING })
/** /**
@ -207,9 +234,14 @@ inline fun <reified E : BotEvent> Bot.subscribe(
@OptIn(MiraiInternalAPI::class) @OptIn(MiraiInternalAPI::class)
inline fun <reified E : BotEvent> Bot.subscribeAlways( inline fun <reified E : BotEvent> Bot.subscribeAlways(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
noinline listener: suspend E.(E) -> Unit noinline listener: suspend E.(E) -> Unit
): Listener<E> { ): Listener<E> {
return E::class.subscribeInternal(Handler(coroutineContext) { if (it.bot === this) it.listener(it); ListeningStatus.LISTENING }) return E::class.subscribeInternal(
Handler(
coroutineContext,
concurrency
) { if (it.bot === this) it.listener(it); ListeningStatus.LISTENING })
} }
/** /**
@ -229,7 +261,7 @@ inline fun <reified E : BotEvent> Bot.subscribeOnce(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
noinline listener: suspend E.(E) -> Unit noinline listener: suspend E.(E) -> Unit
): Listener<E> = ): Listener<E> =
E::class.subscribeInternal(Handler(coroutineContext) { E::class.subscribeInternal(Handler(coroutineContext, Listener.ConcurrencyKind.LOCKED) {
if (it.bot === this) { if (it.bot === this) {
it.listener(it) it.listener(it)
ListeningStatus.STOPPED ListeningStatus.STOPPED

View File

@ -7,6 +7,8 @@
* https://github.com/mamoe/mirai/blob/master/LICENSE * https://github.com/mamoe/mirai/blob/master/LICENSE
*/ */
@file:Suppress("unused")
package net.mamoe.mirai.event.internal package net.mamoe.mirai.event.internal
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
@ -21,11 +23,19 @@ import kotlin.coroutines.EmptyCoroutineContext
@MiraiInternalAPI @MiraiInternalAPI
@Suppress("FunctionName") @Suppress("FunctionName")
fun <E : Event> Class<E>._subscribeEventForJaptOnly(scope: CoroutineScope, onEvent: Function<E, ListeningStatus>): Listener<E> { fun <E : Event> Class<E>._subscribeEventForJaptOnly(scope: CoroutineScope, onEvent: Function<E, ListeningStatus>): Listener<E> {
return this.kotlin.subscribeInternal(scope.Handler(EmptyCoroutineContext) { onEvent.apply(it) }) return this.kotlin.subscribeInternal(
scope.Handler(
EmptyCoroutineContext,
Listener.ConcurrencyKind.CONCURRENT
) { onEvent.apply(it) })
} }
@MiraiInternalAPI @MiraiInternalAPI
@Suppress("FunctionName") @Suppress("FunctionName")
fun <E : Event> Class<E>._subscribeEventForJaptOnly(scope: CoroutineScope, onEvent: Consumer<E>): Listener<E> { fun <E : Event> Class<E>._subscribeEventForJaptOnly(scope: CoroutineScope, onEvent: Consumer<E>): Listener<E> {
return this.kotlin.subscribeInternal(scope.Handler(EmptyCoroutineContext) { onEvent.accept(it); ListeningStatus.LISTENING; }) return this.kotlin.subscribeInternal(
scope.Handler(
EmptyCoroutineContext,
Listener.ConcurrencyKind.CONCURRENT
) { onEvent.accept(it); ListeningStatus.LISTENING; })
} }