多个消息处理器并发处理、继续整理客户端代码

This commit is contained in:
John Smith 2021-12-15 23:44:44 +08:00
parent 2fdfc88fb2
commit 89b540dddd

View File

@ -156,6 +156,8 @@ class BLiveClient:
def add_handler(self, handler: 'handlers.HandlerInterface'):
"""
添加消息处理器
注意多个处理器是并发处理的不要依赖处理的顺序
消息处理器和接收消息运行在同一协程如果处理消息耗时太长会阻塞接收消息这种情况建议将消息推到队列让另一个协程处理
:param handler: 消息处理器
"""
@ -177,27 +179,17 @@ class BLiveClient:
"""
启动本客户端
"""
if self._network_future is not None:
if self.is_running:
logger.warning('room %s 已经在运行中不能再次start', self.room_id)
return
self._network_future = asyncio.ensure_future(self._network_coroutine(), loop=self._loop)
self._network_future.add_done_callback(self.__on_network_coroutine_done)
def __on_network_coroutine_done(self, future):
self._network_future = None
logger.debug('room %s 网络协程结束', self.room_id)
exception = future.exception()
if exception is not None:
exc_info = (type(exception), exception, exception.__traceback__)
logger.exception('room %s 网络协程异常结束:', self.room_id, exc_info=exc_info)
self._network_future = asyncio.ensure_future(self._network_coroutine_wrapper(), loop=self._loop)
def stop(self):
"""
停止本客户端
"""
if self._network_future is None:
if not self.is_running:
logger.warning('room %s 已经停止不能再次stop', self.room_id)
return
@ -215,7 +207,7 @@ class BLiveClient:
"""
等待本客户端停止
"""
if self._network_future is None:
if not self.is_running:
logger.warning('room %s 已经停止不能join', self.room_id)
return
@ -225,7 +217,7 @@ class BLiveClient:
"""
释放本客户端的资源调用后本客户端将不可用
"""
if self._network_future is not None:
if self.is_running:
logger.warning('room %s 在运行状态中调用了close', self.room_id)
# 如果session是自己创建的则关闭session
@ -324,21 +316,20 @@ class BLiveClient:
)
return header + body
async def _send_auth(self):
async def _network_coroutine_wrapper(self):
"""
发送认证包
负责处理网络协程的异常网络协程具体逻辑在_network_coroutine里
"""
auth_params = {
'uid': self._uid,
'roomid': self._room_id,
'protover': 2,
'platform': 'web',
'clientver': '1.14.3',
'type': 2
}
if self._host_server_token is not None:
auth_params['key'] = self._host_server_token
await self._websocket.send_bytes(self._make_packet(auth_params, Operation.AUTH))
try:
await self._network_coroutine()
except asyncio.CancelledError:
# 正常停止
pass
except Exception as e: # noqa
logger.exception('room %s 网络协程异常结束:', self.room_id)
finally:
logger.debug('room %s 网络协程结束', self.room_id)
self._network_future = None
async def _network_coroutine(self):
"""
@ -346,11 +337,8 @@ class BLiveClient:
"""
# 如果之前未初始化则初始化
if self._host_server_token is None:
try:
if not await self.init_room():
raise InitError('初始化失败')
except asyncio.CancelledError:
return
retry_count = 0
while True:
@ -372,15 +360,11 @@ class BLiveClient:
await self._on_ws_message(message)
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
# 重连
# 掉线重连
pass
except asyncio.CancelledError:
# 正常停止
break
except ssl_.SSLError:
logger.exception('SSL错误')
# 证书错误时无法重连
break
logger.error('room %d 发生SSL错误无法重连', self.room_id)
raise
finally:
self._websocket = None
await self._on_ws_close()
@ -388,10 +372,7 @@ class BLiveClient:
# 准备重连
retry_count += 1
logger.warning('room %d 掉线重连中%d', self.room_id, retry_count)
try:
await asyncio.sleep(1)
except asyncio.CancelledError:
break
await asyncio.sleep(1, loop=self._loop)
async def _on_ws_connect(self):
"""
@ -400,6 +381,30 @@ class BLiveClient:
await self._send_auth()
self._heartbeat_timer_handle = self._loop.call_later(self._heartbeat_interval, self._on_send_heartbeat)
async def _on_ws_close(self):
"""
websocket连接断开
"""
if self._heartbeat_timer_handle is not None:
self._heartbeat_timer_handle.cancel()
self._heartbeat_timer_handle = None
async def _send_auth(self):
"""
发送认证包
"""
auth_params = {
'uid': self._uid,
'roomid': self._room_id,
'protover': 2,
'platform': 'web',
'clientver': '1.14.3',
'type': 2
}
if self._host_server_token is not None:
auth_params['key'] = self._host_server_token
await self._websocket.send_bytes(self._make_packet(auth_params, Operation.AUTH))
def _on_send_heartbeat(self):
"""
定时发送心跳包的回调
@ -422,18 +427,11 @@ class BLiveClient:
try:
await self._parse_ws_message(message.data)
except asyncio.CancelledError:
# 正常停止,让外层处理
raise
except Exception: # noqa
logger.exception('room %d 处理websocket消息时发生错误', self.room_id)
async def _on_ws_close(self):
"""
websocket连接断开
"""
if self._heartbeat_timer_handle is not None:
self._heartbeat_timer_handle.cancel()
self._heartbeat_timer_handle = None
async def _parse_ws_message(self, data: bytes):
"""
解析websocket消息
@ -459,7 +457,7 @@ class BLiveClient:
'popularity': popularity
}
}
await self._parse_command(body)
await self._handle_command(body)
elif header.operation == Operation.SEND_MSG_REPLY:
# 业务消息
@ -472,9 +470,9 @@ class BLiveClient:
# 没压缩过的
try:
body = json.loads(body.decode('utf-8'))
await self._parse_command(body)
await self._handle_command(body)
except Exception:
logger.error('body=%s', body)
logger.error('room %d body=%s', self.room_id, body)
raise
elif header.operation == Operation.AUTH_REPLY:
@ -489,20 +487,23 @@ class BLiveClient:
offset += header.pack_len
async def _parse_command(self, command: Union[list, dict]):
async def _handle_command(self, command: Union[list, dict]):
"""
解析业务消息
解析并处理业务消息
:param command: 业务消息
"""
# 这里可能会多个消息一起发
if isinstance(command, list):
for one_command in command:
await self._parse_command(one_command)
await self._handle_command(one_command)
return
for handler in self._handlers:
try:
await handler.handle(self, command)
except Exception: # noqa
logger.exception('room %d 处理消息时发生错误command=%s', self.room_id, command)
results = await asyncio.gather(
*(handler.handle(self, command) for handler in self._handlers),
loop=self._loop,
return_exceptions=True
)
for res in results:
if isinstance(res, Exception):
logger.exception('room %d 处理消息时发生错误command=%s', self.room_id, command, exc_info=res)