From 28dfc12a908aa40277a0d34a37fe0e77d75c7a74 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sat, 18 Dec 2021 15:46:30 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=84=E7=90=86=E8=AE=A4=E8=AF=81=E5=A4=B1?= =?UTF-8?q?=E8=B4=A5=E7=9A=84=E6=83=85=E5=86=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- blivedm/client.py | 100 ++++++++++++++++++++++++++++------------------ 1 file changed, 61 insertions(+), 39 deletions(-) diff --git a/blivedm/client.py b/blivedm/client.py index f726f6d..49d0fa2 100644 --- a/blivedm/client.py +++ b/blivedm/client.py @@ -62,10 +62,20 @@ class Operation(enum.IntEnum): # MaxBusinessOp = 10000 +# WS_AUTH +class AuthReplyCode(enum.IntEnum): + OK = 0 + TOKEN_ERROR = -101 + + class InitError(Exception): """初始化失败""" +class AuthError(Exception): + """认证失败""" + + class BLiveClient: """ B站直播弹幕客户端,负责连接房间 @@ -348,7 +358,7 @@ class BLiveClient: # 如果之前未初始化则初始化 if self._host_server_token is None: if not await self.init_room(): - raise InitError('初始化失败') + raise InitError('init_room() failed') retry_count = 0 while True: @@ -366,12 +376,18 @@ class BLiveClient: # 处理消息 message: aiohttp.WSMessage async for message in websocket: - retry_count = 0 await self._on_ws_message(message) + # 至少成功处理1条消息 + retry_count = 0 except (aiohttp.ClientConnectionError, asyncio.TimeoutError): # 掉线重连 pass + except AuthError: + # 认证失败了,应该重新获取token再重连 + logger.exception('room=%d auth failed, trying init_room() again', self.room_id) + if not await self.init_room(): + raise InitError('init_room() failed') except ssl_.SSLError: logger.error('room=%d a SSLError happened, cannot reconnect', self.room_id) raise @@ -435,7 +451,7 @@ class BLiveClient: try: await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT)) - except ConnectionResetError as e: + except (ConnectionResetError, aiohttp.ClientConnectionError) as e: logger.warning('room=%d _send_heartbeat() failed: %r', self.room_id, e) except Exception: # noqa logger.exception('room=%d _send_heartbeat() failed:', self.room_id) @@ -453,8 +469,8 @@ class BLiveClient: try: await self._parse_ws_message(message.data) - except asyncio.CancelledError: - # 正常停止,让外层处理 + except (asyncio.CancelledError, AuthError): + # 正常停止、认证失败,让外层处理 raise except Exception: # noqa logger.exception('room=%d _parse_ws_message() error:', self.room_id) @@ -475,36 +491,8 @@ class BLiveClient: if header.operation in (Operation.SEND_MSG_REPLY, Operation.AUTH_REPLY): # 业务消息,可能有多个包一起发,需要分包 while True: - if header.operation == Operation.SEND_MSG_REPLY: - # 业务消息 - body = data[offset + header.raw_header_size: offset + header.pack_len] - if header.ver == ProtoVer.DEFLATE: - # 压缩过的先解压,为了避免阻塞网络线程,放在其他线程执行 - body = await self._loop.run_in_executor(None, zlib.decompress, body) - await self._parse_ws_message(body) - elif header.ver == ProtoVer.NORMAL: - # 没压缩过的直接反序列化,因为有万恶的GIL,这里不能并行避免阻塞 - if len(body) != 0: - try: - body = json.loads(body.decode('utf-8')) - await self._handle_command(body) - except Exception: - logger.error('room=%d, body=%s', self.room_id, body) - raise - else: - # 未知格式 - logger.warning('room=%d unknown protocol version=%d, header=%s, body=%s', self.room_id, - header.ver, header, body) - - elif header.operation == Operation.AUTH_REPLY: - # 认证响应 TODO 判断是否成功 - await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT)) - - else: - # 未知消息 - body = data[offset + header.raw_header_size: offset + header.pack_len] - logger.warning('room=%d unknown message operation=%d, header=%s, body=%s', self.room_id, - header.operation, header, body) + body = data[offset + header.raw_header_size: offset + header.pack_len] + await self.__parse_business_message(header, body) offset += header.pack_len if offset >= len(data): @@ -519,10 +507,8 @@ class BLiveClient: elif header.operation == Operation.HEARTBEAT_REPLY: # 服务器心跳包,前4字节是人气值,后面是客户端发的心跳包内容 # pack_len不包括客户端发的心跳包内容,不知道是不是服务器BUG - popularity = int.from_bytes( - data[offset + header.raw_header_size: offset + header.raw_header_size + 4], - 'big' - ) + body = data[offset + header.raw_header_size: offset + header.raw_header_size + 4] + popularity = int.from_bytes(body, 'big') # 自己造个消息当成业务消息处理 body = { 'cmd': '_HEARTBEAT', @@ -538,6 +524,42 @@ class BLiveClient: logger.warning('room=%d unknown message operation=%d, header=%s, body=%s', self.room_id, header.operation, header, body) + async def __parse_business_message(self, header: HeaderTuple, body: bytes): + """ + 解析业务消息 + """ + if header.operation == Operation.SEND_MSG_REPLY: + # 业务消息 + if header.ver == ProtoVer.DEFLATE: + # 压缩过的先解压,为了避免阻塞网络线程,放在其他线程执行 + body = await self._loop.run_in_executor(None, zlib.decompress, body) + await self._parse_ws_message(body) + elif header.ver == ProtoVer.NORMAL: + # 没压缩过的直接反序列化,因为有万恶的GIL,这里不能并行避免阻塞 + if len(body) != 0: + try: + body = json.loads(body.decode('utf-8')) + await self._handle_command(body) + except Exception: + logger.error('room=%d, body=%s', self.room_id, body) + raise + else: + # 未知格式 + logger.warning('room=%d unknown protocol version=%d, header=%s, body=%s', self.room_id, + header.ver, header, body) + + elif header.operation == Operation.AUTH_REPLY: + # 认证响应 + body = json.loads(body.decode('utf-8')) + if body['code'] != AuthReplyCode.OK: + raise AuthError(f"auth reply error, code={body['code']}, body={body}") + await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT)) + + else: + # 未知消息 + logger.warning('room=%d unknown message operation=%d, header=%s, body=%s', self.room_id, + header.operation, header, body) + async def _handle_command(self, command: dict): """ 解析并处理业务消息