处理认证失败的情况

This commit is contained in:
John Smith 2021-12-18 15:46:30 +08:00
parent 62df6345c4
commit 28dfc12a90

View File

@ -62,10 +62,20 @@ class Operation(enum.IntEnum):
# MaxBusinessOp = 10000 # MaxBusinessOp = 10000
# WS_AUTH
class AuthReplyCode(enum.IntEnum):
OK = 0
TOKEN_ERROR = -101
class InitError(Exception): class InitError(Exception):
"""初始化失败""" """初始化失败"""
class AuthError(Exception):
"""认证失败"""
class BLiveClient: class BLiveClient:
""" """
B站直播弹幕客户端负责连接房间 B站直播弹幕客户端负责连接房间
@ -348,7 +358,7 @@ class BLiveClient:
# 如果之前未初始化则初始化 # 如果之前未初始化则初始化
if self._host_server_token is None: if self._host_server_token is None:
if not await self.init_room(): if not await self.init_room():
raise InitError('初始化失败') raise InitError('init_room() failed')
retry_count = 0 retry_count = 0
while True: while True:
@ -366,12 +376,18 @@ class BLiveClient:
# 处理消息 # 处理消息
message: aiohttp.WSMessage message: aiohttp.WSMessage
async for message in websocket: async for message in websocket:
retry_count = 0
await self._on_ws_message(message) await self._on_ws_message(message)
# 至少成功处理1条消息
retry_count = 0
except (aiohttp.ClientConnectionError, asyncio.TimeoutError): except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
# 掉线重连 # 掉线重连
pass pass
except AuthError:
# 认证失败了应该重新获取token再重连
logger.exception('room=%d auth failed, trying init_room() again', self.room_id)
if not await self.init_room():
raise InitError('init_room() failed')
except ssl_.SSLError: except ssl_.SSLError:
logger.error('room=%d a SSLError happened, cannot reconnect', self.room_id) logger.error('room=%d a SSLError happened, cannot reconnect', self.room_id)
raise raise
@ -435,7 +451,7 @@ class BLiveClient:
try: try:
await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT)) await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT))
except ConnectionResetError as e: except (ConnectionResetError, aiohttp.ClientConnectionError) as e:
logger.warning('room=%d _send_heartbeat() failed: %r', self.room_id, e) logger.warning('room=%d _send_heartbeat() failed: %r', self.room_id, e)
except Exception: # noqa except Exception: # noqa
logger.exception('room=%d _send_heartbeat() failed:', self.room_id) logger.exception('room=%d _send_heartbeat() failed:', self.room_id)
@ -453,8 +469,8 @@ class BLiveClient:
try: try:
await self._parse_ws_message(message.data) await self._parse_ws_message(message.data)
except asyncio.CancelledError: except (asyncio.CancelledError, AuthError):
# 正常停止,让外层处理 # 正常停止、认证失败,让外层处理
raise raise
except Exception: # noqa except Exception: # noqa
logger.exception('room=%d _parse_ws_message() error:', self.room_id) logger.exception('room=%d _parse_ws_message() error:', self.room_id)
@ -475,36 +491,8 @@ class BLiveClient:
if header.operation in (Operation.SEND_MSG_REPLY, Operation.AUTH_REPLY): if header.operation in (Operation.SEND_MSG_REPLY, Operation.AUTH_REPLY):
# 业务消息,可能有多个包一起发,需要分包 # 业务消息,可能有多个包一起发,需要分包
while True: while True:
if header.operation == Operation.SEND_MSG_REPLY: body = data[offset + header.raw_header_size: offset + header.pack_len]
# 业务消息 await self.__parse_business_message(header, body)
body = data[offset + header.raw_header_size: offset + header.pack_len]
if header.ver == ProtoVer.DEFLATE:
# 压缩过的先解压,为了避免阻塞网络线程,放在其他线程执行
body = await self._loop.run_in_executor(None, zlib.decompress, body)
await self._parse_ws_message(body)
elif header.ver == ProtoVer.NORMAL:
# 没压缩过的直接反序列化因为有万恶的GIL这里不能并行避免阻塞
if len(body) != 0:
try:
body = json.loads(body.decode('utf-8'))
await self._handle_command(body)
except Exception:
logger.error('room=%d, body=%s', self.room_id, body)
raise
else:
# 未知格式
logger.warning('room=%d unknown protocol version=%d, header=%s, body=%s', self.room_id,
header.ver, header, body)
elif header.operation == Operation.AUTH_REPLY:
# 认证响应 TODO 判断是否成功
await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT))
else:
# 未知消息
body = data[offset + header.raw_header_size: offset + header.pack_len]
logger.warning('room=%d unknown message operation=%d, header=%s, body=%s', self.room_id,
header.operation, header, body)
offset += header.pack_len offset += header.pack_len
if offset >= len(data): if offset >= len(data):
@ -519,10 +507,8 @@ class BLiveClient:
elif header.operation == Operation.HEARTBEAT_REPLY: elif header.operation == Operation.HEARTBEAT_REPLY:
# 服务器心跳包前4字节是人气值后面是客户端发的心跳包内容 # 服务器心跳包前4字节是人气值后面是客户端发的心跳包内容
# pack_len不包括客户端发的心跳包内容不知道是不是服务器BUG # pack_len不包括客户端发的心跳包内容不知道是不是服务器BUG
popularity = int.from_bytes( body = data[offset + header.raw_header_size: offset + header.raw_header_size + 4]
data[offset + header.raw_header_size: offset + header.raw_header_size + 4], popularity = int.from_bytes(body, 'big')
'big'
)
# 自己造个消息当成业务消息处理 # 自己造个消息当成业务消息处理
body = { body = {
'cmd': '_HEARTBEAT', 'cmd': '_HEARTBEAT',
@ -538,6 +524,42 @@ class BLiveClient:
logger.warning('room=%d unknown message operation=%d, header=%s, body=%s', self.room_id, logger.warning('room=%d unknown message operation=%d, header=%s, body=%s', self.room_id,
header.operation, header, body) header.operation, header, body)
async def __parse_business_message(self, header: HeaderTuple, body: bytes):
"""
解析业务消息
"""
if header.operation == Operation.SEND_MSG_REPLY:
# 业务消息
if header.ver == ProtoVer.DEFLATE:
# 压缩过的先解压,为了避免阻塞网络线程,放在其他线程执行
body = await self._loop.run_in_executor(None, zlib.decompress, body)
await self._parse_ws_message(body)
elif header.ver == ProtoVer.NORMAL:
# 没压缩过的直接反序列化因为有万恶的GIL这里不能并行避免阻塞
if len(body) != 0:
try:
body = json.loads(body.decode('utf-8'))
await self._handle_command(body)
except Exception:
logger.error('room=%d, body=%s', self.room_id, body)
raise
else:
# 未知格式
logger.warning('room=%d unknown protocol version=%d, header=%s, body=%s', self.room_id,
header.ver, header, body)
elif header.operation == Operation.AUTH_REPLY:
# 认证响应
body = json.loads(body.decode('utf-8'))
if body['code'] != AuthReplyCode.OK:
raise AuthError(f"auth reply error, code={body['code']}, body={body}")
await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT))
else:
# 未知消息
logger.warning('room=%d unknown message operation=%d, header=%s, body=%s', self.room_id,
header.operation, header, body)
async def _handle_command(self, command: dict): async def _handle_command(self, command: dict):
""" """
解析并处理业务消息 解析并处理业务消息