Introduce EventChannel (#753)
* Introduce EventChannel, close #514 * Fix generics in EventChannel * Don't require Job in CoroutineScope for EventChannel.parentScope * Don't attach child job on EventChannel.parentJob * Improve tests * Fix tests * Merge with dev
package net.mamoe.mirai
import kotlinx.coroutines.*
import net.mamoe.kjbb.JvmBlockingBridge
import net.mamoe.mirai.contact.*
import net.mamoe.mirai.event.EventChannel
import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.message.action.BotNudge
import net.mamoe.mirai.message.action.MemberNudge
import net.mamoe.mirai.network.LoginFailedException
public interface Bot : CoroutineScope, ContactOrBot, UserOrBot {
    public val isOnline: Boolean
public val isOnline: Boolean
* 来自这个 [Bot] 的 [BotEvent] 的事件通道.
* @see EventChannel
public val eventChannel: EventChannel<@UnsafeVariance BotEvent>
// region contacts
* Copyright 2019-2020 Mamoe Technologies and contributors.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* https://github.com/mamoe/mirai/blob/master/LICENSE
@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "MemberVisibilityCanBePrivate", "unused")
package net.mamoe.mirai.event
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import net.mamoe.mirai.Bot
import net.mamoe.mirai.event.Listener.ConcurrencyKind.CONCURRENT
import net.mamoe.mirai.event.Listener.ConcurrencyKind.LOCKED
import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.event.internal.GlobalEventListeners
import net.mamoe.mirai.event.internal.Handler
import net.mamoe.mirai.event.internal.ListenerRegistry
import net.mamoe.mirai.utils.MiraiExperimentalApi
import net.mamoe.mirai.utils.MiraiLogger
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.internal.LowPriorityInOverloadResolution
import kotlin.reflect.KClass
* 包装 [EventChannel.filter] 的 `filter` lambda 抛出的异常并重新抛出.
public class ExceptionInEventChannelFilterException(
* 当时正在处理的事件
public val event: Event,
public val eventChannel: EventChannel<*>,
override val message: String = "Exception in EventHandler",
* 原异常
override val cause: Throwable
) : IllegalStateException()
* 在此 [CoroutineScope] 下创建一个监听所有事件的 [EventChannel]. 相当于 `GlobalEventChannel.parentScope(this).context(coroutineContext)`.
* 在返回的 [EventChannel] 中的事件监听器都会以 [this] 作为父协程作用域. 即会 使用 [this]
* @param coroutineContext 额外的 [CoroutineContext]
* @throws IllegalStateException 当 [this] 和 [coroutineContext] 均不包含 [CoroutineContext]
public fun CoroutineScope.globalEventChannel(coroutineContext: CoroutineContext = EmptyCoroutineContext): EventChannel<Event> {
return if (coroutineContext === EmptyCoroutineContext) GlobalEventChannel.parentScope(this)
else GlobalEventChannel.parentScope(this).context(coroutineContext)
* 全局事件通道. 此通道包含来自所有 [Bot] 的所有类型的事件. 可通过 [EventChannel.filter] 过滤得到范围更小的 [EventChannel].
* @see EventChannel
public object GlobalEventChannel : EventChannel<Event>(Event::class, EmptyCoroutineContext)
* 事件通道. 事件通道是监听事件的入口. **在不同的事件通道中可以监听到不同类型的事件**.
* [GlobalEventChannel] 是最大的通道: 所有的事件都可以在 [GlobalEventChannel] 监听到.
* ### 对通道的操作
* - "缩窄" 通道: 通过 [EventChannel.filter]. 例如 `filter { it is BotEvent }` 得到一个只能监听到 [BotEvent] 的事件通道.
* - 转换为 Kotlin 协程 [Channel]: [EventChannel.asChannel]
* - 添加 [CoroutineContext]: [context], [parentJob], [parentScope], [exceptionHandler]
* ### 创建事件监听
* - [EventChannel.subscribe] 创建带条件的一个事件监听器.
* - [EventChannel.subscribeAlways] 创建一个总是监听事件的事件监听器.
* - [EventChannel.subscribeOnce] 创建一个只监听单次的事件监听器.
* ### 获取事件通道
* - [GlobalEventChannel]
* - [Bot.eventChannel]
* @see EventChannel.subscribe
public open class EventChannel<out BaseEvent : Event> @JvmOverloads constructor(
public val baseEventClass: KClass<out BaseEvent>,
* 此事件通道的默认 [CoroutineScope.coroutineContext]. 将会被添加给所有注册的事件监听器.
public val defaultCoroutineContext: CoroutineContext = EmptyCoroutineContext
) {
* 创建事件监听并将监听结果发送在 [Channel]. 将返回值 [Channel] [关闭][Channel.close] 时将会同时关闭事件监听.
* 标注 [ExperimentalCoroutinesApi] 是因为使用了 [Channel.invokeOnClose]
* @param capacity Channel 容量. 详见 [Channel] 构造.
* @see subscribeAlways
* @see Channel
public fun asChannel(
capacity: Int = Channel.RENDEZVOUS,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
priority: EventPriority = EventPriority.NORMAL,
): Channel<out BaseEvent> {
val channel = Channel<BaseEvent>(capacity)
val listener = subscribeAlways(baseEventClass, coroutineContext, concurrency, priority) { channel.send(it) }
channel.invokeOnClose {
if (it != null) listener.completeExceptionally(it)
else listener.complete()
return channel
// region transforming operations
* 添加一个过滤器. 过滤器将在收到任何事件之后, 传递给通过 [subscribe] 注册的监听器之前调用.
* 若 [filter] 返回 `true`, 该事件将会被传给监听器. 否则将会被忽略, **监听器继续监听**.
* ### 线性顺序
* 多个 [filter] 的处理是线性且有顺序的. 若一个 [filter] 已经返回了 `false` (代表忽略这个事件), 则会立即忽略, 而不会传递给后续过滤器.
* 示例:
* ```
* GlobalEventChannel // GlobalEventChannel 会收到全局所有事件, 事件类型是 Event
* .filterIsInstance<BotEvent>() // 过滤, 只接受 BotEvent
* .filter { event: BotEvent ->
* // 此时的 event 一定是 BotEvent
* event.bot.id == 123456 // 再过滤 event 的 bot.id
* }
* .subscribeAlways { event: BotEvent ->
* // 现在 event 是 BotEvent, 且 bot.id == 123456
* }
* ```
* ### 过滤器挂起
* [filter] 允许挂起协程. **过滤器的挂起将被认为是事件监听器的挂起**.
* 过滤器挂起是否会影响事件处理,
* 取决于 [subscribe] 时的 [Listener.ConcurrencyKind] 和 [Listener.EventPriority].
* ### 过滤器异常处理
* 若 [filter] 抛出异常, 将被包装为 [ExceptionInEventChannelFilterException] 并重新抛出.
* @see filterIsInstance 过滤指定类型的事件
public fun filter(filter: suspend (event: @UnsafeVariance BaseEvent) -> Boolean): EventChannel<BaseEvent> {
return object : EventChannel<BaseEvent>(baseEventClass, defaultCoroutineContext) {
private inline val innerThis get() = this
override fun <E : Event> (suspend (E) -> ListeningStatus).intercepted(): suspend (E) -> ListeningStatus {
return { ev ->
val filterResult = try {
baseEventClass.isInstance(ev) && filter(ev as BaseEvent)
} catch (e: Throwable) {
if (e is ExceptionInEventChannelFilterException) throw e // wrapped by another filter
throw ExceptionInEventChannelFilterException(ev, innerThis, cause = e)
if (filterResult) this.invoke(ev)
else ListeningStatus.LISTENING
* 过滤事件的类型. 返回一个只包含 [E] 类型事件的 [EventChannel]
* @see filter 获取更多信息
public inline fun <reified E : Event> filterIsInstance(): EventChannel<E> =
* 过滤事件的类型. 返回一个只包含 [E] 类型事件的 [EventChannel]
* @see filter 获取更多信息
public fun <E : Event> filterIsInstance(kClass: KClass<out E>): EventChannel<E> {
return object : EventChannel<E>(kClass, defaultCoroutineContext) {
private inline val innerThis get() = this
override fun <E1 : Event> (suspend (E1) -> ListeningStatus).intercepted(): suspend (E1) -> ListeningStatus {
return { ev ->
if (kClass.isInstance(ev)) this.invoke(ev)
else ListeningStatus.LISTENING
* 过滤事件的类型. 返回一个只包含 [E] 类型事件的 [EventChannel]
* @see filter 获取更多信息
public fun <E : Event> filterIsInstance(clazz: Class<out E>): EventChannel<E> =
* 创建一个新的 [EventChannel], 该 [EventChannel] 包含 [`this.coroutineContext`][defaultCoroutineContext] 和添加的 [coroutineContexts].
* [coroutineContexts] 会覆盖 [defaultCoroutineContext] 中的重复元素.
* 此操作不会修改 [`this.coroutineContext`][defaultCoroutineContext], 只会创建一个新的 [EventChannel].
public fun context(vararg coroutineContexts: CoroutineContext): EventChannel<BaseEvent> =
coroutineContexts.fold(this.defaultCoroutineContext) { acc, element -> acc + element }
* 创建一个新的 [EventChannel], 该 [EventChannel] 包含 [this.coroutineContext][defaultCoroutineContext] 和添加的 [coroutineExceptionHandler]
* @see context
public fun exceptionHandler(coroutineExceptionHandler: CoroutineExceptionHandler): EventChannel<BaseEvent> {
return context(coroutineExceptionHandler)
* 创建一个新的 [EventChannel], 该 [EventChannel] 包含 [`this.coroutineContext`][defaultCoroutineContext] 和添加的 [coroutineExceptionHandler]
* @see context
public fun exceptionHandler(coroutineExceptionHandler: (exception: Throwable) -> Unit): EventChannel<BaseEvent> {
return context(CoroutineExceptionHandler { _, throwable ->
* 将 [coroutineScope] 作为这个 [EventChannel] 的父作用域.
* 实际作用为创建一个新的 [EventChannel],
* 该 [EventChannel] 包含 [`this.coroutineContext`][defaultCoroutineContext] 和添加的 [CoroutineScope.coroutineContext],
* 并以 [CoroutineScope] 中 [Job] (如果有) [作为父 Job][parentJob]
* @see parentJob
* @see context
* @see CoroutineScope.globalEventChannel `GlobalEventChannel.parentScope()` 的扩展
public fun parentScope(coroutineScope: CoroutineScope): EventChannel<BaseEvent> {
return context(coroutineScope.coroutineContext).apply {
val job = coroutineScope.coroutineContext[Job]
if (job != null) parentJob(job)
* 指定协程父 [Job]. 之后在此 [EventChannel] 下创建的事件监听器都会成为 [job] 的子任务, 当 [job] 被取消时, 所有的事件监听器都会被取消.
* 注意: 监听器不会失败 ([Job.cancel]). 监听器处理过程的异常都会被捕获然后交由 [CoroutineExceptionHandler] 处理, 因此 [job] 不会因为子任务监听器的失败而被取消.
* @see parentScope
* @see context
public fun parentJob(job: Job): EventChannel<BaseEvent> {
return context(job)
// endregion
// region subscribe
* 在指定的 [协程作用域][CoroutineScope] 下创建一个事件监听器, 监听所有 [E] 及其子类事件.
* 每当 [事件广播][Event.broadcast] 时, [handler] 都会被执行.
* ### 创建监听
* 调用本函数:
* ```
* subscribe<Event> { /* 会收到来自全部 Bot 的事件和与 Bot 不相关的事件 */ }
* ```
* ### 生命周期
* #### 通过协程作用域管理监听器
* 本函数将会创建一个 [Job], 成为 [coroutineContext] 中的子任务. 可创建一个 [CoroutineScope] 来管理所有的监听器:
* ```
* val scope = CoroutineScope(SupervisorJob())
* scope.subscribeAlways<MemberJoinEvent> { /* ... */ }
* scope.subscribeAlways<MemberMuteEvent> { /* ... */ }
* scope.cancel() // 停止上文两个监听
* ```
* **注意**, 这个函数返回 [Listener], 它是一个 [CompletableJob]. 它会成为 [CoroutineScope] 的一个 [子任务][Job]
* ```
* runBlocking { // this: CoroutineScope
* subscribe<Event> { /* 一些处理 */ } // 返回 Listener, 即 CompletableJob
* }
* // runBlocking 不会完结, 直到监听时创建的 `Listener` 被停止.
* // 它可能通过 Listener.cancel() 停止, 也可能自行返回 ListeningStatus.Stopped 停止.
* ```
* #### 在监听器内部停止后续监听
* 当 [handler] 返回 [ListeningStatus.STOPPED] 时停止监听.
* 或 [Listener.complete] 后结束.
* ### 子类监听
* 监听父类事件, 也会同时监听其子类. 因此监听 [Event] 即可监听所有类型的事件.
* ### 异常处理
* - 当参数 [handler] 处理抛出异常时, 将会按如下顺序寻找 [CoroutineExceptionHandler] 处理异常:
* 1. 参数 [coroutineContext]
* 2. [EventChannel.defaultCoroutineContext]
* 3. [Event.broadcast] 调用者的 [coroutineContext]
* 4. 若事件为 [BotEvent], 则从 [BotEvent.bot] 获取到 [Bot], 进而在 [Bot.coroutineContext] 中寻找
* 5. 若以上四个步骤均无法获取 [CoroutineExceptionHandler], 则使用 [MiraiLogger.Companion] 通过日志记录. 但这种情况理论上不应发生.
* - 事件处理时抛出异常不会停止监听器.
* - 建议在事件处理中 (即 [handler] 里) 处理异常,
* 或在参数 [coroutineContext] 中添加 [CoroutineExceptionHandler].
* @param coroutineContext 在 [defaultCoroutineContext] 的基础上, 给事件监听协程的额外的 [CoroutineContext].
* @param concurrency 并发类型. 查看 [Listener.ConcurrencyKind]
* @param priority 监听优先级,优先级越高越先执行
* @param handler 事件处理器. 在接收到事件时会调用这个处理器. 其返回值意义参考 [ListeningStatus]. 其异常处理参考上文
* @return 监听器实例. 此监听器已经注册到指定事件上, 在事件广播时将会调用 [handler]
* @see syncFromEvent 挂起当前协程, 监听一个事件, 并尝试从这个事件中**同步**一个值
* @see asyncFromEvent 异步监听一个事件, 并尝试从这个事件中获取一个值.
* @see nextEvent 挂起当前协程, 直到监听到事件 [E] 的广播, 返回这个事件实例.
* @see selectMessages 以 `when` 的语法 '选择' 即将到来的一条消息.
* @see whileSelectMessages 以 `when` 的语法 '选择' 即将到来的所有消息, 直到不满足筛选结果.
* @see subscribeAlways 一直监听
* @see subscribeOnce 只监听一次
* @see subscribeMessages 监听消息 DSL
* @see subscribeGroupMessages 监听群消息 DSL
* @see subscribeFriendMessages 监听好友消息 DSL
* @see subscribeTempMessages 监听临时会话消息 DSL
public inline fun <reified E : Event> subscribe(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = LOCKED,
priority: EventPriority = EventPriority.NORMAL,
noinline handler: suspend E.(E) -> ListeningStatus
): Listener<E> = subscribe(E::class, coroutineContext, concurrency, priority, handler)
* 与 [subscribe] 的区别是接受 [eventClass] 参数, 而不使用 `reified` 泛型
* @return 监听器实例. 此监听器已经注册到指定事件上, 在事件广播时将会调用 [handler]
* @see subscribe
public fun <E : Event> subscribe(
eventClass: KClass<out E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = LOCKED,
priority: EventPriority = EventPriority.NORMAL,
handler: suspend E.(E) -> ListeningStatus
): Listener<E> = subscribeInternal(
createListener(coroutineContext, concurrency, priority) { it.handler(it); }
* 在指定的 [CoroutineScope] 下订阅所有 [E] 及其子类事件.
* 每当 [事件广播][Event.broadcast] 时, [handler] 都会被执行.
* 可在任意时候通过 [Listener.complete] 来主动停止监听.
* [CoroutineScope] 被关闭后事件监听会被 [取消][Listener.cancel].
* @param concurrency 并发类型默认为 [CONCURRENT]
* @param coroutineContext 在 [defaultCoroutineContext] 的基础上, 给事件监听协程的额外的 [CoroutineContext]
* @param priority 处理优先级, 优先级高的先执行
* @return 监听器实例. 此监听器已经注册到指定事件上, 在事件广播时将会调用 [handler]
* @see subscribe 获取更多说明
public inline fun <reified E : Event> subscribeAlways(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
priority: EventPriority = EventPriority.NORMAL,
noinline handler: suspend E.(E) -> Unit
): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, priority, handler)
* @see subscribe
* @see subscribeAlways
public fun <E : Event> subscribeAlways(
eventClass: KClass<out E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
priority: EventPriority = EventPriority.NORMAL,
handler: suspend E.(E) -> Unit
): Listener<E> = subscribeInternal(
createListener(coroutineContext, concurrency, priority) { it.handler(it); ListeningStatus.LISTENING }
* 在指定的 [CoroutineScope] 下订阅所有 [E] 及其子类事件.
* 仅在第一次 [事件广播][Event.broadcast] 时, [handler] 会被执行.
* 可在任意时候通过 [Listener.complete] 来主动停止监听.
* [CoroutineScope] 被关闭后事件监听会被 [取消][Listener.cancel].
* @param coroutineContext 在 [defaultCoroutineContext] 的基础上, 给事件监听协程的额外的 [CoroutineContext]
* @param priority 处理优先级, 优先级高的先执行
* @see subscribe 获取更多说明
public inline fun <reified E : Event> subscribeOnce(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
priority: EventPriority = EventPriority.NORMAL,
noinline handler: suspend E.(E) -> Unit
): Listener<E> = subscribeOnce(E::class, coroutineContext, priority, handler)
* @see subscribeOnce
public fun <E : Event> subscribeOnce(
eventClass: KClass<out E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
priority: EventPriority = EventPriority.NORMAL,
handler: suspend E.(E) -> Unit
): Listener<E> = subscribeInternal(
createListener(coroutineContext, LOCKED, priority) { it.handler(it); ListeningStatus.STOPPED }
// endregion
// region subscribe with Kotlin function reference
* 支持 Kotlin 带接收者的挂起函数的函数引用的监听方式.
* ```
* fun onMessage(event: GroupMessageEvent): ListeningStatus {
* return ListeningStatus.LISTENING
* }
* scope.subscribe(::onMessage)
* ```
* @see subscribe
public inline fun <reified E : Event> subscribe(
crossinline handler: (E) -> ListeningStatus,
priority: EventPriority = EventPriority.NORMAL,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = subscribe(E::class, coroutineContext, concurrency, priority) { handler(this) }
* 支持 Kotlin 带接收者的函数的函数引用的监听方式.
* ```
* fun GroupMessageEvent.onMessage(event: GroupMessageEvent): ListeningStatus {
* return ListeningStatus.LISTENING
* }
* scope.subscribe(GroupMessageEvent::onMessage)
* ```
* @see subscribe
public inline fun <reified E : Event> subscribe(
crossinline handler: E.(E) -> ListeningStatus,
priority: EventPriority = EventPriority.NORMAL,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = subscribe(E::class, coroutineContext, concurrency, priority) { handler(this) }
* 支持 Kotlin 挂起函数的函数引用的监听方式.
* ```
* suspend fun onMessage(event: GroupMessageEvent): ListeningStatus {
* return ListeningStatus.LISTENING
* }
* scope.subscribe(::onMessage)
* ```
* @see subscribe
public inline fun <reified E : Event> subscribe(
crossinline handler: suspend (E) -> ListeningStatus,
priority: EventPriority = EventPriority.NORMAL,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = subscribe(E::class, coroutineContext, concurrency, priority) { handler(this) }
* 支持 Kotlin 带接收者的挂起函数的函数引用的监听方式.
* ```
* suspend fun GroupMessageEvent.onMessage(event: GroupMessageEvent): ListeningStatus {
* return ListeningStatus.LISTENING
* }
* scope.subscribe(GroupMessageEvent::onMessage)
* ```
* @see subscribe
public inline fun <reified E : Event> subscribe(
crossinline handler: suspend E.(E) -> ListeningStatus,
priority: EventPriority = EventPriority.NORMAL,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = subscribe(E::class, coroutineContext, concurrency, priority) { handler(this) }
// endregion
// region subscribeAlways with Kotlin function references
* 支持 Kotlin 带接收者的挂起函数的函数引用的监听方式.
* ```
* fun onMessage(event: GroupMessageEvent) {
* }
* scope.subscribeAlways(::onMessage)
* ```
* @see subscribeAlways
public inline fun <reified E : Event> subscribeAlways(
crossinline handler: (E) -> Unit,
priority: EventPriority = EventPriority.NORMAL,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, priority) { handler(this) }
* 支持 Kotlin 带接收者的函数的函数引用的监听方式.
* ```
* fun GroupMessageEvent.onMessage(event: GroupMessageEvent) {
* }
* scope.subscribeAlways(GroupMessageEvent::onMessage)
* ```
* @see subscribeAlways
public inline fun <reified E : Event> subscribeAlways(
crossinline handler: E.(E) -> Unit,
priority: EventPriority = EventPriority.NORMAL,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, priority) { handler(this) }
* 支持 Kotlin 挂起函数的函数引用的监听方式.
* ```
* suspend fun onMessage(event: GroupMessageEvent) {
* }
* scope.subscribeAlways(::onMessage)
* ```
* @see subscribeAlways
public inline fun <reified E : Event> subscribeAlways(
crossinline handler: suspend (E) -> Unit,
priority: EventPriority = EventPriority.NORMAL,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, priority) { handler(this) }
* 支持 Kotlin 带接收者的挂起函数的函数引用的监听方式.
* ```
* suspend fun GroupMessageEvent.onMessage(event: GroupMessageEvent) {
* }
* scope.subscribeAlways(GroupMessageEvent::onMessage)
* ```
* @see subscribeAlways
public inline fun <reified E : Event> subscribeAlways(
crossinline handler: suspend E.(E) -> Unit,
priority: EventPriority = EventPriority.NORMAL,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, priority) { handler(this) }
// endregion
// region impl
* 由子类实现,可以为 handler 包装一个过滤器等. 每个 handler 都会经过此函数处理.
protected open fun <E : Event> (suspend (E) -> ListeningStatus).intercepted(): (suspend (E) -> ListeningStatus) {
return this
internal fun <L : Listener<E>, E : Event> subscribeInternal(eventClass: KClass<out E>, listener: L): L {
with(GlobalEventListeners[listener.priority]) {
val node = ListenerRegistry(listener as Listener<Event>, eventClass)
listener.invokeOnCompletion {
return listener
private fun <E : Event> createListener(
coroutineContext: CoroutineContext,
concurrencyKind: Listener.ConcurrencyKind,
priority: Listener.EventPriority = EventPriority.NORMAL,
handler: suspend (E) -> ListeningStatus
): Listener<E> {
val context = this.defaultCoroutineContext + coroutineContext
return Handler(
parentJob = context[Job],
subscriberContext = context,
handler = handler.intercepted(),
concurrencyKind = concurrencyKind,
priority = priority
// endregion
@ -348,11 +348,11 @@ private fun Method.registerEvent(
when (kotlinFunction.returnType.classifier) {
Unit::class, Nothing::class -> {
param[1].type.classifier as KClass<out Event>,
priority = annotation.priority,
concurrency = annotation.concurrency,
coroutineContext = coroutineContext
) {
if (annotation.ignoreCancelled) {
if ((this as? CancellableEvent)?.isCancelled != true) {
}.also { listener = it }
            ListeningStatus::class -> {
}.also { listener = it }
ListeningStatus::class -> {
param[1].type.classifier as KClass<out Event>,
priority = annotation.priority,
concurrency = annotation.concurrency,
coroutineContext = coroutineContext
) {
if (annotation.ignoreCancelled) {
if ((this as? CancellableEvent)?.isCancelled != true) {
@ -410,21 +410,21 @@ private fun Method.registerEvent(
when (this.returnType) {
Void::class.java, Void.TYPE, Nothing::class.java -> {
paramType.kotlin as KClass<out Event>,
priority = annotation.priority,
concurrency = annotation.concurrency,
coroutineContext = coroutineContext
) {
ListeningStatus::class.java -> {
paramType.kotlin as KClass<out Event>,
priority = annotation.priority,
concurrency = annotation.concurrency,
coroutineContext = coroutineContext
) {
callMethod(this) as ListeningStatus?
?: error("Java method EventHandler cannot return `null`: $this")
@ -19,9 +19,6 @@ import net.mamoe.mirai.contact.Group
import net.mamoe.mirai.contact.Member
import net.mamoe.mirai.event.Event
import kotlin.internal.HidesMembers
import kotlin.jvm.JvmMultifileClass
import kotlin.jvm.JvmName
import kotlin.jvm.JvmSynthetic
* 有关一个 [Bot] 的事件
import kotlin.reflect.KClass
import kotlin.reflect.KClass
internal fun <L : Listener<E>, E : Event> KClass<out E>.subscribeInternal(listener: L): L {
with(GlobalEventListeners[listener.priority]) {
val node = ListenerRegistry(listener as Listener<Event>, this@subscribeInternal)
listener.invokeOnCompletion {
return listener
internal fun <E : Event> CoroutineScope.Handler(
coroutineContext: CoroutineContext,
concurrencyKind: Listener.ConcurrencyKind,
priority: Listener.EventPriority = EventPriority.NORMAL,
handler: suspend (E) -> ListeningStatus
): Handler<E> {
@OptIn(ExperimentalCoroutinesApi::class) // don't remove
val context = this.newCoroutineContext(coroutineContext)
return Handler(context[Job], context, handler, concurrencyKind, priority)
* 事件处理器.
import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.jvm.JvmSynthetic
import kotlin.reflect.KClass
@ -148,7 +147,7 @@ internal suspend inline fun <E : Event, R> syncFromEventImpl(
priority: Listener.EventPriority,
crossinline mapper: suspend E.(E) -> R?
): R = suspendCancellableCoroutine { cont ->
coroutineScope.subscribe(eventClass, priority = priority) {
coroutineScope.globalEventChannel().subscribe(eventClass, priority = priority) {
try {
cont.resumeWith(kotlin.runCatching {
mapper.invoke(this, it) ?: return@subscribe ListeningStatus.LISTENING
* Copyright 2019-2020 Mamoe Technologies and contributors.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* https://github.com/mamoe/mirai/blob/master/LICENSE
package net.mamoe.mirai.event
import kotlinx.coroutines.CoroutineScope
import net.mamoe.mirai.event.Listener.ConcurrencyKind
import net.mamoe.mirai.event.Listener.EventPriority
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.internal.LowPriorityInOverloadResolution
import kotlin.reflect.KClass
// region subscribe / subscribeAlways / subscribeOnce
CoroutineScope.subscribe 已被弃用.
CoroutineScope.subscribe 设计为在指定协程作用域下创建事件监听器, 监听所有事件 E.
但由于 Bot 也实现接口 CoroutineScope, 就可以调用 Bot.subscribe<MessageEvent>,
直观语义上应该是监听来自 Bot 的事件, 但实际是监听来自所有 Bot 的事件.
请以 Bot.eventChannel 或 GlobalEventChannel 替代. 可在 EventChannel 获取更详细的帮助.
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith(
"this.globalEventChannel().subscribe(coroutineContext, concurrency, priority, handler)",
public inline fun <reified E : Event> CoroutineScope.subscribe(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: ConcurrencyKind = ConcurrencyKind.LOCKED,
priority: EventPriority = EventPriority.NORMAL,
noinline handler: suspend E.(E) -> ListeningStatus
): Listener<E> = this.globalEventChannel().subscribe(coroutineContext, concurrency, priority, handler)
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith(
"this.globalEventChannel().subscribe(eventClass, coroutineContext, concurrency, priority, handler)",
public fun <E : Event> CoroutineScope.subscribe(
eventClass: KClass<out E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: ConcurrencyKind = ConcurrencyKind.LOCKED,
priority: EventPriority = EventPriority.NORMAL,
handler: suspend E.(E) -> ListeningStatus
): Listener<E> =
this.globalEventChannel().subscribe(eventClass, coroutineContext, concurrency, priority, handler)
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith(
"this.globalEventChannel().subscribeAlways(E::class, coroutineContext, concurrency, priority, handler)",
public inline fun <reified E : Event> CoroutineScope.subscribeAlways(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: ConcurrencyKind = ConcurrencyKind.CONCURRENT,
priority: EventPriority = EventPriority.NORMAL,
noinline handler: suspend E.(E) -> Unit
): Listener<E> =
this.globalEventChannel().subscribeAlways(E::class, coroutineContext, concurrency, priority, handler)
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith(
"this.globalEventChannel().subscribeAlways(eventClass, coroutineContext, concurrency, priority, handler)",
public fun <E : Event> CoroutineScope.subscribeAlways(
eventClass: KClass<out E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: ConcurrencyKind = ConcurrencyKind.CONCURRENT,
priority: EventPriority = EventPriority.NORMAL,
handler: suspend E.(E) -> Unit
): Listener<E> =
this.globalEventChannel().subscribeAlways(eventClass, coroutineContext, concurrency, priority, handler)
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith(
"this.globalEventChannel().subscribeOnce(coroutineContext, priority, handler)",
public inline fun <reified E : Event> CoroutineScope.subscribeOnce(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
priority: EventPriority = EventPriority.NORMAL,
noinline handler: suspend E.(E) -> Unit
): Listener<E> = this.globalEventChannel().subscribeOnce(coroutineContext, priority, handler)
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith(
"this.globalEventChannel().subscribeOnce(eventClass, coroutineContext, priority, handler)",
public fun <E : Event> CoroutineScope.subscribeOnce(
eventClass: KClass<out E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
priority: EventPriority = EventPriority.NORMAL,
handler: suspend E.(E) -> Unit
): Listener<E> = this.globalEventChannel().subscribeOnce(eventClass, coroutineContext, priority, handler)
// endregion
// region subscribe for Kotlin functional reference
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith(
"this.globalEventChannel().subscribe(E::class, coroutineContext, concurrency, priority) { handler(this) }",
public inline fun <reified E : Event> CoroutineScope.subscribe(
crossinline handler: (E) -> ListeningStatus,
priority: EventPriority = EventPriority.NORMAL,
concurrency: ConcurrencyKind = ConcurrencyKind.CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> =
this.globalEventChannel().subscribe(E::class, coroutineContext, concurrency, priority) { handler(this) }
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith(
"this.globalEventChannel().subscribe(E::class, coroutineContext, concurrency, priority) { handler(this) }",
public inline fun <reified E : Event> CoroutineScope.subscribe(
crossinline handler: E.(E) -> ListeningStatus,
priority: EventPriority = EventPriority.NORMAL,
concurrency: ConcurrencyKind = ConcurrencyKind.CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> =
this.globalEventChannel().subscribe(E::class, coroutineContext, concurrency, priority) { handler(this) }
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith(
"this.globalEventChannel().subscribe(E::class, coroutineContext, concurrency, priority) { handler(this) }",
public inline fun <reified E : Event> CoroutineScope.subscribe(
crossinline handler: suspend (E) -> ListeningStatus,
priority: EventPriority = EventPriority.NORMAL,
concurrency: ConcurrencyKind = ConcurrencyKind.CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> =
this.globalEventChannel().subscribe(E::class, coroutineContext, concurrency, priority) { handler(this) }
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith(
"this.globalEventChannel().subscribe(E::class, coroutineContext, concurrency, priority) { handler(this) }",
public inline fun <reified E : Event> CoroutineScope.subscribe(
crossinline handler: suspend E.(E) -> ListeningStatus,
priority: EventPriority = EventPriority.NORMAL,
concurrency: ConcurrencyKind = ConcurrencyKind.CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> =
this.globalEventChannel().subscribe(E::class, coroutineContext, concurrency, priority) { handler(this) }
// endregion
// region subscribeAlways for Kotlin functional references
level = DeprecationLevel.ERROR, replaceWith =
"this.globalEventChannel().subscribeAlways(E::class, coroutineContext, concurrency, priority) { handler(this) }",
public inline fun <reified E : Event> CoroutineScope.subscribeAlways(
crossinline handler: (E) -> Unit,
priority: EventPriority = EventPriority.NORMAL,
concurrency: ConcurrencyKind = ConcurrencyKind.CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = this.globalEventChannel()
.subscribeAlways(E::class, coroutineContext, concurrency, priority) { handler(this) }
level = DeprecationLevel.ERROR, replaceWith =
"this.globalEventChannel().subscribeAlways(E::class, coroutineContext, concurrency, priority) { handler(this) }",
public inline fun <reified E : Event> CoroutineScope.subscribeAlways(
crossinline handler: E.(E) -> Unit,
priority: EventPriority = EventPriority.NORMAL,
concurrency: ConcurrencyKind = ConcurrencyKind.CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = this.globalEventChannel()
.subscribeAlways(E::class, coroutineContext, concurrency, priority) { handler(this) }
level = DeprecationLevel.ERROR, replaceWith =
"this.globalEventChannel().subscribeAlways(E::class, coroutineContext, concurrency, priority) { handler(this) }",
public inline fun <reified E : Event> CoroutineScope.subscribeAlways(
crossinline handler: suspend (E) -> Unit,
priority: EventPriority = EventPriority.NORMAL,
concurrency: ConcurrencyKind = ConcurrencyKind.CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = this.globalEventChannel()
.subscribeAlways(E::class, coroutineContext, concurrency, priority) { handler(this) }
level = DeprecationLevel.ERROR, replaceWith =
"this.globalEventChannel().subscribeAlways(E::class, coroutineContext, concurrency, priority) { handler(this) }",
public inline fun <reified E : Event> CoroutineScope.subscribeAlways(
crossinline handler: suspend E.(E) -> Unit,
priority: EventPriority = EventPriority.NORMAL,
concurrency: ConcurrencyKind = ConcurrencyKind.CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = this.globalEventChannel()
.subscribeAlways(E::class, coroutineContext, concurrency, priority) { handler(this) }
// endregion
@ -73,7 +73,7 @@ internal suspend inline fun <E : Event> nextEventImpl(
priority: Listener.EventPriority,
crossinline filter: (E) -> Boolean
): E = suspendCancellableCoroutine { cont ->
coroutineScope.subscribe(eventClass, priority = priority) {
coroutineScope.globalEventChannel().subscribe(eventClass, priority = priority) {
if (!filter(this)) return@subscribe ListeningStatus.LISTENING
try {
@ -92,7 +92,7 @@ internal suspend inline fun <E : BotEvent> nextBotEventImpl(
coroutineScope: CoroutineScope,
priority: Listener.EventPriority
): E = suspendCancellableCoroutine { cont ->
coroutineScope.subscribe(eventClass, priority = priority) {
coroutineScope.globalEventChannel().subscribe(eventClass, priority = priority) {
try {
if (this.bot == bot) cont.resume(this)
} catch (e: Exception) {
@ -518,7 +518,7 @@ internal suspend inline fun <reified T : MessageEvent, R> T.selectMessagesImpl(
// we don't have any way to reduce duplication yet,
// until local functions are supported in inline functions
@Suppress("DuplicatedCode") val subscribeAlways = subscribeAlways<T>(
@Suppress("DuplicatedCode") val subscribeAlways = globalEventChannel().subscribeAlways<T>(
concurrency = Listener.ConcurrencyKind.LOCKED,
priority = priority
) { event ->
@ -600,7 +600,7 @@ internal suspend inline fun <reified T : MessageEvent> T.whileSelectMessagesImpl
// ensure atomic completing
val subscribeAlways = subscribeAlways<T>(
val subscribeAlways = globalEventChannel().subscribeAlways<T>(
concurrency = Listener.ConcurrencyKind.LOCKED,
priority = priority
) { event ->
@ -13,12 +13,14 @@
package net.mamoe.mirai.event
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import net.mamoe.mirai.Bot
import net.mamoe.mirai.event.events.*
import net.mamoe.mirai.event.Listener.ConcurrencyKind.CONCURRENT
import net.mamoe.mirai.event.events.FriendMessageEvent
import net.mamoe.mirai.event.events.GroupMessageEvent
import net.mamoe.mirai.event.events.MessageEvent
import net.mamoe.mirai.event.events.TempMessageEvent
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlin.coroutines.CoroutineContext
@ -30,13 +32,11 @@ public typealias MessagePacketSubscribersBuilder = MessageSubscribersBuilder<Mes
* 订阅来自所有 [Bot] 的所有联系人的消息事件. 联系人可以是任意群或任意好友或临时会话.
* @see subscribe 事件监听基础
* @see CoroutineScope.incoming 打开一个指定事件的接收通道
* @see EventChannel 事件通道
public fun <R> CoroutineScope.subscribeMessages(
public fun <R> EventChannel<*>.subscribeMessages(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
concurrencyKind: Listener.ConcurrencyKind = CONCURRENT,
priority: Listener.EventPriority = EventPriority.MONITOR,
listeners: MessagePacketSubscribersBuilder.() -> R
): R {
@ -45,11 +45,11 @@ public fun <R> CoroutineScope.subscribeMessages(
callsInPlace(listeners, InvocationKind.EXACTLY_ONCE)
return MessagePacketSubscribersBuilder(Unit)
return MessageSubscribersBuilder(Unit)
{ filter, messageListener: MessageListener<MessageEvent, Unit> ->
// subscribeAlways 即注册一个监听器. 这个监听器收到消息后就传递给 [messageListener]
// messageListener 即为 DSL 里 `contains(...) { }`, `startsWith(...) { }` 的代码块.
subscribeAlways(coroutineContext, concurrencyKind, priority) {
subscribeAlways<MessageEvent>(coroutineContext, concurrencyKind, priority) {
// this.message.contentToString() 即为 messageListener 中 it 接收到的值
val toString = this.message.contentToString()
if (filter.invoke(this, toString))
@ -64,12 +64,11 @@ public typealias GroupMessageSubscribersBuilder = MessageSubscribersBuilder<Grou
* 订阅来自所有 [Bot] 的所有群消息事件
* @see subscribe 事件监听基础
* @see CoroutineScope.incoming 打开一个指定事件的接收通道
* @see EventChannel 事件通道
public fun <R> CoroutineScope.subscribeGroupMessages(
public fun <R> EventChannel<*>.subscribeGroupMessages(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
concurrencyKind: Listener.ConcurrencyKind = CONCURRENT,
priority: Listener.EventPriority = EventPriority.MONITOR,
listeners: GroupMessageSubscribersBuilder.() -> R
): R {
@ -91,12 +90,11 @@ public typealias FriendMessageSubscribersBuilder = MessageSubscribersBuilder<Fri
* 订阅来自所有 [Bot] 的所有好友消息事件
* @see subscribe 事件监听基础
* @see CoroutineScope.incoming 打开一个指定事件的接收通道
* @see EventChannel 事件通道
public fun <R> CoroutineScope.subscribeFriendMessages(
public fun <R> EventChannel<*>.subscribeFriendMessages(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
concurrencyKind: Listener.ConcurrencyKind = CONCURRENT,
priority: Listener.EventPriority = EventPriority.MONITOR,
listeners: FriendMessageSubscribersBuilder.() -> R
): R {
@ -118,12 +116,11 @@ public typealias TempMessageSubscribersBuilder = MessageSubscribersBuilder<TempM
* 订阅来自所有 [Bot] 的所有临时会话消息事件
* @see subscribe 事件监听基础
* @see CoroutineScope.incoming 打开一个指定事件的接收通道
* @see EventChannel 事件通道
public fun <R> CoroutineScope.subscribeTempMessages(
public fun <R> EventChannel<*>.subscribeTempMessages(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
concurrencyKind: Listener.ConcurrencyKind = CONCURRENT,
priority: Listener.EventPriority = EventPriority.MONITOR,
listeners: TempMessageSubscribersBuilder.() -> R
): R {
@ -154,19 +151,13 @@ public fun <R> CoroutineScope.subscribeTempMessages(
* @see subscribeMessages
* @see subscribeGroupMessages
public inline fun <reified E : Event> CoroutineScope.incoming(
@Deprecated("Use asChannel", ReplaceWith("asChannel(capacity, coroutineContext, concurrencyKind, priority)"))
public fun <E : Event> EventChannel<E>.incoming(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrencyKind: Listener.ConcurrencyKind = Listener.ConcurrencyKind.CONCURRENT,
concurrencyKind: Listener.ConcurrencyKind = CONCURRENT,
priority: Listener.EventPriority = EventPriority.MONITOR,
capacity: Int = Channel.UNLIMITED
): ReceiveChannel<E> {
return Channel<E>(capacity).apply {
val listener = this@incoming.subscribeAlways<E>(coroutineContext, concurrencyKind, priority) {
this.invokeOnClose {
listener.cancel(CancellationException("ReceiveChannel closed", it))
return asChannel(capacity, coroutineContext, concurrencyKind, priority)
@ -14,30 +14,9 @@
package net.mamoe.mirai.event
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.sync.Mutex
import net.mamoe.mirai.Bot
import net.mamoe.mirai.event.Listener.ConcurrencyKind.CONCURRENT
import net.mamoe.mirai.event.Listener.ConcurrencyKind.LOCKED
import net.mamoe.mirai.event.Listener.EventPriority.*
import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.event.internal.Handler
import net.mamoe.mirai.event.internal.subscribeInternal
import net.mamoe.mirai.utils.MiraiLogger
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.internal.LowPriorityInOverloadResolution
import kotlin.jvm.JvmMultifileClass
import kotlin.jvm.JvmName
import kotlin.jvm.JvmStatic
import kotlin.jvm.JvmSynthetic
import kotlin.reflect.KClass
* 该文件为所有的订阅事件的方法.
* 订阅者的状态
@ -130,350 +109,4 @@ public interface Listener<in E : Event> : CompletableJob {
public suspend fun onEvent(event: E): ListeningStatus
public typealias EventPriority = Listener.EventPriority
// region subscribe / subscribeAlways / subscribeOnce
* 在指定的 [协程作用域][CoroutineScope] 下创建一个事件监听器, 监听所有 [E] 及其子类事件.
* 每当 [事件广播][Event.broadcast] 时, [handler] 都会被执行.
* ### 创建监听
* 调用本函数:
* ```
* coroutineScope.subscribe<Event> { /* 会收到来自全部 Bot 的事件和与 Bot 不相关的事件 */ }
* ```
* ### 生命周期
* #### 通过协程作用域管理监听器
* 本函数将会创建一个 [Job], 成为 [this] 中的子任务. 可创建一个 [CoroutineScope] 来管理所有的监听器:
* ```
* val scope = CoroutineScope(SupervisorJob())
* scope.subscribeAlways<MemberJoinEvent> { /* ... */ }
* scope.subscribeAlways<MemberMuteEvent> { /* ... */ }
* scope.cancel() // 停止上文两个监听
* ```
* **注意**, 这个函数返回 [Listener], 它是一个 [CompletableJob]. 它会成为 [CoroutineScope] 的一个 [子任务][Job]
* ```
* runBlocking { // this: CoroutineScope
* subscribe<Event> { /* 一些处理 */ } // 返回 Listener, 即 CompletableJob
* }
* // runBlocking 不会完结, 直到监听时创建的 `Listener` 被停止.
* // 它可能通过 Listener.cancel() 停止, 也可能自行返回 ListeningStatus.Stopped 停止.
* ```
* #### 在监听器内部停止后续监听
* 当 [handler] 返回 [ListeningStatus.STOPPED] 时停止监听.
* 或 [Listener.complete] 后结束.
* ### 子类监听
* 监听父类事件, 也会同时监听其子类. 因此监听 [Event] 即可监听所有类型的事件.
* ### 异常处理
* 事件处理时的 [CoroutineContext] 为调用本函数时的 [receiver][this] 的 [CoroutineScope.coroutineContext].
* 因此:
* - 当参数 [handler] 处理抛出异常时, 将会按如下顺序寻找 [CoroutineExceptionHandler] 处理异常:
* 1. 参数 [coroutineContext]
* 2. 接收者 [this] 的 [CoroutineScope.coroutineContext]
* 3. [Event.broadcast] 调用者的 [coroutineContext]
* 4. 若事件为 [BotEvent], 则从 [BotEvent.bot] 获取到 [Bot], 进而在 [Bot.coroutineContext] 中寻找
* 5. 若以上四个步骤均无法获取 [CoroutineExceptionHandler], 则使用 [MiraiLogger.Companion] 通过日志记录. 但这种情况理论上不应发生.
* - 事件处理时抛出异常不会停止监听器.
* - 建议在事件处理中 (即 [handler] 里) 处理异常,
* 或在参数 [coroutineContext] 中添加 [CoroutineExceptionHandler].
* @param coroutineContext 给事件监听协程的额外的 [CoroutineContext].
* @param concurrency 并发类型. 查看 [Listener.ConcurrencyKind]
* @param priority 监听优先级,优先级越高越先执行
* @param handler 事件处理器. 在接收到事件时会调用这个处理器. 其返回值意义参考 [ListeningStatus]. 其异常处理参考上文
* @return 监听器实例. 此监听器已经注册到指定事件上, 在事件广播时将会调用 [handler]
* @see syncFromEvent 挂起当前协程, 监听一个事件, 并尝试从这个事件中**同步**一个值
* @see asyncFromEvent 异步监听一个事件, 并尝试从这个事件中获取一个值.
* @see nextEvent 挂起当前协程, 直到监听到事件 [E] 的广播, 返回这个事件实例.
* @see selectMessages 以 `when` 的语法 '选择' 即将到来的一条消息.
* @see whileSelectMessages 以 `when` 的语法 '选择' 即将到来的所有消息, 直到不满足筛选结果.
* @see subscribeAlways 一直监听
* @see subscribeOnce 只监听一次
* @see subscribeMessages 监听消息 DSL
* @see subscribeGroupMessages 监听群消息 DSL
* @see subscribeFriendMessages 监听好友消息 DSL
* @see subscribeTempMessages 监听临时会话消息 DSL
public inline fun <reified E : Event> CoroutineScope.subscribe(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = LOCKED,
priority: Listener.EventPriority = NORMAL,
noinline handler: suspend E.(E) -> ListeningStatus
): Listener<E> = subscribe(E::class, coroutineContext, concurrency, priority, handler)
* 与 [CoroutineScope.subscribe] 的区别是接受 [eventClass] 参数, 而不使用 `reified` 泛型
* @see CoroutineScope.subscribe
* @return 监听器实例. 此监听器已经注册到指定事件上, 在事件广播时将会调用 [handler]
public fun <E : Event> CoroutineScope.subscribe(
eventClass: KClass<out E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = LOCKED,
priority: Listener.EventPriority = NORMAL,
handler: suspend E.(E) -> ListeningStatus
): Listener<E> = eventClass.subscribeInternal(Handler(coroutineContext, concurrency, priority) { it.handler(it); })
* 在指定的 [CoroutineScope] 下订阅所有 [E] 及其子类事件.
* 每当 [事件广播][Event.broadcast] 时, [handler] 都会被执行.
* 可在任意时候通过 [Listener.complete] 来主动停止监听.
* [CoroutineScope] 被关闭后事件监听会被 [取消][Listener.cancel].
* @param concurrency 并发类型默认为 [CONCURRENT]
* @param coroutineContext 给事件监听协程的额外的 [CoroutineContext]
* @param priority 处理优先级, 优先级高的先执行
* @return 监听器实例. 此监听器已经注册到指定事件上, 在事件广播时将会调用 [handler]
* @see CoroutineScope.subscribe 获取更多说明
public inline fun <reified E : Event> CoroutineScope.subscribeAlways(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
priority: Listener.EventPriority = NORMAL,
noinline handler: suspend E.(E) -> Unit
): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, priority, handler)
* @see CoroutineScope.subscribe
* @see CoroutineScope.subscribeAlways
public fun <E : Event> CoroutineScope.subscribeAlways(
eventClass: KClass<out E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
priority: Listener.EventPriority = NORMAL,
handler: suspend E.(E) -> Unit
): Listener<E> = eventClass.subscribeInternal(
Handler(coroutineContext, concurrency, priority) { it.handler(it); ListeningStatus.LISTENING }
* 在指定的 [CoroutineScope] 下订阅所有 [E] 及其子类事件.
* 仅在第一次 [事件广播][Event.broadcast] 时, [handler] 会被执行.
* 可在任意时候通过 [Listener.complete] 来主动停止监听.
* [CoroutineScope] 被关闭后事件监听会被 [取消][Listener.cancel].
* @param coroutineContext 给事件监听协程的额外的 [CoroutineContext]
* @param priority 处理优先级, 优先级高的先执行
* @see CoroutineScope.subscribe 获取更多说明
public inline fun <reified E : Event> CoroutineScope.subscribeOnce(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
priority: Listener.EventPriority = NORMAL,
noinline handler: suspend E.(E) -> Unit
): Listener<E> = subscribeOnce(E::class, coroutineContext, priority, handler)
* @see CoroutineScope.subscribeOnce
public fun <E : Event> CoroutineScope.subscribeOnce(
eventClass: KClass<out E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
priority: Listener.EventPriority = NORMAL,
handler: suspend E.(E) -> Unit
): Listener<E> = eventClass.subscribeInternal(
Handler(coroutineContext, LOCKED, priority) { it.handler(it); ListeningStatus.STOPPED }
// endregion
// region subscribe for Kotlin functional reference
* 支持 Kotlin 带接收者的挂起函数的函数引用的监听方式.
* ```
* fun onMessage(event: GroupMessageEvent): ListeningStatus {
* return ListeningStatus.LISTENING
* }
* scope.subscribe(::onMessage)
* ```
public inline fun <reified E : Event> CoroutineScope.subscribe(
crossinline handler: (E) -> ListeningStatus,
priority: Listener.EventPriority = NORMAL,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = subscribe(E::class, coroutineContext, concurrency, priority) { handler(this) }
* 支持 Kotlin 带接收者的函数的函数引用的监听方式.
* ```
* fun GroupMessageEvent.onMessage(event: GroupMessageEvent): ListeningStatus {
* return ListeningStatus.LISTENING
* }
* scope.subscribe(GroupMessageEvent::onMessage)
* ```
public inline fun <reified E : Event> CoroutineScope.subscribe(
crossinline handler: E.(E) -> ListeningStatus,
priority: Listener.EventPriority = NORMAL,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = subscribe(E::class, coroutineContext, concurrency, priority) { handler(this) }
* 支持 Kotlin 挂起函数的函数引用的监听方式.
* ```
* suspend fun onMessage(event: GroupMessageEvent): ListeningStatus {
* return ListeningStatus.LISTENING
* }
* scope.subscribe(::onMessage)
* ```
public inline fun <reified E : Event> CoroutineScope.subscribe(
crossinline handler: suspend (E) -> ListeningStatus,
priority: Listener.EventPriority = NORMAL,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = subscribe(E::class, coroutineContext, concurrency, priority) { handler(this) }
* 支持 Kotlin 带接收者的挂起函数的函数引用的监听方式.
* ```
* suspend fun GroupMessageEvent.onMessage(event: GroupMessageEvent): ListeningStatus {
* return ListeningStatus.LISTENING
* }
* scope.subscribe(GroupMessageEvent::onMessage)
* ```
public inline fun <reified E : Event> CoroutineScope.subscribe(
crossinline handler: suspend E.(E) -> ListeningStatus,
priority: Listener.EventPriority = NORMAL,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = subscribe(E::class, coroutineContext, concurrency, priority) { handler(this) }
// endregion
// region subscribeAlways for Kotlin functional references
* 支持 Kotlin 带接收者的挂起函数的函数引用的监听方式.
* ```
* fun onMessage(event: GroupMessageEvent) {
* }
* scope.subscribeAlways(::onMessage)
* ```
public inline fun <reified E : Event> CoroutineScope.subscribeAlways(
crossinline handler: (E) -> Unit,
priority: Listener.EventPriority = NORMAL,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, priority) { handler(this) }
* 支持 Kotlin 带接收者的函数的函数引用的监听方式.
* ```
* fun GroupMessageEvent.onMessage(event: GroupMessageEvent) {
* }
* scope.subscribeAlways(GroupMessageEvent::onMessage)
* ```
public inline fun <reified E : Event> CoroutineScope.subscribeAlways(
crossinline handler: E.(E) -> Unit,
priority: Listener.EventPriority = NORMAL,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, priority) { handler(this) }
* 支持 Kotlin 挂起函数的函数引用的监听方式.
* ```
* suspend fun onMessage(event: GroupMessageEvent) {
* }
* scope.subscribeAlways(::onMessage)
* ```
public inline fun <reified E : Event> CoroutineScope.subscribeAlways(
crossinline handler: suspend (E) -> Unit,
priority: Listener.EventPriority = NORMAL,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, priority) { handler(this) }
* 支持 Kotlin 带接收者的挂起函数的函数引用的监听方式.
* ```
* suspend fun GroupMessageEvent.onMessage(event: GroupMessageEvent) {
* }
* scope.subscribeAlways(GroupMessageEvent::onMessage)
* ```
public inline fun <reified E : Event> CoroutineScope.subscribeAlways(
crossinline handler: suspend E.(E) -> Unit,
priority: Listener.EventPriority = NORMAL,
concurrency: Listener.ConcurrencyKind = CONCURRENT,
coroutineContext: CoroutineContext = EmptyCoroutineContext
): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, priority) { handler(this) }
// endregion
public typealias EventPriority = Listener.EventPriority
val scope = CoroutineScope(SupervisorJob())
        var got = false
val scope = CoroutineScope(SupervisorJob())
var got = false
scope.subscribeAlways<TestEvent> {
scope.globalEventChannel().subscribeAlways<TestEvent> {
got = true
* Copyright 2019-2020 Mamoe Technologies and contributors.
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
* https://github.com/mamoe/mirai/blob/master/LICENSE
package net.mamoe.mirai.event
import net.mamoe.mirai.event.events.FriendEvent
import net.mamoe.mirai.event.events.GroupEvent
import net.mamoe.mirai.event.events.GroupMessageEvent
import net.mamoe.mirai.event.events.MessageEvent
import org.junit.jupiter.api.Test
internal class EventChannelTest {
fun testVariance() {
var global: EventChannel<Event> = GlobalEventChannel
var a: EventChannel<MessageEvent> = global.filterIsInstance<MessageEvent>()
a.filter {
// it: Event
val messageEventChannel = a.filterIsInstance<MessageEvent>()
// group.asChannel<GroupMessageEvent>()
val listener: Listener<GroupMessageEvent> = messageEventChannel.subscribeAlways<GroupEvent>() {
global = a
global.subscribeMessages {
messageEventChannel.subscribeMessages {
global.subscribeAlways<FriendEvent> {
// inappliable: out cannot passed as in
// val b: EventChannel<in FriendMessageEvent> = global.filterIsInstance<FriendMessageEvent>()
fun testSubscribeInplace() {
        runBlocking {
fun testSubscribeInplace() {
runBlocking {
val subscriber = subscribeAlways<TestEvent> {
val subscriber = globalEventChannel().subscribeAlways<TestEvent> {
triggered = true
assertTrue { subscriber.complete() }
@ -41,7 +41,7 @@ class EventTests {
fun testSubscribeGlobalScope() {
runBlocking {
GlobalScope.subscribeAlways<TestEvent> {
GlobalScope.globalEventChannel().subscribeAlways<TestEvent> {
triggered = true
@ -57,7 +57,7 @@ class EventTests {
for (p in Listener.EventPriority.values()) {
repeat(2333) {
GlobalScope.subscribeAlways<ParentEvent> {
GlobalScope.globalEventChannel().subscribeAlways<ParentEvent> {
launch {
                repeat(5000) {
launch {
repeat(5000) {
priority = priority
) {
repeat(444) {
repeat(444) {
supervisor.subscribeAlways<ParentEvent> {
supervisor.globalEventChannel().subscribeAlways<ParentEvent> {
runBlocking {
            val job: CompletableJob
runBlocking {
val job: CompletableJob
job = subscribeAlways<ParentEvent> {
job = globalEventChannel().subscribeAlways<ParentEvent> {
triggered = true
runBlocking {
            val job: CompletableJob
runBlocking {
val job: CompletableJob
job = subscribeAlways<ParentEvent> {
job = globalEventChannel().subscribeAlways<ParentEvent> {
triggered = true
@ -181,11 +181,11 @@ class EventTests {
open class PriorityTestEvent : AbstractEvent() {}
fun singleThreaded(step: StepUtil, invoke: suspend CoroutineScope.() -> Unit) {
fun singleThreaded(step: StepUtil, invoke: suspend EventChannel<Event>.() -> Unit) {
// runBlocking 会完全堵死, 没法退出
val scope = CoroutineScope(Executor { it.run() }.asCoroutineDispatcher())
val job = scope.launch {
kotlinx.coroutines.runBlocking {
private val called = AtomicInteger(0)
private val called = AtomicInteger(0)
fun ev(event: TestEvent?) {
public fun ev(event: TestEvent?) {
fun ev2(event: TestEvent?): Void? {
public fun ev2(event: TestEvent?): Void? {
return null
fun ev3(event: TestEvent?): ListeningStatus? {
public fun ev3(event: TestEvent?): ListeningStatus? {
return ListeningStatus.LISTENING
@ -21,11 +21,10 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import net.mamoe.mirai.Bot
import net.mamoe.mirai.contact.OtherClientList
import net.mamoe.mirai.event.Listener
import net.mamoe.mirai.event.broadcast
import net.mamoe.mirai.event.*
import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.event.events.BotOfflineEvent
import net.mamoe.mirai.event.events.BotReloginEvent
import net.mamoe.mirai.event.subscribeAlways
import net.mamoe.mirai.internal.network.BotNetworkHandler
import net.mamoe.mirai.internal.network.DefaultServerList
import net.mamoe.mirai.internal.network.closeAndJoin
internal var _isConnecting: Boolean = false
    override val isOnline: Boolean get() = _network.areYouOk()
internal var _isConnecting: Boolean = false
override val isOnline: Boolean get() = _network.areYouOk()
override val eventChannel: EventChannel<BotEvent> =
GlobalEventChannel.filterIsInstance<BotEvent>().filter { it.bot === this@AbstractBot }
val otherClientsLock = Mutex() // lock sync
override val otherClients: OtherClientList = OtherClientList()
init {
init {
val listener = bot.subscribeAlways<BotReloginEvent>(priority = Listener.EventPriority.MONITOR) {
if (bot != this.bot) return@subscribeAlways
val listener = bot.eventChannel.subscribeAlways<BotReloginEvent>(priority = Listener.EventPriority.MONITOR) {
this@QQAndroidBotNetworkHandler.launch { syncMessageSvc() }
supervisor.invokeOnCompletion { listener.cancel() }
@ -14,6 +14,8 @@ pluginManagement {
maven(url = "https://dl.bintray.com/kotlin/kotlin-eap")
maven(url = "https://dl.bintray.com/kotlin/kotlin-dev")
maven(url = "https://dl.bintray.com/jetbrains/kotlin-native-dependencies")
maven(url = "https://kotlin.bintray.com/kotlinx")
