优化代码

This commit is contained in:
czp3009 2019-03-29 00:24:32 +08:00
parent 0e2a9a0db0
commit bf1cf9c3a3
4 changed files with 145 additions and 168 deletions

View File

@ -389,21 +389,23 @@ bilibiliClient.liveAPI.roomMessage(roomId).await()
接下来的弹幕都是实时弹幕, 直播间实时弹幕通过 `Websocket` 来推送.
```kotlin
bilibiliClient.liveClient(
roomId = 3,
onConnect = {
println("Connected")
},
onPopularityPacket = { _, popularity ->
println("Current popularity: $popularity")
},
onCommandPacket = { _, jsonObject ->
println(jsonObject)
},
onClose = { _, closeReason ->
println(closeReason)
}
).start()
val job = bilibiliClient.liveClient(roomId = 3) {
onConnect = {
println("Connected")
}
onPopularityPacket = { _, popularity ->
println("Current popularity: $popularity")
}
onCommandPacket = { _, jsonObject ->
println(jsonObject)
}
onClose = { _, closeReason ->
println(closeReason)
}
}.launch()
```
服务器推送的 `Message` 有两种, 一种是 `人气值` 数据, 另一种是 `Command` 数据.
@ -431,13 +433,13 @@ bilibiliClient.liveClient(
onCommandPacket = { _, jsonObject ->
val cmd by jsonObject.byString
println(
if (cmd == "DANMU_MSG") {
with(DanmakuMessage(jsonObject)) {
"${if (fansMedalInfo.isNotEmpty()) "[$fansMedalName $fansMedalLevel] " else ""}[UL$userLevel] $nickname: $message"
}
} else {
jsonObject.toString()
if (cmd == "DANMU_MSG") {
with(DanmakuMessage(jsonObject)) {
"${if (fansMedalInfo.isNotEmpty()) "[$fansMedalName $fansMedalLevel] " else ""}[UL$userLevel] $nickname: $message"
}
} else {
jsonObject.toString()
}
)
}
```
@ -450,32 +452,14 @@ onCommandPacket = { _, jsonObject ->
更多 `Command` 数据包的数据结构详见本项目的 [/record/直播弹幕](record/直播弹幕) 文件夹.
注意, `start()` 方法会 suspend 当前协程直到连接关闭, 如果当前协程还需要执行更多逻辑则如下所示
注意, `onPopularityPacket`, `onCommandPacket` 这些回调不能进行耗时操作.
关闭连接
```kotlin
val liveClient = bilibiliClient.liveClient(args)
launch { liveClient.start() }
println("We do more thing here")
delay(100_000)
liveClient.close()
job.cancel()
```
如果要实现断线重连, 需要在额外的协程中进行连接操作, 例如 `onClose` 回调如下所示(手动调用 `close()` 方法也会触发 `onClose` 回调, 请用额外变量来记录该次关闭是否是最终用户的行为)
```kotlin
onClose = { liveClient, closeReason ->
launch {
liveClient.start()
}
}
```
如果网络不可达, 那么 `start()` 方法会抛出异常, 且不会触发 `onConnect` 以及 `onClose` 回调.
如果数据包被中间人修改, 那么可能不会触发 `onConnect` 回调, 但是会触发 `onClose`.
如果手动 `cancel` 了执行 `start()` 方法的协程将不会触发 `onClose`.
## 发送直播弹幕
在直播间里发送弹幕也非常简单(必须先登陆)

View File

@ -3,8 +3,6 @@ package com.hiczp.bilibili.api
import com.hiczp.bilibili.api.app.AppAPI
import com.hiczp.bilibili.api.danmaku.DanmakuAPI
import com.hiczp.bilibili.api.live.LiveAPI
import com.hiczp.bilibili.api.live.websocket.LiveClient
import com.hiczp.bilibili.api.live.websocket.LiveClientCallbackDSL
import com.hiczp.bilibili.api.main.MainAPI
import com.hiczp.bilibili.api.member.MemberAPI
import com.hiczp.bilibili.api.message.MessageAPI
@ -237,21 +235,6 @@ class BilibiliClient(
)
}
/**
* 打开一个直播客户端
*/
fun liveClient(
roomId: Long,
fetchRoomId: Boolean = true,
fetchDanmakuConfig: Boolean = true,
doEntryRoomAction: Boolean = false,
sendUserOnlineHeart: Boolean = false,
callback: LiveClientCallbackDSL.() -> Unit
) = LiveClient(
this, roomId, fetchRoomId, fetchDanmakuConfig, doEntryRoomAction, sendUserOnlineHeart,
callback
)
/**
* 登陆
* v3 登陆接口会同时返回 cookies token

View File

@ -9,13 +9,16 @@ import io.ktor.client.engine.cio.CIO
import io.ktor.client.features.websocket.WebSockets
import io.ktor.client.features.websocket.wss
import io.ktor.http.cio.websocket.CloseReason
import io.ktor.http.cio.websocket.WebSocketSession
import io.ktor.http.cio.websocket.close
import io.ktor.util.InternalAPI
import io.ktor.util.KtorExperimentalAPI
import io.ktor.util.decodeString
import io.ktor.util.error
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.consumeEach
import kotlinx.io.errors.IOException
import mu.KotlinLogging
private val logger = KotlinLogging.logger { }
/**
* 直播客户端
@ -31,7 +34,7 @@ import kotlinx.io.errors.IOException
@Suppress("CanBeParameter")
class LiveClient(
private val bilibiliClient: BilibiliClient,
private val maybeShortRoomId: Long,
maybeShortRoomId: Long,
private val fetchRoomId: Boolean = true,
private val fetchDanmakuConfig: Boolean = true,
private val doEntryRoomAction: Boolean = false,
@ -40,7 +43,6 @@ class LiveClient(
) {
private val callback = LiveClientCallbackDSL().apply { callback() }
private val liveAPI = bilibiliClient.liveAPI
private var websocketSession: WebSocketSession? = null
var roomId = maybeShortRoomId
private set
@ -49,127 +51,123 @@ class LiveClient(
* 开启连接
*/
@UseExperimental(KtorExperimentalAPI::class, ObsoleteCoroutinesApi::class, InternalAPI::class)
fun start() {
GlobalScope.launch(CoroutineExceptionHandler { _, throwable ->
callback.onError?.invoke(this, throwable) ?: throwable.printStackTrace()
}) {
//得到原始房间号和主播的用户ID
var anchorUserId = 0L
if (fetchRoomId) {
liveAPI.mobileRoomInit(maybeShortRoomId).await().data.also {
roomId = it.roomId
anchorUserId = it.uid
fun launch() = GlobalScope.launch(CoroutineExceptionHandler { _, throwable ->
callback.onError?.invoke(this, throwable) ?: logger.error(throwable)
}) {
//得到原始房间号和主播的用户ID
var anchorUserId = 0L
if (fetchRoomId) {
liveAPI.mobileRoomInit(roomId).await().data.also {
roomId = it.roomId
anchorUserId = it.uid
}
}
//获得 wss 地址和端口(推荐服务器)
@Suppress("SpellCheckingInspection")
var host = "broadcastlv.chat.bilibili.com"
var port = 443
if (fetchDanmakuConfig) {
liveAPI.getDanmakuConfig(roomId).await().data.also { data ->
host = data.host
data.hostServerList.find { it.host == host }?.wssPort?.also {
port = it
}
}
}
//获得 wss 地址和端口(推荐服务器)
@Suppress("SpellCheckingInspection")
var host = "broadcastlv.chat.bilibili.com"
var port = 443
if (fetchDanmakuConfig) {
liveAPI.getDanmakuConfig(roomId).await().data.also { data ->
host = data.host
data.hostServerList.find { it.host == host }?.wssPort?.also {
port = it
}
//产生历史记录
@Suppress("DeferredResultUnused")
if (doEntryRoomAction && bilibiliClient.isLogin) liveAPI.roomEntryAction(roomId)
//开启 websocket
HttpClient(CIO).config { install(WebSockets) }.wss(host = host, port = port, path = "/sub") {
pingIntervalMillis = 30_000
timeoutMillis = 10_000
//发送进房数据包
send(PresetPacket.enterRoomPacket(anchorUserId, roomId))
val enterRoomResponsePacket = incoming.receive().toPackets()[0]
if (enterRoomResponsePacket.packetType == PacketType.ENTER_ROOM_RESPONSE) {
try {
callback.onConnect?.invoke(this@LiveClient)
} catch (e: Exception) {
logger.error(e)
}
} else {
//impossible
logger.error { "Receive unreadable server response: $enterRoomResponsePacket" }
close(CloseReason(CloseReason.Codes.NOT_CONSISTENT, ""))
return@wss
}
//产生历史记录
@Suppress("DeferredResultUnused")
if (doEntryRoomAction && bilibiliClient.isLogin) liveAPI.roomEntryAction(roomId)
//开启 websocket
HttpClient(CIO).config { install(WebSockets) }.wss(host = host, port = port, path = "/sub") {
websocketSession = this
pingIntervalMillis = -1
//发送进房数据包
send(PresetPacket.enterRoomPacket(anchorUserId, roomId))
val enterRoomResponsePacket = incoming.receive().toPackets()[0]
if (enterRoomResponsePacket.packetType == PacketType.ENTER_ROOM_RESPONSE) {
try {
callback.onConnect?.invoke(this@LiveClient)
} catch (e: Exception) {
e.printStackTrace()
//发送 rest 心跳包
//五分钟一次
val restHeartBeatJob = if (sendUserOnlineHeart && bilibiliClient.isLogin) {
launch {
val scale = bilibiliClient.billingClientProperties.scale
while (true) {
@Suppress("DeferredResultUnused")
liveAPI.userOnlineHeart(roomId, scale)
delay(300_000)
}
} else {
//impossible
close(IOException("Receive unreadable server response: $enterRoomResponsePacket"))
}
} else {
null
}
//发送 rest 心跳包
//五分钟一次
val restHeartBeatJob = if (sendUserOnlineHeart && bilibiliClient.isLogin) {
launch {
val scale = bilibiliClient.billingClientProperties.scale
while (true) {
@Suppress("DeferredResultUnused")
liveAPI.userOnlineHeart(roomId, scale)
delay(300_000)
}
}
} else {
null
}
//发送 websocket 心跳包
//30 秒一次
val websocketHeartBeatJob = launch {
//TODO 阻止异常传播
//发送 websocket 心跳包
//30 秒一次
val websocketHeartBeatJob = launch {
try {
while (true) {
send(PresetPacket.heartbeatPacket())
delay(30_000)
}
} catch (ignore: CancellationException) {
//ignore
} catch (e: Exception) {
logger.error(e)
}
}
try {
incoming.consumeEach { frame ->
frame.toPackets().forEach {
try {
@Suppress("NON_EXHAUSTIVE_WHEN")
when (it.packetType) {
PacketType.POPULARITY -> callback.onPopularityPacket?.invoke(
this@LiveClient,
it.content.int
)
PacketType.COMMAND -> callback.onCommandPacket?.invoke(
this@LiveClient,
jsonParser.parse(it.content.decodeString()).obj
)
}
} catch (e: Exception) {
e.printStackTrace()
try {
incoming.consumeEach { frame ->
frame.toPackets().forEach {
try {
@Suppress("NON_EXHAUSTIVE_WHEN")
when (it.packetType) {
PacketType.POPULARITY -> callback.onPopularityPacket?.invoke(
this@LiveClient,
it.content.int
)
PacketType.COMMAND -> callback.onCommandPacket?.invoke(
this@LiveClient,
jsonParser.parse(it.content.decodeString()).obj
)
}
} catch (e: Exception) {
logger.error(e)
}
}
} finally {
restHeartBeatJob?.cancel()
websocketHeartBeatJob.cancel()
launch {
val closeReason = closeReason.await()
try {
callback.onClose?.invoke(this@LiveClient, closeReason)
} catch (e: Exception) {
e.printStackTrace()
}
}
} catch (e: CancellationException) {
close()
} finally {
restHeartBeatJob?.cancel()
websocketHeartBeatJob.cancel()
launch(NonCancellable) {
val closeReason = closeReason.await()
try {
callback.onClose?.invoke(this@LiveClient, closeReason)
} catch (e: Exception) {
logger.error(e)
}
}
}
}
}
/**
* 关闭连接
*/
fun close() {
websocketSession?.run {
websocketSession = null
//client 不能使用 close(), 因为 WebsocketSession 本体执行完毕时会自动执行一次 close(), 这会导致多次关闭
incoming.cancel()
}
}
/**
* 发送弹幕
*/
@ -203,3 +201,18 @@ class LiveClientCallbackDSL {
*/
var onClose: (suspend (LiveClient, CloseReason?) -> Unit)? = null
}
/**
* 打开一个直播客户端
*/
fun BilibiliClient.liveClient(
roomId: Long,
fetchRoomId: Boolean = true,
fetchDanmakuConfig: Boolean = true,
doEntryRoomAction: Boolean = false,
sendUserOnlineHeart: Boolean = false,
callback: LiveClientCallbackDSL.() -> Unit
) = LiveClient(
this, roomId, fetchRoomId, fetchDanmakuConfig, doEntryRoomAction, sendUserOnlineHeart,
callback
)

View File

@ -3,6 +3,7 @@ package com.hiczp.bilibili.api.test
import com.github.salomonbrys.kotson.byString
import com.hiczp.bilibili.api.isNotEmpty
import com.hiczp.bilibili.api.live.websocket.DanmakuMessage
import com.hiczp.bilibili.api.live.websocket.liveClient
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
@ -15,15 +16,11 @@ class LiveClientTest {
it.toFile().mkdirs()
}
bilibiliClient.liveClient(roomId = 3, sendUserOnlineHeart = true) {
val job = bilibiliClient.liveClient(roomId = 3, sendUserOnlineHeart = true) {
onConnect = {
println("Connected")
}
onError = { _, throwable ->
throwable.printStackTrace()
}
onPopularityPacket = { _, popularity ->
println("Current popularity: $popularity")
}
@ -49,9 +46,9 @@ class LiveClientTest {
onClose = { _, closeReason ->
println(closeReason)
}
}.start()
}.launch()
runBlocking {
delay(99999999)
delay(9999999)
}
}
}