处理开放平台项目异常关闭的情况

This commit is contained in:
John Smith 2023-09-05 22:00:33 +08:00
parent 1db652246a
commit b6e9578cb2
3 changed files with 37 additions and 41 deletions

View File

@ -65,15 +65,9 @@ class OpenLiveClient(ws_base.WebSocketClientBase):
self._auth_body: Optional[str] = None self._auth_body: Optional[str] = None
"""连接弹幕服务器用的认证包内容""" """连接弹幕服务器用的认证包内容"""
self._game_id: Optional[str] = None self._game_id: Optional[str] = None
"""项目场次ID,仅用于互动玩法类项目,其他项目为空字符串""" """项目场次ID"""
# 在运行时初始化的字段 # 在运行时初始化的字段
self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None
"""WebSocket连接"""
self._network_future: Optional[asyncio.Future] = None
"""网络协程的future"""
self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None
"""发连接心跳包定时器的handle"""
self._game_heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None self._game_heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None
"""发项目心跳包定时器的handle""" """发项目心跳包定时器的handle"""
@ -101,7 +95,7 @@ class OpenLiveClient(ws_base.WebSocketClientBase):
@property @property
def game_id(self) -> Optional[str]: def game_id(self) -> Optional[str]:
""" """
项目场次ID仅用于互动玩法类项目其他项目为空字符串调用init_room后初始化 项目场次ID调用init_room后初始化
""" """
return self._game_id return self._game_id
@ -191,7 +185,7 @@ class OpenLiveClient(ws_base.WebSocketClientBase):
async def _end_game(self): async def _end_game(self):
""" """
关闭项目互动玩法类项目建议断开连接时保证调用到这个函数close会调用否则可能短时间内无法重复连接同一个房间 关闭项目建议关闭客户端时保证调用到这个函数close会调用否则可能短时间内无法重复连接同一个房间
""" """
if self._game_id in (None, ''): if self._game_id in (None, ''):
return True return True
@ -206,9 +200,14 @@ class OpenLiveClient(ws_base.WebSocketClientBase):
self._room_id, res.status, res.reason) self._room_id, res.status, res.reason)
return False return False
data = await res.json() data = await res.json()
if data['code'] != 0: code = data['code']
if code != 0:
if code in (7000, 7003):
# 项目已经关闭了也算成功
return True
logger.warning('room=%d _end_game() failed, code=%d, message=%s, request_id=%s', logger.warning('room=%d _end_game() failed, code=%d, message=%s, request_id=%s',
self._room_id, data['code'], data['message'], data['request_id']) self._room_id, code, data['message'], data['request_id'])
return False return False
except (aiohttp.ClientConnectionError, asyncio.TimeoutError): except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room=%d _end_game() failed:', self._room_id) logger.exception('room=%d _end_game() failed:', self._room_id)
@ -230,41 +229,41 @@ class OpenLiveClient(ws_base.WebSocketClientBase):
async def _send_game_heartbeat(self): async def _send_game_heartbeat(self):
""" """
发送项目心跳包仅用于互动玩法类项目 发送项目心跳包
""" """
if self._game_id in (None, ''): if self._game_id in (None, ''):
logger.warning('game=%d heartbeat failed, game_id not found', self._game_id) logger.warning('game=%d _send_game_heartbeat() failed, game_id not found', self._game_id)
return False return False
try: try:
# 保存一下防止await之后game_id改变
game_id = self._game_id
async with self._request_open_live( async with self._request_open_live(
HEARTBEAT_URL, HEARTBEAT_URL,
{'game_id': self._game_id} {'game_id': game_id}
) as res: ) as res:
if res.status != 200: if res.status != 200:
logger.warning('room=%d _send_game_heartbeat() failed, status=%d, reason=%s', logger.warning('room=%d _send_game_heartbeat() failed, status=%d, reason=%s',
self._room_id, res.status, res.reason) self._room_id, res.status, res.reason)
return False return False
data = await res.json() data = await res.json()
if data['code'] != 0: code = data['code']
# TODO 遇到7003则重新init_room if code != 0:
logger.warning('room=%d _send_game_heartbeat() failed, code=%d, message=%s, request_id=%s', logger.warning('room=%d _send_game_heartbeat() failed, code=%d, message=%s, request_id=%s',
self._room_id, data['code'], data['message'], data['request_id']) self._room_id, code, data['message'], data['request_id'])
if code == 7003 and self._game_id == game_id:
# 项目异常关闭,可能是心跳超时,需要重新开启项目
self._need_init_room = True
if self._websocket is not None and not self._websocket.closed:
await self._websocket.close()
return False return False
except (aiohttp.ClientConnectionError, asyncio.TimeoutError): except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room=%d _send_game_heartbeat() failed:', self._room_id) logger.exception('room=%d _send_game_heartbeat() failed:', self._room_id)
return False return False
return True return True
async def _on_network_coroutine_start(self):
"""
在_network_coroutine开头运行可以用来初始化房间
"""
# 如果之前未初始化则初始化
if self._auth_body is None:
if not await self.init_room():
raise ws_base.InitError('init_room() failed')
def _get_ws_url(self, retry_count) -> str: def _get_ws_url(self, retry_count) -> str:
""" """
返回WebSocket连接的URL可以在这里做故障转移和负载均衡 返回WebSocket连接的URL可以在这里做故障转移和负载均衡

View File

@ -232,15 +232,6 @@ class BLiveClient(ws_base.WebSocketClientBase):
return False return False
return True return True
async def _on_network_coroutine_start(self):
"""
在_network_coroutine开头运行可以用来初始化房间
"""
# 如果之前未初始化则初始化
if self._host_server_token is None:
if not await self.init_room():
raise ws_base.InitError('init_room() failed')
def _get_ws_url(self, retry_count) -> str: def _get_ws_url(self, retry_count) -> str:
""" """
返回WebSocket连接的URL可以在这里做故障转移和负载均衡 返回WebSocket连接的URL可以在这里做故障转移和负载均衡

View File

@ -99,6 +99,7 @@ class WebSocketClientBase:
self._heartbeat_interval = heartbeat_interval self._heartbeat_interval = heartbeat_interval
self._need_init_room = True
self._handler: Optional[handlers.HandlerInterface] = None self._handler: Optional[handlers.HandlerInterface] = None
"""消息处理器""" """消息处理器"""
@ -244,11 +245,11 @@ class WebSocketClientBase:
""" """
网络协程负责连接服务器接收消息解包 网络协程负责连接服务器接收消息解包
""" """
await self._on_network_coroutine_start()
retry_count = 0 retry_count = 0
while True: while True:
try: try:
await self._on_before_ws_connect()
# 连接 # 连接
async with self._session.ws_connect( async with self._session.ws_connect(
self._get_ws_url(retry_count), self._get_ws_url(retry_count),
@ -271,8 +272,7 @@ class WebSocketClientBase:
except AuthError: except AuthError:
# 认证失败了应该重新获取token再重连 # 认证失败了应该重新获取token再重连
logger.exception('room=%d auth failed, trying init_room() again', self.room_id) logger.exception('room=%d auth failed, trying init_room() again', self.room_id)
if not await self.init_room(): self._need_init_room = True
raise InitError('init_room() failed')
finally: finally:
self._websocket = None self._websocket = None
await self._on_ws_close() await self._on_ws_close()
@ -282,10 +282,16 @@ class WebSocketClientBase:
logger.warning('room=%d is reconnecting, retry_count=%d', self.room_id, retry_count) logger.warning('room=%d is reconnecting, retry_count=%d', self.room_id, retry_count)
await asyncio.sleep(1) await asyncio.sleep(1)
async def _on_network_coroutine_start(self): async def _on_before_ws_connect(self):
""" """
_network_coroutine开头运行可以用来初始化房间 每次建立连接之前调用可以用来初始化房间
""" """
if not self._need_init_room:
return
if not await self.init_room():
raise InitError('init_room() failed')
self._need_init_room = False
def _get_ws_url(self, retry_count) -> str: def _get_ws_url(self, retry_count) -> str:
""" """