diff --git a/blivedm/client.py b/blivedm/client.py index fce9bf5..4f555b5 100644 --- a/blivedm/client.py +++ b/blivedm/client.py @@ -180,7 +180,7 @@ class BLiveClient: 启动本客户端 """ if self.is_running: - logger.warning('room %s 已经在运行中,不能再次start', self.room_id) + logger.warning('room=%s client is running, cannot start() again', self.room_id) return self._network_future = asyncio.ensure_future(self._network_coroutine_wrapper(), loop=self._loop) @@ -190,7 +190,7 @@ class BLiveClient: 停止本客户端 """ if not self.is_running: - logger.warning('room %s 已经停止,不能再次stop', self.room_id) + logger.warning('room=%s client is stopped, cannot stop() again', self.room_id) return self._network_future.cancel() @@ -208,7 +208,7 @@ class BLiveClient: 等待本客户端停止 """ if not self.is_running: - logger.warning('room %s 已经停止,不能join', self.room_id) + logger.warning('room=%s client is stopped, cannot join()', self.room_id) return await self._network_future @@ -218,7 +218,7 @@ class BLiveClient: 释放本客户端的资源,调用后本客户端将不可用 """ if self.is_running: - logger.warning('room %s 在运行状态中调用了close', self.room_id) + logger.warning('room=%s is calling close(), but client is running', self.room_id) # 如果session是自己创建的则关闭session if self._own_session: @@ -249,17 +249,18 @@ class BLiveClient: async with self._session.get(ROOM_INIT_URL, params={'room_id': self._tmp_room_id}, ssl=self._ssl) as res: if res.status != 200: - logger.warning('room %d init_room失败:%d %s', self._tmp_room_id, + logger.warning('room=%d _init_room_id_and_owner() failed, status=%d, reason=%s', self._tmp_room_id, res.status, res.reason) return False data = await res.json() if data['code'] != 0: - logger.warning('room %d init_room失败:%s', self._tmp_room_id, data['message']) + logger.warning('room=%d _init_room_id_and_owner() failed, message=%s', self._tmp_room_id, + data['message']) return False if not self._parse_room_init(data['data']): return False except (aiohttp.ClientConnectionError, asyncio.TimeoutError): - logger.exception('room %d init_room失败:', self._tmp_room_id) + logger.exception('room=%d _init_room_id_and_owner() failed:', self._tmp_room_id) return False return True @@ -275,17 +276,17 @@ class BLiveClient: async with self._session.get(DANMAKU_SERVER_CONF_URL, params={'id': self._room_id, 'type': 0}, ssl=self._ssl) as res: if res.status != 200: - logger.warning('room %d getConf失败:%d %s', self._room_id, + logger.warning('room=%d _init_host_server() failed, status=%d, reason=%s', self._room_id, res.status, res.reason) return False data = await res.json() if data['code'] != 0: - logger.warning('room %d getConf失败:%s', self._room_id, data['message']) + logger.warning('room=%d _init_host_server() failed, message=%s', self._room_id, data['message']) return False if not self._parse_danmaku_server_conf(data['data']): return False except (aiohttp.ClientConnectionError, asyncio.TimeoutError): - logger.exception('room %d getConf失败:', self._room_id) + logger.exception('room=%d _init_host_server() failed:', self._room_id) return False return True @@ -293,7 +294,7 @@ class BLiveClient: self._host_server_list = data['host_list'] self._host_server_token = data['token'] if not self._host_server_list: - logger.warning('room %d getConf失败:host_server_list为空', self._room_id) + logger.warning('room=%d _parse_danmaku_server_conf() failed: host_server_list is empty', self._room_id) return False return True @@ -326,9 +327,9 @@ class BLiveClient: # 正常停止 pass except Exception as e: # noqa - logger.exception('room %s 网络协程异常结束:', self.room_id) + logger.exception('room=%s _network_coroutine() finished with exception:', self.room_id) finally: - logger.debug('room %s 网络协程结束', self.room_id) + logger.debug('room=%s _network_coroutine() finished', self.room_id) self._network_future = None async def _network_coroutine(self): @@ -363,7 +364,7 @@ class BLiveClient: # 掉线重连 pass except ssl_.SSLError: - logger.error('room %d 发生SSL错误,无法重连', self.room_id) + logger.error('room=%d a SSLError happened, cannot reconnect', self.room_id) raise finally: self._websocket = None @@ -371,7 +372,7 @@ class BLiveClient: # 准备重连 retry_count += 1 - logger.warning('room %d 掉线重连中%d', self.room_id, retry_count) + logger.warning('room=%d is reconnecting, retry_count=%d', self.room_id, retry_count) await asyncio.sleep(1, loop=self._loop) async def _on_ws_connect(self): @@ -420,7 +421,7 @@ class BLiveClient: :param message: websocket消息 """ if message.type != aiohttp.WSMsgType.BINARY: - logger.warning('room %d 未知的websocket消息:type=%s %s', self.room_id, + logger.warning('room=%d unknown websocket message type=%s, data=%s', self.room_id, message.type, message.data) return @@ -430,7 +431,7 @@ class BLiveClient: # 正常停止,让外层处理 raise except Exception: # noqa - logger.exception('room %d 处理websocket消息时发生错误:', self.room_id) + logger.exception('room=%d _parse_ws_message() error:', self.room_id) async def _parse_ws_message(self, data: bytes): """ @@ -439,71 +440,89 @@ class BLiveClient: :param data: websocket消息数据 """ offset = 0 - while offset < len(data): - try: - header = HeaderTuple(*HEADER_STRUCT.unpack_from(data, offset)) - except struct.error: - break + try: + header = HeaderTuple(*HEADER_STRUCT.unpack_from(data, offset)) + except struct.error: + logger.exception('room=%d parsing header failed, offset=%d, data=%s', self.room_id, offset, data) + return - if header.operation == Operation.HEARTBEAT_REPLY: - # 心跳包,自己造个消息当成业务消息处理 - popularity = int.from_bytes( - data[offset + HEADER_STRUCT.size: offset + HEADER_STRUCT.size + 4], - 'big' - ) - body = { - 'cmd': '_HEARTBEAT', - 'data': { - 'popularity': popularity - } - } - await self._handle_command(body) + if header.operation in (Operation.SEND_MSG_REPLY, Operation.AUTH_REPLY): + # 业务消息,可能有多个包一起发,需要分包 + while True: + if header.operation == Operation.SEND_MSG_REPLY: + # 业务消息 + body = data[offset + HEADER_STRUCT.size: offset + header.pack_len] + if header.ver == WS_BODY_PROTOCOL_VERSION_DEFLATE: + # 压缩过的先解压,为了避免阻塞网络线程,放在其他线程执行 + body = await self._loop.run_in_executor(None, zlib.decompress, body) + await self._parse_ws_message(body) + else: + # 没压缩过的直接反序列化,因为有万恶的GIL,这里不能并行避免阻塞 + if len(body) != 0: + try: + body = json.loads(body.decode('utf-8')) + await self._handle_command(body) + except Exception: + logger.error('room=%d, body=%s', self.room_id, body) + raise + + elif header.operation == Operation.AUTH_REPLY: + # 认证响应 TODO 判断是否成功 + await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT)) - elif header.operation == Operation.SEND_MSG_REPLY: - # 业务消息 - body = data[offset + HEADER_STRUCT.size: offset + header.pack_len] - if header.ver == WS_BODY_PROTOCOL_VERSION_DEFLATE: - # 压缩过的先解压,为了避免阻塞网络线程,放在其他线程执行 - body = await self._loop.run_in_executor(None, zlib.decompress, body) - await self._parse_ws_message(body) else: - # 没压缩过的 - try: - body = json.loads(body.decode('utf-8')) - await self._handle_command(body) - except Exception: - logger.error('room %d body=%s', self.room_id, body) - raise + # 未知消息 + body = data[offset + HEADER_STRUCT.size: offset + header.pack_len] + logger.warning('room=%d unknown message operation=%d, header=%s, body=%s', self.room_id, + header.operation, header, body) - elif header.operation == Operation.AUTH_REPLY: - # 认证响应 - await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT)) + offset += header.pack_len + if offset >= len(data): + break - else: - # 未知消息 - body = data[offset + HEADER_STRUCT.size: offset + header.pack_len] - logger.warning('room %d 未知包类型:operation=%d %s%s', self.room_id, - header.operation, header, body) + try: + header = HeaderTuple(*HEADER_STRUCT.unpack_from(data, offset)) + except struct.error: + logger.exception('room=%d parsing header failed, offset=%d, data=%s', self.room_id, offset, data) + break - offset += header.pack_len + elif header.operation == Operation.HEARTBEAT_REPLY: + # 服务器心跳包,前4字节是人气值,后面是客户端发的心跳包内容 + # pack_len不包括客户端发的心跳包内容,不知道是不是服务器BUG + popularity = int.from_bytes( + data[offset + HEADER_STRUCT.size: offset + HEADER_STRUCT.size + 4], + 'big' + ) + # 自己造个消息当成业务消息处理 + body = { + 'cmd': '_HEARTBEAT', + 'data': { + 'popularity': popularity + } + } + await self._handle_command(body) - async def _handle_command(self, command: Union[list, dict]): + else: + # 未知消息 + body = data[offset + HEADER_STRUCT.size: offset + header.pack_len] + logger.warning('room=%d unknown message operation=%d, header=%s, body=%s', self.room_id, + header.operation, header, body) + + async def _handle_command(self, command: dict): """ 解析并处理业务消息 :param command: 业务消息 """ - # 这里可能会多个消息一起发 - if isinstance(command, list): - for one_command in command: - await self._handle_command(one_command) - return - - results = await asyncio.gather( - *(handler.handle(self, command) for handler in self._handlers), - loop=self._loop, - return_exceptions=True + # 外部代码可能不能正常处理取消,所以这里加shield + results = await asyncio.shield( + asyncio.gather( + *(handler.handle(self, command) for handler in self._handlers), + loop=self._loop, + return_exceptions=True + ), + loop=self._loop ) for res in results: if isinstance(res, Exception): - logger.exception('room %d 处理消息时发生错误,command=%s', self.room_id, command, exc_info=res) + logger.exception('room=%d _handle_command() failed, command=%s', self.room_id, command, exc_info=res) diff --git a/blivedm/handlers.py b/blivedm/handlers.py index 3a27121..4aca61c 100644 --- a/blivedm/handlers.py +++ b/blivedm/handlers.py @@ -108,7 +108,7 @@ class BaseHandler(HandlerInterface): if cmd not in self._CMD_CALLBACK_DICT: # 只有第一次遇到未知cmd时打日志 if cmd not in logged_unknown_cmds: - logger.warning('room %d 未知cmd:cmd=%s %s', client.room_id, cmd, command) + logger.warning('room=%d unknown cmd=%s, command=%s', client.room_id, cmd, command) logged_unknown_cmds.add(cmd) return