Move event implementations to EventDispatcher and add EventChannel.asFlow.

This commit is contained in:
Him188 2022-03-06 12:53:31 +00:00
parent bc8fea2195
commit c192047361
32 changed files with 786 additions and 530 deletions

View File

@ -83,7 +83,7 @@ public abstract interface class net/mamoe/mirai/IMirai : net/mamoe/mirai/LowLeve
public fun acceptNewFriendRequest (Lnet/mamoe/mirai/event/events/NewFriendRequestEvent;)V
public abstract fun acceptNewFriendRequest (Lnet/mamoe/mirai/event/events/NewFriendRequestEvent;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun broadcastEvent (Lnet/mamoe/mirai/event/Event;)V
public fun broadcastEvent (Lnet/mamoe/mirai/event/Event;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun broadcastEvent (Lnet/mamoe/mirai/event/Event;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun broadcastEvent$suspendImpl (Lnet/mamoe/mirai/IMirai;Lnet/mamoe/mirai/event/Event;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun calculateGroupCodeByGroupUin (J)J
public fun calculateGroupUinByGroupCode (J)J
@ -1704,6 +1704,8 @@ public final class net/mamoe/mirai/data/UserProfile$Sex : java/lang/Enum {
}
public abstract class net/mamoe/mirai/event/AbstractEvent : net/mamoe/mirai/event/Event {
public field _intercepted Z
public final field broadCastLock Lkotlinx/coroutines/sync/Mutex;
public fun <init> ()V
public final fun cancel ()V
public fun intercept ()V
@ -1732,10 +1734,11 @@ public abstract interface class net/mamoe/mirai/event/Event {
public abstract fun isIntercepted ()Z
}
public class net/mamoe/mirai/event/EventChannel {
public fun <init> (Lkotlin/reflect/KClass;)V
public abstract class net/mamoe/mirai/event/EventChannel {
public static synthetic fun asChannel$default (Lnet/mamoe/mirai/event/EventChannel;ILkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
public final fun context ([Lkotlin/coroutines/CoroutineContext;)Lnet/mamoe/mirai/event/EventChannel;
public abstract fun asFlow ()Lkotlinx/coroutines/flow/Flow;
public abstract fun context ([Lkotlin/coroutines/CoroutineContext;)Lnet/mamoe/mirai/event/EventChannel;
protected abstract fun createListener (Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function2;)Lnet/mamoe/mirai/event/Listener;
public final fun exceptionHandler (Lkotlin/jvm/functions/Function1;)Lnet/mamoe/mirai/event/EventChannel;
public final fun exceptionHandler (Lkotlinx/coroutines/CoroutineExceptionHandler;)Lnet/mamoe/mirai/event/EventChannel;
public final fun filter (Lkotlin/jvm/functions/Function1;)Lnet/mamoe/mirai/event/EventChannel;
@ -1765,6 +1768,7 @@ public class net/mamoe/mirai/event/EventChannel {
public final synthetic fun subscribeAlways (Lkotlin/reflect/KClass;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function3;)Lnet/mamoe/mirai/event/Listener;
public static synthetic fun subscribeAlways$default (Lnet/mamoe/mirai/event/EventChannel;Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Ljava/util/function/Consumer;ILjava/lang/Object;)Lnet/mamoe/mirai/event/Listener;
public static synthetic fun subscribeAlways$default (Lnet/mamoe/mirai/event/EventChannel;Lkotlin/reflect/KClass;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function3;ILjava/lang/Object;)Lnet/mamoe/mirai/event/Listener;
protected abstract fun subscribeImpl (Lkotlin/reflect/KClass;Lnet/mamoe/mirai/event/Listener;)V
public final fun subscribeOnce (Ljava/lang/Class;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener;
public final fun subscribeOnce (Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener;
public final fun subscribeOnce (Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener;
@ -1793,14 +1797,12 @@ public final class net/mamoe/mirai/event/EventKt {
}
public final class net/mamoe/mirai/event/EventPriority : java/lang/Enum {
public static final field Companion Lnet/mamoe/mirai/event/EventPriority$Companion;
public static final field HIGH Lnet/mamoe/mirai/event/EventPriority;
public static final field HIGHEST Lnet/mamoe/mirai/event/EventPriority;
public static final field LOW Lnet/mamoe/mirai/event/EventPriority;
public static final field LOWEST Lnet/mamoe/mirai/event/EventPriority;
public static final field MONITOR Lnet/mamoe/mirai/event/EventPriority;
public static final field NORMAL Lnet/mamoe/mirai/event/EventPriority;
public static final fun getPrioritiesExcludedMonitor$mirai_core_api ()[Lnet/mamoe/mirai/event/EventPriority;
public static fun valueOf (Ljava/lang/String;)Lnet/mamoe/mirai/event/EventPriority;
public static fun values ()[Lnet/mamoe/mirai/event/EventPriority;
}
@ -1833,6 +1835,8 @@ public final class net/mamoe/mirai/event/ExtensionsKt {
public final class net/mamoe/mirai/event/GlobalEventChannel : net/mamoe/mirai/event/EventChannel {
public static final field INSTANCE Lnet/mamoe/mirai/event/GlobalEventChannel;
public fun asFlow ()Lkotlinx/coroutines/flow/Flow;
public fun context ([Lkotlin/coroutines/CoroutineContext;)Lnet/mamoe/mirai/event/EventChannel;
}
public abstract interface class net/mamoe/mirai/event/Listener : kotlinx/coroutines/CompletableJob {

View File

@ -83,7 +83,7 @@ public abstract interface class net/mamoe/mirai/IMirai : net/mamoe/mirai/LowLeve
public fun acceptNewFriendRequest (Lnet/mamoe/mirai/event/events/NewFriendRequestEvent;)V
public abstract fun acceptNewFriendRequest (Lnet/mamoe/mirai/event/events/NewFriendRequestEvent;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun broadcastEvent (Lnet/mamoe/mirai/event/Event;)V
public fun broadcastEvent (Lnet/mamoe/mirai/event/Event;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun broadcastEvent (Lnet/mamoe/mirai/event/Event;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun broadcastEvent$suspendImpl (Lnet/mamoe/mirai/IMirai;Lnet/mamoe/mirai/event/Event;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun calculateGroupCodeByGroupUin (J)J
public fun calculateGroupUinByGroupCode (J)J
@ -1704,6 +1704,8 @@ public final class net/mamoe/mirai/data/UserProfile$Sex : java/lang/Enum {
}
public abstract class net/mamoe/mirai/event/AbstractEvent : net/mamoe/mirai/event/Event {
public field _intercepted Z
public final field broadCastLock Lkotlinx/coroutines/sync/Mutex;
public fun <init> ()V
public final fun cancel ()V
public fun intercept ()V
@ -1732,10 +1734,11 @@ public abstract interface class net/mamoe/mirai/event/Event {
public abstract fun isIntercepted ()Z
}
public class net/mamoe/mirai/event/EventChannel {
public fun <init> (Lkotlin/reflect/KClass;)V
public abstract class net/mamoe/mirai/event/EventChannel {
public static synthetic fun asChannel$default (Lnet/mamoe/mirai/event/EventChannel;ILkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
public final fun context ([Lkotlin/coroutines/CoroutineContext;)Lnet/mamoe/mirai/event/EventChannel;
public abstract fun asFlow ()Lkotlinx/coroutines/flow/Flow;
public abstract fun context ([Lkotlin/coroutines/CoroutineContext;)Lnet/mamoe/mirai/event/EventChannel;
protected abstract fun createListener (Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function2;)Lnet/mamoe/mirai/event/Listener;
public final fun exceptionHandler (Lkotlin/jvm/functions/Function1;)Lnet/mamoe/mirai/event/EventChannel;
public final fun exceptionHandler (Lkotlinx/coroutines/CoroutineExceptionHandler;)Lnet/mamoe/mirai/event/EventChannel;
public final fun filter (Lkotlin/jvm/functions/Function1;)Lnet/mamoe/mirai/event/EventChannel;
@ -1765,6 +1768,7 @@ public class net/mamoe/mirai/event/EventChannel {
public final synthetic fun subscribeAlways (Lkotlin/reflect/KClass;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function3;)Lnet/mamoe/mirai/event/Listener;
public static synthetic fun subscribeAlways$default (Lnet/mamoe/mirai/event/EventChannel;Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Ljava/util/function/Consumer;ILjava/lang/Object;)Lnet/mamoe/mirai/event/Listener;
public static synthetic fun subscribeAlways$default (Lnet/mamoe/mirai/event/EventChannel;Lkotlin/reflect/KClass;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Lnet/mamoe/mirai/event/EventPriority;Lkotlin/jvm/functions/Function3;ILjava/lang/Object;)Lnet/mamoe/mirai/event/Listener;
protected abstract fun subscribeImpl (Lkotlin/reflect/KClass;Lnet/mamoe/mirai/event/Listener;)V
public final fun subscribeOnce (Ljava/lang/Class;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener;
public final fun subscribeOnce (Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener;
public final fun subscribeOnce (Ljava/lang/Class;Lkotlin/coroutines/CoroutineContext;Lnet/mamoe/mirai/event/ConcurrencyKind;Ljava/util/function/Consumer;)Lnet/mamoe/mirai/event/Listener;
@ -1793,14 +1797,12 @@ public final class net/mamoe/mirai/event/EventKt {
}
public final class net/mamoe/mirai/event/EventPriority : java/lang/Enum {
public static final field Companion Lnet/mamoe/mirai/event/EventPriority$Companion;
public static final field HIGH Lnet/mamoe/mirai/event/EventPriority;
public static final field HIGHEST Lnet/mamoe/mirai/event/EventPriority;
public static final field LOW Lnet/mamoe/mirai/event/EventPriority;
public static final field LOWEST Lnet/mamoe/mirai/event/EventPriority;
public static final field MONITOR Lnet/mamoe/mirai/event/EventPriority;
public static final field NORMAL Lnet/mamoe/mirai/event/EventPriority;
public static final fun getPrioritiesExcludedMonitor$mirai_core_api ()[Lnet/mamoe/mirai/event/EventPriority;
public static fun valueOf (Ljava/lang/String;)Lnet/mamoe/mirai/event/EventPriority;
public static fun values ()[Lnet/mamoe/mirai/event/EventPriority;
}
@ -1833,6 +1835,8 @@ public final class net/mamoe/mirai/event/ExtensionsKt {
public final class net/mamoe/mirai/event/GlobalEventChannel : net/mamoe/mirai/event/EventChannel {
public static final field INSTANCE Lnet/mamoe/mirai/event/GlobalEventChannel;
public fun asFlow ()Lkotlinx/coroutines/flow/Flow;
public fun context ([Lkotlin/coroutines/CoroutineContext;)Lnet/mamoe/mirai/event/EventChannel;
}
public abstract interface class net/mamoe/mirai/event/Listener : kotlinx/coroutines/CompletableJob {

View File

@ -20,7 +20,6 @@ import me.him188.kotlin.jvm.blocking.bridge.JvmBlockingBridge
import net.mamoe.mirai.contact.*
import net.mamoe.mirai.data.UserProfile
import net.mamoe.mirai.event.Event
import net.mamoe.mirai.event._EventBroadcast
import net.mamoe.mirai.event.broadcast
import net.mamoe.mirai.event.events.BotInvitedJoinGroupRequestEvent
import net.mamoe.mirai.event.events.MemberJoinRequestEvent
@ -315,9 +314,7 @@ public interface IMirai : LowLevelApiAccessor {
/**
* 广播一个事件. [Event.broadcast] 调用.
*/
public suspend fun broadcastEvent(event: Event) {
_EventBroadcast.implementation.broadcastImpl(event)
}
public suspend fun broadcastEvent(event: Event)
}
/**

View File

@ -1,10 +1,10 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
@file:Suppress("unused")
@ -12,33 +12,35 @@
package net.mamoe.mirai.event
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import me.him188.kotlin.jvm.blocking.bridge.JvmBlockingBridge
import net.mamoe.mirai.IMirai
import net.mamoe.mirai.Mirai
import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.event.events.MessageEvent
import net.mamoe.mirai.internal.event.VerboseEvent
import net.mamoe.mirai.internal.event.callAndRemoveIfRequired
import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.utils.*
/**
* 可被监听的类, 可以是任何 class object.
* 表示一个事件.
*
* 若监听这个类, 监听器将会接收所有事件的广播.
* 实现时应继承 [AbstractEvent] 而不要直接实现 [Event]. 否则将无法广播.
*
* 所有 [Event] 都应继承 [AbstractEvent] 而不要直接实现 [Event]. 否则将无法广播也无法监听.
* ## 广播事件
*
* ### 广播
* 广播事件的唯一方式为 [broadcast].
* 使用 [Event.broadcast] [IMirai.broadcastEvent].
*
* @see EventChannel.subscribeAlways
* @see EventChannel.subscribeOnce
* Kotlin:
* ```
* val event: Event = ...
* event.broadcast()
* ```
*
* @see EventChannel.subscribeMessages
* Java:
* ```
* Event event = ...;
* Mirai.getInstance().broadcastEvent(event);
* ```
*
* @see [broadcast] 广播事件
* @see [EventChannel.subscribe] 监听事件
* ## 监听事件
*
* 参阅 [EventChannel].
*
* @see CancellableEvent 可被取消的事件
*/
@ -72,12 +74,14 @@ public interface Event {
public abstract class AbstractEvent : Event {
/** 限制一个事件实例不能并行广播. (适用于 object 广播的情况) */
@JvmField
internal val broadCastLock = Mutex()
@MiraiInternalApi
public val broadCastLock: Mutex = Mutex()
@Suppress("PropertyName")
@JvmField
@Volatile
internal var _intercepted = false
@MiraiInternalApi
public var _intercepted: Boolean = false
@Volatile
private var _cancelled = false
@ -104,6 +108,7 @@ public abstract class AbstractEvent : Event {
/**
* @see CancellableEvent.cancel
* @throws IllegalStateException 当事件未实现接口 [CancellableEvent] 时抛出
*/
public fun cancel() {
check(this is CancellableEvent) {
@ -128,8 +133,6 @@ public interface CancellableEvent : Event {
/**
* 取消这个事件.
* 事件需实现 [CancellableEvent] 接口才可以被取消
*
* @throws IllegalStateException 当事件未实现接口 [CancellableEvent] 时抛出
*/
public fun cancel()
}
@ -143,66 +146,16 @@ public interface CancellableEvent : Event {
* @see __broadcastJava Java 使用
*/
@JvmBlockingBridge
public suspend fun <E : Event> E.broadcast(): E = _EventBroadcast.implementation.broadcastPublic(this)
public suspend fun <E : Event> E.broadcast(): E {
@OptIn(TestOnly::class)
EventBroadcast.implementation.invoke(this)
return this
}
/**
* @since 2.7-M1
*/
@Suppress("ClassName")
internal open class _EventBroadcast {
companion object {
@Volatile
@JvmStatic
var implementation: _EventBroadcast = _EventBroadcast()
private val SHOW_VERBOSE_EVENT_ALWAYS = systemProp("mirai.event.show.verbose.events", false)
}
open suspend fun <E : Event> broadcastPublic(event: E): E = event.apply { Mirai.broadcastEvent(this) }
@JvmName("broadcastImpl") // avoid mangling
internal suspend fun <E : Event> broadcastImpl(event: E): E {
check(event is AbstractEvent) { "Events must extend AbstractEvent" }
if (event is BroadcastControllable && !event.shouldBroadcast) {
return event
}
event.broadCastLock.withLock {
event._intercepted = false
if (EventDisabled) return@withLock
logEvent(event)
callAndRemoveIfRequired(event)
}
return event
}
private fun isVerboseEvent(event: Event): Boolean {
if (SHOW_VERBOSE_EVENT_ALWAYS) return false
if (event is VerboseEvent) {
if (event is BotEvent) {
return !event.bot.configuration.isShowingVerboseEventLog
}
return true
}
return false
}
private fun logEvent(event: Event) {
if (event is Packet.NoEventLog) return
if (event is Packet.NoLog) return
if (event is MessageEvent) return // specially handled in [LoggingPacketHandlerAdapter]
// if (this is Packet) return@withLock // all [Packet]s are logged in [LoggingPacketHandlerAdapter]
if (isVerboseEvent(event)) return
if (event is BotEvent) {
event.bot.logger.verbose { "Event: $event" }
} else {
topLevelEventLogger.verbose { "Event: $event" }
}
}
private val topLevelEventLogger by lazy { MiraiLogger.Factory.create(Event::class, "EventPipeline") }
// 由测试覆盖
@TestOnly
internal object EventBroadcast {
internal val implementation: suspend (Event) -> Unit = { Mirai.broadcastEvent(it) }
}
/**
@ -210,6 +163,11 @@ internal open class _EventBroadcast {
* 所有的 `subscribe` 都能正常添加到监听器列表, 但所有的广播都会直接返回.
*/
@MiraiExperimentalApi
@Deprecated(
"Deprecated without replacement. If you really need this, please file an issue.",
level = DeprecationLevel.WARNING
)
@DeprecatedSinceMirai(warningSince = "2.11")
public var EventDisabled: Boolean = false
/**

View File

@ -1,48 +1,58 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "MemberVisibilityCanBePrivate", "unused")
@file:JvmMultifileClass
@file:JvmName("EventChannelKt")
package net.mamoe.mirai.event
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.sync.Mutex
import net.mamoe.mirai.Bot
import net.mamoe.mirai.IMirai
import net.mamoe.mirai.event.ConcurrencyKind.CONCURRENT
import net.mamoe.mirai.event.ConcurrencyKind.LOCKED
import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.internal.event.GlobalEventListeners
import net.mamoe.mirai.internal.event.Handler
import net.mamoe.mirai.internal.event.ListenerRegistry
import net.mamoe.mirai.internal.event.registerEventHandler
import net.mamoe.mirai.utils.*
import java.util.function.Consumer
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.internal.LowPriorityInOverloadResolution
import kotlin.reflect.KClass
/**
* 事件通道. 事件通道是监听事件的入口. **在不同的事件通道中可以监听到不同类型的事件**.
* 事件通道.
*
* [GlobalEventChannel] 是最大的通道: 所有的事件都可以在 [GlobalEventChannel] 监听到.
* 通过 [Bot.eventChannel] 得到的通道只能监听到来自这个 [Bot] 的事件.
* 事件通道是监听事件的入口, 但不负责广播事件. 要广播事件, 使用 [Event.broadcast] [IMirai.broadcastEvent].
*
* ## 获取事件通道
*
* [EventChannel] 不可自行构造, 只能通过 [GlobalEventChannel], [BotEvent], 或基于一个通道的过滤等操作获得.
*
* ### 全局事件通道
*
* [GlobalEventChannel] 是单例对象, 表示全局事件通道, 可以获取到在其中广播的所有事件.
*
* ### [BotEvent] 事件通道
*
* 若只需要监听某个 [Bot] 的事件, 可通过 [Bot.eventChannel] 获取到这样的 [EventChannel].
*
* ## 通道操作
*
* ### 对通道的操作
* - "缩窄" 通道: 通过 [EventChannel.filter]. 例如 `filter { it is BotEvent }` 得到一个只能监听到 [BotEvent] 的事件通道.
* - 过滤通道: 通过 [EventChannel.filter]. 例如 `filter { it is BotEvent }` 得到一个只能监听到 [BotEvent] 的事件通道.
* - 转换为 Kotlin 协程 [Channel]: [EventChannel.asChannel]
* - 添加 [CoroutineContext]: [context], [parentJob], [parentScope], [exceptionHandler]
*
@ -51,20 +61,31 @@ import kotlin.reflect.KClass
* - [EventChannel.subscribeAlways] 创建一个总是监听事件的事件监听器.
* - [EventChannel.subscribeOnce] 创建一个只监听单次的事件监听器.
*
* ### 获取事件通道
* - 全局事件通道: [GlobalEventChannel]
* - [BotEvent] 通道: [Bot.eventChannel]
* ### 监听器生命周期
*
* @see subscribe
* 阅读 [EventChannel.subscribe] 以获取监听器生命周期相关信息.
*
* ## kotlinx-coroutines 交互
*
* mirai [EventChannel] 设计比 kotlinx-coroutines [Flow] 稳定版更早.
* [EventChannel] 的功能与 [Flow] 类似, 不过 [EventChannel] [subscribe] (类似 [Flow.collect]) 时有优先级判定, 也允许[拦截][Event.intercept].
*
* ### 通过 [Flow] 接收事件
*
* 使用 [EventChannel.asFlow] 获得 [Flow], 然后可使用 [Flow.collect] 等操作.
*
* ### 转发事件到 [SendChannel]
*
* 使用 [EventChannel.forwardToChannel] 可将事件转发到指定 [SendChannel].
*/
public open class EventChannel<out BaseEvent : Event> @JvmOverloads internal constructor(
@NotStableForInheritance // since 2.11, before it was `final class`.
public abstract class EventChannel<out BaseEvent : Event> @MiraiInternalApi public constructor(
public val baseEventClass: KClass<out BaseEvent>,
/**
* 此事件通道的默认 [CoroutineScope.coroutineContext]. 将会被添加给所有注册的事件监听器.
*/
public val defaultCoroutineContext: CoroutineContext = EmptyCoroutineContext,
public val defaultCoroutineContext: CoroutineContext,
) {
/**
* 创建事件监听并将监听结果发送在 [Channel]. 将返回值 [Channel] [关闭][Channel.close] 时将会同时关闭事件监听.
*
@ -98,6 +119,17 @@ public open class EventChannel<out BaseEvent : Event> @JvmOverloads internal con
*
* [Channel.send] 挂起, 则监听器也会挂起, 也就可能会导致事件广播过程挂起.
*
* 示例:
*
* ```
* val eventChannel: EventChannel<BotEvent> = ...
* val channel = Channel<BotEvent>() // kotlinx.coroutines.channels.Channel
* eventChannel.forwardToChannel(channel, priority = ...)
*
* // 其他地方
* val event: BotEvent = channel.receive() // 挂起并接收一个事件
* ```
*
* @see subscribeAlways
* @see Channel
* @since 2.10
@ -117,6 +149,35 @@ public open class EventChannel<out BaseEvent : Event> @JvmOverloads internal con
}
}
/**
* 通过 [Flow] 接收此通道内的所有事件.
*
* ```
* val eventChannel: EventChannel<BotEvent> = ...
* val flow: Flow<BotEvent> = eventChannel.asFlow()
*
* flow.collect { // it
* //
* }
*
* flow.filterIsInstance<GroupMessageEvent>.collect { // it: GroupMessageEvent
* // 处理事件 ...
* }
*
* flow.filterIsInstance<FriendMessageEvent>.collect { // it: FriendMessageEvent
* // 处理事件 ...
* }
* ```
*
* 类似于 [SharedFlow], [EventChannel.asFlow] 返回的 [Flow] 永远都不会停止. 因此上述示例 [Flow.collect] 永远都不会正常 (以抛出异常之外的) 结束.
*
* 通过 [asFlow] 接收事件相当于通过 [subscribeAlways] [EventPriority.MONITOR] 监听事件.
*
* @see Flow
* @since 2.11
*/
public abstract fun asFlow(): Flow<BaseEvent>
// region transforming operations
/**
@ -153,23 +214,20 @@ public open class EventChannel<out BaseEvent : Event> @JvmOverloads internal con
*/
@JvmSynthetic
public fun filter(filter: suspend (event: BaseEvent) -> Boolean): EventChannel<BaseEvent> {
val parent = this
return object : EventChannel<BaseEvent>(baseEventClass, defaultCoroutineContext) {
private inline val innerThis get() = this
override fun <E : Event> (suspend (E) -> ListeningStatus).intercepted(): suspend (E) -> ListeningStatus {
return object : DelegateEventChannel<BaseEvent>(this) {
override fun <E : Event> intercept(block: suspend (E) -> ListeningStatus): suspend (E) -> ListeningStatus {
val thisIntercepted: suspend (E) -> ListeningStatus = { ev ->
val filterResult = try {
@Suppress("UNCHECKED_CAST")
baseEventClass.isInstance(ev) && filter(ev as BaseEvent)
} catch (e: Throwable) {
if (e is ExceptionInEventChannelFilterException) throw e // wrapped by another filter
throw ExceptionInEventChannelFilterException(ev, innerThis, cause = e)
throw ExceptionInEventChannelFilterException(ev, this, cause = e)
}
if (filterResult) this@intercepted.invoke(ev)
if (filterResult) block.invoke(ev)
else ListeningStatus.LISTENING
}
return parent.intercept(thisIntercepted)
return delegate.intercept(thisIntercepted)
}
}
}
@ -246,23 +304,14 @@ public open class EventChannel<out BaseEvent : Event> @JvmOverloads internal con
*
* 此操作不会修改 [`this.coroutineContext`][defaultCoroutineContext], 只会创建一个新的 [EventChannel].
*/
public fun context(vararg coroutineContexts: CoroutineContext): EventChannel<BaseEvent> {
val origin = this
return object : EventChannel<BaseEvent>(
baseEventClass,
coroutineContexts.fold(this.defaultCoroutineContext) { acc, element -> acc + element }
) {
override fun <E : Event> (suspend (E) -> ListeningStatus).intercepted(): suspend (E) -> ListeningStatus {
return origin.intercept(this)
}
}
}
public abstract fun context(vararg coroutineContexts: CoroutineContext): EventChannel<BaseEvent>
/**
* 创建一个新的 [EventChannel], [EventChannel] 包含 [this.coroutineContext][defaultCoroutineContext] 和添加的 [coroutineExceptionHandler]
* @see context
*/
@LowPriorityInOverloadResolution
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
@kotlin.internal.LowPriorityInOverloadResolution
public fun exceptionHandler(coroutineExceptionHandler: CoroutineExceptionHandler): EventChannel<BaseEvent> {
return context(coroutineExceptionHandler)
}
@ -561,7 +610,8 @@ public open class EventChannel<out BaseEvent : Event> @JvmOverloads internal con
* @see subscribeAlways
*/
@JvmOverloads
@LowPriorityInOverloadResolution
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
@kotlin.internal.LowPriorityInOverloadResolution
public fun <E : Event> subscribeAlways(
eventClass: Class<out E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -588,7 +638,8 @@ public open class EventChannel<out BaseEvent : Event> @JvmOverloads internal con
* @see subscribe
*/
@JvmOverloads
@LowPriorityInOverloadResolution
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
@kotlin.internal.LowPriorityInOverloadResolution
public fun <E : Event> subscribe(
eventClass: Class<out E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -613,7 +664,8 @@ public open class EventChannel<out BaseEvent : Event> @JvmOverloads internal con
* @see subscribeOnce
*/
@JvmOverloads
@LowPriorityInOverloadResolution
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
@kotlin.internal.LowPriorityInOverloadResolution
public fun <E : Event> subscribeOnce(
eventClass: Class<out E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -636,43 +688,68 @@ public open class EventChannel<out BaseEvent : Event> @JvmOverloads internal con
* 由子类实现可以为 handler 包装一个过滤器等. 每个 handler 都会经过此函数处理.
*/
@MiraiExperimentalApi
protected open fun <E : Event> (suspend (E) -> ListeningStatus).intercepted(): (suspend (E) -> ListeningStatus) {
return this
protected open fun <E : Event> intercept(block: suspend (E) -> ListeningStatus): suspend (E) -> ListeningStatus {
return block
}
private fun <E : Event> intercept(listener: (suspend (E) -> ListeningStatus)): suspend (E) -> ListeningStatus {
return listener.intercepted()
// to overcome visibility issue
internal fun <E : Event> intercept0(block: suspend (E) -> ListeningStatus): suspend (E) -> ListeningStatus =
intercept(block)
protected abstract fun <E : Event> subscribeImpl(eventClass: KClass<out E>, listener: Listener<E>)
// to overcome visibility issue
internal fun <E : Event> subscribeImpl0(eventClass: KClass<out E>, listener: Listener<E>) {
return subscribeImpl(eventClass, listener)
}
private fun <L : Listener<E>, E : Event> subscribeInternal(eventClass: KClass<out E>, listener: L): L {
with(GlobalEventListeners[listener.priority]) {
@Suppress("UNCHECKED_CAST")
val node = ListenerRegistry(listener as Listener<Event>, eventClass)
add(node)
listener.invokeOnCompletion {
this.remove(node)
}
}
subscribeImpl(eventClass, listener)
return listener
}
@Suppress("FunctionName")
private fun <E : Event> createListener(
protected abstract fun <E : Event> createListener(
coroutineContext: CoroutineContext,
concurrencyKind: ConcurrencyKind,
priority: EventPriority = EventPriority.NORMAL,
handler: suspend (E) -> ListeningStatus,
): Listener<E> {
val context = this.defaultCoroutineContext + coroutineContext
return Handler(
parentJob = context[Job],
subscriberContext = context,
handler = handler.intercepted(),
concurrencyKind = concurrencyKind,
priority = priority
)
}
priority: EventPriority,
listenerBlock: suspend (E) -> ListeningStatus,
): Listener<E>
// to overcome visibility issue
internal fun <E : Event> createListener0(
coroutineContext: CoroutineContext,
concurrencyKind: ConcurrencyKind,
priority: EventPriority,
listenerBlock: suspend (E) -> ListeningStatus,
): Listener<E> = createListener(coroutineContext, concurrencyKind, priority, listenerBlock)
// endregion
}
private open class DelegateEventChannel<BaseEvent : Event>(
protected val delegate: EventChannel<BaseEvent>,
) : EventChannel<BaseEvent>(delegate.baseEventClass, delegate.defaultCoroutineContext) {
private inline val innerThis get() = this
override fun <E : Event> intercept(block: suspend (E) -> ListeningStatus): suspend (E) -> ListeningStatus {
return delegate.intercept0(block)
}
override fun asFlow(): Flow<BaseEvent> = delegate.asFlow()
override fun <E : Event> subscribeImpl(eventClass: KClass<out E>, listener: Listener<E>) {
delegate.subscribeImpl0(eventClass, listener)
}
override fun <E : Event> createListener(
coroutineContext: CoroutineContext,
concurrencyKind: ConcurrencyKind,
priority: EventPriority,
listenerBlock: suspend (E) -> ListeningStatus
): Listener<E> = delegate.createListener0(coroutineContext, concurrencyKind, priority, listenerBlock)
override fun context(vararg coroutineContexts: CoroutineContext): EventChannel<BaseEvent> {
return delegate.context(*coroutineContexts)
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
@ -13,16 +13,40 @@
package net.mamoe.mirai.event
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import net.mamoe.mirai.Bot
import net.mamoe.mirai.utils.MiraiInternalApi
import net.mamoe.mirai.utils.loadService
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KClass
/**
* 全局事件通道. 此通道包含来自所有 [Bot] 的所有类型的事件. 可通过 [EventChannel.filter] 过滤得到范围更小的 [EventChannel].
*
* @see EventChannel
*/
public object GlobalEventChannel : EventChannel<Event>(Event::class, EmptyCoroutineContext)
public object GlobalEventChannel : EventChannel<Event>(Event::class, EmptyCoroutineContext) {
private val instance by lazy {
loadService(InternalGlobalEventChannelProvider::class).getInstance()
}
override fun asFlow(): Flow<Event> = instance.asFlow()
override fun <E : Event> subscribeImpl(eventClass: KClass<out E>, listener: Listener<E>) {
return instance.subscribeImpl0(eventClass, listener)
}
override fun <E : Event> createListener(
coroutineContext: CoroutineContext,
concurrencyKind: ConcurrencyKind,
priority: EventPriority,
listenerBlock: suspend (E) -> ListeningStatus
): Listener<E> = instance.createListener0(coroutineContext, concurrencyKind, priority, listenerBlock)
override fun context(vararg coroutineContexts: CoroutineContext): EventChannel<Event> {
return instance.context(*coroutineContexts)
}
}
/**
* 在此 [CoroutineScope] 下创建一个监听所有事件的 [EventChannel]. 相当于 `GlobalEventChannel.parentScope(this).context(coroutineContext)`.
@ -38,3 +62,11 @@ public fun CoroutineScope.globalEventChannel(coroutineContext: CoroutineContext
return if (coroutineContext === EmptyCoroutineContext) GlobalEventChannel.parentScope(this)
else GlobalEventChannel.parentScope(this).context(coroutineContext)
}
/**
* @since 2.11
*/
@MiraiInternalApi
public interface InternalGlobalEventChannelProvider {
public fun getInstance(): EventChannel<Event>
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
@ -49,8 +49,8 @@ public interface Listener<in E : Event> : CompletableJob {
// Impl notes:
// Inheriting CompletableJob is a bad idea. See #1224.
// However we cannot change it as it leads to binary changes.
// We can do it in 3.0 or when we found incompatibility with kotlinx.serialization.
// However, we cannot change it as it leads to binary changes.
// We can do it in 3.0 or when we found incompatibility with kotlinx.coroutines.
/**
* 并发类型
@ -108,11 +108,4 @@ public enum class EventPriority {
* - [拦截事件][Event.intercept]
*/
MONITOR;
internal companion object {
@JvmStatic
internal val prioritiesExcludedMonitor: Array<EventPriority> = run {
values().filter { it != MONITOR }.toTypedArray()
}
}
}

View File

@ -1,171 +0,0 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.event
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import net.mamoe.mirai.event.*
import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.lateinitMutableProperty
import net.mamoe.mirai.utils.systemProp
import java.util.*
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.coroutines.CoroutineContext
import kotlin.reflect.KClass
/**
* 事件处理器.
*/
internal class Handler<in E : Event> internal constructor(
parentJob: Job?,
subscriberContext: CoroutineContext,
@JvmField val handler: suspend (E) -> ListeningStatus,
override val concurrencyKind: ConcurrencyKind,
override val priority: EventPriority,
) : Listener<E>, CompletableJob by SupervisorJob(parentJob) { // avoid being cancelled on handling event
private val subscriberContext: CoroutineContext = subscriberContext + this // override Job.
val lock: Mutex? = when (concurrencyKind) {
ConcurrencyKind.LOCKED -> Mutex()
else -> null
}
@Suppress("unused")
override suspend fun onEvent(event: E): ListeningStatus {
if (isCompleted || isCancelled) return ListeningStatus.STOPPED
if (!isActive) return ListeningStatus.LISTENING
return try {
// Inherit context.
withContext(subscriberContext) { handler.invoke(event) }.also { if (it == ListeningStatus.STOPPED) this.complete() }
} catch (e: Throwable) {
subscriberContext[CoroutineExceptionHandler]?.handleException(subscriberContext, e)
?: currentCoroutineContext()[CoroutineExceptionHandler]?.handleException(subscriberContext, e)
?: kotlin.run {
val logger = if (event is BotEvent) event.bot.logger else logger
val subscriberName = subscriberContext[CoroutineName]?.name ?: "<unnamed>"
val broadcasterName = currentCoroutineContext()[CoroutineName]?.name ?: "<unnamed>"
val message =
"An exception occurred when processing event. " +
"Subscriber scope: '$subscriberName'. " +
"Broadcaster scope: '$broadcasterName'"
logger.warning(message, e)
}
// this.complete() // do not `completeExceptionally`, otherwise parentJob will fai`l.
// ListeningStatus.STOPPED
// not stopping listening.
ListeningStatus.LISTENING
}
}
companion object {
private val logger by lazy {
MiraiLogger.Factory.create(Handler::class)
}
}
}
internal class ListenerRegistry(
val listener: Listener<Event>,
val type: KClass<out Event>,
)
internal object GlobalEventListeners {
private val ALL_LEVEL_REGISTRIES: Map<EventPriority, ConcurrentLinkedQueue<ListenerRegistry>>
fun clear() {
ALL_LEVEL_REGISTRIES.forEach { (_, u) ->
u.clear()
}
}
init {
val map =
EnumMap<EventPriority, ConcurrentLinkedQueue<ListenerRegistry>>(EventPriority::class.java)
EventPriority.values().forEach {
map[it] = ConcurrentLinkedQueue()
}
ALL_LEVEL_REGISTRIES = map
}
operator fun get(priority: EventPriority): ConcurrentLinkedQueue<ListenerRegistry> =
ALL_LEVEL_REGISTRIES[priority]!!
}
internal suspend fun <E : AbstractEvent> callAndRemoveIfRequired(event: E) {
for (p in EventPriority.prioritiesExcludedMonitor) {
val container = GlobalEventListeners[p]
for (registry in container) {
if (event.isIntercepted) return
if (!registry.type.isInstance(event)) continue
val listener = registry.listener
process(container, registry, listener, event)
}
}
if (event.isIntercepted) return
val container = GlobalEventListeners[EventPriority.MONITOR]
when (container.size) {
0 -> return
1 -> {
val registry = container.firstOrNull() ?: return
if (!registry.type.isInstance(event)) return
process(container, registry, registry.listener, event)
}
else -> supervisorScope {
for (registry in GlobalEventListeners[EventPriority.MONITOR]) {
if (!registry.type.isInstance(event)) continue
launch(start = if (EVENT_LAUNCH_UNDISPATCHED) CoroutineStart.UNDISPATCHED else CoroutineStart.DEFAULT) {
process(container, registry, registry.listener, event)
}
}
}
}
}
/**
* If `true`, all event listeners runs directly in the broadcaster's thread until first suspension.
*
* If there is not suspension point in the listener, the coroutine executing [Event.broadcast] will not suspend,
* so the thread before and after execution will be the same and no other code is being executed if there is only one thread.
*
* This is useful for tests to not to depend on `delay`
*/
internal var EVENT_LAUNCH_UNDISPATCHED: Boolean by lateinitMutableProperty {
systemProp("mirai.event.launch.undispatched", false)
}
private suspend fun <E : AbstractEvent> process(
container: ConcurrentLinkedQueue<ListenerRegistry>,
registry: ListenerRegistry,
listener: Listener<Event>,
event: E,
) {
when (listener.concurrencyKind) {
ConcurrencyKind.LOCKED -> {
(listener as Handler).lock!!.withLock {
if (listener.onEvent(event) == ListeningStatus.STOPPED) {
container.remove(registry)
}
}
}
ConcurrencyKind.CONCURRENT -> {
if (listener.onEvent(event) == ListeningStatus.STOPPED) {
container.remove(registry)
}
}
}
}

View File

@ -1,28 +0,0 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.event
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
internal abstract class AbstractEventTest {
@BeforeEach
fun loadEventBroadcast() {
_EventBroadcast.implementation = object : _EventBroadcast() {
override suspend fun <E : Event> broadcastPublic(event: E): E =
broadcastImpl(event) // do not call MiraiImpl
}
}
@AfterEach
fun unloadEventBroadcast() {
_EventBroadcast.implementation = _EventBroadcast() // restore
}
}

View File

@ -1,38 +0,0 @@
/*
* Copyright 2019-2020 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.message
import kotlinx.coroutines.*
import net.mamoe.mirai.event.TestEvent
import net.mamoe.mirai.event.syncFromEvent
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.test.Test
import kotlin.test.assertFailsWith
internal class SubscribingGetTest {
@Test
fun testSyncFromEventTimeout() {
runBlockingWithTimeout(500) {
assertFailsWith<TimeoutCancellationException> {
syncFromEvent(100) { _: TestEvent -> }
}
}
}
}
internal fun <R> runBlockingWithTimeout(
millis: Long,
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> R
): R = runBlocking(context) {
withTimeout(millis, block)
}

View File

@ -14,6 +14,7 @@ import io.ktor.client.engine.okhttp.*
import io.ktor.client.features.*
import io.ktor.client.request.*
import io.ktor.client.request.forms.*
import io.ktor.util.*
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.io.core.discardExact
import kotlinx.io.core.readBytes
@ -32,10 +33,10 @@ import net.mamoe.mirai.internal.contact.info.FriendInfoImpl
import net.mamoe.mirai.internal.contact.info.FriendInfoImpl.Companion.impl
import net.mamoe.mirai.internal.contact.info.MemberInfoImpl
import net.mamoe.mirai.internal.contact.info.StrangerInfoImpl.Companion.impl
import net.mamoe.mirai.internal.event.EventChannelToEventDispatcherAdapter
import net.mamoe.mirai.internal.message.*
import net.mamoe.mirai.internal.message.DeepMessageRefiner.refineDeep
import net.mamoe.mirai.internal.network.components.EventDispatcher
import net.mamoe.mirai.internal.network.components.EventDispatcherScopeFlag
import net.mamoe.mirai.internal.network.highway.ChannelKind
import net.mamoe.mirai.internal.network.highway.ResourceKind
import net.mamoe.mirai.internal.network.highway.tryDownload
@ -299,17 +300,13 @@ internal open class MiraiImpl : IMirai, LowLevelApiAccessor {
solveInvitedJoinGroupRequest(event, accept = false)
override suspend fun broadcastEvent(event: Event) {
if (currentCoroutineContext()[EventDispatcherScopeFlag] != null) {
// called by [EventDispatcher]
return super.broadcastEvent(event)
}
if (event is BotEvent) {
val bot = event.bot
if (bot is AbstractBot) {
bot.components[EventDispatcher].broadcast(event)
}
} else {
super.broadcastEvent(event)
EventChannelToEventDispatcherAdapter.instance.callListeners(event)
}
}

View File

@ -0,0 +1,124 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.event
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.sync.withLock
import net.mamoe.mirai.event.*
import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.event.events.MessageEvent
import net.mamoe.mirai.internal.network.Packet
import net.mamoe.mirai.internal.network.components.SHOW_VERBOSE_EVENT
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.verbose
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KClass
internal open class EventChannelImpl<E : Event>(
baseEventClass: KClass<out E>, defaultCoroutineContext: CoroutineContext = EmptyCoroutineContext
) : EventChannel<E>(baseEventClass, defaultCoroutineContext) {
val eventListeners = EventListeners()
// drop any unsubscribed events
private val flow = MutableSharedFlow<Event>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
companion object {
private val logger by lazy { MiraiLogger.Factory.create(EventChannelImpl::class, "EventChannelImpl") }
}
suspend fun callListeners(event: Event) {
event as AbstractEvent
eventListeners.callListeners(event)
flow.emit(event)
}
override fun asFlow(): Flow<E> {
@Suppress("UNCHECKED_CAST")
return flow.asSharedFlow().filter { baseEventClass.isInstance(it) } as Flow<E>
}
override fun <E : Event> subscribeImpl(eventClass: KClass<out E>, listener: Listener<E>) {
eventListeners.addListener(eventClass, listener)
}
override fun <E : Event> createListener(
coroutineContext: CoroutineContext,
concurrencyKind: ConcurrencyKind,
priority: EventPriority,
listenerBlock: suspend (E) -> ListeningStatus
): Listener<E> {
val context = this.defaultCoroutineContext + coroutineContext
return SafeListener(
parentJob = context[Job],
subscriberContext = context,
listenerBlock = intercept(listenerBlock),
concurrencyKind = concurrencyKind,
priority = priority
)
}
private suspend fun <E : Event> broadcastImpl(event: E): E {
check(event is AbstractEvent) { "Events must extend AbstractEvent" }
if (event is BroadcastControllable && !event.shouldBroadcast) {
return event
}
event.broadCastLock.withLock {
event._intercepted = false
@Suppress("DEPRECATION")
if (EventDisabled) return@withLock
logEvent(event)
eventListeners.callListeners(event)
}
return event
}
private fun isVerboseEvent(event: Event): Boolean {
if (SHOW_VERBOSE_EVENT) return false
if (event is VerboseEvent) {
if (event is BotEvent) {
return !event.bot.configuration.isShowingVerboseEventLog
}
return true
}
return false
}
private fun logEvent(event: Event) {
if (event is Packet.NoEventLog) return
if (event is Packet.NoLog) return
if (event is MessageEvent) return // specially handled in [LoggingPacketHandlerAdapter]
// if (this is Packet) return@withLock // all [Packet]s are logged in [LoggingPacketHandlerAdapter]
if (isVerboseEvent(event)) return
if (event is BotEvent) {
event.bot.logger.verbose { "Event: $event" }
} else {
logger.verbose { "Event: $event" }
}
}
override fun context(vararg coroutineContexts: CoroutineContext): EventChannel<E> {
return EventChannelImpl(
baseEventClass,
coroutineContexts.fold(defaultCoroutineContext) { acc, coroutineContext ->
acc + coroutineContext
})
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.event
import net.mamoe.mirai.event.Event
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KClass
/**
* @since 2.11
*/
internal class EventChannelToEventDispatcherAdapter<E : Event> private constructor(
baseEventClass: KClass<out E>, defaultCoroutineContext: CoroutineContext = EmptyCoroutineContext
) : EventChannelImpl<E>(baseEventClass, defaultCoroutineContext) {
companion object {
val instance by lazy { EventChannelToEventDispatcherAdapter(Event::class, EmptyCoroutineContext) }
}
}

View File

@ -0,0 +1,119 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.event
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.sync.withLock
import net.mamoe.mirai.event.*
import net.mamoe.mirai.internal.network.components.EVENT_LAUNCH_UNDISPATCHED
import java.util.*
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.reflect.KClass
internal class ListenerRegistry(
val listener: Listener<Event>,
val type: KClass<out Event>,
)
internal class EventListeners {
private val map: Map<EventPriority, ConcurrentLinkedQueue<ListenerRegistry>>
init {
val map =
EnumMap<EventPriority, ConcurrentLinkedQueue<ListenerRegistry>>(EventPriority::class.java)
EventPriority.values().forEach {
map[it] = ConcurrentLinkedQueue()
}
this.map = map
}
fun clear() {
map.forEach { (_, u) ->
u.clear()
}
}
operator fun get(priority: EventPriority): MutableCollection<ListenerRegistry> =
map[priority] ?: error("Internal error: map[$priority] == null")
private val prioritiesExcludedMonitor: Array<EventPriority> = run {
EventPriority.values().filter { it != EventPriority.MONITOR }.toTypedArray()
}
internal suspend fun <E : AbstractEvent> callListeners(event: E) {
for (p in prioritiesExcludedMonitor) {
val container = get(p)
for (registry in container) {
if (event.isIntercepted) return
if (!registry.type.isInstance(event)) continue
val listener = registry.listener
process(container, registry, listener, event)
}
}
if (event.isIntercepted) return
val container = get(EventPriority.MONITOR)
when (container.size) {
0 -> return
1 -> {
val registry = container.firstOrNull() ?: return
if (!registry.type.isInstance(event)) return
process(container, registry, registry.listener, event)
}
else -> supervisorScope {
for (registry in get(EventPriority.MONITOR)) {
if (!registry.type.isInstance(event)) continue
launch(start = if (EVENT_LAUNCH_UNDISPATCHED) CoroutineStart.UNDISPATCHED else CoroutineStart.DEFAULT) {
process(container, registry, registry.listener, event)
}
}
}
}
}
internal fun <E : Event> addListener(eventClass: KClass<E>, listener: Listener<E>) {
val listeners = get(listener.priority)
@Suppress("UNCHECKED_CAST")
val node = ListenerRegistry(listener as Listener<Event>, eventClass)
listeners.add(node)
listener.invokeOnCompletion {
listeners.remove(node)
}
}
private suspend fun <E : AbstractEvent> process(
container: MutableCollection<ListenerRegistry>,
registry: ListenerRegistry,
listener: Listener<Event>,
event: E,
) {
when (listener.concurrencyKind) {
ConcurrencyKind.LOCKED -> {
(listener as SafeListener).lock!!.withLock {
if (listener.onEvent(event) == ListeningStatus.STOPPED) {
container.remove(registry)
}
}
}
ConcurrencyKind.CONCURRENT -> {
if (listener.onEvent(event) == ListeningStatus.STOPPED) {
container.remove(registry)
}
}
}
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.event
import net.mamoe.mirai.event.Event
import net.mamoe.mirai.event.EventChannel
import net.mamoe.mirai.event.InternalGlobalEventChannelProvider
internal class GlobalEventChannelProviderImpl : InternalGlobalEventChannelProvider {
val instance = EventChannelToEventDispatcherAdapter.instance
override fun getInstance(): EventChannel<Event> = instance
}

View File

@ -0,0 +1,70 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.event
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import net.mamoe.mirai.event.*
import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.utils.MiraiLogger
import kotlin.coroutines.CoroutineContext
/**
* 包装用户的 [Listener], 增加异常处理, 实现 [ConcurrencyKind].
*/
internal class SafeListener<in E : Event> internal constructor(
parentJob: Job?,
subscriberContext: CoroutineContext,
private val listenerBlock: suspend (E) -> ListeningStatus,
override val concurrencyKind: ConcurrencyKind,
override val priority: EventPriority,
) : Listener<E>, CompletableJob by SupervisorJob(parentJob) { // avoid being cancelled on handling event
private val subscriberContext: CoroutineContext = subscriberContext + this // override Job.
val lock: Mutex? = when (concurrencyKind) {
ConcurrencyKind.LOCKED -> Mutex()
else -> null
}
@Suppress("unused")
override suspend fun onEvent(event: E): ListeningStatus {
if (isCompleted || isCancelled) return ListeningStatus.STOPPED
if (!isActive) return ListeningStatus.LISTENING
return try {
// Inherit context.
withContext(subscriberContext) { listenerBlock.invoke(event) }.also { if (it == ListeningStatus.STOPPED) this.complete() }
} catch (e: Throwable) {
subscriberContext[CoroutineExceptionHandler]?.handleException(subscriberContext, e)
?: currentCoroutineContext()[CoroutineExceptionHandler]?.handleException(subscriberContext, e)
?: kotlin.run {
val logger = if (event is BotEvent) event.bot.logger else logger
val subscriberName = subscriberContext[CoroutineName]?.name ?: "<unnamed>"
val broadcasterName = currentCoroutineContext()[CoroutineName]?.name ?: "<unnamed>"
val message =
"An exception occurred when processing event. " +
"Subscriber scope: '$subscriberName'. " +
"Broadcaster scope: '$broadcasterName'"
logger.warning(message, e)
}
// this.complete() // do not `completeExceptionally`, otherwise parentJob will fai`l.
// ListeningStatus.STOPPED
// not stopping listening.
ListeningStatus.LISTENING
}
}
companion object {
private val logger by lazy {
MiraiLogger.Factory.create(SafeListener::class)
}
}
}

View File

@ -0,0 +1,11 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.event

View File

@ -1,22 +1,20 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.network.components
import kotlinx.coroutines.*
import net.mamoe.mirai.event.Event
import net.mamoe.mirai.event.broadcast
import net.mamoe.mirai.event.*
import net.mamoe.mirai.internal.event.EventChannelToEventDispatcherAdapter
import net.mamoe.mirai.internal.network.component.ComponentKey
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.TestOnly
import net.mamoe.mirai.utils.addNameHierarchically
import net.mamoe.mirai.utils.childScope
import net.mamoe.mirai.utils.*
import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
@ -25,7 +23,7 @@ import kotlin.coroutines.EmptyCoroutineContext
*/
internal interface EventDispatcher {
/**
* Implementor must call `event.broadcast()` within a coroutine with [EventDispatcherScopeFlag]
* Implement [Event.broadcast]
*/
suspend fun broadcast(event: Event)
@ -53,10 +51,6 @@ internal interface EventDispatcher {
companion object : ComponentKey<EventDispatcher>
}
internal object EventDispatcherScopeFlag : CoroutineContext.Element, CoroutineContext.Key<EventDispatcherScopeFlag> {
override val key: CoroutineContext.Key<*> get() = this
}
@JvmInline
internal value class EventBroadcastJob(
val job: Job
@ -75,6 +69,19 @@ internal value class EventBroadcastJob(
}
}
/**
* If `true`, all event listeners runs directly in the broadcaster's thread until first suspension.
*
* If there is no suspension point in the listener, the coroutine executing [Event.broadcast] will not suspend,
* so the thread before and after execution will be the same and no other code is being executed if there is only one thread.
*
* This is useful for tests to not depend on `delay`
*/
internal var EVENT_LAUNCH_UNDISPATCHED: Boolean by lateinitMutableProperty {
systemProp("mirai.event.launch.undispatched", false)
}
internal val SHOW_VERBOSE_EVENT: Boolean by lazy { systemProp("mirai.event.show.verbose.events", false) }
internal open class EventDispatcherImpl(
lifecycleContext: CoroutineContext,
@ -86,9 +93,7 @@ internal open class EventDispatcherImpl(
override suspend fun broadcast(event: Event) {
try {
withContext(EventDispatcherScopeFlag) {
event.broadcast()
}
EventChannelToEventDispatcherAdapter.instance.callListeners(event)
} catch (e: Exception) {
if (e is CancellationException) return
if (logger.isEnabled) {
@ -100,7 +105,7 @@ internal open class EventDispatcherImpl(
override fun broadcastAsync(event: Event, additionalContext: CoroutineContext): EventBroadcastJob {
val job = launch(
additionalContext + EventDispatcherScopeFlag,
additionalContext,
start = CoroutineStart.UNDISPATCHED
) { broadcast(event) }
// UNDISPATCHED: starts the coroutine NOW in the current thread until its first suspension point,
@ -110,7 +115,7 @@ internal open class EventDispatcherImpl(
override fun broadcastAsync(additionalContext: CoroutineContext, event: suspend () -> Event?): EventBroadcastJob {
val job = launch(
additionalContext + EventDispatcherScopeFlag,
additionalContext,
start = CoroutineStart.UNDISPATCHED
) {
event()?.let { broadcast(it) }
@ -124,4 +129,9 @@ internal open class EventDispatcherImpl(
val qualified = event::class.java.canonicalName ?: return event.toString()
return qualified.substringAfter("net.mamoe.mirai.event.events.", "").ifEmpty { event.toString() }
}
///////////////////////////////////////////////////////////////////////////
// broadcast
///////////////////////////////////////////////////////////////////////////
}

View File

@ -0,0 +1,10 @@
#
# Copyright 2019-2022 Mamoe Technologies and contributors.
#
# 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
# Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
#
# https://github.com/mamoe/mirai/blob/dev/LICENSE
#
net.mamoe.mirai.internal.event.GlobalEventChannelProviderImpl

View File

@ -0,0 +1,14 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.event
import net.mamoe.mirai.internal.test.AbstractTest
internal abstract class AbstractEventTest : AbstractTest()

View File

@ -1,14 +1,16 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.event
package net.mamoe.mirai.internal.event
import kotlinx.coroutines.*
import net.mamoe.mirai.event.broadcast
import net.mamoe.mirai.event.globalEventChannel
import org.junit.jupiter.api.Test
import kotlin.test.assertFalse

View File

@ -0,0 +1,34 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.event
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.first
import me.him188.kotlin.jvm.blocking.bridge.JvmBlockingBridge
import net.mamoe.mirai.event.GlobalEventChannel
import net.mamoe.mirai.event.broadcast
import org.junit.jupiter.api.Test
import kotlin.test.assertIs
@JvmBlockingBridge
internal class EventChannelFlowTest : AbstractEventTest() {
@Test
suspend fun asFlow(): Unit = coroutineScope {
val channel = GlobalEventChannel
val job = async(start = CoroutineStart.UNDISPATCHED) {
channel.asFlow().first()
}
TestEvent().broadcast()
assertIs<TestEvent>(job.await())
}
}

View File

@ -1,19 +1,20 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package event;
import net.mamoe.mirai.event.Event;
import net.mamoe.mirai.event.GlobalEventChannel;
import net.mamoe.mirai.event.events.GroupMessageEvent;
import net.mamoe.mirai.event.events.MessageEvent;
import java.util.Objects;
/**
* 仅测试可以使用, 不会被编译运行
*/
@ -21,9 +22,7 @@ public class EventChannelJavaTest {
public static void main(String[] args) {
GlobalEventChannel.INSTANCE
.filter(event -> {
return event.toString() == "test";
})
.filter(event -> Objects.equals(event.toString(), "test"))
.filterIsInstance(MessageEvent.class)
.subscribeAlways(GroupMessageEvent.class, groupMessageEvent -> {

View File

@ -1,26 +1,25 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.event
package net.mamoe.mirai.internal.event
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Semaphore
import net.mamoe.mirai.event.*
import net.mamoe.mirai.event.events.FriendEvent
import net.mamoe.mirai.event.events.GroupEvent
import net.mamoe.mirai.event.events.GroupMessageEvent
import net.mamoe.mirai.event.events.MessageEvent
import net.mamoe.mirai.internal.event.GlobalEventListeners
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
@ -46,17 +45,11 @@ internal class EventChannelTest : AbstractEventTest() {
@BeforeEach
fun x() {
runBlocking { semaphore.acquire() }
_EventBroadcast.implementation = object : _EventBroadcast() {
override suspend fun <E : Event> broadcastPublic(event: E): E =
broadcastImpl(event) // do not call MiraiImpl
}
}
@AfterEach
fun s() {
GlobalEventListeners.clear()
runBlocking { semaphore.release() }
_EventBroadcast.implementation = _EventBroadcast() // restore
EventChannelToEventDispatcherAdapter.instance.eventListeners.clear()
}
@Test

View File

@ -1,19 +1,23 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
@file:JvmBlockingBridge
package net.mamoe.mirai.event
package net.mamoe.mirai.internal.event
import kotlinx.coroutines.*
import me.him188.kotlin.jvm.blocking.bridge.JvmBlockingBridge
import net.mamoe.mirai.internal.event.EVENT_LAUNCH_UNDISPATCHED
import net.mamoe.mirai.event.AbstractEvent
import net.mamoe.mirai.event.EventPriority
import net.mamoe.mirai.event.broadcast
import net.mamoe.mirai.event.globalEventChannel
import net.mamoe.mirai.internal.network.components.EVENT_LAUNCH_UNDISPATCHED
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test

View File

@ -1,16 +1,16 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.event
package net.mamoe.mirai.internal.event
import kotlinx.coroutines.*
import net.mamoe.mirai.utils.StepUtil
import net.mamoe.mirai.event.*
import org.junit.jupiter.api.AfterEach
import java.util.concurrent.Executor
import java.util.concurrent.atomic.AtomicInteger
@ -26,7 +26,7 @@ internal class EventTests : AbstractEventTest() {
var scope = CoroutineScope(EmptyCoroutineContext)
@AfterEach
fun finiallyReset() {
fun finallyReset() {
resetEventListeners()
}
@ -69,7 +69,7 @@ internal class EventTests : AbstractEventTest() {
}
}
}
kotlinx.coroutines.runBlocking {
runBlocking {
ParentEvent().broadcast()
}
val called = counter.get()
@ -97,7 +97,7 @@ internal class EventTests : AbstractEventTest() {
called.getAndIncrement()
}
}
println("Registeterd $priority")
println("Registered $priority")
}
}
println("Step 1")
@ -189,13 +189,13 @@ internal class EventTests : AbstractEventTest() {
open class PriorityTestEvent : AbstractEvent()
fun singleThreaded(step: StepUtil, invoke: suspend EventChannel<Event>.() -> Unit) {
private fun singleThreaded(step: StepUtil, invoke: suspend EventChannel<Event>.() -> Unit) {
// runBlocking 会完全堵死, 没法退出
val scope = CoroutineScope(Executor { it.run() }.asCoroutineDispatcher())
val job = scope.launch {
invoke(scope.globalEventChannel())
}
kotlinx.coroutines.runBlocking {
runBlocking {
job.join()
}
scope.cancel()
@ -203,7 +203,7 @@ internal class EventTests : AbstractEventTest() {
}
@Test
fun `test handler remvoe`() {
fun `test handler remove`() {
resetEventListeners()
val step = StepUtil()
singleThreaded(step) {
@ -226,7 +226,7 @@ internal class EventTests : AbstractEventTest() {
}
}
*/
fun resetEventListeners() {
private fun resetEventListeners() {
scope.cancel()
runBlocking { scope.coroutineContext[Job]?.join() }
scope = CoroutineScope(EmptyCoroutineContext)

View File

@ -1,19 +1,20 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
@file:Suppress("RedundantSuspendModifier", "unused", "UNUSED_PARAMETER")
package net.mamoe.mirai.event
package net.mamoe.mirai.internal.event
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import kotlinx.coroutines.isActive
import kotlinx.coroutines.runBlocking
import net.mamoe.mirai.event.*
import org.jetbrains.annotations.NotNull
import org.junit.jupiter.api.Test
import java.util.concurrent.atomic.AtomicInteger

View File

@ -1,17 +1,17 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.event
package net.mamoe.mirai.internal.event
import kotlinx.coroutines.cancel
import kotlinx.coroutines.runBlocking
import net.mamoe.mirai.utils.EventListenerLikeJava
import net.mamoe.mirai.event.*
import net.mamoe.mirai.utils.JavaFriendlyAPI
import org.junit.jupiter.api.Test
import java.util.concurrent.atomic.AtomicInteger
@ -31,8 +31,11 @@ internal class JvmMethodEventsTestJava : AbstractEventTest() {
}
}
@EventListenerLikeJava
@Suppress("UNUSED_PARAMETER", "RedundantVisibilityModifier", "RedundantNullableReturnType")
@Suppress(
"INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "RedundantVisibilityModifier", "UNUSED_PARAMETER", "unused",
"RedundantNullableReturnType"
)
@net.mamoe.mirai.utils.EventListenerLikeJava
internal class TestHost(
private val called: AtomicInteger
) : SimpleListenerHost() {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
@ -9,10 +9,11 @@
@file:Suppress("DEPRECATION")
package net.mamoe.mirai.event
package net.mamoe.mirai.internal.event
import kotlinx.coroutines.*
import me.him188.kotlin.jvm.blocking.bridge.JvmBlockingBridge
import net.mamoe.mirai.event.*
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows

View File

@ -1,31 +1,32 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.event
package net.mamoe.mirai.internal.event
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.runBlocking
import net.mamoe.mirai.utils.EventListenerLikeJava
import net.mamoe.mirai.event.*
import net.mamoe.mirai.utils.JavaFriendlyAPI
import org.junit.jupiter.api.Test
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.EmptyCoroutineContext
@JavaFriendlyAPI
@EventListenerLikeJava
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
@net.mamoe.mirai.utils.EventListenerLikeJava
internal class SimpleListenerHostTestJava : AbstractEventTest() {
@Test
fun testJavaSimpleListenerHostWork() {
val called = AtomicBoolean()
val host: SimpleListenerHost = object : SimpleListenerHost() {
@EventHandler
@EventListenerLikeJava
@net.mamoe.mirai.utils.EventListenerLikeJava
fun testListen(
event: AbstractEvent?
) {

View File

@ -1,21 +1,13 @@
/*
* Copyright 2019-2020 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AFFERO GENERAL PUBLIC LICENSE version 3 license that can be found via the following link.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.utils
package net.mamoe.mirai.internal.event
import kotlinx.atomicfu.atomic
import java.util.concurrent.ConcurrentLinkedDeque

View File

@ -15,7 +15,6 @@ import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import net.mamoe.mirai.event.Event
import net.mamoe.mirai.internal.network.components.EventDispatcherImpl
import net.mamoe.mirai.internal.network.components.EventDispatcherScopeFlag
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.TestOnly
import net.mamoe.mirai.utils.runUnwrapCancellationException
@ -32,13 +31,12 @@ internal open class TestEventDispatcherImpl(
// so that [joinBroadcast] works.
launch(
EventDispatcherScopeFlag,
start = CoroutineStart.UNDISPATCHED
) {
super.broadcast(event)
}.join()
} else {
// Scoped closed, typically when broadcasting `BotOfflineEvent` by StateObserver from `bot.close`
// Scope closed, typically when broadcasting `BotOfflineEvent` by StateObserver from `bot.close`
super.broadcast(event)
}
}