From b6e9578cb23a8ba4d82aeb5c17626e776b9f4404 Mon Sep 17 00:00:00 2001 From: John Smith Date: Tue, 5 Sep 2023 22:00:33 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=84=E7=90=86=E5=BC=80=E6=94=BE=E5=B9=B3?= =?UTF-8?q?=E5=8F=B0=E9=A1=B9=E7=9B=AE=E5=BC=82=E5=B8=B8=E5=85=B3=E9=97=AD?= =?UTF-8?q?=E7=9A=84=E6=83=85=E5=86=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- blivedm/clients/open_live.py | 51 ++++++++++++++++++------------------ blivedm/clients/web.py | 9 ------- blivedm/clients/ws_base.py | 18 ++++++++----- 3 files changed, 37 insertions(+), 41 deletions(-) diff --git a/blivedm/clients/open_live.py b/blivedm/clients/open_live.py index 031dc8a..dbb76f6 100644 --- a/blivedm/clients/open_live.py +++ b/blivedm/clients/open_live.py @@ -65,15 +65,9 @@ class OpenLiveClient(ws_base.WebSocketClientBase): self._auth_body: Optional[str] = None """连接弹幕服务器用的认证包内容""" self._game_id: Optional[str] = None - """项目场次ID,仅用于互动玩法类项目,其他项目为空字符串""" + """项目场次ID""" # 在运行时初始化的字段 - self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None - """WebSocket连接""" - self._network_future: Optional[asyncio.Future] = None - """网络协程的future""" - self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None - """发连接心跳包定时器的handle""" self._game_heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None """发项目心跳包定时器的handle""" @@ -101,7 +95,7 @@ class OpenLiveClient(ws_base.WebSocketClientBase): @property def game_id(self) -> Optional[str]: """ - 项目场次ID,仅用于互动玩法类项目,其他项目为空字符串,调用init_room后初始化 + 项目场次ID,调用init_room后初始化 """ return self._game_id @@ -191,7 +185,7 @@ class OpenLiveClient(ws_base.WebSocketClientBase): async def _end_game(self): """ - 关闭项目。互动玩法类项目建议断开连接时保证调用到这个函数(close会调用),否则可能短时间内无法重复连接同一个房间 + 关闭项目。建议关闭客户端时保证调用到这个函数(close会调用),否则可能短时间内无法重复连接同一个房间 """ if self._game_id in (None, ''): return True @@ -206,9 +200,14 @@ class OpenLiveClient(ws_base.WebSocketClientBase): self._room_id, res.status, res.reason) return False data = await res.json() - if data['code'] != 0: + code = data['code'] + if code != 0: + if code in (7000, 7003): + # 项目已经关闭了也算成功 + return True + logger.warning('room=%d _end_game() failed, code=%d, message=%s, request_id=%s', - self._room_id, data['code'], data['message'], data['request_id']) + self._room_id, code, data['message'], data['request_id']) return False except (aiohttp.ClientConnectionError, asyncio.TimeoutError): logger.exception('room=%d _end_game() failed:', self._room_id) @@ -230,41 +229,41 @@ class OpenLiveClient(ws_base.WebSocketClientBase): async def _send_game_heartbeat(self): """ - 发送项目心跳包,仅用于互动玩法类项目 + 发送项目心跳包 """ if self._game_id in (None, ''): - logger.warning('game=%d heartbeat failed, game_id not found', self._game_id) + logger.warning('game=%d _send_game_heartbeat() failed, game_id not found', self._game_id) return False try: + # 保存一下,防止await之后game_id改变 + game_id = self._game_id async with self._request_open_live( HEARTBEAT_URL, - {'game_id': self._game_id} + {'game_id': game_id} ) as res: if res.status != 200: logger.warning('room=%d _send_game_heartbeat() failed, status=%d, reason=%s', self._room_id, res.status, res.reason) return False data = await res.json() - if data['code'] != 0: - # TODO 遇到7003则重新init_room + code = data['code'] + if code != 0: logger.warning('room=%d _send_game_heartbeat() failed, code=%d, message=%s, request_id=%s', - self._room_id, data['code'], data['message'], data['request_id']) + self._room_id, code, data['message'], data['request_id']) + + if code == 7003 and self._game_id == game_id: + # 项目异常关闭,可能是心跳超时,需要重新开启项目 + self._need_init_room = True + if self._websocket is not None and not self._websocket.closed: + await self._websocket.close() + return False except (aiohttp.ClientConnectionError, asyncio.TimeoutError): logger.exception('room=%d _send_game_heartbeat() failed:', self._room_id) return False return True - async def _on_network_coroutine_start(self): - """ - 在_network_coroutine开头运行,可以用来初始化房间 - """ - # 如果之前未初始化则初始化 - if self._auth_body is None: - if not await self.init_room(): - raise ws_base.InitError('init_room() failed') - def _get_ws_url(self, retry_count) -> str: """ 返回WebSocket连接的URL,可以在这里做故障转移和负载均衡 diff --git a/blivedm/clients/web.py b/blivedm/clients/web.py index 48400ae..cbe4211 100644 --- a/blivedm/clients/web.py +++ b/blivedm/clients/web.py @@ -232,15 +232,6 @@ class BLiveClient(ws_base.WebSocketClientBase): return False return True - async def _on_network_coroutine_start(self): - """ - 在_network_coroutine开头运行,可以用来初始化房间 - """ - # 如果之前未初始化则初始化 - if self._host_server_token is None: - if not await self.init_room(): - raise ws_base.InitError('init_room() failed') - def _get_ws_url(self, retry_count) -> str: """ 返回WebSocket连接的URL,可以在这里做故障转移和负载均衡 diff --git a/blivedm/clients/ws_base.py b/blivedm/clients/ws_base.py index ddbec51..ad08c23 100644 --- a/blivedm/clients/ws_base.py +++ b/blivedm/clients/ws_base.py @@ -99,6 +99,7 @@ class WebSocketClientBase: self._heartbeat_interval = heartbeat_interval + self._need_init_room = True self._handler: Optional[handlers.HandlerInterface] = None """消息处理器""" @@ -244,11 +245,11 @@ class WebSocketClientBase: """ 网络协程,负责连接服务器、接收消息、解包 """ - await self._on_network_coroutine_start() - retry_count = 0 while True: try: + await self._on_before_ws_connect() + # 连接 async with self._session.ws_connect( self._get_ws_url(retry_count), @@ -271,8 +272,7 @@ class WebSocketClientBase: except AuthError: # 认证失败了,应该重新获取token再重连 logger.exception('room=%d auth failed, trying init_room() again', self.room_id) - if not await self.init_room(): - raise InitError('init_room() failed') + self._need_init_room = True finally: self._websocket = None await self._on_ws_close() @@ -282,10 +282,16 @@ class WebSocketClientBase: logger.warning('room=%d is reconnecting, retry_count=%d', self.room_id, retry_count) await asyncio.sleep(1) - async def _on_network_coroutine_start(self): + async def _on_before_ws_connect(self): """ - 在_network_coroutine开头运行,可以用来初始化房间 + 在每次建立连接之前调用,可以用来初始化房间 """ + if not self._need_init_room: + return + + if not await self.init_room(): + raise InitError('init_room() failed') + self._need_init_room = False def _get_ws_url(self, retry_count) -> str: """