diff --git a/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt b/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt index edff518..7405a8b 100644 --- a/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt +++ b/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt @@ -15,7 +15,6 @@ import io.ktor.util.KtorExperimentalAPI import io.ktor.util.decodeString import io.ktor.util.error import kotlinx.coroutines.* -import kotlinx.coroutines.channels.consumeEach import mu.KotlinLogging private val logger = KotlinLogging.logger { } @@ -82,9 +81,6 @@ class LiveClient( //开启 websocket HttpClient(CIO).config { install(WebSockets) }.wss(host = host, port = port, path = "/sub") { - pingIntervalMillis = 30_000 - timeoutMillis = 10_000 - //发送进房数据包 send(PresetPacket.enterRoomPacket(anchorUserId, roomId)) val enterRoomResponsePacket = incoming.receive().toPackets()[0] @@ -107,8 +103,9 @@ class LiveClient( launch { val scale = bilibiliClient.billingClientProperties.scale while (true) { - @Suppress("DeferredResultUnused") - liveAPI.userOnlineHeart(roomId, scale) + liveAPI.userOnlineHeart(roomId, scale).invokeOnCompletion { + if (it != null) logger.error(it) + } delay(300_000) } } @@ -132,8 +129,10 @@ class LiveClient( } try { - incoming.consumeEach { frame -> - frame.toPackets().forEach { + while (true) { + withTimeout(40_000) { + incoming.receive() + }.toPackets().forEach { try { @Suppress("NON_EXHAUSTIVE_WHEN") when (it.packetType) { @@ -151,12 +150,14 @@ class LiveClient( } } } + } catch (e: TimeoutCancellationException) { + throw e } catch (e: CancellationException) { close() } finally { restHeartBeatJob?.cancel() websocketHeartBeatJob.cancel() - launch(NonCancellable) { + launch { val closeReason = closeReason.await() try { callback.onClose?.invoke(this@LiveClient, closeReason) @@ -179,7 +180,7 @@ class LiveClientCallbackDSL { /** * 成功进入房间时触发 */ - var onConnect: (suspend (LiveClient) -> Unit)? = null + var onConnect: ((LiveClient) -> Unit)? = null /** * 抛出异常时触发 @@ -189,17 +190,17 @@ class LiveClientCallbackDSL { /** * 收到人气值数据包 */ - var onPopularityPacket: (suspend (LiveClient, Int) -> Unit)? = null + var onPopularityPacket: ((LiveClient, Int) -> Unit)? = null /** * 收到 command 数据包 */ - var onCommandPacket: (suspend (LiveClient, JsonObject) -> Unit)? = null + var onCommandPacket: ((LiveClient, JsonObject) -> Unit)? = null /** * 连接关闭时触发 */ - var onClose: (suspend (LiveClient, CloseReason?) -> Unit)? = null + var onClose: ((LiveClient, CloseReason?) -> Unit)? = null } /** diff --git a/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/Packet.kt b/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/Packet.kt index 846e922..8202ddd 100644 --- a/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/Packet.kt +++ b/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/Packet.kt @@ -1,9 +1,7 @@ package com.hiczp.bilibili.api.live.websocket -import com.hiczp.bilibili.api.toPrettyPrintString import io.ktor.http.cio.websocket.Frame import io.ktor.http.cio.websocket.WebSocketSession -import io.ktor.http.cio.websocket.readBytes import java.nio.ByteBuffer /** @@ -12,14 +10,14 @@ import java.nio.ByteBuffer * 数据包头部结构 00 00 00 65 00 10 00 01 00 00 00 07 00 00 00 01 * |数据包总长度| |头长| |tag| |数据包类型 | | tag | * - * @param tagShort 一种 tag, 如果是非 command 数据包则为 1, 否则为 0, short 类型 + * @param shortTag 一种 tag, 如果是非 command 数据包则为 1, 否则为 0, short 类型 * @param packetType 数据包类型 * @param tag 同 tagShort, 但是为 int 类型 * @param content 正文内容 */ @Suppress("MemberVisibilityCanBePrivate") -class Packet( - val tagShort: Short = 1, +data class Packet( + val shortTag: Short = 1, val packetType: PacketType, val tag: Int = 1, val content: ByteBuffer @@ -29,24 +27,18 @@ class Packet( val headerLength: Short = 0x10 - fun toByteBuffer() = + fun toFrame() = Frame.Binary( + true, ByteBuffer.allocate(totalLength) .putInt(totalLength) .putShort(headerLength) - .putShort(tagShort) + .putShort(shortTag) .putInt(packetType.value) .putInt(tag) .put(content).apply { flip() }!! - - fun toFrame() = Frame.Binary( - true, - toByteBuffer() ) - - //for debug - override fun toString() = toFrame().readBytes().toPrettyPrintString() } /** @@ -59,14 +51,14 @@ internal fun Frame.toPackets(): List { val startPosition = buffer.position() val totalLength = buffer.int buffer.position(buffer.position() + 2) //skip headerLength - val tagShort = buffer.short + val shortTag = buffer.short val packetType = PacketType.getByValue(buffer.int) val tag = buffer.int buffer.limit(startPosition + totalLength) val content = buffer.slice() buffer.position(buffer.limit()) buffer.limit(bufferLength) - list.add(Packet(tagShort, packetType, tag, content)) + list.add(Packet(shortTag, packetType, tag, content)) } return list } diff --git a/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/PresetPacket.kt b/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/PresetPacket.kt index 0a6c316..e557b95 100644 --- a/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/PresetPacket.kt +++ b/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/PresetPacket.kt @@ -28,7 +28,7 @@ object PresetPacket { /** * 心跳包 - * 心跳包的正文内容可能是故意的, 为固定值 [object Object] + * 心跳包的正文内容可能是故意的, 为固定值 "[object Object]" */ fun heartbeatPacket(content: ByteBuffer = ByteBuffer.wrap("[object Object]".toByteArray())) = Packet( packetType = PacketType.HEARTBEAT, diff --git a/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt b/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt index 5cffe5d..1372972 100644 --- a/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt +++ b/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt @@ -1,13 +1,17 @@ package com.hiczp.bilibili.api.test import com.github.salomonbrys.kotson.byString +import com.hiczp.bilibili.api.BilibiliClient import com.hiczp.bilibili.api.isNotEmpty import com.hiczp.bilibili.api.live.websocket.DanmakuMessage import com.hiczp.bilibili.api.live.websocket.liveClient +import io.ktor.http.cio.websocket.CloseReason import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking +import okhttp3.logging.HttpLoggingInterceptor import org.junit.jupiter.api.Test import java.nio.file.Paths +import java.time.Instant class LiveClientTest { @Test @@ -16,39 +20,45 @@ class LiveClientTest { it.toFile().mkdirs() } - val job = bilibiliClient.liveClient(roomId = 3, sendUserOnlineHeart = true) { - onConnect = { - println("Connected") - } - - onPopularityPacket = { _, popularity -> - println("Current popularity: $popularity") - } - - onCommandPacket = { _, jsonObject -> - val json = jsonObject.toString() - val cmd by jsonObject.byString - path.resolve("$cmd.json").toFile().run { - if (!exists()) writeText(json) + BilibiliClient(logLevel = HttpLoggingInterceptor.Level.BASIC) + .apply { + loginResponse = bilibiliClient.loginResponse } + .liveClient(roomId = 3, sendUserOnlineHeart = true) { + onConnect = { + println("Connected ${Instant.now()}") + } - println( - if (cmd == "DANMU_MSG") { - with(DanmakuMessage(jsonObject)) { - "${if (fansMedalInfo.isNotEmpty()) "[$fansMedalName $fansMedalLevel] " else ""}[UL$userLevel] $nickname: $message" - } - } else { - json + onPopularityPacket = { _, popularity -> + println("Current popularity: $popularity ${Instant.now()}") + } + + onCommandPacket = { _, jsonObject -> + val json = jsonObject.toString() + val cmd by jsonObject.byString + path.resolve("$cmd.json").toFile().run { + if (!exists()) writeText(json) } - ) - } - onClose = { _, closeReason -> - println(closeReason) - } - }.launch() + println( + if (cmd == "DANMU_MSG") { + with(DanmakuMessage(jsonObject)) { + "${if (fansMedalInfo.isNotEmpty()) "[$fansMedalName $fansMedalLevel] " else ""}[UL$userLevel] $nickname: $message" + } + } else { + json + } + ) + } + + onClose = { liveClient, closeReason -> + println("$closeReason ${Instant.now()}") + if (closeReason?.code != CloseReason.Codes.NORMAL.code) liveClient.launch() + } + }.launch() + runBlocking { - delay(9999999) + delay(99999999999999999) } } }