diff --git a/README.md b/README.md index 50c8edd..588adc3 100644 --- a/README.md +++ b/README.md @@ -389,21 +389,23 @@ bilibiliClient.liveAPI.roomMessage(roomId).await() 接下来的弹幕都是实时弹幕, 直播间实时弹幕通过 `Websocket` 来推送. ```kotlin -bilibiliClient.liveClient( - roomId = 3, - onConnect = { - println("Connected") - }, - onPopularityPacket = { _, popularity -> - println("Current popularity: $popularity") - }, - onCommandPacket = { _, jsonObject -> - println(jsonObject) - }, - onClose = { _, closeReason -> - println(closeReason) - } -).start() +val job = bilibiliClient.liveClient(roomId = 3) { + onConnect = { + println("Connected") + } + + onPopularityPacket = { _, popularity -> + println("Current popularity: $popularity") + } + + onCommandPacket = { _, jsonObject -> + println(jsonObject) + } + + onClose = { _, closeReason -> + println(closeReason) + } +}.launch() ``` 服务器推送的 `Message` 有两种, 一种是 `人气值` 数据, 另一种是 `Command` 数据. @@ -431,13 +433,13 @@ bilibiliClient.liveClient( onCommandPacket = { _, jsonObject -> val cmd by jsonObject.byString println( - if (cmd == "DANMU_MSG") { - with(DanmakuMessage(jsonObject)) { - "${if (fansMedalInfo.isNotEmpty()) "[$fansMedalName $fansMedalLevel] " else ""}[UL$userLevel] $nickname: $message" - } - } else { - jsonObject.toString() + if (cmd == "DANMU_MSG") { + with(DanmakuMessage(jsonObject)) { + "${if (fansMedalInfo.isNotEmpty()) "[$fansMedalName $fansMedalLevel] " else ""}[UL$userLevel] $nickname: $message" } + } else { + jsonObject.toString() + } ) } ``` @@ -450,32 +452,14 @@ onCommandPacket = { _, jsonObject -> 更多 `Command` 数据包的数据结构详见本项目的 [/record/直播弹幕](record/直播弹幕) 文件夹. -注意, `start()` 方法会 suspend 当前协程直到连接关闭, 如果当前协程还需要执行更多逻辑则如下所示 +注意, `onPopularityPacket`, `onCommandPacket` 这些回调不能进行耗时操作. + +关闭连接 ```kotlin -val liveClient = bilibiliClient.liveClient(args) -launch { liveClient.start() } -println("We do more thing here") -delay(100_000) -liveClient.close() +job.cancel() ``` -如果要实现断线重连, 需要在额外的协程中进行连接操作, 例如 `onClose` 回调如下所示(手动调用 `close()` 方法也会触发 `onClose` 回调, 请用额外变量来记录该次关闭是否是最终用户的行为) - -```kotlin -onClose = { liveClient, closeReason -> - launch { - liveClient.start() - } -} -``` - -如果网络不可达, 那么 `start()` 方法会抛出异常, 且不会触发 `onConnect` 以及 `onClose` 回调. - -如果数据包被中间人修改, 那么可能不会触发 `onConnect` 回调, 但是会触发 `onClose`. - -如果手动 `cancel` 了执行 `start()` 方法的协程将不会触发 `onClose`. - ## 发送直播弹幕 在直播间里发送弹幕也非常简单(必须先登陆) diff --git a/src/main/kotlin/com/hiczp/bilibili/api/BilibiliClient.kt b/src/main/kotlin/com/hiczp/bilibili/api/BilibiliClient.kt index 8ee0c4f..c6b7a5b 100644 --- a/src/main/kotlin/com/hiczp/bilibili/api/BilibiliClient.kt +++ b/src/main/kotlin/com/hiczp/bilibili/api/BilibiliClient.kt @@ -3,8 +3,6 @@ package com.hiczp.bilibili.api import com.hiczp.bilibili.api.app.AppAPI import com.hiczp.bilibili.api.danmaku.DanmakuAPI import com.hiczp.bilibili.api.live.LiveAPI -import com.hiczp.bilibili.api.live.websocket.LiveClient -import com.hiczp.bilibili.api.live.websocket.LiveClientCallbackDSL import com.hiczp.bilibili.api.main.MainAPI import com.hiczp.bilibili.api.member.MemberAPI import com.hiczp.bilibili.api.message.MessageAPI @@ -237,21 +235,6 @@ class BilibiliClient( ) } - /** - * 打开一个直播客户端 - */ - fun liveClient( - roomId: Long, - fetchRoomId: Boolean = true, - fetchDanmakuConfig: Boolean = true, - doEntryRoomAction: Boolean = false, - sendUserOnlineHeart: Boolean = false, - callback: LiveClientCallbackDSL.() -> Unit - ) = LiveClient( - this, roomId, fetchRoomId, fetchDanmakuConfig, doEntryRoomAction, sendUserOnlineHeart, - callback - ) - /** * 登陆 * v3 登陆接口会同时返回 cookies 和 token diff --git a/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt b/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt index 999af53..edff518 100644 --- a/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt +++ b/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt @@ -9,13 +9,16 @@ import io.ktor.client.engine.cio.CIO import io.ktor.client.features.websocket.WebSockets import io.ktor.client.features.websocket.wss import io.ktor.http.cio.websocket.CloseReason -import io.ktor.http.cio.websocket.WebSocketSession +import io.ktor.http.cio.websocket.close import io.ktor.util.InternalAPI import io.ktor.util.KtorExperimentalAPI import io.ktor.util.decodeString +import io.ktor.util.error import kotlinx.coroutines.* import kotlinx.coroutines.channels.consumeEach -import kotlinx.io.errors.IOException +import mu.KotlinLogging + +private val logger = KotlinLogging.logger { } /** * 直播客户端 @@ -31,7 +34,7 @@ import kotlinx.io.errors.IOException @Suppress("CanBeParameter") class LiveClient( private val bilibiliClient: BilibiliClient, - private val maybeShortRoomId: Long, + maybeShortRoomId: Long, private val fetchRoomId: Boolean = true, private val fetchDanmakuConfig: Boolean = true, private val doEntryRoomAction: Boolean = false, @@ -40,7 +43,6 @@ class LiveClient( ) { private val callback = LiveClientCallbackDSL().apply { callback() } private val liveAPI = bilibiliClient.liveAPI - private var websocketSession: WebSocketSession? = null var roomId = maybeShortRoomId private set @@ -49,127 +51,123 @@ class LiveClient( * 开启连接 */ @UseExperimental(KtorExperimentalAPI::class, ObsoleteCoroutinesApi::class, InternalAPI::class) - fun start() { - GlobalScope.launch(CoroutineExceptionHandler { _, throwable -> - callback.onError?.invoke(this, throwable) ?: throwable.printStackTrace() - }) { - //得到原始房间号和主播的用户ID - var anchorUserId = 0L - if (fetchRoomId) { - liveAPI.mobileRoomInit(maybeShortRoomId).await().data.also { - roomId = it.roomId - anchorUserId = it.uid + fun launch() = GlobalScope.launch(CoroutineExceptionHandler { _, throwable -> + callback.onError?.invoke(this, throwable) ?: logger.error(throwable) + }) { + //得到原始房间号和主播的用户ID + var anchorUserId = 0L + if (fetchRoomId) { + liveAPI.mobileRoomInit(roomId).await().data.also { + roomId = it.roomId + anchorUserId = it.uid + } + } + + //获得 wss 地址和端口(推荐服务器) + @Suppress("SpellCheckingInspection") + var host = "broadcastlv.chat.bilibili.com" + var port = 443 + if (fetchDanmakuConfig) { + liveAPI.getDanmakuConfig(roomId).await().data.also { data -> + host = data.host + data.hostServerList.find { it.host == host }?.wssPort?.also { + port = it } } + } - //获得 wss 地址和端口(推荐服务器) - @Suppress("SpellCheckingInspection") - var host = "broadcastlv.chat.bilibili.com" - var port = 443 - if (fetchDanmakuConfig) { - liveAPI.getDanmakuConfig(roomId).await().data.also { data -> - host = data.host - data.hostServerList.find { it.host == host }?.wssPort?.also { - port = it - } + //产生历史记录 + @Suppress("DeferredResultUnused") + if (doEntryRoomAction && bilibiliClient.isLogin) liveAPI.roomEntryAction(roomId) + + //开启 websocket + HttpClient(CIO).config { install(WebSockets) }.wss(host = host, port = port, path = "/sub") { + pingIntervalMillis = 30_000 + timeoutMillis = 10_000 + + //发送进房数据包 + send(PresetPacket.enterRoomPacket(anchorUserId, roomId)) + val enterRoomResponsePacket = incoming.receive().toPackets()[0] + if (enterRoomResponsePacket.packetType == PacketType.ENTER_ROOM_RESPONSE) { + try { + callback.onConnect?.invoke(this@LiveClient) + } catch (e: Exception) { + logger.error(e) } + } else { + //impossible + logger.error { "Receive unreadable server response: $enterRoomResponsePacket" } + close(CloseReason(CloseReason.Codes.NOT_CONSISTENT, "")) + return@wss } - //产生历史记录 - @Suppress("DeferredResultUnused") - if (doEntryRoomAction && bilibiliClient.isLogin) liveAPI.roomEntryAction(roomId) - - //开启 websocket - HttpClient(CIO).config { install(WebSockets) }.wss(host = host, port = port, path = "/sub") { - websocketSession = this - pingIntervalMillis = -1 - - //发送进房数据包 - send(PresetPacket.enterRoomPacket(anchorUserId, roomId)) - val enterRoomResponsePacket = incoming.receive().toPackets()[0] - if (enterRoomResponsePacket.packetType == PacketType.ENTER_ROOM_RESPONSE) { - try { - callback.onConnect?.invoke(this@LiveClient) - } catch (e: Exception) { - e.printStackTrace() + //发送 rest 心跳包 + //五分钟一次 + val restHeartBeatJob = if (sendUserOnlineHeart && bilibiliClient.isLogin) { + launch { + val scale = bilibiliClient.billingClientProperties.scale + while (true) { + @Suppress("DeferredResultUnused") + liveAPI.userOnlineHeart(roomId, scale) + delay(300_000) } - } else { - //impossible - close(IOException("Receive unreadable server response: $enterRoomResponsePacket")) } + } else { + null + } - //发送 rest 心跳包 - //五分钟一次 - val restHeartBeatJob = if (sendUserOnlineHeart && bilibiliClient.isLogin) { - launch { - val scale = bilibiliClient.billingClientProperties.scale - while (true) { - @Suppress("DeferredResultUnused") - liveAPI.userOnlineHeart(roomId, scale) - delay(300_000) - } - } - } else { - null - } - - //发送 websocket 心跳包 - //30 秒一次 - val websocketHeartBeatJob = launch { - //TODO 阻止异常传播 + //发送 websocket 心跳包 + //30 秒一次 + val websocketHeartBeatJob = launch { + try { while (true) { send(PresetPacket.heartbeatPacket()) delay(30_000) } + } catch (ignore: CancellationException) { + //ignore + } catch (e: Exception) { + logger.error(e) } + } - try { - incoming.consumeEach { frame -> - frame.toPackets().forEach { - try { - @Suppress("NON_EXHAUSTIVE_WHEN") - when (it.packetType) { - PacketType.POPULARITY -> callback.onPopularityPacket?.invoke( - this@LiveClient, - it.content.int - ) - PacketType.COMMAND -> callback.onCommandPacket?.invoke( - this@LiveClient, - jsonParser.parse(it.content.decodeString()).obj - ) - } - } catch (e: Exception) { - e.printStackTrace() + try { + incoming.consumeEach { frame -> + frame.toPackets().forEach { + try { + @Suppress("NON_EXHAUSTIVE_WHEN") + when (it.packetType) { + PacketType.POPULARITY -> callback.onPopularityPacket?.invoke( + this@LiveClient, + it.content.int + ) + PacketType.COMMAND -> callback.onCommandPacket?.invoke( + this@LiveClient, + jsonParser.parse(it.content.decodeString()).obj + ) } + } catch (e: Exception) { + logger.error(e) } } - } finally { - restHeartBeatJob?.cancel() - websocketHeartBeatJob.cancel() - launch { - val closeReason = closeReason.await() - try { - callback.onClose?.invoke(this@LiveClient, closeReason) - } catch (e: Exception) { - e.printStackTrace() - } + } + } catch (e: CancellationException) { + close() + } finally { + restHeartBeatJob?.cancel() + websocketHeartBeatJob.cancel() + launch(NonCancellable) { + val closeReason = closeReason.await() + try { + callback.onClose?.invoke(this@LiveClient, closeReason) + } catch (e: Exception) { + logger.error(e) } } } } } - /** - * 关闭连接 - */ - fun close() { - websocketSession?.run { - websocketSession = null - //client 不能使用 close(), 因为 WebsocketSession 本体执行完毕时会自动执行一次 close(), 这会导致多次关闭 - incoming.cancel() - } - } - /** * 发送弹幕 */ @@ -203,3 +201,18 @@ class LiveClientCallbackDSL { */ var onClose: (suspend (LiveClient, CloseReason?) -> Unit)? = null } + +/** + * 打开一个直播客户端 + */ +fun BilibiliClient.liveClient( + roomId: Long, + fetchRoomId: Boolean = true, + fetchDanmakuConfig: Boolean = true, + doEntryRoomAction: Boolean = false, + sendUserOnlineHeart: Boolean = false, + callback: LiveClientCallbackDSL.() -> Unit +) = LiveClient( + this, roomId, fetchRoomId, fetchDanmakuConfig, doEntryRoomAction, sendUserOnlineHeart, + callback +) diff --git a/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt b/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt index 2320af2..5cffe5d 100644 --- a/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt +++ b/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt @@ -3,6 +3,7 @@ package com.hiczp.bilibili.api.test import com.github.salomonbrys.kotson.byString import com.hiczp.bilibili.api.isNotEmpty import com.hiczp.bilibili.api.live.websocket.DanmakuMessage +import com.hiczp.bilibili.api.live.websocket.liveClient import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test @@ -15,15 +16,11 @@ class LiveClientTest { it.toFile().mkdirs() } - bilibiliClient.liveClient(roomId = 3, sendUserOnlineHeart = true) { + val job = bilibiliClient.liveClient(roomId = 3, sendUserOnlineHeart = true) { onConnect = { println("Connected") } - onError = { _, throwable -> - throwable.printStackTrace() - } - onPopularityPacket = { _, popularity -> println("Current popularity: $popularity") } @@ -49,9 +46,9 @@ class LiveClientTest { onClose = { _, closeReason -> println(closeReason) } - }.start() + }.launch() runBlocking { - delay(99999999) + delay(9999999) } } }