Add SupervisorJob

This commit is contained in:
Him188 2019-11-20 19:11:04 +08:00
parent 8cb29df51b
commit 4ea98e1a8a
2 changed files with 22 additions and 13 deletions

View File

@ -2,7 +2,7 @@ package net.mamoe.mirai.network
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancelChildren
import net.mamoe.mirai.Bot
import net.mamoe.mirai.network.protocol.tim.TIMBotNetworkHandler.BotSocketAdapter
@ -48,6 +48,8 @@ interface BotNetworkHandler<Socket : DataPacketSocketAdapter> : CoroutineScope {
val socket: Socket
val bot: Bot
val supervisor get() = SupervisorJob()
/**
* 得到 [PacketHandler].
* `get(EventPacketHandler)` 返回 [EventPacketHandler]
@ -85,8 +87,6 @@ interface BotNetworkHandler<Socket : DataPacketSocketAdapter> : CoroutineScope {
* 关闭网络接口, 停止所有有关协程和任务
*/
suspend fun close(cause: Throwable? = null) {
val job = coroutineContext[Job]
checkNotNull(job) { "Job should not be null because there will always be a SupervisorJob. There may be a internal mistake" }
job.cancelChildren(CancellationException("handler closed", cause))
supervisor.cancelChildren(CancellationException("handler closed", cause))
}
}

View File

@ -41,10 +41,11 @@ internal expect val NetworkDispatcher: CoroutineDispatcher
internal class TIMBotNetworkHandler internal constructor(coroutineContext: CoroutineContext, override inline val bot: Bot) :
BotNetworkHandler<TIMBotNetworkHandler.BotSocketAdapter>, PacketHandlerList() {
override val coroutineContext: CoroutineContext =
coroutineContext + NetworkDispatcher + CoroutineExceptionHandler { _, e ->
bot.logger.error("An exception was thrown in a coroutine under TIMBotNetworkHandler", e)
} + SupervisorJob()
} + supervisor
override lateinit var socket: BotSocketAdapter
@ -70,7 +71,11 @@ internal class TIMBotNetworkHandler internal constructor(coroutineContext: Corou
return withContext(this.coroutineContext) {
TIMProtocol.SERVER_IP.forEach { ip ->
bot.logger.info("Connecting server $ip")
socket = BotSocketAdapter(ip, configuration)
try {
socket = BotSocketAdapter(ip, configuration)
} catch (e: Exception) {
return@withContext LoginResult.NETWORK_UNAVAILABLE
}
loginResult = CompletableDeferred()
@ -139,9 +144,7 @@ internal class TIMBotNetworkHandler internal constructor(coroutineContext: Corou
try {
channel.read(buffer)// JVM: withContext(IO)
} catch (e: ClosedChannelException) {
withContext(userContext) {
close()
}
close()
return
} catch (e: ReadPacketInternalException) {
bot.logger.error("Socket channel read failed: ${e.message}")
@ -278,11 +281,17 @@ internal class TIMBotNetworkHandler internal constructor(coroutineContext: Corou
try {
built.readAvailable(buffer)
val shouldBeSent = buffer.readRemaining
check(channel.send(buffer) == shouldBeSent) { "Buffer is not entirely sent. Required sent length=$shouldBeSent, but after channel.send, buffer remains ${buffer.readBytes().toUHexString()}" }//JVM: withContext(IO)
check(channel.send(buffer) == shouldBeSent) {
"Buffer is not entirely sent. " +
"Required sent length=$shouldBeSent, but after channel.send, " +
"buffer remains ${buffer.readBytes().toUHexString()}"
}//JVM: withContext(IO)
} catch (e: SendPacketInternalException) {
bot.logger.error("Caught SendPacketInternalException: ${e.cause?.message}")
if (e.cause !is CancellationException) {
bot.logger.error("Caught SendPacketInternalException: ${e.cause?.message}")
}
withContext(userContext) {
GlobalScope.launch(userContext) {
bot.reinitializeNetworkHandler(configuration, e)
}
return@withContext
@ -513,7 +522,7 @@ internal class TIMBotNetworkHandler internal constructor(coroutineContext: Corou
HeartbeatPacket(
bot.qqAccount,
sessionKey
).sendAndExpectAsync<HeartbeatPacketResponse>().join()
).sendAndExpect<HeartbeatPacketResponse>()
} == null) {
bot.logger.warning("Heartbeat timed out")
bot.reinitializeNetworkHandler(configuration, HeartbeatTimeoutException())