重构 LiveClient

This commit is contained in:
czp3009 2019-03-28 19:16:00 +08:00
parent c546606a66
commit 0e2a9a0db0
3 changed files with 164 additions and 151 deletions

View File

@ -1,10 +1,10 @@
package com.hiczp.bilibili.api
import com.google.gson.JsonObject
import com.hiczp.bilibili.api.app.AppAPI
import com.hiczp.bilibili.api.danmaku.DanmakuAPI
import com.hiczp.bilibili.api.live.LiveAPI
import com.hiczp.bilibili.api.live.websocket.LiveClient
import com.hiczp.bilibili.api.live.websocket.LiveClientCallbackDSL
import com.hiczp.bilibili.api.main.MainAPI
import com.hiczp.bilibili.api.member.MemberAPI
import com.hiczp.bilibili.api.message.MessageAPI
@ -21,7 +21,6 @@ import com.hiczp.bilibili.api.retrofit.interceptor.FailureResponseInterceptor
import com.hiczp.bilibili.api.retrofit.interceptor.SortAndSignInterceptor
import com.hiczp.bilibili.api.vc.VcAPI
import com.jakewharton.retrofit2.adapter.kotlin.coroutines.CoroutineCallAdapterFactory
import io.ktor.http.cio.websocket.CloseReason
import okhttp3.Interceptor
import okhttp3.OkHttpClient
import okhttp3.logging.HttpLoggingInterceptor
@ -247,13 +246,10 @@ class BilibiliClient(
fetchDanmakuConfig: Boolean = true,
doEntryRoomAction: Boolean = false,
sendUserOnlineHeart: Boolean = false,
onConnect: (LiveClient) -> Unit,
onPopularityPacket: (LiveClient, Int) -> Unit,
onCommandPacket: (LiveClient, JsonObject) -> Unit,
onClose: (LiveClient, CloseReason?) -> Unit
callback: LiveClientCallbackDSL.() -> Unit
) = LiveClient(
this, roomId, fetchRoomId, fetchDanmakuConfig, doEntryRoomAction, sendUserOnlineHeart,
onConnect, onPopularityPacket, onCommandPacket, onClose
callback
)
/**

View File

@ -13,11 +13,8 @@ import io.ktor.http.cio.websocket.WebSocketSession
import io.ktor.util.InternalAPI
import io.ktor.util.KtorExperimentalAPI
import io.ktor.util.decodeString
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.io.core.Closeable
import kotlinx.io.errors.IOException
/**
@ -29,10 +26,7 @@ import kotlinx.io.errors.IOException
* @param fetchDanmakuConfig 是否在连接前先获取弹幕推送服务器地址
* @param doEntryRoomAction 是否产生直播间观看历史记录
* @param sendUserOnlineHeart 是否发送 rest 心跳包, 这会增加观看直播的时长, 用于服务端统计(与弹幕推送无关)
* @param onConnect 回调函数, 连接成功时触发
* @param onPopularityPacket 回调函数, 接收到人气值数据包时触发
* @param onCommandPacket 回调函数, 接收到 Command 数据包时触发
* @param onClose 回调函数, 连接断开时触发
* @param callback 回调
*/
@Suppress("CanBeParameter")
class LiveClient(
@ -42,11 +36,9 @@ class LiveClient(
private val fetchDanmakuConfig: Boolean = true,
private val doEntryRoomAction: Boolean = false,
private val sendUserOnlineHeart: Boolean = false,
private val onConnect: (LiveClient) -> Unit,
private val onPopularityPacket: (LiveClient, Int) -> Unit,
private val onCommandPacket: (LiveClient, JsonObject) -> Unit,
private val onClose: (LiveClient, CloseReason?) -> Unit
) : Closeable {
callback: LiveClientCallbackDSL.() -> Unit
) {
private val callback = LiveClientCallbackDSL().apply { callback() }
private val liveAPI = bilibiliClient.liveAPI
private var websocketSession: WebSocketSession? = null
@ -55,113 +47,114 @@ class LiveClient(
/**
* 开启连接
* 注意此方法将 suspend 所在协程直到连接关闭
*/
@UseExperimental(KtorExperimentalAPI::class, ObsoleteCoroutinesApi::class, InternalAPI::class)
suspend fun start() {
//得到原始房间号和主播的用户ID
var anchorUserId = 0L
if (fetchRoomId) {
liveAPI.mobileRoomInit(maybeShortRoomId).await().data.also {
roomId = it.roomId
anchorUserId = it.uid
}
}
//获得 wss 地址和端口(推荐服务器)
@Suppress("SpellCheckingInspection")
var host = "broadcastlv.chat.bilibili.com"
var port = 443
if (fetchDanmakuConfig) {
liveAPI.getDanmakuConfig(roomId).await().data.also { data ->
host = data.host
data.hostServerList.find { it.host == host }?.wssPort?.also {
port = it
fun start() {
GlobalScope.launch(CoroutineExceptionHandler { _, throwable ->
callback.onError?.invoke(this, throwable) ?: throwable.printStackTrace()
}) {
//得到原始房间号和主播的用户ID
var anchorUserId = 0L
if (fetchRoomId) {
liveAPI.mobileRoomInit(maybeShortRoomId).await().data.also {
roomId = it.roomId
anchorUserId = it.uid
}
}
}
//产生历史记录
@Suppress("DeferredResultUnused")
if (doEntryRoomAction && bilibiliClient.isLogin) liveAPI.roomEntryAction(roomId)
//开启 websocket
HttpClient(CIO).config { install(WebSockets) }.wss(host = host, port = port, path = "/sub") {
websocketSession = this
pingIntervalMillis = -1
//发送进房数据包
send(PresetPacket.enterRoomPacket(anchorUserId, roomId))
if (incoming.receive().toPackets()[0].packetType == PacketType.ENTER_ROOM_RESPONSE) {
try {
onConnect(this@LiveClient)
} catch (e: Exception) {
e.printStackTrace()
}
} else {
//impossible
close(IOException("Receive unreadable server response"))
}
//发送 rest 心跳包
//五分钟一次
val restHeartBeatJob = if (sendUserOnlineHeart && bilibiliClient.isLogin) {
launch {
val scale = bilibiliClient.billingClientProperties.scale
while (true) {
@Suppress("DeferredResultUnused")
liveAPI.userOnlineHeart(roomId, scale)
delay(300_000)
//获得 wss 地址和端口(推荐服务器)
@Suppress("SpellCheckingInspection")
var host = "broadcastlv.chat.bilibili.com"
var port = 443
if (fetchDanmakuConfig) {
liveAPI.getDanmakuConfig(roomId).await().data.also { data ->
host = data.host
data.hostServerList.find { it.host == host }?.wssPort?.also {
port = it
}
}
} else {
null
}
//发送 websocket 心跳包
//30 秒一次
val websocketHeartBeatJob = launch {
while (true) {
send(PresetPacket.heartbeatPacket())
delay(30_000)
//产生历史记录
@Suppress("DeferredResultUnused")
if (doEntryRoomAction && bilibiliClient.isLogin) liveAPI.roomEntryAction(roomId)
//开启 websocket
HttpClient(CIO).config { install(WebSockets) }.wss(host = host, port = port, path = "/sub") {
websocketSession = this
pingIntervalMillis = -1
//发送进房数据包
send(PresetPacket.enterRoomPacket(anchorUserId, roomId))
val enterRoomResponsePacket = incoming.receive().toPackets()[0]
if (enterRoomResponsePacket.packetType == PacketType.ENTER_ROOM_RESPONSE) {
try {
callback.onConnect?.invoke(this@LiveClient)
} catch (e: Exception) {
e.printStackTrace()
}
} else {
//impossible
close(IOException("Receive unreadable server response: $enterRoomResponsePacket"))
}
}
//如果被 cancel, 那么这里将抛出异常
try {
incoming.consumeEach { frame ->
frame.toPackets().forEach {
try {
@Suppress("NON_EXHAUSTIVE_WHEN")
when (it.packetType) {
PacketType.POPULARITY -> onPopularityPacket(
this@LiveClient,
it.content.int
)
PacketType.COMMAND -> onCommandPacket(
this@LiveClient,
jsonParser.parse(it.content.decodeString()).obj
)
//发送 rest 心跳包
//五分钟一次
val restHeartBeatJob = if (sendUserOnlineHeart && bilibiliClient.isLogin) {
launch {
val scale = bilibiliClient.billingClientProperties.scale
while (true) {
@Suppress("DeferredResultUnused")
liveAPI.userOnlineHeart(roomId, scale)
delay(300_000)
}
}
} else {
null
}
//发送 websocket 心跳包
//30 秒一次
val websocketHeartBeatJob = launch {
//TODO 阻止异常传播
while (true) {
send(PresetPacket.heartbeatPacket())
delay(30_000)
}
}
try {
incoming.consumeEach { frame ->
frame.toPackets().forEach {
try {
@Suppress("NON_EXHAUSTIVE_WHEN")
when (it.packetType) {
PacketType.POPULARITY -> callback.onPopularityPacket?.invoke(
this@LiveClient,
it.content.int
)
PacketType.COMMAND -> callback.onCommandPacket?.invoke(
this@LiveClient,
jsonParser.parse(it.content.decodeString()).obj
)
}
} catch (e: Exception) {
e.printStackTrace()
}
}
}
} finally {
restHeartBeatJob?.cancel()
websocketHeartBeatJob.cancel()
launch {
val closeReason = closeReason.await()
try {
callback.onClose?.invoke(this@LiveClient, closeReason)
} catch (e: Exception) {
e.printStackTrace()
}
}
}
} finally {
//无论是连接关闭还是抛出异常, 都要清理掉两个子任务
restHeartBeatJob?.cancel()
websocketHeartBeatJob.cancel()
}
//如果上面抛出了异常, 那么这里就不会被执行
launch {
val closeReason = closeReason.await()
try {
onClose(this@LiveClient, closeReason)
} catch (e: Exception) {
e.printStackTrace()
}
}
}
}
@ -169,7 +162,7 @@ class LiveClient(
/**
* 关闭连接
*/
override fun close() {
fun close() {
websocketSession?.run {
websocketSession = null
//client 不能使用 close(), 因为 WebsocketSession 本体执行完毕时会自动执行一次 close(), 这会导致多次关闭
@ -183,3 +176,30 @@ class LiveClient(
fun sendMessage(message: String) =
liveAPI.sendMessage(cid = roomId, mid = bilibiliClient.userId ?: 0, message = message)
}
class LiveClientCallbackDSL {
/**
* 成功进入房间时触发
*/
var onConnect: (suspend (LiveClient) -> Unit)? = null
/**
* 抛出异常时触发
*/
var onError: ((LiveClient, Throwable) -> Unit)? = null
/**
* 收到人气值数据包
*/
var onPopularityPacket: (suspend (LiveClient, Int) -> Unit)? = null
/**
* 收到 command 数据包
*/
var onCommandPacket: (suspend (LiveClient, JsonObject) -> Unit)? = null
/**
* 连接关闭时触发
*/
var onClose: (suspend (LiveClient, CloseReason?) -> Unit)? = null
}

View File

@ -3,7 +3,7 @@ package com.hiczp.bilibili.api.test
import com.github.salomonbrys.kotson.byString
import com.hiczp.bilibili.api.isNotEmpty
import com.hiczp.bilibili.api.live.websocket.DanmakuMessage
import kotlinx.coroutines.launch
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import java.nio.file.Paths
@ -15,46 +15,43 @@ class LiveClientTest {
it.toFile().mkdirs()
}
runBlocking {
val liveClient = bilibiliClient.liveClient(
roomId = 3,
sendUserOnlineHeart = true,
onConnect = {
println("Connected")
//想要这么做的人一定逻辑学有问题
// launch {
// delay(5_000)
// it.close()
// }
},
onPopularityPacket = { _, popularity ->
println("Current popularity: $popularity")
},
onCommandPacket = { _, jsonObject ->
val json = jsonObject.toString()
val cmd by jsonObject.byString
path.resolve("$cmd.json").toFile().run {
if (!exists()) writeText(json)
}
println(
if (cmd == "DANMU_MSG") {
with(DanmakuMessage(jsonObject)) {
"${if (fansMedalInfo.isNotEmpty()) "[$fansMedalName $fansMedalLevel] " else ""}[UL$userLevel] $nickname: $message"
}
} else {
json
}
)
},
onClose = { _, closeReason ->
println(closeReason)
}
)
val job = launch {
liveClient.start()
bilibiliClient.liveClient(roomId = 3, sendUserOnlineHeart = true) {
onConnect = {
println("Connected")
}
job.join()
onError = { _, throwable ->
throwable.printStackTrace()
}
onPopularityPacket = { _, popularity ->
println("Current popularity: $popularity")
}
onCommandPacket = { _, jsonObject ->
val json = jsonObject.toString()
val cmd by jsonObject.byString
path.resolve("$cmd.json").toFile().run {
if (!exists()) writeText(json)
}
println(
if (cmd == "DANMU_MSG") {
with(DanmakuMessage(jsonObject)) {
"${if (fansMedalInfo.isNotEmpty()) "[$fansMedalName $fansMedalLevel] " else ""}[UL$userLevel] $nickname: $message"
}
} else {
json
}
)
}
onClose = { _, closeReason ->
println(closeReason)
}
}.start()
runBlocking {
delay(99999999)
}
}
}