From fa3b1d974c3542aad201e267e2145df5c8162170 Mon Sep 17 00:00:00 2001 From: czp3009 Date: Fri, 22 Mar 2019 19:41:11 +0800 Subject: [PATCH] live client test --- .../com/hiczp/bilibili/api/BilibiliClient.kt | 11 ++ .../com/hiczp/bilibili/api/GlobalConstant.kt | 8 ++ .../{StreamExtension.kt => IOExtension.kt} | 4 + .../bilibili/api/live/websocket/LiveClient.kt | 99 ++++++++++++++ .../bilibili/api/live/websocket/Packet.kt | 125 ++++++++++++++++++ .../interceptor/FailureResponseInterceptor.kt | 8 +- .../com/hiczp/bilibili/api/test/Config.kt | 5 +- .../hiczp/bilibili/api/test/LiveClientTest.kt | 13 ++ 8 files changed, 263 insertions(+), 10 deletions(-) create mode 100644 src/main/kotlin/com/hiczp/bilibili/api/GlobalConstant.kt rename src/main/kotlin/com/hiczp/bilibili/api/{StreamExtension.kt => IOExtension.kt} (86%) create mode 100644 src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt create mode 100644 src/main/kotlin/com/hiczp/bilibili/api/live/websocket/Packet.kt create mode 100644 src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt diff --git a/src/main/kotlin/com/hiczp/bilibili/api/BilibiliClient.kt b/src/main/kotlin/com/hiczp/bilibili/api/BilibiliClient.kt index f5f0afb..f2f6078 100644 --- a/src/main/kotlin/com/hiczp/bilibili/api/BilibiliClient.kt +++ b/src/main/kotlin/com/hiczp/bilibili/api/BilibiliClient.kt @@ -3,6 +3,7 @@ package com.hiczp.bilibili.api import com.hiczp.bilibili.api.app.AppAPI import com.hiczp.bilibili.api.danmaku.DanmakuAPI import com.hiczp.bilibili.api.live.LiveAPI +import com.hiczp.bilibili.api.live.websocket.LiveClient import com.hiczp.bilibili.api.main.MainAPI import com.hiczp.bilibili.api.member.MemberAPI import com.hiczp.bilibili.api.message.MessageAPI @@ -237,6 +238,16 @@ class BilibiliClient( ) } + /** + * 打开一个直播客户端 + */ + fun liveClient( + roomId: Long, + fetchRoomId: Boolean = true, + fetchDanmakuConfig: Boolean = true, + doEntryRoomAction: Boolean = false + ) = LiveClient(this, roomId, fetchRoomId, fetchDanmakuConfig, doEntryRoomAction) + /** * 登陆 * v3 登陆接口会同时返回 cookies 和 token diff --git a/src/main/kotlin/com/hiczp/bilibili/api/GlobalConstant.kt b/src/main/kotlin/com/hiczp/bilibili/api/GlobalConstant.kt new file mode 100644 index 0000000..092a00d --- /dev/null +++ b/src/main/kotlin/com/hiczp/bilibili/api/GlobalConstant.kt @@ -0,0 +1,8 @@ +package com.hiczp.bilibili.api + +import com.google.gson.Gson +import com.google.gson.JsonParser + +internal val gson = Gson() + +internal val jsonParser = JsonParser() diff --git a/src/main/kotlin/com/hiczp/bilibili/api/StreamExtension.kt b/src/main/kotlin/com/hiczp/bilibili/api/IOExtension.kt similarity index 86% rename from src/main/kotlin/com/hiczp/bilibili/api/StreamExtension.kt rename to src/main/kotlin/com/hiczp/bilibili/api/IOExtension.kt index 08d87da..0bb4771 100644 --- a/src/main/kotlin/com/hiczp/bilibili/api/StreamExtension.kt +++ b/src/main/kotlin/com/hiczp/bilibili/api/IOExtension.kt @@ -1,5 +1,6 @@ package com.hiczp.bilibili.api +import io.ktor.util.InternalAPI import org.apache.commons.io.IOUtils import org.apache.commons.io.input.BoundedInputStream import org.apache.commons.io.input.BoundedReader @@ -33,3 +34,6 @@ fun InputStream.bounded(size: Long) = BoundedInputStream(this, size) @UseExperimental(ExperimentalUnsignedTypes::class) fun InputStream.bounded(size: UInt) = bounded(size.toLong()) + +@UseExperimental(InternalAPI::class) +internal fun ByteArray.toPrettyPrintString() = joinToString(prefix = "[", postfix = "]") { "%02x".format(it) } diff --git a/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt b/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt new file mode 100644 index 0000000..8b764ff --- /dev/null +++ b/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt @@ -0,0 +1,99 @@ +package com.hiczp.bilibili.api.live.websocket + +import com.hiczp.bilibili.api.BilibiliClient +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO +import io.ktor.client.features.websocket.WebSockets +import io.ktor.client.features.websocket.wss +import io.ktor.util.KtorExperimentalAPI +import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.channels.map +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.io.errors.IOException +import java.math.BigInteger + +/** + * 直播客户端 + * + * @param maybeShortRoomId 可能为短房间号的房间号 + * @param fetchRoomId 是否在连接前先获取房间号(长号) + * @param fetchDanmakuConfig 是否在连接前先获取弹幕推送服务器地址 + * @param doEntryRoomAction 是否产生直播间观看历史记录 + */ +class LiveClient( + private val bilibiliClient: BilibiliClient, + private val maybeShortRoomId: Long, + private val fetchRoomId: Boolean = true, + private val fetchDanmakuConfig: Boolean = true, + private val doEntryRoomAction: Boolean = false +) { + var roomId = maybeShortRoomId + private set + + @UseExperimental(KtorExperimentalAPI::class, kotlinx.coroutines.ObsoleteCoroutinesApi::class) + suspend fun start() { + val liveAPI = bilibiliClient.liveAPI + + //得到原始房间号和房间主用户ID + var anchorUserId = 0L + if (fetchRoomId) { + liveAPI.mobileRoomInit(maybeShortRoomId).await().data.also { + roomId = it.roomId + anchorUserId = it.uid + } + } + + //获得 wss 地址和端口(推荐服务器) + @Suppress("SpellCheckingInspection") + var host = "tx-hk-live-comet-01.chat.bilibili.com" + var port = 443 + if (fetchDanmakuConfig) { + liveAPI.getDanmakuConfig(roomId).await().data.also { data -> + host = data.host + data.hostServerList.find { it.host == host }?.wssPort?.also { + port = it + } + } + } + + //产生历史记录 + @Suppress("DeferredResultUnused") + if (doEntryRoomAction) liveAPI.roomEntryAction(roomId) + + //开启 websocket + HttpClient(CIO).config { install(WebSockets) }.wss(host = host, port = port, path = "/sub") { + pingIntervalMillis = -1 + + try { + //发送进房数据包 + send(PresetPacket.enterRoomPacket(anchorUserId, roomId)) + if (incoming.receive().toPacket().packetType != PacketType.ENTER_ROOM_RESPONSE) { + //impossible + close(IOException("Receive incorrect server response")) + } + + //发送心跳包 + launch { + while (true) { + send(PresetPacket.heartbeatPacket()) + delay(30_000) + } + } + + incoming.map { it.toPacket() }.consumeEach { + println( + when (it.packetType) { + PacketType.POPULARITY -> "Current popularity: ${BigInteger(it.content).longValueExact()}" + PacketType.COMMAND -> "${it.getJsonContent()}" + else -> "Other packet: $it" + } + ) + } + + } catch (e: Exception) { + e.printStackTrace() + } + } + } +} diff --git a/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/Packet.kt b/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/Packet.kt new file mode 100644 index 0000000..0e25acd --- /dev/null +++ b/src/main/kotlin/com/hiczp/bilibili/api/live/websocket/Packet.kt @@ -0,0 +1,125 @@ +package com.hiczp.bilibili.api.live.websocket + +import com.github.salomonbrys.kotson.jsonObject +import com.google.gson.JsonElement +import com.hiczp.bilibili.api.jsonParser +import com.hiczp.bilibili.api.toPrettyPrintString +import io.ktor.http.cio.websocket.Frame +import io.ktor.http.cio.websocket.WebSocketSession +import io.ktor.http.cio.websocket.readBytes +import io.ktor.util.InternalAPI +import io.ktor.util.moveToByteArray +import java.nio.ByteBuffer + +/** + * 数据包模型 + * 由于 Android APP 并未全线换成 wss, 以下用的是移动版网页的协议 + * + * @param packetType 数据包类型 + * @param content 正文内容 + * @param protocolVersion 协议版本 + * @param sequence 序列号, 似乎总为 1 + */ +@Suppress("MemberVisibilityCanBePrivate") +class Packet( + val packetType: PacketType, + val content: ByteArray, + val protocolVersion: Short = 1, + val sequence: Int = 1 +) { + constructor( + packetType: PacketType, + content: JsonElement, + protocolVersion: Short = 1, + sequence: Int = 1 + ) : this(packetType, content.toString().toByteArray(), protocolVersion, sequence) + + val totalLength: Int + get() = headerLength + content.size + + val headerLength: Short = 16 + + fun getJsonContent() = jsonParser.parse(content.toString(Charsets.UTF_8))!! + + fun toFrame() = Frame.Binary( + true, + ByteBuffer.allocate(totalLength) + .putInt(totalLength) + .putShort(headerLength) + .putShort(protocolVersion) + .putInt(packetType.value) + .putInt(sequence) + .put(content) + .flip() + ) + + override fun toString() = toFrame().readBytes().toPrettyPrintString() + + companion object { + @UseExperimental(InternalAPI::class) + fun fromFrame(frame: Frame) = + with(frame.buffer) { + int + short + val protocolVersion = short + val packetType = PacketType.getByValue(int) + val sequence = int + val content = moveToByteArray() + Packet(packetType, content, protocolVersion, sequence) + } + } +} + +enum class PacketType(val value: Int) { + //impossible + UNKNOWN(0), + + HEARTBEAT(2), + + POPULARITY(3), + + COMMAND(5), + + ENTER_ROOM(7), + + ENTER_ROOM_RESPONSE(8); + + companion object { + fun getByValue(value: Int) = values().firstOrNull { it.value == value } ?: UNKNOWN + } +} + +/** + * 预设数据包 + */ +object PresetPacket { + /** + * 进房数据包 + * {"uid":50333369,"roomid":14073662,"protover":0} + * + * @param anchorUserId 房间主的用户 ID + * @param roomId 房间号 + */ + @Suppress("SpellCheckingInspection") + fun enterRoomPacket(anchorUserId: Long, roomId: Long) = Packet( + PacketType.ENTER_ROOM, + jsonObject( + "uid" to anchorUserId, + "roomid" to roomId, + "protover" to 0 //该值总为 0 + ) + ) + + /** + * 心跳包 + * 心跳包的正文内容可能是故意的, 为固定值 [object Object] + */ + fun heartbeatPacket(content: ByteArray = "[object Object]".toByteArray()) = Packet( + PacketType.HEARTBEAT, + content + ) +} + +internal fun Frame.toPacket() = Packet.fromFrame(this) + +internal suspend inline fun WebSocketSession.send(packet: Packet) = send(packet.toFrame()) diff --git a/src/main/kotlin/com/hiczp/bilibili/api/retrofit/interceptor/FailureResponseInterceptor.kt b/src/main/kotlin/com/hiczp/bilibili/api/retrofit/interceptor/FailureResponseInterceptor.kt index e2b3cc4..4eb0ed1 100644 --- a/src/main/kotlin/com/hiczp/bilibili/api/retrofit/interceptor/FailureResponseInterceptor.kt +++ b/src/main/kotlin/com/hiczp/bilibili/api/retrofit/interceptor/FailureResponseInterceptor.kt @@ -3,8 +3,8 @@ package com.hiczp.bilibili.api.retrofit.interceptor import com.github.salomonbrys.kotson.fromJson import com.github.salomonbrys.kotson.int import com.github.salomonbrys.kotson.obj -import com.google.gson.Gson -import com.google.gson.JsonParser +import com.hiczp.bilibili.api.gson +import com.hiczp.bilibili.api.jsonParser import com.hiczp.bilibili.api.retrofit.exception.BilibiliApiException import okhttp3.Interceptor import okhttp3.Response @@ -13,10 +13,6 @@ import okhttp3.Response * 如果服务器返回的 code 不为 0 则抛出异常 */ object FailureResponseInterceptor : Interceptor { - private val jsonParser = JsonParser() - @Suppress("SpellCheckingInspection") - private val gson = Gson() - override fun intercept(chain: Interceptor.Chain): Response { val response = chain.proceed(chain.request()) val body = response.body() diff --git a/src/test/kotlin/com/hiczp/bilibili/api/test/Config.kt b/src/test/kotlin/com/hiczp/bilibili/api/test/Config.kt index 009523b..fbec339 100644 --- a/src/test/kotlin/com/hiczp/bilibili/api/test/Config.kt +++ b/src/test/kotlin/com/hiczp/bilibili/api/test/Config.kt @@ -2,15 +2,12 @@ package com.hiczp.bilibili.api.test import com.github.salomonbrys.kotson.byString import com.github.salomonbrys.kotson.fromJson -import com.google.gson.Gson import com.google.gson.JsonObject import com.hiczp.bilibili.api.BilibiliClient +import com.hiczp.bilibili.api.gson import okhttp3.logging.HttpLoggingInterceptor //配置文件 -@Suppress("SpellCheckingInspection") -private val gson = Gson() - private val config = gson.fromJson( Config::class.java.getResourceAsStream("/config.json").reader() ) diff --git a/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt b/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt new file mode 100644 index 0000000..b4a062e --- /dev/null +++ b/src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt @@ -0,0 +1,13 @@ +package com.hiczp.bilibili.api.test + +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Test + +class LiveClientTest { + @Test + fun liveClient() { + runBlocking { + bilibiliClient.liveClient(roomId = 3).start() + } + } +}