diff --git a/src/main/kotlin/com/hiczp/bilibili/api/BilibiliClient.kt b/src/main/kotlin/com/hiczp/bilibili/api/BilibiliClient.kt index e0ab659..8ee0c4f 100644 --- a/src/main/kotlin/com/hiczp/bilibili/api/BilibiliClient.kt +++ b/src/main/kotlin/com/hiczp/bilibili/api/BilibiliClient.kt @@ -1,10 +1,10 @@ package com.hiczp.bilibili.api -import com.google.gson.JsonObject import com.hiczp.bilibili.api.app.AppAPI import com.hiczp.bilibili.api.danmaku.DanmakuAPI import com.hiczp.bilibili.api.live.LiveAPI import com.hiczp.bilibili.api.live.websocket.LiveClient +import com.hiczp.bilibili.api.live.websocket.LiveClientCallbackDSL import com.hiczp.bilibili.api.main.MainAPI import com.hiczp.bilibili.api.member.MemberAPI import com.hiczp.bilibili.api.message.MessageAPI @@ -21,7 +21,6 @@ import com.hiczp.bilibili.api.retrofit.interceptor.FailureResponseInterceptor import com.hiczp.bilibili.api.retrofit.interceptor.SortAndSignInterceptor import com.hiczp.bilibili.api.vc.VcAPI import com.jakewharton.retrofit2.adapter.kotlin.coroutines.CoroutineCallAdapterFactory -import io.ktor.http.cio.websocket.CloseReason import okhttp3.Interceptor import okhttp3.OkHttpClient import okhttp3.logging.HttpLoggingInterceptor @@ -247,13 +246,10 @@ class BilibiliClient( fetchDanmakuConfig: Boolean = true, doEntryRoomAction: Boolean = false, sendUserOnlineHeart: Boolean = false, - onConnect: (LiveClient) -> Unit, - onPopularityPacket: (LiveClient, Int) -> Unit, - onCommandPacket: (LiveClient, JsonObject) -> Unit, - onClose: (LiveClient, CloseReason?) -> Unit + callback: LiveClientCallbackDSL.() -> Unit ) = LiveClient( this, roomId, fetchRoomId, fetchDanmakuConfig, doEntryRoomAction, sendUserOnlineHeart, - onConnect, onPopularityPacket, onCommandPacket, onClose + callback ) /** diff --git a/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt b/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt index a65c61e..999af53 100644 --- a/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt +++ b/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt @@ -13,11 +13,8 @@ import io.ktor.http.cio.websocket.WebSocketSession import io.ktor.util.InternalAPI import io.ktor.util.KtorExperimentalAPI import io.ktor.util.decodeString -import kotlinx.coroutines.ObsoleteCoroutinesApi +import kotlinx.coroutines.* import kotlinx.coroutines.channels.consumeEach -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.io.core.Closeable import kotlinx.io.errors.IOException /** @@ -29,10 +26,7 @@ import kotlinx.io.errors.IOException * @param fetchDanmakuConfig 是否在连接前先获取弹幕推送服务器地址 * @param doEntryRoomAction 是否产生直播间观看历史记录 * @param sendUserOnlineHeart 是否发送 rest 心跳包, 这会增加观看直播的时长, 用于服务端统计(与弹幕推送无关) - * @param onConnect 回调函数, 连接成功时触发 - * @param onPopularityPacket 回调函数, 接收到人气值数据包时触发 - * @param onCommandPacket 回调函数, 接收到 Command 数据包时触发 - * @param onClose 回调函数, 连接断开时触发 + * @param callback 回调 */ @Suppress("CanBeParameter") class LiveClient( @@ -42,11 +36,9 @@ class LiveClient( private val fetchDanmakuConfig: Boolean = true, private val doEntryRoomAction: Boolean = false, private val sendUserOnlineHeart: Boolean = false, - private val onConnect: (LiveClient) -> Unit, - private val onPopularityPacket: (LiveClient, Int) -> Unit, - private val onCommandPacket: (LiveClient, JsonObject) -> Unit, - private val onClose: (LiveClient, CloseReason?) -> Unit -) : Closeable { + callback: LiveClientCallbackDSL.() -> Unit +) { + private val callback = LiveClientCallbackDSL().apply { callback() } private val liveAPI = bilibiliClient.liveAPI private var websocketSession: WebSocketSession? = null @@ -55,113 +47,114 @@ class LiveClient( /** * 开启连接 - * 注意此方法将 suspend 所在协程直到连接关闭 */ @UseExperimental(KtorExperimentalAPI::class, ObsoleteCoroutinesApi::class, InternalAPI::class) - suspend fun start() { - //得到原始房间号和主播的用户ID - var anchorUserId = 0L - if (fetchRoomId) { - liveAPI.mobileRoomInit(maybeShortRoomId).await().data.also { - roomId = it.roomId - anchorUserId = it.uid - } - } - - //获得 wss 地址和端口(推荐服务器) - @Suppress("SpellCheckingInspection") - var host = "broadcastlv.chat.bilibili.com" - var port = 443 - if (fetchDanmakuConfig) { - liveAPI.getDanmakuConfig(roomId).await().data.also { data -> - host = data.host - data.hostServerList.find { it.host == host }?.wssPort?.also { - port = it + fun start() { + GlobalScope.launch(CoroutineExceptionHandler { _, throwable -> + callback.onError?.invoke(this, throwable) ?: throwable.printStackTrace() + }) { + //得到原始房间号和主播的用户ID + var anchorUserId = 0L + if (fetchRoomId) { + liveAPI.mobileRoomInit(maybeShortRoomId).await().data.also { + roomId = it.roomId + anchorUserId = it.uid } } - } - //产生历史记录 - @Suppress("DeferredResultUnused") - if (doEntryRoomAction && bilibiliClient.isLogin) liveAPI.roomEntryAction(roomId) - - //开启 websocket - HttpClient(CIO).config { install(WebSockets) }.wss(host = host, port = port, path = "/sub") { - websocketSession = this - pingIntervalMillis = -1 - - //发送进房数据包 - send(PresetPacket.enterRoomPacket(anchorUserId, roomId)) - if (incoming.receive().toPackets()[0].packetType == PacketType.ENTER_ROOM_RESPONSE) { - try { - onConnect(this@LiveClient) - } catch (e: Exception) { - e.printStackTrace() - } - } else { - //impossible - close(IOException("Receive unreadable server response")) - } - - //发送 rest 心跳包 - //五分钟一次 - val restHeartBeatJob = if (sendUserOnlineHeart && bilibiliClient.isLogin) { - launch { - val scale = bilibiliClient.billingClientProperties.scale - while (true) { - @Suppress("DeferredResultUnused") - liveAPI.userOnlineHeart(roomId, scale) - delay(300_000) + //获得 wss 地址和端口(推荐服务器) + @Suppress("SpellCheckingInspection") + var host = "broadcastlv.chat.bilibili.com" + var port = 443 + if (fetchDanmakuConfig) { + liveAPI.getDanmakuConfig(roomId).await().data.also { data -> + host = data.host + data.hostServerList.find { it.host == host }?.wssPort?.also { + port = it } } - } else { - null } - //发送 websocket 心跳包 - //30 秒一次 - val websocketHeartBeatJob = launch { - while (true) { - send(PresetPacket.heartbeatPacket()) - delay(30_000) + //产生历史记录 + @Suppress("DeferredResultUnused") + if (doEntryRoomAction && bilibiliClient.isLogin) liveAPI.roomEntryAction(roomId) + + //开启 websocket + HttpClient(CIO).config { install(WebSockets) }.wss(host = host, port = port, path = "/sub") { + websocketSession = this + pingIntervalMillis = -1 + + //发送进房数据包 + send(PresetPacket.enterRoomPacket(anchorUserId, roomId)) + val enterRoomResponsePacket = incoming.receive().toPackets()[0] + if (enterRoomResponsePacket.packetType == PacketType.ENTER_ROOM_RESPONSE) { + try { + callback.onConnect?.invoke(this@LiveClient) + } catch (e: Exception) { + e.printStackTrace() + } + } else { + //impossible + close(IOException("Receive unreadable server response: $enterRoomResponsePacket")) } - } - //如果被 cancel, 那么这里将抛出异常 - try { - incoming.consumeEach { frame -> - frame.toPackets().forEach { - try { - @Suppress("NON_EXHAUSTIVE_WHEN") - when (it.packetType) { - PacketType.POPULARITY -> onPopularityPacket( - this@LiveClient, - it.content.int - ) - PacketType.COMMAND -> onCommandPacket( - this@LiveClient, - jsonParser.parse(it.content.decodeString()).obj - ) + //发送 rest 心跳包 + //五分钟一次 + val restHeartBeatJob = if (sendUserOnlineHeart && bilibiliClient.isLogin) { + launch { + val scale = bilibiliClient.billingClientProperties.scale + while (true) { + @Suppress("DeferredResultUnused") + liveAPI.userOnlineHeart(roomId, scale) + delay(300_000) + } + } + } else { + null + } + + //发送 websocket 心跳包 + //30 秒一次 + val websocketHeartBeatJob = launch { + //TODO 阻止异常传播 + while (true) { + send(PresetPacket.heartbeatPacket()) + delay(30_000) + } + } + + try { + incoming.consumeEach { frame -> + frame.toPackets().forEach { + try { + @Suppress("NON_EXHAUSTIVE_WHEN") + when (it.packetType) { + PacketType.POPULARITY -> callback.onPopularityPacket?.invoke( + this@LiveClient, + it.content.int + ) + PacketType.COMMAND -> callback.onCommandPacket?.invoke( + this@LiveClient, + jsonParser.parse(it.content.decodeString()).obj + ) + } + } catch (e: Exception) { + e.printStackTrace() } + } + } + } finally { + restHeartBeatJob?.cancel() + websocketHeartBeatJob.cancel() + launch { + val closeReason = closeReason.await() + try { + callback.onClose?.invoke(this@LiveClient, closeReason) } catch (e: Exception) { e.printStackTrace() } } } - } finally { - //无论是连接关闭还是抛出异常, 都要清理掉两个子任务 - restHeartBeatJob?.cancel() - websocketHeartBeatJob.cancel() - } - - //如果上面抛出了异常, 那么这里就不会被执行 - launch { - val closeReason = closeReason.await() - try { - onClose(this@LiveClient, closeReason) - } catch (e: Exception) { - e.printStackTrace() - } } } } @@ -169,7 +162,7 @@ class LiveClient( /** * 关闭连接 */ - override fun close() { + fun close() { websocketSession?.run { websocketSession = null //client 不能使用 close(), 因为 WebsocketSession 本体执行完毕时会自动执行一次 close(), 这会导致多次关闭 @@ -183,3 +176,30 @@ class LiveClient( fun sendMessage(message: String) = liveAPI.sendMessage(cid = roomId, mid = bilibiliClient.userId ?: 0, message = message) } + +class LiveClientCallbackDSL { + /** + * 成功进入房间时触发 + */ + var onConnect: (suspend (LiveClient) -> Unit)? = null + + /** + * 抛出异常时触发 + */ + var onError: ((LiveClient, Throwable) -> Unit)? = null + + /** + * 收到人气值数据包 + */ + var onPopularityPacket: (suspend (LiveClient, Int) -> Unit)? = null + + /** + * 收到 command 数据包 + */ + var onCommandPacket: (suspend (LiveClient, JsonObject) -> Unit)? = null + + /** + * 连接关闭时触发 + */ + var onClose: (suspend (LiveClient, CloseReason?) -> Unit)? = null +} diff --git a/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt b/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt index f4a4d58..2320af2 100644 --- a/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt +++ b/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt @@ -3,7 +3,7 @@ package com.hiczp.bilibili.api.test import com.github.salomonbrys.kotson.byString import com.hiczp.bilibili.api.isNotEmpty import com.hiczp.bilibili.api.live.websocket.DanmakuMessage -import kotlinx.coroutines.launch +import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test import java.nio.file.Paths @@ -15,46 +15,43 @@ class LiveClientTest { it.toFile().mkdirs() } - runBlocking { - val liveClient = bilibiliClient.liveClient( - roomId = 3, - sendUserOnlineHeart = true, - onConnect = { - println("Connected") - //想要这么做的人一定逻辑学有问题 -// launch { -// delay(5_000) -// it.close() -// } - }, - onPopularityPacket = { _, popularity -> - println("Current popularity: $popularity") - }, - onCommandPacket = { _, jsonObject -> - val json = jsonObject.toString() - val cmd by jsonObject.byString - path.resolve("$cmd.json").toFile().run { - if (!exists()) writeText(json) - } - - println( - if (cmd == "DANMU_MSG") { - with(DanmakuMessage(jsonObject)) { - "${if (fansMedalInfo.isNotEmpty()) "[$fansMedalName $fansMedalLevel] " else ""}[UL$userLevel] $nickname: $message" - } - } else { - json - } - ) - }, - onClose = { _, closeReason -> - println(closeReason) - } - ) - val job = launch { - liveClient.start() + bilibiliClient.liveClient(roomId = 3, sendUserOnlineHeart = true) { + onConnect = { + println("Connected") } - job.join() + + onError = { _, throwable -> + throwable.printStackTrace() + } + + onPopularityPacket = { _, popularity -> + println("Current popularity: $popularity") + } + + onCommandPacket = { _, jsonObject -> + val json = jsonObject.toString() + val cmd by jsonObject.byString + path.resolve("$cmd.json").toFile().run { + if (!exists()) writeText(json) + } + + println( + if (cmd == "DANMU_MSG") { + with(DanmakuMessage(jsonObject)) { + "${if (fansMedalInfo.isNotEmpty()) "[$fansMedalName $fansMedalLevel] " else ""}[UL$userLevel] $nickname: $message" + } + } else { + json + } + ) + } + + onClose = { _, closeReason -> + println(closeReason) + } + }.start() + runBlocking { + delay(99999999) } } }