Pass exceptions caught in subscriber context to subscriber only

This commit is contained in:
Him188 2022-04-06 14:01:54 +01:00
parent c192047361
commit 30dbd1b1c2
8 changed files with 252 additions and 86 deletions

View File

@ -84,7 +84,6 @@ public abstract interface class net/mamoe/mirai/IMirai : net/mamoe/mirai/LowLeve
public abstract fun acceptNewFriendRequest (Lnet/mamoe/mirai/event/events/NewFriendRequestEvent;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun acceptNewFriendRequest (Lnet/mamoe/mirai/event/events/NewFriendRequestEvent;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun broadcastEvent (Lnet/mamoe/mirai/event/Event;)V public fun broadcastEvent (Lnet/mamoe/mirai/event/Event;)V
public abstract fun broadcastEvent (Lnet/mamoe/mirai/event/Event;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun broadcastEvent (Lnet/mamoe/mirai/event/Event;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun broadcastEvent$suspendImpl (Lnet/mamoe/mirai/IMirai;Lnet/mamoe/mirai/event/Event;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun calculateGroupCodeByGroupUin (J)J public fun calculateGroupCodeByGroupUin (J)J
public fun calculateGroupUinByGroupCode (J)J public fun calculateGroupUinByGroupCode (J)J
public abstract fun constructMessageSource (JLnet/mamoe/mirai/message/data/MessageSourceKind;JJ[II[ILnet/mamoe/mirai/message/data/MessageChain;)Lnet/mamoe/mirai/message/data/OfflineMessageSource; public abstract fun constructMessageSource (JLnet/mamoe/mirai/message/data/MessageSourceKind;JJ[II[ILnet/mamoe/mirai/message/data/MessageChain;)Lnet/mamoe/mirai/message/data/OfflineMessageSource;
@ -1738,7 +1737,6 @@ public abstract class net/mamoe/mirai/event/EventChannel {
public static synthetic fun asChannel$default (Lnet/mamoe/mirai/event/EventChannel;ILkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel; public static synthetic fun asChannel$default (Lnet/mamoe/mirai/event/EventChannel;ILkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
public abstract fun asFlow ()Lkotlinx/coroutines/flow/Flow; public abstract fun asFlow ()Lkotlinx/coroutines/flow/Flow;
public abstract fun context ([Lkotlin/coroutines/CoroutineContext;)Lnet/mamoe/mirai/event/EventChannel; public abstract fun context ([Lkotlin/coroutines/CoroutineContext;)Lnet/mamoe/mirai/event/EventChannel;
protected abstract fun createListener (Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function2;)Lnet/mamoe/mirai/event/Listener;
public final fun exceptionHandler (Lkotlin/jvm/functions/Function1;)Lnet/mamoe/mirai/event/EventChannel; public final fun exceptionHandler (Lkotlin/jvm/functions/Function1;)Lnet/mamoe/mirai/event/EventChannel;
public final fun exceptionHandler (Lkotlinx/coroutines/CoroutineExceptionHandler;)Lnet/mamoe/mirai/event/EventChannel; public final fun exceptionHandler (Lkotlinx/coroutines/CoroutineExceptionHandler;)Lnet/mamoe/mirai/event/EventChannel;
public final fun filter (Lkotlin/jvm/functions/Function1;)Lnet/mamoe/mirai/event/EventChannel; public final fun filter (Lkotlin/jvm/functions/Function1;)Lnet/mamoe/mirai/event/EventChannel;
@ -1768,7 +1766,6 @@ public abstract class net/mamoe/mirai/event/EventChannel {
public final synthetic fun subscribeAlways (Lkotlin/reflect/KClass;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function3;)Lnet/mamoe/mirai/event/Listener; public final synthetic fun subscribeAlways (Lkotlin/reflect/KClass;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function3;)Lnet/mamoe/mirai/event/Listener;
public static synthetic fun subscribeAlways$default (Lnet/mamoe/mirai/event/EventChannel;Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Ljava/util/function/Consumer;ILjava/lang/Object;)Lnet/mamoe/mirai/event/Listener; public static synthetic fun subscribeAlways$default (Lnet/mamoe/mirai/event/EventChannel;Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Ljava/util/function/Consumer;ILjava/lang/Object;)Lnet/mamoe/mirai/event/Listener;
public static synthetic fun subscribeAlways$default (Lnet/mamoe/mirai/event/EventChannel;Lkotlin/reflect/KClass;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function3;ILjava/lang/Object;)Lnet/mamoe/mirai/event/Listener; public static synthetic fun subscribeAlways$default (Lnet/mamoe/mirai/event/EventChannel;Lkotlin/reflect/KClass;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function3;ILjava/lang/Object;)Lnet/mamoe/mirai/event/Listener;
protected abstract fun subscribeImpl (Lkotlin/reflect/KClass;Lnet/mamoe/mirai/event/Listener;)V
public final fun subscribeOnce (Ljava/lang/Class;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener; public final fun subscribeOnce (Ljava/lang/Class;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener;
public final fun subscribeOnce (Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener; public final fun subscribeOnce (Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener;
public final fun subscribeOnce (Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener; public final fun subscribeOnce (Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener;

View File

@ -84,7 +84,6 @@ public abstract interface class net/mamoe/mirai/IMirai : net/mamoe/mirai/LowLeve
public abstract fun acceptNewFriendRequest (Lnet/mamoe/mirai/event/events/NewFriendRequestEvent;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun acceptNewFriendRequest (Lnet/mamoe/mirai/event/events/NewFriendRequestEvent;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun broadcastEvent (Lnet/mamoe/mirai/event/Event;)V public fun broadcastEvent (Lnet/mamoe/mirai/event/Event;)V
public abstract fun broadcastEvent (Lnet/mamoe/mirai/event/Event;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun broadcastEvent (Lnet/mamoe/mirai/event/Event;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun broadcastEvent$suspendImpl (Lnet/mamoe/mirai/IMirai;Lnet/mamoe/mirai/event/Event;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun calculateGroupCodeByGroupUin (J)J public fun calculateGroupCodeByGroupUin (J)J
public fun calculateGroupUinByGroupCode (J)J public fun calculateGroupUinByGroupCode (J)J
public abstract fun constructMessageSource (JLnet/mamoe/mirai/message/data/MessageSourceKind;JJ[II[ILnet/mamoe/mirai/message/data/MessageChain;)Lnet/mamoe/mirai/message/data/OfflineMessageSource; public abstract fun constructMessageSource (JLnet/mamoe/mirai/message/data/MessageSourceKind;JJ[II[ILnet/mamoe/mirai/message/data/MessageChain;)Lnet/mamoe/mirai/message/data/OfflineMessageSource;
@ -1738,7 +1737,6 @@ public abstract class net/mamoe/mirai/event/EventChannel {
public static synthetic fun asChannel$default (Lnet/mamoe/mirai/event/EventChannel;ILkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel; public static synthetic fun asChannel$default (Lnet/mamoe/mirai/event/EventChannel;ILkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
public abstract fun asFlow ()Lkotlinx/coroutines/flow/Flow; public abstract fun asFlow ()Lkotlinx/coroutines/flow/Flow;
public abstract fun context ([Lkotlin/coroutines/CoroutineContext;)Lnet/mamoe/mirai/event/EventChannel; public abstract fun context ([Lkotlin/coroutines/CoroutineContext;)Lnet/mamoe/mirai/event/EventChannel;
protected abstract fun createListener (Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function2;)Lnet/mamoe/mirai/event/Listener;
public final fun exceptionHandler (Lkotlin/jvm/functions/Function1;)Lnet/mamoe/mirai/event/EventChannel; public final fun exceptionHandler (Lkotlin/jvm/functions/Function1;)Lnet/mamoe/mirai/event/EventChannel;
public final fun exceptionHandler (Lkotlinx/coroutines/CoroutineExceptionHandler;)Lnet/mamoe/mirai/event/EventChannel; public final fun exceptionHandler (Lkotlinx/coroutines/CoroutineExceptionHandler;)Lnet/mamoe/mirai/event/EventChannel;
public final fun filter (Lkotlin/jvm/functions/Function1;)Lnet/mamoe/mirai/event/EventChannel; public final fun filter (Lkotlin/jvm/functions/Function1;)Lnet/mamoe/mirai/event/EventChannel;
@ -1768,7 +1766,6 @@ public abstract class net/mamoe/mirai/event/EventChannel {
public final synthetic fun subscribeAlways (Lkotlin/reflect/KClass;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function3;)Lnet/mamoe/mirai/event/Listener; public final synthetic fun subscribeAlways (Lkotlin/reflect/KClass;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function3;)Lnet/mamoe/mirai/event/Listener;
public static synthetic fun subscribeAlways$default (Lnet/mamoe/mirai/event/EventChannel;Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Ljava/util/function/Consumer;ILjava/lang/Object;)Lnet/mamoe/mirai/event/Listener; public static synthetic fun subscribeAlways$default (Lnet/mamoe/mirai/event/EventChannel;Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Ljava/util/function/Consumer;ILjava/lang/Object;)Lnet/mamoe/mirai/event/Listener;
public static synthetic fun subscribeAlways$default (Lnet/mamoe/mirai/event/EventChannel;Lkotlin/reflect/KClass;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function3;ILjava/lang/Object;)Lnet/mamoe/mirai/event/Listener; public static synthetic fun subscribeAlways$default (Lnet/mamoe/mirai/event/EventChannel;Lkotlin/reflect/KClass;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function3;ILjava/lang/Object;)Lnet/mamoe/mirai/event/Listener;
protected abstract fun subscribeImpl (Lkotlin/reflect/KClass;Lnet/mamoe/mirai/event/Listener;)V
public final fun subscribeOnce (Ljava/lang/Class;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener; public final fun subscribeOnce (Ljava/lang/Class;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener;
public final fun subscribeOnce (Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener; public final fun subscribeOnce (Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener;
public final fun subscribeOnce (Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener; public final fun subscribeOnce (Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener;

View File

@ -27,6 +27,7 @@ import net.mamoe.mirai.event.ConcurrencyKind.LOCKED
import net.mamoe.mirai.event.events.BotEvent import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.internal.event.registerEventHandler import net.mamoe.mirai.internal.event.registerEventHandler
import net.mamoe.mirai.utils.* import net.mamoe.mirai.utils.*
import org.jetbrains.annotations.Contract
import java.util.function.Consumer import java.util.function.Consumer
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.EmptyCoroutineContext
@ -214,22 +215,7 @@ public abstract class EventChannel<out BaseEvent : Event> @MiraiInternalApi publ
*/ */
@JvmSynthetic @JvmSynthetic
public fun filter(filter: suspend (event: BaseEvent) -> Boolean): EventChannel<BaseEvent> { public fun filter(filter: suspend (event: BaseEvent) -> Boolean): EventChannel<BaseEvent> {
return object : DelegateEventChannel<BaseEvent>(this) { return FilterEventChannel<BaseEvent>(this, filter)
override fun <E : Event> intercept(block: suspend (E) -> ListeningStatus): suspend (E) -> ListeningStatus {
val thisIntercepted: suspend (E) -> ListeningStatus = { ev ->
val filterResult = try {
@Suppress("UNCHECKED_CAST")
baseEventClass.isInstance(ev) && filter(ev as BaseEvent)
} catch (e: Throwable) {
if (e is ExceptionInEventChannelFilterException) throw e // wrapped by another filter
throw ExceptionInEventChannelFilterException(ev, this, cause = e)
}
if (filterResult) block.invoke(ev)
else ListeningStatus.LISTENING
}
return delegate.intercept(thisIntercepted)
}
}
} }
/** /**
@ -684,30 +670,26 @@ public abstract class EventChannel<out BaseEvent : Event> @MiraiInternalApi publ
// region impl // region impl
/**
* 由子类实现可以为 handler 包装一个过滤器等. 每个 handler 都会经过此函数处理. // protected, to hide from users
*/ @MiraiInternalApi
@MiraiExperimentalApi protected abstract fun <E : Event> registerListener(eventClass: KClass<out E>, listener: Listener<E>)
protected open fun <E : Event> intercept(block: suspend (E) -> ListeningStatus): suspend (E) -> ListeningStatus {
return block
}
// to overcome visibility issue // to overcome visibility issue
internal fun <E : Event> intercept0(block: suspend (E) -> ListeningStatus): suspend (E) -> ListeningStatus = internal fun <E : Event> registerListener0(eventClass: KClass<out E>, listener: Listener<E>) {
intercept(block) return registerListener(eventClass, listener)
protected abstract fun <E : Event> subscribeImpl(eventClass: KClass<out E>, listener: Listener<E>)
// to overcome visibility issue
internal fun <E : Event> subscribeImpl0(eventClass: KClass<out E>, listener: Listener<E>) {
return subscribeImpl(eventClass, listener)
} }
private fun <L : Listener<E>, E : Event> subscribeInternal(eventClass: KClass<out E>, listener: L): L { private fun <L : Listener<E>, E : Event> subscribeInternal(eventClass: KClass<out E>, listener: L): L {
subscribeImpl(eventClass, listener) registerListener(eventClass, listener)
return listener return listener
} }
/**
* Creates [Listener] instance using the [listenerBlock] action.
*/
@Contract("_ -> new") // always creates new instance
@MiraiInternalApi
protected abstract fun <E : Event> createListener( protected abstract fun <E : Event> createListener(
coroutineContext: CoroutineContext, coroutineContext: CoroutineContext,
concurrencyKind: ConcurrencyKind, concurrencyKind: ConcurrencyKind,
@ -727,19 +709,29 @@ public abstract class EventChannel<out BaseEvent : Event> @MiraiInternalApi publ
} }
private open class DelegateEventChannel<BaseEvent : Event>( // used by mirai-core
protected val delegate: EventChannel<BaseEvent>, internal open class FilterEventChannel<BaseEvent : Event>(
private val delegate: EventChannel<BaseEvent>,
private val filter: suspend (event: BaseEvent) -> Boolean,
) : EventChannel<BaseEvent>(delegate.baseEventClass, delegate.defaultCoroutineContext) { ) : EventChannel<BaseEvent>(delegate.baseEventClass, delegate.defaultCoroutineContext) {
private inline val innerThis get() = this private fun <E : Event> intercept(block: suspend (E) -> ListeningStatus): suspend (E) -> ListeningStatus {
return { ev ->
override fun <E : Event> intercept(block: suspend (E) -> ListeningStatus): suspend (E) -> ListeningStatus { val filterResult = try {
return delegate.intercept0(block) @Suppress("UNCHECKED_CAST")
baseEventClass.isInstance(ev) && filter(ev as BaseEvent)
} catch (e: Throwable) {
if (e is ExceptionInEventChannelFilterException) throw e // wrapped by another filter
throw ExceptionInEventChannelFilterException(ev, this, cause = e)
}
if (filterResult) block.invoke(ev)
else ListeningStatus.LISTENING
}
} }
override fun asFlow(): Flow<BaseEvent> = delegate.asFlow() override fun asFlow(): Flow<BaseEvent> = delegate.asFlow().filter(filter)
override fun <E : Event> subscribeImpl(eventClass: KClass<out E>, listener: Listener<E>) { override fun <E : Event> registerListener(eventClass: KClass<out E>, listener: Listener<E>) {
delegate.subscribeImpl0(eventClass, listener) delegate.registerListener0(eventClass, listener)
} }
override fun <E : Event> createListener( override fun <E : Event> createListener(
@ -747,7 +739,7 @@ private open class DelegateEventChannel<BaseEvent : Event>(
concurrencyKind: ConcurrencyKind, concurrencyKind: ConcurrencyKind,
priority: EventPriority, priority: EventPriority,
listenerBlock: suspend (E) -> ListeningStatus listenerBlock: suspend (E) -> ListeningStatus
): Listener<E> = delegate.createListener0(coroutineContext, concurrencyKind, priority, listenerBlock) ): Listener<E> = delegate.createListener0(coroutineContext, concurrencyKind, priority, intercept(listenerBlock))
override fun context(vararg coroutineContexts: CoroutineContext): EventChannel<BaseEvent> { override fun context(vararg coroutineContexts: CoroutineContext): EventChannel<BaseEvent> {
return delegate.context(*coroutineContexts) return delegate.context(*coroutineContexts)

View File

@ -32,8 +32,8 @@ public object GlobalEventChannel : EventChannel<Event>(Event::class, EmptyCorout
} }
override fun asFlow(): Flow<Event> = instance.asFlow() override fun asFlow(): Flow<Event> = instance.asFlow()
override fun <E : Event> subscribeImpl(eventClass: KClass<out E>, listener: Listener<E>) { override fun <E : Event> registerListener(eventClass: KClass<out E>, listener: Listener<E>) {
return instance.subscribeImpl0(eventClass, listener) return instance.registerListener0(eventClass, listener)
} }
override fun <E : Event> createListener( override fun <E : Event> createListener(

View File

@ -27,7 +27,14 @@ import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KClass import kotlin.reflect.KClass
internal open class EventChannelImpl<E : Event>( // You probably should only use EventChannelToEventDispatcherAdapter.instance, or just use EventDispatchers. Event.broadcast is also good to use internally!
@RequiresOptIn(
"Every EventChannelImpl has dedicated EventListeners registries. Use the constructor only when you know what you are doing.",
level = RequiresOptIn.Level.ERROR
)
internal annotation class DangerousEventChannelImplConstructor
internal open class EventChannelImpl<E : Event> @DangerousEventChannelImplConstructor constructor(
baseEventClass: KClass<out E>, defaultCoroutineContext: CoroutineContext = EmptyCoroutineContext baseEventClass: KClass<out E>, defaultCoroutineContext: CoroutineContext = EmptyCoroutineContext
) : EventChannel<E>(baseEventClass, defaultCoroutineContext) { ) : EventChannel<E>(baseEventClass, defaultCoroutineContext) {
val eventListeners = EventListeners() val eventListeners = EventListeners()
@ -50,7 +57,7 @@ internal open class EventChannelImpl<E : Event>(
return flow.asSharedFlow().filter { baseEventClass.isInstance(it) } as Flow<E> return flow.asSharedFlow().filter { baseEventClass.isInstance(it) } as Flow<E>
} }
override fun <E : Event> subscribeImpl(eventClass: KClass<out E>, listener: Listener<E>) { override fun <E : Event> registerListener(eventClass: KClass<out E>, listener: Listener<E>) {
eventListeners.addListener(eventClass, listener) eventListeners.addListener(eventClass, listener)
} }
@ -64,7 +71,7 @@ internal open class EventChannelImpl<E : Event>(
return SafeListener( return SafeListener(
parentJob = context[Job], parentJob = context[Job],
subscriberContext = context, subscriberContext = context,
listenerBlock = intercept(listenerBlock), listenerBlock = listenerBlock,
concurrencyKind = concurrencyKind, concurrencyKind = concurrencyKind,
priority = priority priority = priority
) )
@ -114,11 +121,54 @@ internal open class EventChannelImpl<E : Event>(
} }
override fun context(vararg coroutineContexts: CoroutineContext): EventChannel<E> { override fun context(vararg coroutineContexts: CoroutineContext): EventChannel<E> {
return EventChannelImpl( val newDefaultContext = coroutineContexts.fold(defaultCoroutineContext) { acc, coroutineContext ->
baseEventClass, acc + coroutineContext
coroutineContexts.fold(defaultCoroutineContext) { acc, coroutineContext -> }
acc + coroutineContext
}) @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
return object : DelegateEventChannel<E>(this) {
override fun <E : Event> createListener(
coroutineContext: CoroutineContext,
concurrencyKind: ConcurrencyKind,
priority: EventPriority,
listenerBlock: suspend (E) -> ListeningStatus
): Listener<E> {
return super.createListener(
newDefaultContext + coroutineContext,
concurrencyKind,
priority,
listenerBlock
)
}
override fun context(vararg coroutineContexts: CoroutineContext): EventChannel<E> {
return delegate.context(newDefaultContext, *coroutineContexts)
}
}
}
}
internal abstract class DelegateEventChannel<BaseEvent : Event>(
protected val delegate: EventChannel<BaseEvent>,
) : EventChannel<BaseEvent>(delegate.baseEventClass, delegate.defaultCoroutineContext) {
override fun asFlow(): Flow<BaseEvent> = delegate.asFlow()
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
override fun <E : Event> registerListener(eventClass: KClass<out E>, listener: Listener<E>) {
delegate.registerListener0(eventClass, listener)
}
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
override fun <E : Event> createListener(
coroutineContext: CoroutineContext,
concurrencyKind: ConcurrencyKind,
priority: EventPriority,
listenerBlock: suspend (E) -> ListeningStatus
): Listener<E> = delegate.createListener0(coroutineContext, concurrencyKind, priority, listenerBlock)
override fun context(vararg coroutineContexts: CoroutineContext): EventChannel<BaseEvent> {
return delegate.context(*coroutineContexts)
} }
} }

View File

@ -17,6 +17,7 @@ import kotlin.reflect.KClass
/** /**
* @since 2.11 * @since 2.11
*/ */
@OptIn(DangerousEventChannelImplConstructor::class)
internal class EventChannelToEventDispatcherAdapter<E : Event> private constructor( internal class EventChannelToEventDispatcherAdapter<E : Event> private constructor(
baseEventClass: KClass<out E>, defaultCoroutineContext: CoroutineContext = EmptyCoroutineContext baseEventClass: KClass<out E>, defaultCoroutineContext: CoroutineContext = EmptyCoroutineContext
) : EventChannelImpl<E>(baseEventClass, defaultCoroutineContext) { ) : EventChannelImpl<E>(baseEventClass, defaultCoroutineContext) {

View File

@ -42,22 +42,25 @@ internal class SafeListener<in E : Event> internal constructor(
// Inherit context. // Inherit context.
withContext(subscriberContext) { listenerBlock.invoke(event) }.also { if (it == ListeningStatus.STOPPED) this.complete() } withContext(subscriberContext) { listenerBlock.invoke(event) }.also { if (it == ListeningStatus.STOPPED) this.complete() }
} catch (e: Throwable) { } catch (e: Throwable) {
subscriberContext[CoroutineExceptionHandler]?.handleException(subscriberContext, e) // 若监听方使用了 EventChannel.exceptionHandler, 那么它就能处理异常, 否则将只记录异常.
?: currentCoroutineContext()[CoroutineExceptionHandler]?.handleException(subscriberContext, e) val subscriberExceptionHandler = subscriberContext[CoroutineExceptionHandler]
?: kotlin.run { if (subscriberExceptionHandler == null) {
val logger = if (event is BotEvent) event.bot.logger else logger val logger = if (event is BotEvent) event.bot.logger else logger
val subscriberName = subscriberContext[CoroutineName]?.name ?: "<unnamed>" val subscriberName =
val broadcasterName = currentCoroutineContext()[CoroutineName]?.name ?: "<unnamed>" subscriberContext[CoroutineName]?.name
val message = ?: "<unnamed>" // Bot 协程域有 CoroutineName, mirai-console 也会给插件域加入.
"An exception occurred when processing event. " + val broadcasterName = currentCoroutineContext()[CoroutineName]?.name ?: "<unnamed>"
"Subscriber scope: '$subscriberName'. " + val message =
"Broadcaster scope: '$broadcasterName'" "An exception occurred when processing event. " +
logger.warning(message, e) "Subscriber scope: '$subscriberName'. " +
} "Broadcaster scope: '$broadcasterName'"
// this.complete() // do not `completeExceptionally`, otherwise parentJob will fai`l. logger.warning(message, e)
// ListeningStatus.STOPPED
} else {
subscriberExceptionHandler.handleException(subscriberContext, e)
}
// not stopping listening.
ListeningStatus.LISTENING ListeningStatus.LISTENING
} }
} }

View File

@ -9,11 +9,10 @@
package net.mamoe.mirai.internal.event package net.mamoe.mirai.internal.event
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.toList import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.sync.Semaphore
import net.mamoe.mirai.event.* import net.mamoe.mirai.event.*
import net.mamoe.mirai.event.events.FriendEvent import net.mamoe.mirai.event.events.FriendEvent
@ -23,21 +22,20 @@ import net.mamoe.mirai.event.events.MessageEvent
import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import kotlin.coroutines.coroutineContext
import kotlin.coroutines.resume import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine import kotlin.coroutines.suspendCoroutine
import kotlin.test.assertEquals import kotlin.test.*
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
import kotlin.test.assertTrue
internal class EventChannelTest : AbstractEventTest() { internal class EventChannelTest : AbstractEventTest() {
suspend fun suspendCall() { suspend fun suspendCall() {
coroutineContext
} }
data class TE( data class TE(
val x: Int val x: Int,
val y: Int = 1,
) : AbstractEvent() ) : AbstractEvent()
val semaphore = Semaphore(1) val semaphore = Semaphore(1)
@ -53,7 +51,7 @@ internal class EventChannelTest : AbstractEventTest() {
} }
@Test @Test
fun testFilter() { fun singleFilter() {
runBlocking { runBlocking {
val received = suspendCoroutine<Int> { cont -> val received = suspendCoroutine<Int> { cont ->
GlobalEventChannel GlobalEventChannel
@ -84,6 +82,131 @@ internal class EventChannelTest : AbstractEventTest() {
} }
} }
@Test
fun multipleFilters() {
runBlocking {
val received = suspendCoroutine<Int> { cont ->
GlobalEventChannel
.filterIsInstance<TE>()
.filter {
true
}
.filter {
it.x == 2
}
.filter {
it.y == 2
}
.filter {
true
}
.subscribeOnce<TE> {
cont.resume(it.x)
}
launch {
println("Broadcast 1")
TE(1, 1).broadcast()
println("Broadcast 2")
TE(2, 1).broadcast()
println("Broadcast 2")
TE(2, 3).broadcast()
println("Broadcast 2")
TE(2, 2).broadcast()
println("Broadcast done")
}
}
assertEquals(2, received)
}
}
@Test
fun multipleContexts1() {
runBlocking {
val received = suspendCoroutine<Int> { cont ->
GlobalEventChannel
.parentScope(CoroutineScope(CoroutineName("1")))
.context(CoroutineName("2"))
.context(CoroutineName("3"))
.subscribeOnce<TE>(CoroutineName("4")) {
assertEquals("4", currentCoroutineContext()[CoroutineName]!!.name)
cont.resume(it.x)
}
launch {
TE(2, 2).broadcast()
}
}
assertEquals(2, received)
}
}
@Test
fun multipleContexts2() {
runBlocking {
val received = suspendCoroutine<Int> { cont ->
GlobalEventChannel
.parentScope(CoroutineScope(CoroutineName("1")))
.context(CoroutineName("2"))
.context(CoroutineName("3"))
.subscribeOnce<TE> {
assertEquals("3", currentCoroutineContext()[CoroutineName]!!.name)
cont.resume(it.x)
}
launch {
TE(2, 2).broadcast()
}
}
assertEquals(2, received)
}
}
@Test
fun multipleContexts3() {
runBlocking {
val received = suspendCoroutine<Int> { cont ->
GlobalEventChannel
.parentScope(CoroutineScope(CoroutineName("1")))
.context(CoroutineName("2"))
.subscribeOnce<TE> {
assertEquals("2", currentCoroutineContext()[CoroutineName]!!.name)
cont.resume(it.x)
}
launch {
TE(2, 2).broadcast()
}
}
assertEquals(2, received)
}
}
@Test
fun multipleContexts4() {
runBlocking {
val received = suspendCoroutine<Int> { cont ->
GlobalEventChannel
.parentScope(CoroutineScope(CoroutineName("1")))
.subscribeOnce<TE> {
assertEquals("1", currentCoroutineContext()[CoroutineName]!!.name)
cont.resume(it.x)
}
launch {
TE(2, 2).broadcast()
}
}
assertEquals(2, received)
}
}
@Test @Test
fun testAsChannel() { fun testAsChannel() {
runBlocking { runBlocking {
@ -193,11 +316,14 @@ internal class EventChannelTest : AbstractEventTest() {
runBlocking { runBlocking {
assertFailsWith<IllegalStateException> { assertFailsWith<IllegalStateException> {
suspendCoroutine<Int> { cont -> suspendCoroutine<Int> { cont ->
val handler = CoroutineExceptionHandler { _, throwable ->
cont.resumeWithException(throwable)
}
GlobalEventChannel GlobalEventChannel
.exceptionHandler { .context(handler)
cont.resumeWithException(it)
}
.subscribeOnce<TE> { .subscribeOnce<TE> {
assertSame(handler, currentCoroutineContext()[CoroutineExceptionHandler])
error("test error") error("test error")
} }
@ -217,7 +343,7 @@ internal class EventChannelTest : AbstractEventTest() {
@Test @Test
fun testVariance() { fun testVariance() {
var global: EventChannel<Event> = GlobalEventChannel var global: EventChannel<Event> = GlobalEventChannel
var a: EventChannel<MessageEvent> = global.filterIsInstance<MessageEvent>() val a: EventChannel<MessageEvent> = global.filterIsInstance()
val filterLambda: (ev: MessageEvent) -> Boolean = { true } val filterLambda: (ev: MessageEvent) -> Boolean = { true }