Deprecate experimental EventChannel.asChannel and add EventChannel.forwardToChannel (#1753)

* Deprecate experimental `EventChannel.asChannel` and add `EventChannel.forwardToChannel`

* Remove redundant opt-ins
This commit is contained in:
Him188 2021-12-22 19:02:11 +00:00 committed by GitHub
parent 45e3f58017
commit 26c099798b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 97 additions and 17 deletions

View File

@ -1705,6 +1705,8 @@ public class net/mamoe/mirai/event/EventChannel {
public final synthetic fun filter (Lkotlin/jvm/functions/Function2;)Lnet/mamoe/mirai/event/EventChannel;
public final fun filterIsInstance (Ljava/lang/Class;)Lnet/mamoe/mirai/event/EventChannel;
public final fun filterIsInstance (Lkotlin/reflect/KClass;)Lnet/mamoe/mirai/event/EventChannel;
public final fun forwardToChannel (Lkotlinx/coroutines/channels/SendChannel;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/EventPriority;)Lnet/mamoe/mirai/event/Listener;
public static synthetic fun forwardToChannel$default (Lnet/mamoe/mirai/event/EventChannel;Lkotlinx/coroutines/channels/SendChannel;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/EventPriority;ILjava/lang/Object;)Lnet/mamoe/mirai/event/Listener;
public final fun getBaseEventClass ()Lkotlin/reflect/KClass;
public final fun getDefaultCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
public final fun parentJob (Lkotlinx/coroutines/Job;)Lnet/mamoe/mirai/event/EventChannel;

View File

@ -1705,6 +1705,8 @@ public class net/mamoe/mirai/event/EventChannel {
public final synthetic fun filter (Lkotlin/jvm/functions/Function2;)Lnet/mamoe/mirai/event/EventChannel;
public final fun filterIsInstance (Ljava/lang/Class;)Lnet/mamoe/mirai/event/EventChannel;
public final fun filterIsInstance (Lkotlin/reflect/KClass;)Lnet/mamoe/mirai/event/EventChannel;
public final fun forwardToChannel (Lkotlinx/coroutines/channels/SendChannel;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/EventPriority;)Lnet/mamoe/mirai/event/Listener;
public static synthetic fun forwardToChannel$default (Lnet/mamoe/mirai/event/EventChannel;Lkotlinx/coroutines/channels/SendChannel;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/EventPriority;ILjava/lang/Object;)Lnet/mamoe/mirai/event/Listener;
public final fun getBaseEventClass ()Lkotlin/reflect/KClass;
public final fun getDefaultCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
public final fun parentJob (Lkotlinx/coroutines/Job;)Lnet/mamoe/mirai/event/EventChannel;

View File

@ -17,6 +17,8 @@ package net.mamoe.mirai.event
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.sync.Mutex
import net.mamoe.mirai.Bot
import net.mamoe.mirai.event.ConcurrencyKind.CONCURRENT
@ -26,10 +28,7 @@ import net.mamoe.mirai.internal.event.GlobalEventListeners
import net.mamoe.mirai.internal.event.Handler
import net.mamoe.mirai.internal.event.ListenerRegistry
import net.mamoe.mirai.internal.event.registerEventHandler
import net.mamoe.mirai.utils.MiraiExperimentalApi
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.cast
import net.mamoe.mirai.utils.runBIO
import net.mamoe.mirai.utils.*
import java.util.function.Consumer
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
@ -69,29 +68,53 @@ public open class EventChannel<out BaseEvent : Event> @JvmOverloads internal con
/**
* 创建事件监听并将监听结果发送在 [Channel]. 将返回值 [Channel] [关闭][Channel.close] 时将会同时关闭事件监听.
*
* 标注 [ExperimentalCoroutinesApi] 是因为使用了 [Channel.invokeOnClose]
*
* @param capacity Channel 容量. 详见 [Channel] 构造.
*
* @see subscribeAlways
* @see Channel
*/
@Deprecated(
"Please use forwardToChannel instead.",
replaceWith = ReplaceWith(
"Channel<BaseEvent>(capacity).apply { forwardToChannel(this, coroutineContext, priority) }",
"kotlinx.coroutines.channels.Channel"
),
level = DeprecationLevel.WARNING,
)
@DeprecatedSinceMirai(warningSince = "2.10.0-RC")
@MiraiExperimentalApi
@ExperimentalCoroutinesApi
public fun asChannel(
capacity: Int = Channel.RENDEZVOUS,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: ConcurrencyKind = CONCURRENT,
@Suppress("UNUSED_PARAMETER") concurrency: ConcurrencyKind = CONCURRENT,
priority: EventPriority = EventPriority.NORMAL,
): Channel<out BaseEvent> {
val channel = Channel<BaseEvent>(capacity)
val listener = subscribeAlways(baseEventClass, coroutineContext, concurrency, priority) { channel.send(it) }
channel.invokeOnClose {
if (it != null) listener.completeExceptionally(it)
else listener.complete()
}
): Channel<out BaseEvent> =
Channel<BaseEvent>(capacity).apply { forwardToChannel(this, coroutineContext, priority) }
return channel
/**
* 创建事件监听并将监听结果转发到 [channel]. [Channel.send] 抛出 [ClosedSendChannelException] 时停止 [Listener] 监听和转发.
*
* 返回创建的会转发监听到的所有事件到 [channel] [事件监听器][Listener]. [停止][Listener.complete] 该监听器会停止转发, 不会影响目标 [channel].
*
* [Channel.send] 挂起, 则监听器也会挂起, 也就可能会导致事件广播过程挂起.
*
* @see subscribeAlways
* @see Channel
* @since 2.10
*/
public fun forwardToChannel(
channel: SendChannel<@UnsafeVariance BaseEvent>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
priority: EventPriority = EventPriority.MONITOR,
): Listener<@UnsafeVariance BaseEvent> {
return subscribe(baseEventClass, coroutineContext, priority = priority) {
try {
channel.send(it)
ListeningStatus.LISTENING
} catch (_: ClosedSendChannelException) {
ListeningStatus.STOPPED
}
}
}
// region transforming operations

View File

@ -29,6 +29,8 @@ import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
import kotlin.test.assertTrue
internal class EventChannelTest : AbstractEventTest() {
suspend fun suspendCall() {
@ -89,10 +91,10 @@ internal class EventChannelTest : AbstractEventTest() {
}
}
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun testAsChannel() {
runBlocking {
@Suppress("DEPRECATION")
val channel = GlobalEventChannel
.filterIsInstance<TE>()
.filter { true }
@ -114,6 +116,57 @@ internal class EventChannelTest : AbstractEventTest() {
}
}
@Test
fun `test forwardToChannel`() {
runBlocking {
val channel = Channel<TE>(Channel.BUFFERED)
val listener = GlobalEventChannel
.filterIsInstance<TE>()
.filter { true }
.filter { it.x == 2 }
.filter { true }
.forwardToChannel(channel)
TE(1).broadcast()
TE(2).broadcast()
listener.complete()
TE(2).broadcast()
channel.close()
val list = channel.receiveAsFlow().toList()
assertEquals(1, list.size)
assertEquals(TE(2), list.single())
}
}
@Test
fun `test forwardToChannel listener completes if channel closed`() {
runBlocking {
val channel = Channel<TE>(Channel.BUFFERED)
val listener = GlobalEventChannel
.filterIsInstance<TE>()
.filter { true }
.filter { it.x == 2 }
.filter { true }
.forwardToChannel(channel)
TE(1).broadcast()
TE(2).broadcast()
channel.close()
assertTrue { listener.isActive }
TE(2).broadcast()
assertTrue { listener.isCompleted }
assertFalse { listener.isActive }
val list = channel.receiveAsFlow().toList()
assertEquals(1, list.size)
assertEquals(TE(2), list.single())
}
}
@Test
fun testExceptionInFilter() {
runBlocking {