From dca7f1499fa08944c67def6a25c2b590d867476b Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 31 Jan 2021 13:11:20 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- blivedm.py | 59 +++++++++++++++++++++++------------------------------- 1 file changed, 25 insertions(+), 34 deletions(-) diff --git a/blivedm.py b/blivedm.py index 8769a23..1446319 100644 --- a/blivedm.py +++ b/blivedm.py @@ -22,8 +22,8 @@ DEFAULT_DANMAKU_SERVER_LIST = [ HEADER_STRUCT = struct.Struct('>I2H2I') HeaderTuple = namedtuple('HeaderTuple', ('pack_len', 'raw_header_size', 'ver', 'operation', 'seq_id')) -WS_BODY_PROTOCOL_VERSION_NORMAL = 0 -WS_BODY_PROTOCOL_VERSION_INT = 1 # 用于心跳包 +WS_BODY_PROTOCOL_VERSION_INFLATE = 0 +WS_BODY_PROTOCOL_VERSION_NORMAL = 1 WS_BODY_PROTOCOL_VERSION_DEFLATE = 2 @@ -374,6 +374,7 @@ class BLiveClient: # noinspection PyProtectedMember self._ssl = ssl if ssl else ssl_._create_unverified_context() self._websocket = None + self._heartbeat_timer_handle = None @property def is_running(self): @@ -507,7 +508,8 @@ class BLiveClient: return False return True - def _make_packet(self, data, operation): + @staticmethod + def _make_packet(data, operation): body = json.dumps(data).encode('utf-8') header = HEADER_STRUCT.pack( HEADER_STRUCT.size + len(body), @@ -539,7 +541,6 @@ class BLiveClient: retry_count = 0 while True: - heartbeat_future = None try: # 连接 host_server = self._host_server_list[retry_count % len(self._host_server_list)] @@ -549,27 +550,24 @@ class BLiveClient: ) as websocket: self._websocket = websocket await self._send_auth() - heartbeat_future = asyncio.ensure_future(self._heartbeat_loop(), loop=self._loop) - heartbeat_future.add_done_callback( - lambda _future: logger.debug('room %d 心跳循环结束', self.room_id) + self._heartbeat_timer_handle = self._loop.call_later( + self._heartbeat_interval, self._on_send_heartbeat ) # 处理消息 async for message in websocket: # type: aiohttp.WSMessage retry_count = 0 - if message.type == aiohttp.WSMsgType.BINARY: - try: - await self._handle_message(message.data) - except BaseException as e: - if type(e) in ( - asyncio.CancelledError, aiohttp.ClientConnectionError, - asyncio.TimeoutError, ssl_.SSLError - ): - raise - logger.exception('room %d 处理消息时发生错误:', self.room_id) - else: + if message.type != aiohttp.WSMsgType.BINARY: logger.warning('room %d 未知的websocket消息:type=%s %s', self.room_id, message.type, message.data) + continue + + try: + await self._handle_message(message.data) + except asyncio.CancelledError: + raise + except Exception: + logger.exception('room %d 处理消息时发生错误:', self.room_id) except asyncio.CancelledError: break @@ -581,13 +579,10 @@ class BLiveClient: # 证书错误时无法重连 break finally: - if heartbeat_future is not None: - heartbeat_future.cancel() - try: - await heartbeat_future - except asyncio.CancelledError: - break self._websocket = None + if self._heartbeat_timer_handle is not None: + self._heartbeat_timer_handle.cancel() + self._heartbeat_timer_handle = None retry_count += 1 logger.warning('room %d 掉线重连中%d', self.room_id, retry_count) @@ -596,14 +591,10 @@ class BLiveClient: except asyncio.CancelledError: break - async def _heartbeat_loop(self): - while True: - try: - await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT)) - await asyncio.sleep(self._heartbeat_interval) - - except (asyncio.CancelledError, aiohttp.ClientConnectionError): - break + def _on_send_heartbeat(self): + coro = self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT)) + asyncio.ensure_future(coro, loop=self._loop) + self._heartbeat_timer_handle = self._loop.call_later(self._heartbeat_interval, self._on_send_heartbeat) async def _handle_message(self, data): offset = 0 @@ -622,13 +613,13 @@ class BLiveClient: elif header.operation == Operation.SEND_MSG_REPLY: body = data[offset + HEADER_STRUCT.size: offset + header.pack_len] if header.ver == WS_BODY_PROTOCOL_VERSION_DEFLATE: - body = zlib.decompress(body) + body = await self._loop.run_in_executor(None, zlib.decompress, body) await self._handle_message(body) else: try: body = json.loads(body.decode('utf-8')) await self._handle_command(body) - except BaseException: + except Exception: logger.error('body: %s', body) raise