diff --git a/api/chat.py b/api/chat.py index 9a282f4..355aece 100644 --- a/api/chat.py +++ b/api/chat.py @@ -43,6 +43,8 @@ def init(): class Room(blivedm.BLiveClient): + HEARTBEAT_INTERVAL = 10 + # 重新定义parse_XXX是为了减少对字段名的依赖,防止B站改字段名 def __parse_danmaku(self, command): info = command['info'] @@ -97,7 +99,7 @@ class Room(blivedm.BLiveClient): } def __init__(self, room_id): - super().__init__(room_id, session=_http_session, heartbeat_interval=10) + super().__init__(room_id, session=_http_session, heartbeat_interval=self.HEARTBEAT_INTERVAL) self.clients: List['ChatHandler'] = [] self.auto_translate_count = 0 @@ -365,34 +367,68 @@ class RoomManager: # noinspection PyAbstractClass class ChatHandler(tornado.websocket.WebSocketHandler): + HEARTBEAT_INTERVAL = 10 + RECEIVE_TIMEOUT = HEARTBEAT_INTERVAL + 5 + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self._close_on_timeout_future = None + self._heartbeat_timer_handle = None + self._receive_timeout_timer_handle = None + self.room_id = None self.auto_translate = False def open(self): logger.info('Websocket connected %s', self.request.remote_ip) - self._close_on_timeout_future = asyncio.ensure_future(self._close_on_timeout()) + self._heartbeat_timer_handle = asyncio.get_event_loop().call_later( + self.HEARTBEAT_INTERVAL, self._on_send_heartbeat + ) + self._refresh_receive_timeout_timer() - async def _close_on_timeout(self): - try: - # 超过一定时间还没加入房间则断开 - await asyncio.sleep(10) - logger.warning('Client %s joining room timed out', self.request.remote_ip) - self.close() - except (asyncio.CancelledError, tornado.websocket.WebSocketClosedError): - pass + def _on_send_heartbeat(self): + self.send_message(Command.HEARTBEAT, {}) + self._heartbeat_timer_handle = asyncio.get_event_loop().call_later( + self.HEARTBEAT_INTERVAL, self._on_send_heartbeat + ) + + def _refresh_receive_timeout_timer(self): + if self._receive_timeout_timer_handle is not None: + self._receive_timeout_timer_handle.cancel() + self._receive_timeout_timer_handle = asyncio.get_event_loop().call_later( + self.RECEIVE_TIMEOUT, self._on_receive_timeout + ) + + def _on_receive_timeout(self): + logger.warning('Client %s timed out', self.request.remote_ip) + self._receive_timeout_timer_handle = None + self.close() + + def on_close(self): + logger.info('Websocket disconnected %s room: %s', self.request.remote_ip, str(self.room_id)) + if self.has_joined_room: + room_manager.del_client(self.room_id, self) + if self._heartbeat_timer_handle is not None: + self._heartbeat_timer_handle.cancel() + self._heartbeat_timer_handle = None + if self._receive_timeout_timer_handle is not None: + self._receive_timeout_timer_handle.cancel() + self._receive_timeout_timer_handle = None def on_message(self, message): try: + # 超时没有加入房间也断开 + if self.has_joined_room: + self._refresh_receive_timeout_timer() + body = json.loads(message) cmd = body['cmd'] if cmd == Command.HEARTBEAT: - return + pass elif cmd == Command.JOIN_ROOM: if self.has_joined_room: return + self._refresh_receive_timeout_timer() + self.room_id = int(body['data']['roomId']) logger.info('Client %s is joining room %d', self.request.remote_ip, self.room_id) try: @@ -402,21 +438,11 @@ class ChatHandler(tornado.websocket.WebSocketHandler): pass asyncio.ensure_future(room_manager.add_client(self.room_id, self)) - self._close_on_timeout_future.cancel() - self._close_on_timeout_future = None else: logger.warning('Unknown cmd, client: %s, cmd: %d, body: %s', self.request.remote_ip, cmd, body) except Exception: logger.exception('on_message error, client: %s, message: %s', self.request.remote_ip, message) - def on_close(self): - logger.info('Websocket disconnected %s room: %s', self.request.remote_ip, str(self.room_id)) - if self.has_joined_room: - room_manager.del_client(self.room_id, self) - if self._close_on_timeout_future is not None: - self._close_on_timeout_future.cancel() - self._close_on_timeout_future = None - # 跨域测试用 def check_origin(self, origin): if self.application.settings['debug']: @@ -432,7 +458,7 @@ class ChatHandler(tornado.websocket.WebSocketHandler): try: self.write_message(body) except tornado.websocket.WebSocketClosedError: - self.on_close() + self.close() async def on_join_room(self): if self.application.settings['debug']: diff --git a/blivedm b/blivedm index 8d8cc8c..13712f8 160000 --- a/blivedm +++ b/blivedm @@ -1 +1 @@ -Subproject commit 8d8cc8c2706d62bbfa74cbc36f536b9717fe8f36 +Subproject commit 13712f89ebb13b9ff9a2cf50c9d6922200538113 diff --git a/frontend/src/api/chat/ChatClientDirect.js b/frontend/src/api/chat/ChatClientDirect.js index f83174f..24fa5bb 100644 --- a/frontend/src/api/chat/ChatClientDirect.js +++ b/frontend/src/api/chat/ChatClientDirect.js @@ -6,8 +6,8 @@ import * as avatar from './avatar' const HEADER_SIZE = 16 -// const WS_BODY_PROTOCOL_VERSION_NORMAL = 0 -// const WS_BODY_PROTOCOL_VERSION_INT = 1 // 用于心跳包 +// const WS_BODY_PROTOCOL_VERSION_INFLATE = 0 +// const WS_BODY_PROTOCOL_VERSION_NORMAL = 1 const WS_BODY_PROTOCOL_VERSION_DEFLATE = 2 // const OP_HANDSHAKE = 0 @@ -32,6 +32,9 @@ const OP_AUTH_REPLY = 8 // const MinBusinessOp = 1000 // const MaxBusinessOp = 10000 +const HEARTBEAT_INTERVAL = 10 * 1000 +const RECEIVE_TIMEOUT = HEARTBEAT_INTERVAL + 5 * 1000 + let textEncoder = new TextEncoder() let textDecoder = new TextDecoder() @@ -55,6 +58,7 @@ export default class ChatClientDirect { this.retryCount = 0 this.isDestroying = false this.heartbeatTimerId = null + this.receiveTimeoutTimerId = null } async start () { @@ -120,15 +124,29 @@ export default class ChatClientDirect { this.websocket.onopen = this.onWsOpen.bind(this) this.websocket.onclose = this.onWsClose.bind(this) this.websocket.onmessage = this.onWsMessage.bind(this) - this.heartbeatTimerId = window.setInterval(this.sendHeartbeat.bind(this), 10 * 1000) + } + + onWsOpen () { + this.sendAuth() + this.heartbeatTimerId = window.setInterval(this.sendHeartbeat.bind(this), HEARTBEAT_INTERVAL) + this.refreshReceiveTimeoutTimer() } sendHeartbeat () { this.websocket.send(this.makePacket({}, OP_HEARTBEAT)) } - onWsOpen () { - this.sendAuth() + refreshReceiveTimeoutTimer() { + if (this.receiveTimeoutTimerId) { + window.clearTimeout(this.receiveTimeoutTimerId) + } + this.receiveTimeoutTimerId = window.setTimeout(this.onReceiveTimeout.bind(this), RECEIVE_TIMEOUT) + } + + onReceiveTimeout() { + window.console.warn('接收消息超时') + this.receiveTimeoutTimerId = null + this.websocket.close() } onWsClose () { @@ -137,19 +155,26 @@ export default class ChatClientDirect { window.clearInterval(this.heartbeatTimerId) this.heartbeatTimerId = null } + if (this.receiveTimeoutTimerId) { + window.clearTimeout(this.receiveTimeoutTimerId) + this.receiveTimeoutTimerId = null + } + if (this.isDestroying) { return } - window.console.log(`掉线重连中${++this.retryCount}`) + window.console.warn(`掉线重连中${++this.retryCount}`) window.setTimeout(this.wsConnect.bind(this), 1000) } onWsMessage (event) { + this.refreshReceiveTimeoutTimer() this.retryCount = 0 if (!(event.data instanceof ArrayBuffer)) { window.console.warn('未知的websocket消息:', event.data) return } + let data = new Uint8Array(event.data) this.handlerMessage(data) } diff --git a/frontend/src/api/chat/ChatClientRelay.js b/frontend/src/api/chat/ChatClientRelay.js index b16c726..bc9875d 100644 --- a/frontend/src/api/chat/ChatClientRelay.js +++ b/frontend/src/api/chat/ChatClientRelay.js @@ -7,6 +7,9 @@ const COMMAND_ADD_SUPER_CHAT = 5 const COMMAND_DEL_SUPER_CHAT = 6 const COMMAND_UPDATE_TRANSLATION = 7 +const HEARTBEAT_INTERVAL = 10 * 1000 +const RECEIVE_TIMEOUT = HEARTBEAT_INTERVAL + 5 * 1000 + export default class ChatClientRelay { constructor (roomId, autoTranslate) { this.roomId = roomId @@ -23,6 +26,7 @@ export default class ChatClientRelay { this.retryCount = 0 this.isDestroying = false this.heartbeatTimerId = null + this.receiveTimeoutTimerId = null } start () { @@ -48,13 +52,6 @@ export default class ChatClientRelay { this.websocket.onopen = this.onWsOpen.bind(this) this.websocket.onclose = this.onWsClose.bind(this) this.websocket.onmessage = this.onWsMessage.bind(this) - this.heartbeatTimerId = window.setInterval(this.sendHeartbeat.bind(this), 10 * 1000) - } - - sendHeartbeat () { - this.websocket.send(JSON.stringify({ - cmd: COMMAND_HEARTBEAT - })) } onWsOpen () { @@ -68,6 +65,27 @@ export default class ChatClientRelay { } } })) + this.heartbeatTimerId = window.setInterval(this.sendHeartbeat.bind(this), HEARTBEAT_INTERVAL) + this.refreshReceiveTimeoutTimer() + } + + sendHeartbeat () { + this.websocket.send(JSON.stringify({ + cmd: COMMAND_HEARTBEAT + })) + } + + refreshReceiveTimeoutTimer() { + if (this.receiveTimeoutTimerId) { + window.clearTimeout(this.receiveTimeoutTimerId) + } + this.receiveTimeoutTimerId = window.setTimeout(this.onReceiveTimeout.bind(this), RECEIVE_TIMEOUT) + } + + onReceiveTimeout() { + window.console.warn('接收消息超时') + this.receiveTimeoutTimerId = null + this.websocket.close() } onWsClose () { @@ -76,16 +94,26 @@ export default class ChatClientRelay { window.clearInterval(this.heartbeatTimerId) this.heartbeatTimerId = null } + if (this.receiveTimeoutTimerId) { + window.clearTimeout(this.receiveTimeoutTimerId) + this.receiveTimeoutTimerId = null + } + if (this.isDestroying) { return } - window.console.log(`掉线重连中${++this.retryCount}`) + window.console.warn(`掉线重连中${++this.retryCount}`) window.setTimeout(this.wsConnect.bind(this), 1000) } onWsMessage (event) { + this.refreshReceiveTimeoutTimer() + let {cmd, data} = JSON.parse(event.data) switch (cmd) { + case COMMAND_HEARTBEAT: { + break + } case COMMAND_ADD_TEXT: { if (!this.onAddText) { break