优化代码

This commit is contained in:
John Smith 2021-01-31 13:11:20 +08:00
parent 8d8cc8c270
commit dca7f1499f

View File

@ -22,8 +22,8 @@ DEFAULT_DANMAKU_SERVER_LIST = [
HEADER_STRUCT = struct.Struct('>I2H2I')
HeaderTuple = namedtuple('HeaderTuple', ('pack_len', 'raw_header_size', 'ver', 'operation', 'seq_id'))
WS_BODY_PROTOCOL_VERSION_NORMAL = 0
WS_BODY_PROTOCOL_VERSION_INT = 1 # 用于心跳包
WS_BODY_PROTOCOL_VERSION_INFLATE = 0
WS_BODY_PROTOCOL_VERSION_NORMAL = 1
WS_BODY_PROTOCOL_VERSION_DEFLATE = 2
@ -374,6 +374,7 @@ class BLiveClient:
# noinspection PyProtectedMember
self._ssl = ssl if ssl else ssl_._create_unverified_context()
self._websocket = None
self._heartbeat_timer_handle = None
@property
def is_running(self):
@ -507,7 +508,8 @@ class BLiveClient:
return False
return True
def _make_packet(self, data, operation):
@staticmethod
def _make_packet(data, operation):
body = json.dumps(data).encode('utf-8')
header = HEADER_STRUCT.pack(
HEADER_STRUCT.size + len(body),
@ -539,7 +541,6 @@ class BLiveClient:
retry_count = 0
while True:
heartbeat_future = None
try:
# 连接
host_server = self._host_server_list[retry_count % len(self._host_server_list)]
@ -549,27 +550,24 @@ class BLiveClient:
) as websocket:
self._websocket = websocket
await self._send_auth()
heartbeat_future = asyncio.ensure_future(self._heartbeat_loop(), loop=self._loop)
heartbeat_future.add_done_callback(
lambda _future: logger.debug('room %d 心跳循环结束', self.room_id)
self._heartbeat_timer_handle = self._loop.call_later(
self._heartbeat_interval, self._on_send_heartbeat
)
# 处理消息
async for message in websocket: # type: aiohttp.WSMessage
retry_count = 0
if message.type == aiohttp.WSMsgType.BINARY:
try:
await self._handle_message(message.data)
except BaseException as e:
if type(e) in (
asyncio.CancelledError, aiohttp.ClientConnectionError,
asyncio.TimeoutError, ssl_.SSLError
):
raise
logger.exception('room %d 处理消息时发生错误:', self.room_id)
else:
if message.type != aiohttp.WSMsgType.BINARY:
logger.warning('room %d 未知的websocket消息type=%s %s', self.room_id,
message.type, message.data)
continue
try:
await self._handle_message(message.data)
except asyncio.CancelledError:
raise
except Exception:
logger.exception('room %d 处理消息时发生错误:', self.room_id)
except asyncio.CancelledError:
break
@ -581,13 +579,10 @@ class BLiveClient:
# 证书错误时无法重连
break
finally:
if heartbeat_future is not None:
heartbeat_future.cancel()
try:
await heartbeat_future
except asyncio.CancelledError:
break
self._websocket = None
if self._heartbeat_timer_handle is not None:
self._heartbeat_timer_handle.cancel()
self._heartbeat_timer_handle = None
retry_count += 1
logger.warning('room %d 掉线重连中%d', self.room_id, retry_count)
@ -596,14 +591,10 @@ class BLiveClient:
except asyncio.CancelledError:
break
async def _heartbeat_loop(self):
while True:
try:
await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT))
await asyncio.sleep(self._heartbeat_interval)
except (asyncio.CancelledError, aiohttp.ClientConnectionError):
break
def _on_send_heartbeat(self):
coro = self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT))
asyncio.ensure_future(coro, loop=self._loop)
self._heartbeat_timer_handle = self._loop.call_later(self._heartbeat_interval, self._on_send_heartbeat)
async def _handle_message(self, data):
offset = 0
@ -622,13 +613,13 @@ class BLiveClient:
elif header.operation == Operation.SEND_MSG_REPLY:
body = data[offset + HEADER_STRUCT.size: offset + header.pack_len]
if header.ver == WS_BODY_PROTOCOL_VERSION_DEFLATE:
body = zlib.decompress(body)
body = await self._loop.run_in_executor(None, zlib.decompress, body)
await self._handle_message(body)
else:
try:
body = json.loads(body.decode('utf-8'))
await self._handle_command(body)
except BaseException:
except Exception:
logger.error('body: %s', body)
raise