diff --git a/README.md b/README.md index 7294b79..9b79f33 100644 --- a/README.md +++ b/README.md @@ -472,7 +472,7 @@ onClose = { liveClient, closeReason -> 如果数据包被中间人修改, 那么可能不会触发 `onConnect` 回调, 但是会触发 `onClose`. -如果手动取消了执行 `start()` 方法的协程将不会触发 `onClose`. +如果手动 `cancel` 了执行 `start()` 方法的协程将不会触发 `onClose`. ## 发送直播弹幕 在直播间里发送弹幕也非常简单(必须先登陆) diff --git a/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt b/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt index c9292b7..a65c61e 100644 --- a/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt +++ b/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt @@ -1,5 +1,6 @@ package com.hiczp.bilibili.api.live.websocket +import com.github.salomonbrys.kotson.obj import com.google.gson.JsonObject import com.hiczp.bilibili.api.BilibiliClient import com.hiczp.bilibili.api.jsonParser @@ -13,14 +14,15 @@ import io.ktor.util.InternalAPI import io.ktor.util.KtorExperimentalAPI import io.ktor.util.decodeString import kotlinx.coroutines.ObsoleteCoroutinesApi -import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.delay import kotlinx.coroutines.launch +import kotlinx.io.core.Closeable import kotlinx.io.errors.IOException /** * 直播客户端 + * 注意该类是有状态的 * * @param maybeShortRoomId 可能为短房间号的房间号 * @param fetchRoomId 是否在连接前先获取房间号(长号) @@ -44,7 +46,7 @@ class LiveClient( private val onPopularityPacket: (LiveClient, Int) -> Unit, private val onCommandPacket: (LiveClient, JsonObject) -> Unit, private val onClose: (LiveClient, CloseReason?) -> Unit -) { +) : Closeable { private val liveAPI = bilibiliClient.liveAPI private var websocketSession: WebSocketSession? = null @@ -102,6 +104,7 @@ class LiveClient( } //发送 rest 心跳包 + //五分钟一次 val restHeartBeatJob = if (sendUserOnlineHeart && bilibiliClient.isLogin) { launch { val scale = bilibiliClient.billingClientProperties.scale @@ -116,6 +119,7 @@ class LiveClient( } //发送 websocket 心跳包 + //30 秒一次 val websocketHeartBeatJob = launch { while (true) { send(PresetPacket.heartbeatPacket()) @@ -123,23 +127,34 @@ class LiveClient( } } - incoming.consumeEach { frame -> - frame.toPackets().forEach { - try { - @Suppress("NON_EXHAUSTIVE_WHEN") - when (it.packetType) { - PacketType.POPULARITY -> onPopularityPacket(this@LiveClient, it.content.int) - PacketType.COMMAND -> onCommandPacket(this@LiveClient, jsonParser.parse(it.content.decodeString()).asJsonObject) + //如果被 cancel, 那么这里将抛出异常 + try { + incoming.consumeEach { frame -> + frame.toPackets().forEach { + try { + @Suppress("NON_EXHAUSTIVE_WHEN") + when (it.packetType) { + PacketType.POPULARITY -> onPopularityPacket( + this@LiveClient, + it.content.int + ) + PacketType.COMMAND -> onCommandPacket( + this@LiveClient, + jsonParser.parse(it.content.decodeString()).obj + ) + } + } catch (e: Exception) { + e.printStackTrace() } - } catch (e: Exception) { - e.printStackTrace() } } + } finally { + //无论是连接关闭还是抛出异常, 都要清理掉两个子任务 + restHeartBeatJob?.cancel() + websocketHeartBeatJob.cancel() } - restHeartBeatJob?.cancelAndJoin() - websocketHeartBeatJob.cancelAndJoin() - + //如果上面抛出了异常, 那么这里就不会被执行 launch { val closeReason = closeReason.await() try { @@ -154,11 +169,13 @@ class LiveClient( /** * 关闭连接 */ - fun close() = websocketSession?.run { - websocketSession = null - //client 不能使用 close(), 因为 WebsocketSession 本体执行完毕时会自动执行一次 close(), 这会导致多次关闭 - incoming.cancel() - } ?: Unit + override fun close() { + websocketSession?.run { + websocketSession = null + //client 不能使用 close(), 因为 WebsocketSession 本体执行完毕时会自动执行一次 close(), 这会导致多次关闭 + incoming.cancel() + } + } /** * 发送弹幕 diff --git a/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt b/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt index 5fe336b..f4a4d58 100644 --- a/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt +++ b/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt @@ -3,6 +3,7 @@ package com.hiczp.bilibili.api.test import com.github.salomonbrys.kotson.byString import com.hiczp.bilibili.api.isNotEmpty import com.hiczp.bilibili.api.live.websocket.DanmakuMessage +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test import java.nio.file.Paths @@ -15,7 +16,7 @@ class LiveClientTest { } runBlocking { - bilibiliClient.liveClient( + val liveClient = bilibiliClient.liveClient( roomId = 3, sendUserOnlineHeart = true, onConnect = { @@ -49,7 +50,11 @@ class LiveClientTest { onClose = { _, closeReason -> println(closeReason) } - ).start() + ) + val job = launch { + liveClient.start() + } + job.join() } } }