添加接收消息超时

This commit is contained in:
John Smith 2021-02-01 23:49:31 +08:00
parent ddc13a6327
commit b395886535
4 changed files with 117 additions and 38 deletions

View File

@ -43,6 +43,8 @@ def init():
class Room(blivedm.BLiveClient):
HEARTBEAT_INTERVAL = 10
# 重新定义parse_XXX是为了减少对字段名的依赖防止B站改字段名
def __parse_danmaku(self, command):
info = command['info']
@ -97,7 +99,7 @@ class Room(blivedm.BLiveClient):
}
def __init__(self, room_id):
super().__init__(room_id, session=_http_session, heartbeat_interval=10)
super().__init__(room_id, session=_http_session, heartbeat_interval=self.HEARTBEAT_INTERVAL)
self.clients: List['ChatHandler'] = []
self.auto_translate_count = 0
@ -365,34 +367,68 @@ class RoomManager:
# noinspection PyAbstractClass
class ChatHandler(tornado.websocket.WebSocketHandler):
HEARTBEAT_INTERVAL = 10
RECEIVE_TIMEOUT = HEARTBEAT_INTERVAL + 5
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._close_on_timeout_future = None
self._heartbeat_timer_handle = None
self._receive_timeout_timer_handle = None
self.room_id = None
self.auto_translate = False
def open(self):
logger.info('Websocket connected %s', self.request.remote_ip)
self._close_on_timeout_future = asyncio.ensure_future(self._close_on_timeout())
self._heartbeat_timer_handle = asyncio.get_event_loop().call_later(
self.HEARTBEAT_INTERVAL, self._on_send_heartbeat
)
self._refresh_receive_timeout_timer()
async def _close_on_timeout(self):
try:
# 超过一定时间还没加入房间则断开
await asyncio.sleep(10)
logger.warning('Client %s joining room timed out', self.request.remote_ip)
self.close()
except (asyncio.CancelledError, tornado.websocket.WebSocketClosedError):
pass
def _on_send_heartbeat(self):
self.send_message(Command.HEARTBEAT, {})
self._heartbeat_timer_handle = asyncio.get_event_loop().call_later(
self.HEARTBEAT_INTERVAL, self._on_send_heartbeat
)
def _refresh_receive_timeout_timer(self):
if self._receive_timeout_timer_handle is not None:
self._receive_timeout_timer_handle.cancel()
self._receive_timeout_timer_handle = asyncio.get_event_loop().call_later(
self.RECEIVE_TIMEOUT, self._on_receive_timeout
)
def _on_receive_timeout(self):
logger.warning('Client %s timed out', self.request.remote_ip)
self._receive_timeout_timer_handle = None
self.close()
def on_close(self):
logger.info('Websocket disconnected %s room: %s', self.request.remote_ip, str(self.room_id))
if self.has_joined_room:
room_manager.del_client(self.room_id, self)
if self._heartbeat_timer_handle is not None:
self._heartbeat_timer_handle.cancel()
self._heartbeat_timer_handle = None
if self._receive_timeout_timer_handle is not None:
self._receive_timeout_timer_handle.cancel()
self._receive_timeout_timer_handle = None
def on_message(self, message):
try:
# 超时没有加入房间也断开
if self.has_joined_room:
self._refresh_receive_timeout_timer()
body = json.loads(message)
cmd = body['cmd']
if cmd == Command.HEARTBEAT:
return
pass
elif cmd == Command.JOIN_ROOM:
if self.has_joined_room:
return
self._refresh_receive_timeout_timer()
self.room_id = int(body['data']['roomId'])
logger.info('Client %s is joining room %d', self.request.remote_ip, self.room_id)
try:
@ -402,21 +438,11 @@ class ChatHandler(tornado.websocket.WebSocketHandler):
pass
asyncio.ensure_future(room_manager.add_client(self.room_id, self))
self._close_on_timeout_future.cancel()
self._close_on_timeout_future = None
else:
logger.warning('Unknown cmd, client: %s, cmd: %d, body: %s', self.request.remote_ip, cmd, body)
except Exception:
logger.exception('on_message error, client: %s, message: %s', self.request.remote_ip, message)
def on_close(self):
logger.info('Websocket disconnected %s room: %s', self.request.remote_ip, str(self.room_id))
if self.has_joined_room:
room_manager.del_client(self.room_id, self)
if self._close_on_timeout_future is not None:
self._close_on_timeout_future.cancel()
self._close_on_timeout_future = None
# 跨域测试用
def check_origin(self, origin):
if self.application.settings['debug']:
@ -432,7 +458,7 @@ class ChatHandler(tornado.websocket.WebSocketHandler):
try:
self.write_message(body)
except tornado.websocket.WebSocketClosedError:
self.on_close()
self.close()
async def on_join_room(self):
if self.application.settings['debug']:

@ -1 +1 @@
Subproject commit 8d8cc8c2706d62bbfa74cbc36f536b9717fe8f36
Subproject commit 13712f89ebb13b9ff9a2cf50c9d6922200538113

View File

@ -6,8 +6,8 @@ import * as avatar from './avatar'
const HEADER_SIZE = 16
// const WS_BODY_PROTOCOL_VERSION_NORMAL = 0
// const WS_BODY_PROTOCOL_VERSION_INT = 1 // 用于心跳包
// const WS_BODY_PROTOCOL_VERSION_INFLATE = 0
// const WS_BODY_PROTOCOL_VERSION_NORMAL = 1
const WS_BODY_PROTOCOL_VERSION_DEFLATE = 2
// const OP_HANDSHAKE = 0
@ -32,6 +32,9 @@ const OP_AUTH_REPLY = 8
// const MinBusinessOp = 1000
// const MaxBusinessOp = 10000
const HEARTBEAT_INTERVAL = 10 * 1000
const RECEIVE_TIMEOUT = HEARTBEAT_INTERVAL + 5 * 1000
let textEncoder = new TextEncoder()
let textDecoder = new TextDecoder()
@ -55,6 +58,7 @@ export default class ChatClientDirect {
this.retryCount = 0
this.isDestroying = false
this.heartbeatTimerId = null
this.receiveTimeoutTimerId = null
}
async start () {
@ -120,15 +124,29 @@ export default class ChatClientDirect {
this.websocket.onopen = this.onWsOpen.bind(this)
this.websocket.onclose = this.onWsClose.bind(this)
this.websocket.onmessage = this.onWsMessage.bind(this)
this.heartbeatTimerId = window.setInterval(this.sendHeartbeat.bind(this), 10 * 1000)
}
onWsOpen () {
this.sendAuth()
this.heartbeatTimerId = window.setInterval(this.sendHeartbeat.bind(this), HEARTBEAT_INTERVAL)
this.refreshReceiveTimeoutTimer()
}
sendHeartbeat () {
this.websocket.send(this.makePacket({}, OP_HEARTBEAT))
}
onWsOpen () {
this.sendAuth()
refreshReceiveTimeoutTimer() {
if (this.receiveTimeoutTimerId) {
window.clearTimeout(this.receiveTimeoutTimerId)
}
this.receiveTimeoutTimerId = window.setTimeout(this.onReceiveTimeout.bind(this), RECEIVE_TIMEOUT)
}
onReceiveTimeout() {
window.console.warn('接收消息超时')
this.receiveTimeoutTimerId = null
this.websocket.close()
}
onWsClose () {
@ -137,19 +155,26 @@ export default class ChatClientDirect {
window.clearInterval(this.heartbeatTimerId)
this.heartbeatTimerId = null
}
if (this.receiveTimeoutTimerId) {
window.clearTimeout(this.receiveTimeoutTimerId)
this.receiveTimeoutTimerId = null
}
if (this.isDestroying) {
return
}
window.console.log(`掉线重连中${++this.retryCount}`)
window.console.warn(`掉线重连中${++this.retryCount}`)
window.setTimeout(this.wsConnect.bind(this), 1000)
}
onWsMessage (event) {
this.refreshReceiveTimeoutTimer()
this.retryCount = 0
if (!(event.data instanceof ArrayBuffer)) {
window.console.warn('未知的websocket消息', event.data)
return
}
let data = new Uint8Array(event.data)
this.handlerMessage(data)
}

View File

@ -7,6 +7,9 @@ const COMMAND_ADD_SUPER_CHAT = 5
const COMMAND_DEL_SUPER_CHAT = 6
const COMMAND_UPDATE_TRANSLATION = 7
const HEARTBEAT_INTERVAL = 10 * 1000
const RECEIVE_TIMEOUT = HEARTBEAT_INTERVAL + 5 * 1000
export default class ChatClientRelay {
constructor (roomId, autoTranslate) {
this.roomId = roomId
@ -23,6 +26,7 @@ export default class ChatClientRelay {
this.retryCount = 0
this.isDestroying = false
this.heartbeatTimerId = null
this.receiveTimeoutTimerId = null
}
start () {
@ -48,13 +52,6 @@ export default class ChatClientRelay {
this.websocket.onopen = this.onWsOpen.bind(this)
this.websocket.onclose = this.onWsClose.bind(this)
this.websocket.onmessage = this.onWsMessage.bind(this)
this.heartbeatTimerId = window.setInterval(this.sendHeartbeat.bind(this), 10 * 1000)
}
sendHeartbeat () {
this.websocket.send(JSON.stringify({
cmd: COMMAND_HEARTBEAT
}))
}
onWsOpen () {
@ -68,6 +65,27 @@ export default class ChatClientRelay {
}
}
}))
this.heartbeatTimerId = window.setInterval(this.sendHeartbeat.bind(this), HEARTBEAT_INTERVAL)
this.refreshReceiveTimeoutTimer()
}
sendHeartbeat () {
this.websocket.send(JSON.stringify({
cmd: COMMAND_HEARTBEAT
}))
}
refreshReceiveTimeoutTimer() {
if (this.receiveTimeoutTimerId) {
window.clearTimeout(this.receiveTimeoutTimerId)
}
this.receiveTimeoutTimerId = window.setTimeout(this.onReceiveTimeout.bind(this), RECEIVE_TIMEOUT)
}
onReceiveTimeout() {
window.console.warn('接收消息超时')
this.receiveTimeoutTimerId = null
this.websocket.close()
}
onWsClose () {
@ -76,16 +94,26 @@ export default class ChatClientRelay {
window.clearInterval(this.heartbeatTimerId)
this.heartbeatTimerId = null
}
if (this.receiveTimeoutTimerId) {
window.clearTimeout(this.receiveTimeoutTimerId)
this.receiveTimeoutTimerId = null
}
if (this.isDestroying) {
return
}
window.console.log(`掉线重连中${++this.retryCount}`)
window.console.warn(`掉线重连中${++this.retryCount}`)
window.setTimeout(this.wsConnect.bind(this), 1000)
}
onWsMessage (event) {
this.refreshReceiveTimeoutTimer()
let {cmd, data} = JSON.parse(event.data)
switch (cmd) {
case COMMAND_HEARTBEAT: {
break
}
case COMMAND_ADD_TEXT: {
if (!this.onAddText) {
break