From 30dbd1b1c2f4bce6540f4b3cc692f9211141e545 Mon Sep 17 00:00:00 2001 From: Him188 Date: Wed, 6 Apr 2022 14:01:54 +0100 Subject: [PATCH] Pass exceptions caught in subscriber context to subscriber only --- .../android/api/android.api | 3 - .../compatibility-validation/jvm/api/jvm.api | 3 - .../commonMain/kotlin/event/EventChannel.kt | 76 ++++----- .../kotlin/event/GlobalEventChannel.kt | 4 +- .../kotlin/event/EventChannelImpl.kt | 66 +++++++- .../EventChannelToEventDispatcherAdapter.kt | 1 + .../commonMain/kotlin/event/SafeListener.kt | 33 ++-- .../kotlin/event/EventChannelTest.kt | 152 ++++++++++++++++-- 8 files changed, 252 insertions(+), 86 deletions(-) diff --git a/mirai-core-api/compatibility-validation/android/api/android.api b/mirai-core-api/compatibility-validation/android/api/android.api index f52075655..c56d421d3 100644 --- a/mirai-core-api/compatibility-validation/android/api/android.api +++ b/mirai-core-api/compatibility-validation/android/api/android.api @@ -84,7 +84,6 @@ public abstract interface class net/mamoe/mirai/IMirai : net/mamoe/mirai/LowLeve public abstract fun acceptNewFriendRequest (Lnet/mamoe/mirai/event/events/NewFriendRequestEvent;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun broadcastEvent (Lnet/mamoe/mirai/event/Event;)V public abstract fun broadcastEvent (Lnet/mamoe/mirai/event/Event;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static synthetic fun broadcastEvent$suspendImpl (Lnet/mamoe/mirai/IMirai;Lnet/mamoe/mirai/event/Event;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun calculateGroupCodeByGroupUin (J)J public fun calculateGroupUinByGroupCode (J)J public abstract fun constructMessageSource (JLnet/mamoe/mirai/message/data/MessageSourceKind;JJ[II[ILnet/mamoe/mirai/message/data/MessageChain;)Lnet/mamoe/mirai/message/data/OfflineMessageSource; @@ -1738,7 +1737,6 @@ public abstract class net/mamoe/mirai/event/EventChannel { public static synthetic fun asChannel$default (Lnet/mamoe/mirai/event/EventChannel;ILkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel; public abstract fun asFlow ()Lkotlinx/coroutines/flow/Flow; public abstract fun context ([Lkotlin/coroutines/CoroutineContext;)Lnet/mamoe/mirai/event/EventChannel; - protected abstract fun createListener (Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function2;)Lnet/mamoe/mirai/event/Listener; public final fun exceptionHandler (Lkotlin/jvm/functions/Function1;)Lnet/mamoe/mirai/event/EventChannel; public final fun exceptionHandler (Lkotlinx/coroutines/CoroutineExceptionHandler;)Lnet/mamoe/mirai/event/EventChannel; public final fun filter (Lkotlin/jvm/functions/Function1;)Lnet/mamoe/mirai/event/EventChannel; @@ -1768,7 +1766,6 @@ public abstract class net/mamoe/mirai/event/EventChannel { public final synthetic fun subscribeAlways (Lkotlin/reflect/KClass;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function3;)Lnet/mamoe/mirai/event/Listener; public static synthetic fun subscribeAlways$default (Lnet/mamoe/mirai/event/EventChannel;Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Ljava/util/function/Consumer;ILjava/lang/Object;)Lnet/mamoe/mirai/event/Listener; public static synthetic fun subscribeAlways$default (Lnet/mamoe/mirai/event/EventChannel;Lkotlin/reflect/KClass;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function3;ILjava/lang/Object;)Lnet/mamoe/mirai/event/Listener; - protected abstract fun subscribeImpl (Lkotlin/reflect/KClass;Lnet/mamoe/mirai/event/Listener;)V public final fun subscribeOnce (Ljava/lang/Class;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener; public final fun subscribeOnce (Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener; public final fun subscribeOnce (Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener; diff --git a/mirai-core-api/compatibility-validation/jvm/api/jvm.api b/mirai-core-api/compatibility-validation/jvm/api/jvm.api index e2f211892..d620955df 100644 --- a/mirai-core-api/compatibility-validation/jvm/api/jvm.api +++ b/mirai-core-api/compatibility-validation/jvm/api/jvm.api @@ -84,7 +84,6 @@ public abstract interface class net/mamoe/mirai/IMirai : net/mamoe/mirai/LowLeve public abstract fun acceptNewFriendRequest (Lnet/mamoe/mirai/event/events/NewFriendRequestEvent;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun broadcastEvent (Lnet/mamoe/mirai/event/Event;)V public abstract fun broadcastEvent (Lnet/mamoe/mirai/event/Event;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static synthetic fun broadcastEvent$suspendImpl (Lnet/mamoe/mirai/IMirai;Lnet/mamoe/mirai/event/Event;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun calculateGroupCodeByGroupUin (J)J public fun calculateGroupUinByGroupCode (J)J public abstract fun constructMessageSource (JLnet/mamoe/mirai/message/data/MessageSourceKind;JJ[II[ILnet/mamoe/mirai/message/data/MessageChain;)Lnet/mamoe/mirai/message/data/OfflineMessageSource; @@ -1738,7 +1737,6 @@ public abstract class net/mamoe/mirai/event/EventChannel { public static synthetic fun asChannel$default (Lnet/mamoe/mirai/event/EventChannel;ILkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel; public abstract fun asFlow ()Lkotlinx/coroutines/flow/Flow; public abstract fun context ([Lkotlin/coroutines/CoroutineContext;)Lnet/mamoe/mirai/event/EventChannel; - protected abstract fun createListener (Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function2;)Lnet/mamoe/mirai/event/Listener; public final fun exceptionHandler (Lkotlin/jvm/functions/Function1;)Lnet/mamoe/mirai/event/EventChannel; public final fun exceptionHandler (Lkotlinx/coroutines/CoroutineExceptionHandler;)Lnet/mamoe/mirai/event/EventChannel; public final fun filter (Lkotlin/jvm/functions/Function1;)Lnet/mamoe/mirai/event/EventChannel; @@ -1768,7 +1766,6 @@ public abstract class net/mamoe/mirai/event/EventChannel { public final synthetic fun subscribeAlways (Lkotlin/reflect/KClass;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function3;)Lnet/mamoe/mirai/event/Listener; public static synthetic fun subscribeAlways$default (Lnet/mamoe/mirai/event/EventChannel;Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Ljava/util/function/Consumer;ILjava/lang/Object;)Lnet/mamoe/mirai/event/Listener; public static synthetic fun subscribeAlways$default (Lnet/mamoe/mirai/event/EventChannel;Lkotlin/reflect/KClass;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function3;ILjava/lang/Object;)Lnet/mamoe/mirai/event/Listener; - protected abstract fun subscribeImpl (Lkotlin/reflect/KClass;Lnet/mamoe/mirai/event/Listener;)V public final fun subscribeOnce (Ljava/lang/Class;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener; public final fun subscribeOnce (Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener; public final fun subscribeOnce (Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener; diff --git a/mirai-core-api/src/commonMain/kotlin/event/EventChannel.kt b/mirai-core-api/src/commonMain/kotlin/event/EventChannel.kt index 52aa64f8c..620f01159 100644 --- a/mirai-core-api/src/commonMain/kotlin/event/EventChannel.kt +++ b/mirai-core-api/src/commonMain/kotlin/event/EventChannel.kt @@ -27,6 +27,7 @@ import net.mamoe.mirai.event.ConcurrencyKind.LOCKED import net.mamoe.mirai.event.events.BotEvent import net.mamoe.mirai.internal.event.registerEventHandler import net.mamoe.mirai.utils.* +import org.jetbrains.annotations.Contract import java.util.function.Consumer import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext @@ -214,22 +215,7 @@ public abstract class EventChannel @MiraiInternalApi publ */ @JvmSynthetic public fun filter(filter: suspend (event: BaseEvent) -> Boolean): EventChannel { - return object : DelegateEventChannel(this) { - override fun intercept(block: suspend (E) -> ListeningStatus): suspend (E) -> ListeningStatus { - val thisIntercepted: suspend (E) -> ListeningStatus = { ev -> - val filterResult = try { - @Suppress("UNCHECKED_CAST") - baseEventClass.isInstance(ev) && filter(ev as BaseEvent) - } catch (e: Throwable) { - if (e is ExceptionInEventChannelFilterException) throw e // wrapped by another filter - throw ExceptionInEventChannelFilterException(ev, this, cause = e) - } - if (filterResult) block.invoke(ev) - else ListeningStatus.LISTENING - } - return delegate.intercept(thisIntercepted) - } - } + return FilterEventChannel(this, filter) } /** @@ -684,30 +670,26 @@ public abstract class EventChannel @MiraiInternalApi publ // region impl - /** - * 由子类实现,可以为 handler 包装一个过滤器等. 每个 handler 都会经过此函数处理. - */ - @MiraiExperimentalApi - protected open fun intercept(block: suspend (E) -> ListeningStatus): suspend (E) -> ListeningStatus { - return block - } + + // protected, to hide from users + @MiraiInternalApi + protected abstract fun registerListener(eventClass: KClass, listener: Listener) // to overcome visibility issue - internal fun intercept0(block: suspend (E) -> ListeningStatus): suspend (E) -> ListeningStatus = - intercept(block) - - protected abstract fun subscribeImpl(eventClass: KClass, listener: Listener) - - // to overcome visibility issue - internal fun subscribeImpl0(eventClass: KClass, listener: Listener) { - return subscribeImpl(eventClass, listener) + internal fun registerListener0(eventClass: KClass, listener: Listener) { + return registerListener(eventClass, listener) } private fun , E : Event> subscribeInternal(eventClass: KClass, listener: L): L { - subscribeImpl(eventClass, listener) + registerListener(eventClass, listener) return listener } + /** + * Creates [Listener] instance using the [listenerBlock] action. + */ + @Contract("_ -> new") // always creates new instance + @MiraiInternalApi protected abstract fun createListener( coroutineContext: CoroutineContext, concurrencyKind: ConcurrencyKind, @@ -727,19 +709,29 @@ public abstract class EventChannel @MiraiInternalApi publ } -private open class DelegateEventChannel( - protected val delegate: EventChannel, +// used by mirai-core +internal open class FilterEventChannel( + private val delegate: EventChannel, + private val filter: suspend (event: BaseEvent) -> Boolean, ) : EventChannel(delegate.baseEventClass, delegate.defaultCoroutineContext) { - private inline val innerThis get() = this - - override fun intercept(block: suspend (E) -> ListeningStatus): suspend (E) -> ListeningStatus { - return delegate.intercept0(block) + private fun intercept(block: suspend (E) -> ListeningStatus): suspend (E) -> ListeningStatus { + return { ev -> + val filterResult = try { + @Suppress("UNCHECKED_CAST") + baseEventClass.isInstance(ev) && filter(ev as BaseEvent) + } catch (e: Throwable) { + if (e is ExceptionInEventChannelFilterException) throw e // wrapped by another filter + throw ExceptionInEventChannelFilterException(ev, this, cause = e) + } + if (filterResult) block.invoke(ev) + else ListeningStatus.LISTENING + } } - override fun asFlow(): Flow = delegate.asFlow() + override fun asFlow(): Flow = delegate.asFlow().filter(filter) - override fun subscribeImpl(eventClass: KClass, listener: Listener) { - delegate.subscribeImpl0(eventClass, listener) + override fun registerListener(eventClass: KClass, listener: Listener) { + delegate.registerListener0(eventClass, listener) } override fun createListener( @@ -747,7 +739,7 @@ private open class DelegateEventChannel( concurrencyKind: ConcurrencyKind, priority: EventPriority, listenerBlock: suspend (E) -> ListeningStatus - ): Listener = delegate.createListener0(coroutineContext, concurrencyKind, priority, listenerBlock) + ): Listener = delegate.createListener0(coroutineContext, concurrencyKind, priority, intercept(listenerBlock)) override fun context(vararg coroutineContexts: CoroutineContext): EventChannel { return delegate.context(*coroutineContexts) diff --git a/mirai-core-api/src/commonMain/kotlin/event/GlobalEventChannel.kt b/mirai-core-api/src/commonMain/kotlin/event/GlobalEventChannel.kt index 1cee00fc8..f5e518ef5 100644 --- a/mirai-core-api/src/commonMain/kotlin/event/GlobalEventChannel.kt +++ b/mirai-core-api/src/commonMain/kotlin/event/GlobalEventChannel.kt @@ -32,8 +32,8 @@ public object GlobalEventChannel : EventChannel(Event::class, EmptyCorout } override fun asFlow(): Flow = instance.asFlow() - override fun subscribeImpl(eventClass: KClass, listener: Listener) { - return instance.subscribeImpl0(eventClass, listener) + override fun registerListener(eventClass: KClass, listener: Listener) { + return instance.registerListener0(eventClass, listener) } override fun createListener( diff --git a/mirai-core/src/commonMain/kotlin/event/EventChannelImpl.kt b/mirai-core/src/commonMain/kotlin/event/EventChannelImpl.kt index 17fbde04f..913d1fa34 100644 --- a/mirai-core/src/commonMain/kotlin/event/EventChannelImpl.kt +++ b/mirai-core/src/commonMain/kotlin/event/EventChannelImpl.kt @@ -27,7 +27,14 @@ import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext import kotlin.reflect.KClass -internal open class EventChannelImpl( +// You probably should only use EventChannelToEventDispatcherAdapter.instance, or just use EventDispatchers. Event.broadcast is also good to use internally! +@RequiresOptIn( + "Every EventChannelImpl has dedicated EventListeners registries. Use the constructor only when you know what you are doing.", + level = RequiresOptIn.Level.ERROR +) +internal annotation class DangerousEventChannelImplConstructor + +internal open class EventChannelImpl @DangerousEventChannelImplConstructor constructor( baseEventClass: KClass, defaultCoroutineContext: CoroutineContext = EmptyCoroutineContext ) : EventChannel(baseEventClass, defaultCoroutineContext) { val eventListeners = EventListeners() @@ -50,7 +57,7 @@ internal open class EventChannelImpl( return flow.asSharedFlow().filter { baseEventClass.isInstance(it) } as Flow } - override fun subscribeImpl(eventClass: KClass, listener: Listener) { + override fun registerListener(eventClass: KClass, listener: Listener) { eventListeners.addListener(eventClass, listener) } @@ -64,7 +71,7 @@ internal open class EventChannelImpl( return SafeListener( parentJob = context[Job], subscriberContext = context, - listenerBlock = intercept(listenerBlock), + listenerBlock = listenerBlock, concurrencyKind = concurrencyKind, priority = priority ) @@ -114,11 +121,54 @@ internal open class EventChannelImpl( } override fun context(vararg coroutineContexts: CoroutineContext): EventChannel { - return EventChannelImpl( - baseEventClass, - coroutineContexts.fold(defaultCoroutineContext) { acc, coroutineContext -> - acc + coroutineContext - }) + val newDefaultContext = coroutineContexts.fold(defaultCoroutineContext) { acc, coroutineContext -> + acc + coroutineContext + } + + @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") + return object : DelegateEventChannel(this) { + override fun createListener( + coroutineContext: CoroutineContext, + concurrencyKind: ConcurrencyKind, + priority: EventPriority, + listenerBlock: suspend (E) -> ListeningStatus + ): Listener { + return super.createListener( + newDefaultContext + coroutineContext, + concurrencyKind, + priority, + listenerBlock + ) + } + + override fun context(vararg coroutineContexts: CoroutineContext): EventChannel { + return delegate.context(newDefaultContext, *coroutineContexts) + } + } + } +} + + +internal abstract class DelegateEventChannel( + protected val delegate: EventChannel, +) : EventChannel(delegate.baseEventClass, delegate.defaultCoroutineContext) { + override fun asFlow(): Flow = delegate.asFlow() + + @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") + override fun registerListener(eventClass: KClass, listener: Listener) { + delegate.registerListener0(eventClass, listener) + } + + @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") + override fun createListener( + coroutineContext: CoroutineContext, + concurrencyKind: ConcurrencyKind, + priority: EventPriority, + listenerBlock: suspend (E) -> ListeningStatus + ): Listener = delegate.createListener0(coroutineContext, concurrencyKind, priority, listenerBlock) + + override fun context(vararg coroutineContexts: CoroutineContext): EventChannel { + return delegate.context(*coroutineContexts) } } diff --git a/mirai-core/src/commonMain/kotlin/event/EventChannelToEventDispatcherAdapter.kt b/mirai-core/src/commonMain/kotlin/event/EventChannelToEventDispatcherAdapter.kt index 5b9bb80c4..01796d773 100644 --- a/mirai-core/src/commonMain/kotlin/event/EventChannelToEventDispatcherAdapter.kt +++ b/mirai-core/src/commonMain/kotlin/event/EventChannelToEventDispatcherAdapter.kt @@ -17,6 +17,7 @@ import kotlin.reflect.KClass /** * @since 2.11 */ +@OptIn(DangerousEventChannelImplConstructor::class) internal class EventChannelToEventDispatcherAdapter private constructor( baseEventClass: KClass, defaultCoroutineContext: CoroutineContext = EmptyCoroutineContext ) : EventChannelImpl(baseEventClass, defaultCoroutineContext) { diff --git a/mirai-core/src/commonMain/kotlin/event/SafeListener.kt b/mirai-core/src/commonMain/kotlin/event/SafeListener.kt index 2f5c926ab..cdedf5b4b 100644 --- a/mirai-core/src/commonMain/kotlin/event/SafeListener.kt +++ b/mirai-core/src/commonMain/kotlin/event/SafeListener.kt @@ -42,22 +42,25 @@ internal class SafeListener internal constructor( // Inherit context. withContext(subscriberContext) { listenerBlock.invoke(event) }.also { if (it == ListeningStatus.STOPPED) this.complete() } } catch (e: Throwable) { - subscriberContext[CoroutineExceptionHandler]?.handleException(subscriberContext, e) - ?: currentCoroutineContext()[CoroutineExceptionHandler]?.handleException(subscriberContext, e) - ?: kotlin.run { - val logger = if (event is BotEvent) event.bot.logger else logger - val subscriberName = subscriberContext[CoroutineName]?.name ?: "" - val broadcasterName = currentCoroutineContext()[CoroutineName]?.name ?: "" - val message = - "An exception occurred when processing event. " + - "Subscriber scope: '$subscriberName'. " + - "Broadcaster scope: '$broadcasterName'" - logger.warning(message, e) - } - // this.complete() // do not `completeExceptionally`, otherwise parentJob will fai`l. - // ListeningStatus.STOPPED + // 若监听方使用了 EventChannel.exceptionHandler, 那么它就能处理异常, 否则将只记录异常. + val subscriberExceptionHandler = subscriberContext[CoroutineExceptionHandler] + if (subscriberExceptionHandler == null) { + val logger = if (event is BotEvent) event.bot.logger else logger + val subscriberName = + subscriberContext[CoroutineName]?.name + ?: "" // Bot 协程域有 CoroutineName, mirai-console 也会给插件域加入. + val broadcasterName = currentCoroutineContext()[CoroutineName]?.name ?: "" + val message = + "An exception occurred when processing event. " + + "Subscriber scope: '$subscriberName'. " + + "Broadcaster scope: '$broadcasterName'" + logger.warning(message, e) + + } else { + subscriberExceptionHandler.handleException(subscriberContext, e) + } + - // not stopping listening. ListeningStatus.LISTENING } } diff --git a/mirai-core/src/commonTest/kotlin/event/EventChannelTest.kt b/mirai-core/src/commonTest/kotlin/event/EventChannelTest.kt index 71927ba5a..55c34721b 100644 --- a/mirai-core/src/commonTest/kotlin/event/EventChannelTest.kt +++ b/mirai-core/src/commonTest/kotlin/event/EventChannelTest.kt @@ -9,11 +9,10 @@ package net.mamoe.mirai.internal.event +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Semaphore import net.mamoe.mirai.event.* import net.mamoe.mirai.event.events.FriendEvent @@ -23,21 +22,20 @@ import net.mamoe.mirai.event.events.MessageEvent import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import kotlin.coroutines.coroutineContext import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine -import kotlin.test.assertEquals -import kotlin.test.assertFailsWith -import kotlin.test.assertFalse -import kotlin.test.assertTrue +import kotlin.test.* internal class EventChannelTest : AbstractEventTest() { suspend fun suspendCall() { - + coroutineContext } data class TE( - val x: Int + val x: Int, + val y: Int = 1, ) : AbstractEvent() val semaphore = Semaphore(1) @@ -53,7 +51,7 @@ internal class EventChannelTest : AbstractEventTest() { } @Test - fun testFilter() { + fun singleFilter() { runBlocking { val received = suspendCoroutine { cont -> GlobalEventChannel @@ -84,6 +82,131 @@ internal class EventChannelTest : AbstractEventTest() { } } + @Test + fun multipleFilters() { + runBlocking { + val received = suspendCoroutine { cont -> + GlobalEventChannel + .filterIsInstance() + .filter { + true + } + .filter { + it.x == 2 + } + .filter { + it.y == 2 + } + .filter { + true + } + .subscribeOnce { + cont.resume(it.x) + } + + launch { + println("Broadcast 1") + TE(1, 1).broadcast() + println("Broadcast 2") + TE(2, 1).broadcast() + println("Broadcast 2") + TE(2, 3).broadcast() + println("Broadcast 2") + TE(2, 2).broadcast() + println("Broadcast done") + } + } + + assertEquals(2, received) + } + } + + @Test + fun multipleContexts1() { + runBlocking { + val received = suspendCoroutine { cont -> + GlobalEventChannel + .parentScope(CoroutineScope(CoroutineName("1"))) + .context(CoroutineName("2")) + .context(CoroutineName("3")) + .subscribeOnce(CoroutineName("4")) { + assertEquals("4", currentCoroutineContext()[CoroutineName]!!.name) + cont.resume(it.x) + } + + launch { + TE(2, 2).broadcast() + } + } + + assertEquals(2, received) + } + } + + @Test + fun multipleContexts2() { + runBlocking { + val received = suspendCoroutine { cont -> + GlobalEventChannel + .parentScope(CoroutineScope(CoroutineName("1"))) + .context(CoroutineName("2")) + .context(CoroutineName("3")) + .subscribeOnce { + assertEquals("3", currentCoroutineContext()[CoroutineName]!!.name) + cont.resume(it.x) + } + + launch { + TE(2, 2).broadcast() + } + } + + assertEquals(2, received) + } + } + + + @Test + fun multipleContexts3() { + runBlocking { + val received = suspendCoroutine { cont -> + GlobalEventChannel + .parentScope(CoroutineScope(CoroutineName("1"))) + .context(CoroutineName("2")) + .subscribeOnce { + assertEquals("2", currentCoroutineContext()[CoroutineName]!!.name) + cont.resume(it.x) + } + + launch { + TE(2, 2).broadcast() + } + } + + assertEquals(2, received) + } + } + + @Test + fun multipleContexts4() { + runBlocking { + val received = suspendCoroutine { cont -> + GlobalEventChannel + .parentScope(CoroutineScope(CoroutineName("1"))) + .subscribeOnce { + assertEquals("1", currentCoroutineContext()[CoroutineName]!!.name) + cont.resume(it.x) + } + + launch { + TE(2, 2).broadcast() + } + } + + assertEquals(2, received) + } + } + @Test fun testAsChannel() { runBlocking { @@ -193,11 +316,14 @@ internal class EventChannelTest : AbstractEventTest() { runBlocking { assertFailsWith { suspendCoroutine { cont -> + val handler = CoroutineExceptionHandler { _, throwable -> + cont.resumeWithException(throwable) + } + GlobalEventChannel - .exceptionHandler { - cont.resumeWithException(it) - } + .context(handler) .subscribeOnce { + assertSame(handler, currentCoroutineContext()[CoroutineExceptionHandler]) error("test error") } @@ -217,7 +343,7 @@ internal class EventChannelTest : AbstractEventTest() { @Test fun testVariance() { var global: EventChannel = GlobalEventChannel - var a: EventChannel = global.filterIsInstance() + val a: EventChannel = global.filterIsInstance() val filterLambda: (ev: MessageEvent) -> Boolean = { true }