Implement lifecycle management for network

This commit is contained in:
Him188 2021-04-25 13:01:02 +08:00
parent f9ddf74d8e
commit 8db8e2fb03
11 changed files with 262 additions and 157 deletions

View File

@ -22,10 +22,13 @@ import java.util.concurrent.ConcurrentLinkedQueue
*/
@Suppress("unused")
public class ContactList<C : Contact>
internal constructor(@JvmField @MiraiInternalApi public val delegate: ConcurrentLinkedQueue<C>) :
@MiraiInternalApi public constructor(@JvmField @MiraiInternalApi public val delegate: ConcurrentLinkedQueue<C>) :
Collection<C> by delegate {
internal constructor(collection: Collection<C>) : this(ConcurrentLinkedQueue(collection))
internal constructor() : this(ConcurrentLinkedQueue())
@MiraiInternalApi
public constructor(collection: Collection<C>) : this(ConcurrentLinkedQueue(collection))
@MiraiInternalApi
public constructor() : this(ConcurrentLinkedQueue())
/**
* 获取一个 [Contact.id] [id] 的元素. 在不存在时返回 `null`.

View File

@ -52,19 +52,14 @@ public inline fun CoroutineScope.launchWithPermit(
*/
public fun CoroutineScope.childScope(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
): CoroutineScope {
val ctx = this.coroutineContext + coroutineContext
return CoroutineScope(ctx + SupervisorJob(ctx.job))
}
): CoroutineScope = this.coroutineContext.childScope(coroutineContext)
/**
* Creates a child scope of the receiver context scope.
*/
public fun CoroutineContext.childScope(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
): CoroutineScope {
return CoroutineScope(this.childScopeContext(coroutineContext))
}
): CoroutineScope = CoroutineScope(this.childScopeContext(coroutineContext))
/**
* Creates a child scope of the receiver context scope.
@ -73,12 +68,11 @@ public fun CoroutineContext.childScopeContext(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
): CoroutineContext {
val ctx = this + coroutineContext
return ctx + SupervisorJob(ctx.job)
val job = ctx[Job] ?: return ctx + SupervisorJob()
return ctx + SupervisorJob(job)
}
public inline fun <E : U, U : CoroutineContext.Element> CoroutineContext.getOrElse(
key: CoroutineContext.Key<E>,
default: () -> U
): U {
return this[key] ?: default()
}
): U = this[key] ?: default()

View File

@ -7,13 +7,6 @@
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
@file:Suppress(
"EXPERIMENTAL_API_USAGE",
"DEPRECATION_ERROR",
"OverridingDeprecatedMember",
"INVISIBLE_REFERENCE",
"INVISIBLE_MEMBER"
)
package net.mamoe.mirai.internal
@ -29,8 +22,13 @@ import net.mamoe.mirai.internal.contact.info.StrangerInfoImpl
import net.mamoe.mirai.internal.contact.uin
import net.mamoe.mirai.internal.network.component.ConcurrentComponentStorage
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.impl.netty.asCoroutineExceptionHandler
import net.mamoe.mirai.supervisorJob
import net.mamoe.mirai.utils.*
import net.mamoe.mirai.utils.BotConfiguration
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.childScopeContext
import net.mamoe.mirai.utils.info
import kotlin.collections.set
import kotlin.coroutines.CoroutineContext
/**
@ -45,30 +43,45 @@ internal abstract class AbstractBot constructor(
///////////////////////////////////////////////////////////////////////////
// FASTEST INIT
final override val logger: MiraiLogger by lazy { configuration.botLoggerSupplier(this) }
@Suppress("LeakingThis")
final override val logger: MiraiLogger = configuration.botLoggerSupplier(this)
final override val coroutineContext: CoroutineContext = configuration.parentCoroutineContext.childScopeContext(
configuration.parentCoroutineContext.getOrElse(CoroutineExceptionHandler) {
CoroutineExceptionHandler { _, e ->
logger.error("An exception was thrown under a coroutine of Bot", e)
final override val coroutineContext: CoroutineContext =
CoroutineName("Bot.$id")
.plus(logger.asCoroutineExceptionHandler())
.childScopeContext(configuration.parentCoroutineContext)
.apply {
job.invokeOnCompletion { throwable ->
logger.info { "Bot cancelled" + throwable?.message?.let { ": $it" }.orEmpty() }
kotlin.runCatching {
network.close(throwable)
}.onFailure {
if (it !is CancellationException) logger.error(it)
}
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
Bot._instances.remove(id)
// help GC release instances
groups.forEach { it.members.delegate.clear() }
groups.delegate.clear() // job is cancelled, so child jobs are to be cancelled
friends.delegate.clear()
strangers.delegate.clear()
}
}
} + CoroutineName("Mirai Bot")
)
abstract val components: ConcurrentComponentStorage
init {
@Suppress("LeakingThis")
@Suppress("LeakingThis", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
Bot._instances[this.id] = this
supervisorJob.invokeOnCompletion {
Bot._instances.remove(id)
}
}
///////////////////////////////////////////////////////////////////////////
// overrides
///////////////////////////////////////////////////////////////////////////
abstract val components: ConcurrentComponentStorage
final override val isOnline: Boolean get() = network.isOk()
final override val eventChannel: EventChannel<BotEvent> =
GlobalEventChannel.filterIsInstance<BotEvent>().filter { it.bot === this@AbstractBot }
@ -79,7 +92,19 @@ internal abstract class AbstractBot constructor(
final override val strangers: ContactList<Stranger> = ContactList()
final override val asFriend: Friend by lazy { Mirai.newFriend(this, FriendInfoImpl(uin, nick, "")) }
final override val asStranger: Stranger by lazy { Mirai.newStranger(bot, StrangerInfoImpl(bot.id, bot.nick)) }
final override val asStranger: Stranger by lazy { Mirai.newStranger(this, StrangerInfoImpl(bot.id, bot.nick)) }
override fun close(cause: Throwable?) {
if (!this.isActive) return
if (cause == null) {
supervisorJob.cancel()
} else {
supervisorJob.cancel(CancellationException("Bot closed", cause))
}
}
final override fun toString(): String = "Bot($id)"
///////////////////////////////////////////////////////////////////////////
// network
@ -93,56 +118,4 @@ internal abstract class AbstractBot constructor(
}
protected abstract fun createNetworkHandler(): NetworkHandler
protected abstract suspend fun sendLogout()
// endregion
init {
coroutineContext[Job]!!.invokeOnCompletion { throwable ->
logger.info { "Bot cancelled" + throwable?.message?.let { ": $it" }.orEmpty() }
kotlin.runCatching {
network.close(throwable)
}
// help GC release instances
groups.forEach {
it.members.delegate.clear()
}
groups.delegate.clear() // job is cancelled, so child jobs are to be cancelled
friends.delegate.clear()
strangers.delegate.clear()
}
}
override fun close(cause: Throwable?) {
if (!this.isActive) {
// already cancelled
return
}
this.network.close(cause)
if (supervisorJob.isActive) {
if (cause == null) {
supervisorJob.cancel()
} else {
supervisorJob.cancel(CancellationException("Bot closed", cause))
}
}
}
final override fun toString(): String = "Bot($id)"
}
private val Throwable.rootCause: Throwable
get() {
var depth = 0
var rootCause: Throwable? = this
while (rootCause?.cause != null) {
rootCause = rootCause.cause
if (depth++ == 20) break
}
return rootCause ?: this
}
}

View File

@ -11,6 +11,7 @@
package net.mamoe.mirai.internal
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import net.mamoe.mirai.Bot
import net.mamoe.mirai.Mirai
@ -29,7 +30,6 @@ import net.mamoe.mirai.internal.network.context.SsoProcessorContextImpl
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContextImpl
import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
import net.mamoe.mirai.internal.network.handler.selector.FactoryKeepAliveNetworkHandlerSelector
import net.mamoe.mirai.internal.network.handler.selector.SelectorNetworkHandler
import net.mamoe.mirai.internal.network.handler.state.*
@ -77,12 +77,12 @@ internal open class QQAndroidBot constructor(
// TODO: 2021/4/14 bdhSyncer.loadFromCache() when login
// IDE error, don't move into lazy
// also called by tests.
fun ComponentStorage.stateObserverChain(): StateObserver {
val components = this
return StateObserver.chainOfNotNull(
components[BotInitProcessor].asObserver(),
StateChangedObserver(State.OK) { new ->
StateChangedObserver(to = State.OK) { new ->
bot.launch(logger.asCoroutineExceptionHandler()) {
BotOnlineEvent(bot).broadcast()
if (bot.firstLoginSucceed) { // TODO: 2021/4/21 actually no use
@ -90,31 +90,27 @@ internal open class QQAndroidBot constructor(
}
}
},
object : StateObserver {
override fun stateChanged(
networkHandler: NetworkHandlerSupport,
previous: NetworkHandlerSupport.BaseStateImpl,
new: NetworkHandlerSupport.BaseStateImpl
) {
val p = previous.correspondingState
val n = new.correspondingState
when {
p == State.OK && n == State.CONNECTING -> {
bot.launch(logger.asCoroutineExceptionHandler()) {
BotOfflineEvent.Dropped(bot, new.getCause()).broadcast()
}
}
p == State.OK && n == State.CLOSED -> {
bot.launch(logger.asCoroutineExceptionHandler()) {
BotOfflineEvent.Active(bot, new.getCause()).broadcast()
}
}
}
StateChangedObserver(State.OK, State.CONNECTING) { new ->
bot.launch(logger.asCoroutineExceptionHandler()) {
BotOfflineEvent.Dropped(bot, new.getCause()).broadcast()
}
},
StateChangedObserver(State.OK) { new ->
StateChangedObserver(State.OK, State.CLOSED) { new ->
bot.launch(logger.asCoroutineExceptionHandler()) {
BotOfflineEvent.Active(bot, new.getCause()).broadcast()
}
},
StateChangedObserver(to = State.OK) { new ->
components[BotOfflineEventMonitor].attachJob(bot, new)
},
StateChangedObserver(State.OK, State.CLOSED) {
runBlocking {
try {
components[SsoProcessor].logout(network)
} catch (ignored: Exception) {
}
}
},
debugConfiguration.stateObserver
).safe(logger)
}
@ -156,10 +152,6 @@ internal open class QQAndroidBot constructor(
val client get() = components[SsoProcessor].client
override suspend fun sendLogout() {
components[SsoProcessor].logout(network)
}
override fun createNetworkHandler(): NetworkHandler {
val context = NetworkHandlerContextImpl(
this,

View File

@ -12,11 +12,12 @@ package net.mamoe.mirai.internal.network.handler.state
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.NetworkHandlerSupport
@Suppress("FunctionName")
internal fun StateChangedObserver(
state: State,
to: State,
action: (new: NetworkHandlerSupport.BaseStateImpl) -> Unit
): StateChangedObserver {
return object : StateChangedObserver(state) {
): StateObserver {
return object : StateChangedObserver(to) {
override fun stateChanged0(
networkHandler: NetworkHandlerSupport,
previous: NetworkHandlerSupport.BaseStateImpl,
@ -27,6 +28,25 @@ internal fun StateChangedObserver(
}
}
@Suppress("FunctionName")
internal fun StateChangedObserver(
from: State,
to: State,
action: (new: NetworkHandlerSupport.BaseStateImpl) -> Unit
): StateObserver {
return object : StateObserver {
override fun stateChanged(
networkHandler: NetworkHandlerSupport,
previous: NetworkHandlerSupport.BaseStateImpl,
new: NetworkHandlerSupport.BaseStateImpl
) {
if (previous.correspondingState == from && new.correspondingState == to) {
action(new)
}
}
}
}
internal abstract class StateChangedObserver(
val state: State,
) : StateObserver {

View File

@ -42,6 +42,7 @@ internal open class NettyNetworkHandler(
) : NetworkHandlerSupport(context) {
override fun close(cause: Throwable?) {
setState { StateClosed(CancellationException("Closed manually.", cause)) }
super.close(cause)
// wrap an exception, more stacktrace information
}

View File

@ -9,6 +9,7 @@
package net.mamoe.mirai.internal.network.framework
import kotlinx.coroutines.CoroutineScope
import net.mamoe.mirai.internal.MockBot
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.network.QQAndroidClient
@ -20,17 +21,23 @@ import net.mamoe.mirai.internal.network.context.SsoProcessorContext
import net.mamoe.mirai.internal.network.context.SsoProcessorContextImpl
import net.mamoe.mirai.internal.network.context.SsoSession
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContextImpl
import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory
import net.mamoe.mirai.internal.network.handler.state.StateObserver
import net.mamoe.mirai.internal.test.AbstractTest
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.debug
import net.mamoe.mirai.utils.lateinitMutableProperty
import org.junit.jupiter.api.TestInstance
import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.test.assertEquals
/**
* With real factory and components as in [QQAndroidBot.components].
*/
@TestInstance(TestInstance.Lifecycle.PER_METHOD)
internal abstract class AbstractRealNetworkHandlerTest<H : NetworkHandler> : AbstractTest() {
init {
System.setProperty("mirai.debug.network.state.observer.logging", "true")
@ -38,14 +45,25 @@ internal abstract class AbstractRealNetworkHandlerTest<H : NetworkHandler> : Abs
}
protected abstract val factory: NetworkHandlerFactory<H>
protected abstract val network: NetworkHandler
protected open val bot: QQAndroidBot by lazy {
protected open var bot: QQAndroidBot by lateinitMutableProperty {
MockBot {
networkHandlerProvider { createHandler() }
}
}
protected open val networkLogger = MiraiLogger.TopLevel
protected sealed class NHEvent {
object Login : NHEvent()
object Logout : NHEvent()
object DoHeartbeatNow : NHEvent()
object Init : NHEvent()
}
protected val nhEvents = ConcurrentLinkedQueue<NHEvent>()
protected open val defaultComponents = ConcurrentComponentStorage().apply {
val components = this
val configuration = bot.configuration
@ -55,15 +73,18 @@ internal abstract class AbstractRealNetworkHandlerTest<H : NetworkHandler> : Abs
override val ssoSession: SsoSession get() = bot.client
override fun createObserverChain(): StateObserver = get(StateObserver)
override suspend fun login(handler: NetworkHandler) {
nhEvents.add(NHEvent.Login)
networkLogger.debug { "SsoProcessor.login" }
}
override suspend fun logout(handler: NetworkHandler) {
nhEvents.add(NHEvent.Logout)
networkLogger.debug { "SsoProcessor.logout" }
}
})
set(HeartbeatProcessor, object : HeartbeatProcessor {
override suspend fun doHeartbeatNow(networkHandler: NetworkHandler) {
nhEvents.add(NHEvent.DoHeartbeatNow)
networkLogger.debug { "HeartbeatProcessor.doHeartbeatNow" }
}
})
@ -77,6 +98,7 @@ internal abstract class AbstractRealNetworkHandlerTest<H : NetworkHandler> : Abs
set(BotInitProcessor, object : BotInitProcessor {
override suspend fun init() {
nhEvents.add(NHEvent.Init)
networkLogger.debug { "BotInitProcessor.init" }
}
})
@ -89,6 +111,10 @@ internal abstract class AbstractRealNetworkHandlerTest<H : NetworkHandler> : Abs
set(OtherClientUpdater, OtherClientUpdaterImpl(bot, components, bot.logger))
set(ConfigPushSyncer, ConfigPushSyncerImpl())
set(BotOfflineEventMonitor, object : BotOfflineEventMonitor {
override fun attachJob(bot: QQAndroidBot, scope: CoroutineScope) {
}
})
set(StateObserver, bot.run { stateObserverChain() })
}
@ -102,4 +128,13 @@ internal abstract class AbstractRealNetworkHandlerTest<H : NetworkHandler> : Abs
InetSocketAddress.createUnresolved("localhost", 123)
)
}
///////////////////////////////////////////////////////////////////////////
// Assertions
///////////////////////////////////////////////////////////////////////////
protected fun assertState(state: State) {
assertEquals(state, network.state)
}
}

View File

@ -67,6 +67,21 @@ internal class StateObserverTest : AbstractMockNetworkHandlerTest() {
assertEquals(CONNECTING, called[0].second.correspondingState)
}
@Test
fun `test StateChangedObserver2`() {
val called = ArrayList<NetworkHandlerSupport.BaseStateImpl>()
components[StateObserver] = StateChangedObserver(INITIALIZED, CONNECTING) { new ->
called.add(new)
}
val handler = createNetworkHandler()
assertEquals(0, called.size)
handler.setState(INITIALIZED)
assertEquals(0, called.size)
handler.setState(CONNECTING)
assertEquals(1, called.size)
assertEquals(CONNECTING, called[0].correspondingState)
}
@Test
fun `can combine`() {
val called = ArrayList<Pair<NetworkHandlerSupport.BaseStateImpl, NetworkHandlerSupport.BaseStateImpl>>()

View File

@ -0,0 +1,48 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.impl.netty
import io.netty.channel.Channel
import io.netty.channel.embedded.EmbeddedChannel
import net.mamoe.mirai.internal.network.framework.AbstractRealNetworkHandlerTest
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext
import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory
import net.mamoe.mirai.utils.ExceptionCollector
import java.net.SocketAddress
internal open class TestNettyNH(
context: NetworkHandlerContext,
address: SocketAddress
) : NettyNetworkHandler(context, address) {
fun setStateClosed(exception: Throwable? = null) {
setState { StateClosed(exception) }
}
fun setStateConnecting(exception: Throwable? = null) {
setState { StateConnecting(ExceptionCollector(exception), false) }
}
}
internal abstract class AbstractNettyNHTest : AbstractRealNetworkHandlerTest<TestNettyNH>() {
val channel = EmbeddedChannel()
override val network: TestNettyNH get() = bot.network as TestNettyNH
override val factory: NetworkHandlerFactory<TestNettyNH> =
object : NetworkHandlerFactory<TestNettyNH> {
override fun create(context: NetworkHandlerContext, address: SocketAddress): TestNettyNH {
return object : TestNettyNH(context, address) {
override suspend fun createConnection(decodePipeline: PacketDecodePipeline): Channel =
channel.apply { setupChannelPipeline(pipeline(), decodePipeline) }
}
}
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.impl.netty
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.delay
import net.mamoe.mirai.internal.MockBot
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State.*
import net.mamoe.mirai.internal.test.runBlockingUnit
import net.mamoe.mirai.supervisorJob
import org.junit.jupiter.api.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
internal class NettyBotLifecycleTest : AbstractNettyNHTest() {
@Test
fun `send logout on exit`() = runBlockingUnit {
assertState(INITIALIZED)
bot.login()
assertState(OK)
bot.close() // send logout blocking
delay(1000)
assertState(CLOSED)
assertTrue { nhEvents.any { it is NHEvent.Logout } }
}
@Test
fun `can override context`() = runBlockingUnit {
bot = MockBot {
conf {
parentCoroutineContext = CoroutineName("Overrode")
}
networkHandlerProvider { createHandler() }
}
assertEquals("Overrode", bot.coroutineContext[CoroutineName]!!.name)
}
@Test
fun `job attached`() = runBlockingUnit {
val parentJob = SupervisorJob()
bot = MockBot {
conf {
parentCoroutineContext = parentJob
}
networkHandlerProvider { createHandler() }
}
assertEquals(1, parentJob.children.count())
assertEquals(bot.supervisorJob, parentJob.children.first())
}
}

View File

@ -9,54 +9,19 @@
package net.mamoe.mirai.internal.network.impl.netty
import io.netty.channel.Channel
import io.netty.channel.embedded.EmbeddedChannel
import kotlinx.coroutines.delay
import net.mamoe.mirai.event.events.BotOfflineEvent
import net.mamoe.mirai.event.events.BotOnlineEvent
import net.mamoe.mirai.event.events.BotReloginEvent
import net.mamoe.mirai.internal.network.framework.AbstractRealNetworkHandlerTest
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext
import net.mamoe.mirai.internal.network.handler.NetworkHandlerFactory
import net.mamoe.mirai.internal.test.assertEventBroadcasts
import net.mamoe.mirai.internal.test.runBlockingUnit
import net.mamoe.mirai.utils.ExceptionCollector
import java.net.SocketAddress
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.time.seconds
internal open class TestNettyNH(
context: NetworkHandlerContext,
address: SocketAddress
) : NettyNetworkHandler(context, address) {
fun setStateClosed(exception: Throwable? = null) {
setState { StateClosed(exception) }
}
fun setStateConnecting(exception: Throwable? = null) {
setState { StateConnecting(ExceptionCollector(exception), false) }
}
}
internal class NettyHandlerEventTest : AbstractRealNetworkHandlerTest<TestNettyNH>() {
val channel = EmbeddedChannel()
val network get() = bot.network as TestNettyNH
override val factory: NetworkHandlerFactory<TestNettyNH> =
object : NetworkHandlerFactory<TestNettyNH> {
override fun create(context: NetworkHandlerContext, address: SocketAddress): TestNettyNH {
return object : TestNettyNH(context, address) {
override suspend fun createConnection(decodePipeline: PacketDecodePipeline): Channel =
channel.apply { setupChannelPipeline(pipeline(), decodePipeline) }
}
}
}
internal class NettyHandlerEventTest : AbstractNettyNHTest() {
@Test
fun `BotOnlineEvent after successful logon`() = runBlockingUnit {
assertEventBroadcasts<BotOnlineEvent> {