From 89b540ddddbb51d74d91dbecc75b0a3897209243 Mon Sep 17 00:00:00 2001 From: John Smith Date: Wed, 15 Dec 2021 23:44:44 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=9A=E4=B8=AA=E6=B6=88=E6=81=AF=E5=A4=84?= =?UTF-8?q?=E7=90=86=E5=99=A8=E5=B9=B6=E5=8F=91=E5=A4=84=E7=90=86=E3=80=81?= =?UTF-8?q?=E7=BB=A7=E7=BB=AD=E6=95=B4=E7=90=86=E5=AE=A2=E6=88=B7=E7=AB=AF?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- blivedm/client.py | 127 +++++++++++++++++++++++----------------------- 1 file changed, 64 insertions(+), 63 deletions(-) diff --git a/blivedm/client.py b/blivedm/client.py index 81d7ee1..fce9bf5 100644 --- a/blivedm/client.py +++ b/blivedm/client.py @@ -156,6 +156,8 @@ class BLiveClient: def add_handler(self, handler: 'handlers.HandlerInterface'): """ 添加消息处理器 + 注意多个处理器是并发处理的,不要依赖处理的顺序 + 消息处理器和接收消息运行在同一协程,如果处理消息耗时太长会阻塞接收消息,这种情况建议将消息推到队列,让另一个协程处理 :param handler: 消息处理器 """ @@ -177,27 +179,17 @@ class BLiveClient: """ 启动本客户端 """ - if self._network_future is not None: + if self.is_running: logger.warning('room %s 已经在运行中,不能再次start', self.room_id) return - self._network_future = asyncio.ensure_future(self._network_coroutine(), loop=self._loop) - self._network_future.add_done_callback(self.__on_network_coroutine_done) - - def __on_network_coroutine_done(self, future): - self._network_future = None - - logger.debug('room %s 网络协程结束', self.room_id) - exception = future.exception() - if exception is not None: - exc_info = (type(exception), exception, exception.__traceback__) - logger.exception('room %s 网络协程异常结束:', self.room_id, exc_info=exc_info) + self._network_future = asyncio.ensure_future(self._network_coroutine_wrapper(), loop=self._loop) def stop(self): """ 停止本客户端 """ - if self._network_future is None: + if not self.is_running: logger.warning('room %s 已经停止,不能再次stop', self.room_id) return @@ -215,7 +207,7 @@ class BLiveClient: """ 等待本客户端停止 """ - if self._network_future is None: + if not self.is_running: logger.warning('room %s 已经停止,不能join', self.room_id) return @@ -225,7 +217,7 @@ class BLiveClient: """ 释放本客户端的资源,调用后本客户端将不可用 """ - if self._network_future is not None: + if self.is_running: logger.warning('room %s 在运行状态中调用了close', self.room_id) # 如果session是自己创建的则关闭session @@ -324,21 +316,20 @@ class BLiveClient: ) return header + body - async def _send_auth(self): + async def _network_coroutine_wrapper(self): """ - 发送认证包 + 负责处理网络协程的异常,网络协程具体逻辑在_network_coroutine里 """ - auth_params = { - 'uid': self._uid, - 'roomid': self._room_id, - 'protover': 2, - 'platform': 'web', - 'clientver': '1.14.3', - 'type': 2 - } - if self._host_server_token is not None: - auth_params['key'] = self._host_server_token - await self._websocket.send_bytes(self._make_packet(auth_params, Operation.AUTH)) + try: + await self._network_coroutine() + except asyncio.CancelledError: + # 正常停止 + pass + except Exception as e: # noqa + logger.exception('room %s 网络协程异常结束:', self.room_id) + finally: + logger.debug('room %s 网络协程结束', self.room_id) + self._network_future = None async def _network_coroutine(self): """ @@ -346,11 +337,8 @@ class BLiveClient: """ # 如果之前未初始化则初始化 if self._host_server_token is None: - try: - if not await self.init_room(): - raise InitError('初始化失败') - except asyncio.CancelledError: - return + if not await self.init_room(): + raise InitError('初始化失败') retry_count = 0 while True: @@ -372,15 +360,11 @@ class BLiveClient: await self._on_ws_message(message) except (aiohttp.ClientConnectionError, asyncio.TimeoutError): - # 重连 + # 掉线重连 pass - except asyncio.CancelledError: - # 正常停止 - break except ssl_.SSLError: - logger.exception('SSL错误:') - # 证书错误时无法重连 - break + logger.error('room %d 发生SSL错误,无法重连', self.room_id) + raise finally: self._websocket = None await self._on_ws_close() @@ -388,10 +372,7 @@ class BLiveClient: # 准备重连 retry_count += 1 logger.warning('room %d 掉线重连中%d', self.room_id, retry_count) - try: - await asyncio.sleep(1) - except asyncio.CancelledError: - break + await asyncio.sleep(1, loop=self._loop) async def _on_ws_connect(self): """ @@ -400,6 +381,30 @@ class BLiveClient: await self._send_auth() self._heartbeat_timer_handle = self._loop.call_later(self._heartbeat_interval, self._on_send_heartbeat) + async def _on_ws_close(self): + """ + websocket连接断开 + """ + if self._heartbeat_timer_handle is not None: + self._heartbeat_timer_handle.cancel() + self._heartbeat_timer_handle = None + + async def _send_auth(self): + """ + 发送认证包 + """ + auth_params = { + 'uid': self._uid, + 'roomid': self._room_id, + 'protover': 2, + 'platform': 'web', + 'clientver': '1.14.3', + 'type': 2 + } + if self._host_server_token is not None: + auth_params['key'] = self._host_server_token + await self._websocket.send_bytes(self._make_packet(auth_params, Operation.AUTH)) + def _on_send_heartbeat(self): """ 定时发送心跳包的回调 @@ -422,18 +427,11 @@ class BLiveClient: try: await self._parse_ws_message(message.data) except asyncio.CancelledError: + # 正常停止,让外层处理 raise except Exception: # noqa logger.exception('room %d 处理websocket消息时发生错误:', self.room_id) - async def _on_ws_close(self): - """ - websocket连接断开 - """ - if self._heartbeat_timer_handle is not None: - self._heartbeat_timer_handle.cancel() - self._heartbeat_timer_handle = None - async def _parse_ws_message(self, data: bytes): """ 解析websocket消息 @@ -459,7 +457,7 @@ class BLiveClient: 'popularity': popularity } } - await self._parse_command(body) + await self._handle_command(body) elif header.operation == Operation.SEND_MSG_REPLY: # 业务消息 @@ -472,9 +470,9 @@ class BLiveClient: # 没压缩过的 try: body = json.loads(body.decode('utf-8')) - await self._parse_command(body) + await self._handle_command(body) except Exception: - logger.error('body=%s', body) + logger.error('room %d body=%s', self.room_id, body) raise elif header.operation == Operation.AUTH_REPLY: @@ -489,20 +487,23 @@ class BLiveClient: offset += header.pack_len - async def _parse_command(self, command: Union[list, dict]): + async def _handle_command(self, command: Union[list, dict]): """ - 解析业务消息 + 解析并处理业务消息 :param command: 业务消息 """ # 这里可能会多个消息一起发 if isinstance(command, list): for one_command in command: - await self._parse_command(one_command) + await self._handle_command(one_command) return - for handler in self._handlers: - try: - await handler.handle(self, command) - except Exception: # noqa - logger.exception('room %d 处理消息时发生错误,command=%s', self.room_id, command) + results = await asyncio.gather( + *(handler.handle(self, command) for handler in self._handlers), + loop=self._loop, + return_exceptions=True + ) + for res in results: + if isinstance(res, Exception): + logger.exception('room %d 处理消息时发生错误,command=%s', self.room_id, command, exc_info=res)