添加 websocket 超时机制

This commit is contained in:
czp3009 2019-03-29 14:45:33 +08:00
parent bf1cf9c3a3
commit 5faac14b40
4 changed files with 61 additions and 58 deletions

View File

@ -15,7 +15,6 @@ import io.ktor.util.KtorExperimentalAPI
import io.ktor.util.decodeString
import io.ktor.util.error
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.consumeEach
import mu.KotlinLogging
private val logger = KotlinLogging.logger { }
@ -82,9 +81,6 @@ class LiveClient(
//开启 websocket
HttpClient(CIO).config { install(WebSockets) }.wss(host = host, port = port, path = "/sub") {
pingIntervalMillis = 30_000
timeoutMillis = 10_000
//发送进房数据包
send(PresetPacket.enterRoomPacket(anchorUserId, roomId))
val enterRoomResponsePacket = incoming.receive().toPackets()[0]
@ -107,8 +103,9 @@ class LiveClient(
launch {
val scale = bilibiliClient.billingClientProperties.scale
while (true) {
@Suppress("DeferredResultUnused")
liveAPI.userOnlineHeart(roomId, scale)
liveAPI.userOnlineHeart(roomId, scale).invokeOnCompletion {
if (it != null) logger.error(it)
}
delay(300_000)
}
}
@ -132,8 +129,10 @@ class LiveClient(
}
try {
incoming.consumeEach { frame ->
frame.toPackets().forEach {
while (true) {
withTimeout(40_000) {
incoming.receive()
}.toPackets().forEach {
try {
@Suppress("NON_EXHAUSTIVE_WHEN")
when (it.packetType) {
@ -151,12 +150,14 @@ class LiveClient(
}
}
}
} catch (e: TimeoutCancellationException) {
throw e
} catch (e: CancellationException) {
close()
} finally {
restHeartBeatJob?.cancel()
websocketHeartBeatJob.cancel()
launch(NonCancellable) {
launch {
val closeReason = closeReason.await()
try {
callback.onClose?.invoke(this@LiveClient, closeReason)
@ -179,7 +180,7 @@ class LiveClientCallbackDSL {
/**
* 成功进入房间时触发
*/
var onConnect: (suspend (LiveClient) -> Unit)? = null
var onConnect: ((LiveClient) -> Unit)? = null
/**
* 抛出异常时触发
@ -189,17 +190,17 @@ class LiveClientCallbackDSL {
/**
* 收到人气值数据包
*/
var onPopularityPacket: (suspend (LiveClient, Int) -> Unit)? = null
var onPopularityPacket: ((LiveClient, Int) -> Unit)? = null
/**
* 收到 command 数据包
*/
var onCommandPacket: (suspend (LiveClient, JsonObject) -> Unit)? = null
var onCommandPacket: ((LiveClient, JsonObject) -> Unit)? = null
/**
* 连接关闭时触发
*/
var onClose: (suspend (LiveClient, CloseReason?) -> Unit)? = null
var onClose: ((LiveClient, CloseReason?) -> Unit)? = null
}
/**

View File

@ -1,9 +1,7 @@
package com.hiczp.bilibili.api.live.websocket
import com.hiczp.bilibili.api.toPrettyPrintString
import io.ktor.http.cio.websocket.Frame
import io.ktor.http.cio.websocket.WebSocketSession
import io.ktor.http.cio.websocket.readBytes
import java.nio.ByteBuffer
/**
@ -12,14 +10,14 @@ import java.nio.ByteBuffer
* 数据包头部结构 00 00 00 65 00 10 00 01 00 00 00 07 00 00 00 01
* |数据包总长度| |头长| |tag| |数据包类型 | | tag |
*
* @param tagShort 一种 tag, 如果是非 command 数据包则为 1, 否则为 0, short 类型
* @param shortTag 一种 tag, 如果是非 command 数据包则为 1, 否则为 0, short 类型
* @param packetType 数据包类型
* @param tag tagShort, 但是为 int 类型
* @param content 正文内容
*/
@Suppress("MemberVisibilityCanBePrivate")
class Packet(
val tagShort: Short = 1,
data class Packet(
val shortTag: Short = 1,
val packetType: PacketType,
val tag: Int = 1,
val content: ByteBuffer
@ -29,24 +27,18 @@ class Packet(
val headerLength: Short = 0x10
fun toByteBuffer() =
fun toFrame() = Frame.Binary(
true,
ByteBuffer.allocate(totalLength)
.putInt(totalLength)
.putShort(headerLength)
.putShort(tagShort)
.putShort(shortTag)
.putInt(packetType.value)
.putInt(tag)
.put(content).apply {
flip()
}!!
fun toFrame() = Frame.Binary(
true,
toByteBuffer()
)
//for debug
override fun toString() = toFrame().readBytes().toPrettyPrintString()
}
/**
@ -59,14 +51,14 @@ internal fun Frame.toPackets(): List<Packet> {
val startPosition = buffer.position()
val totalLength = buffer.int
buffer.position(buffer.position() + 2) //skip headerLength
val tagShort = buffer.short
val shortTag = buffer.short
val packetType = PacketType.getByValue(buffer.int)
val tag = buffer.int
buffer.limit(startPosition + totalLength)
val content = buffer.slice()
buffer.position(buffer.limit())
buffer.limit(bufferLength)
list.add(Packet(tagShort, packetType, tag, content))
list.add(Packet(shortTag, packetType, tag, content))
}
return list
}

View File

@ -28,7 +28,7 @@ object PresetPacket {
/**
* 心跳包
* 心跳包的正文内容可能是故意的, 为固定值 [object Object]
* 心跳包的正文内容可能是故意的, 为固定值 "[object Object]"
*/
fun heartbeatPacket(content: ByteBuffer = ByteBuffer.wrap("[object Object]".toByteArray())) = Packet(
packetType = PacketType.HEARTBEAT,

View File

@ -1,13 +1,17 @@
package com.hiczp.bilibili.api.test
import com.github.salomonbrys.kotson.byString
import com.hiczp.bilibili.api.BilibiliClient
import com.hiczp.bilibili.api.isNotEmpty
import com.hiczp.bilibili.api.live.websocket.DanmakuMessage
import com.hiczp.bilibili.api.live.websocket.liveClient
import io.ktor.http.cio.websocket.CloseReason
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import okhttp3.logging.HttpLoggingInterceptor
import org.junit.jupiter.api.Test
import java.nio.file.Paths
import java.time.Instant
class LiveClientTest {
@Test
@ -16,39 +20,45 @@ class LiveClientTest {
it.toFile().mkdirs()
}
val job = bilibiliClient.liveClient(roomId = 3, sendUserOnlineHeart = true) {
onConnect = {
println("Connected")
}
onPopularityPacket = { _, popularity ->
println("Current popularity: $popularity")
}
onCommandPacket = { _, jsonObject ->
val json = jsonObject.toString()
val cmd by jsonObject.byString
path.resolve("$cmd.json").toFile().run {
if (!exists()) writeText(json)
BilibiliClient(logLevel = HttpLoggingInterceptor.Level.BASIC)
.apply {
loginResponse = bilibiliClient.loginResponse
}
.liveClient(roomId = 3, sendUserOnlineHeart = true) {
onConnect = {
println("Connected ${Instant.now()}")
}
println(
if (cmd == "DANMU_MSG") {
with(DanmakuMessage(jsonObject)) {
"${if (fansMedalInfo.isNotEmpty()) "[$fansMedalName $fansMedalLevel] " else ""}[UL$userLevel] $nickname: $message"
}
} else {
json
onPopularityPacket = { _, popularity ->
println("Current popularity: $popularity ${Instant.now()}")
}
onCommandPacket = { _, jsonObject ->
val json = jsonObject.toString()
val cmd by jsonObject.byString
path.resolve("$cmd.json").toFile().run {
if (!exists()) writeText(json)
}
)
}
onClose = { _, closeReason ->
println(closeReason)
}
}.launch()
println(
if (cmd == "DANMU_MSG") {
with(DanmakuMessage(jsonObject)) {
"${if (fansMedalInfo.isNotEmpty()) "[$fansMedalName $fansMedalLevel] " else ""}[UL$userLevel] $nickname: $message"
}
} else {
json
}
)
}
onClose = { liveClient, closeReason ->
println("$closeReason ${Instant.now()}")
if (closeReason?.code != CloseReason.Codes.NORMAL.code) liveClient.launch()
}
}.launch()
runBlocking {
delay(9999999)
delay(99999999999999999)
}
}
}