mirror of
https://github.com/xfgryujk/blivedm.git
synced 2025-01-13 13:50:25 +08:00
整理日志、修复心跳包解析错误的问题
This commit is contained in:
parent
89b540dddd
commit
afa04d557b
@ -180,7 +180,7 @@ class BLiveClient:
|
|||||||
启动本客户端
|
启动本客户端
|
||||||
"""
|
"""
|
||||||
if self.is_running:
|
if self.is_running:
|
||||||
logger.warning('room %s 已经在运行中,不能再次start', self.room_id)
|
logger.warning('room=%s client is running, cannot start() again', self.room_id)
|
||||||
return
|
return
|
||||||
|
|
||||||
self._network_future = asyncio.ensure_future(self._network_coroutine_wrapper(), loop=self._loop)
|
self._network_future = asyncio.ensure_future(self._network_coroutine_wrapper(), loop=self._loop)
|
||||||
@ -190,7 +190,7 @@ class BLiveClient:
|
|||||||
停止本客户端
|
停止本客户端
|
||||||
"""
|
"""
|
||||||
if not self.is_running:
|
if not self.is_running:
|
||||||
logger.warning('room %s 已经停止,不能再次stop', self.room_id)
|
logger.warning('room=%s client is stopped, cannot stop() again', self.room_id)
|
||||||
return
|
return
|
||||||
|
|
||||||
self._network_future.cancel()
|
self._network_future.cancel()
|
||||||
@ -208,7 +208,7 @@ class BLiveClient:
|
|||||||
等待本客户端停止
|
等待本客户端停止
|
||||||
"""
|
"""
|
||||||
if not self.is_running:
|
if not self.is_running:
|
||||||
logger.warning('room %s 已经停止,不能join', self.room_id)
|
logger.warning('room=%s client is stopped, cannot join()', self.room_id)
|
||||||
return
|
return
|
||||||
|
|
||||||
await self._network_future
|
await self._network_future
|
||||||
@ -218,7 +218,7 @@ class BLiveClient:
|
|||||||
释放本客户端的资源,调用后本客户端将不可用
|
释放本客户端的资源,调用后本客户端将不可用
|
||||||
"""
|
"""
|
||||||
if self.is_running:
|
if self.is_running:
|
||||||
logger.warning('room %s 在运行状态中调用了close', self.room_id)
|
logger.warning('room=%s is calling close(), but client is running', self.room_id)
|
||||||
|
|
||||||
# 如果session是自己创建的则关闭session
|
# 如果session是自己创建的则关闭session
|
||||||
if self._own_session:
|
if self._own_session:
|
||||||
@ -249,17 +249,18 @@ class BLiveClient:
|
|||||||
async with self._session.get(ROOM_INIT_URL, params={'room_id': self._tmp_room_id},
|
async with self._session.get(ROOM_INIT_URL, params={'room_id': self._tmp_room_id},
|
||||||
ssl=self._ssl) as res:
|
ssl=self._ssl) as res:
|
||||||
if res.status != 200:
|
if res.status != 200:
|
||||||
logger.warning('room %d init_room失败:%d %s', self._tmp_room_id,
|
logger.warning('room=%d _init_room_id_and_owner() failed, status=%d, reason=%s', self._tmp_room_id,
|
||||||
res.status, res.reason)
|
res.status, res.reason)
|
||||||
return False
|
return False
|
||||||
data = await res.json()
|
data = await res.json()
|
||||||
if data['code'] != 0:
|
if data['code'] != 0:
|
||||||
logger.warning('room %d init_room失败:%s', self._tmp_room_id, data['message'])
|
logger.warning('room=%d _init_room_id_and_owner() failed, message=%s', self._tmp_room_id,
|
||||||
|
data['message'])
|
||||||
return False
|
return False
|
||||||
if not self._parse_room_init(data['data']):
|
if not self._parse_room_init(data['data']):
|
||||||
return False
|
return False
|
||||||
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
||||||
logger.exception('room %d init_room失败:', self._tmp_room_id)
|
logger.exception('room=%d _init_room_id_and_owner() failed:', self._tmp_room_id)
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@ -275,17 +276,17 @@ class BLiveClient:
|
|||||||
async with self._session.get(DANMAKU_SERVER_CONF_URL, params={'id': self._room_id, 'type': 0},
|
async with self._session.get(DANMAKU_SERVER_CONF_URL, params={'id': self._room_id, 'type': 0},
|
||||||
ssl=self._ssl) as res:
|
ssl=self._ssl) as res:
|
||||||
if res.status != 200:
|
if res.status != 200:
|
||||||
logger.warning('room %d getConf失败:%d %s', self._room_id,
|
logger.warning('room=%d _init_host_server() failed, status=%d, reason=%s', self._room_id,
|
||||||
res.status, res.reason)
|
res.status, res.reason)
|
||||||
return False
|
return False
|
||||||
data = await res.json()
|
data = await res.json()
|
||||||
if data['code'] != 0:
|
if data['code'] != 0:
|
||||||
logger.warning('room %d getConf失败:%s', self._room_id, data['message'])
|
logger.warning('room=%d _init_host_server() failed, message=%s', self._room_id, data['message'])
|
||||||
return False
|
return False
|
||||||
if not self._parse_danmaku_server_conf(data['data']):
|
if not self._parse_danmaku_server_conf(data['data']):
|
||||||
return False
|
return False
|
||||||
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
||||||
logger.exception('room %d getConf失败:', self._room_id)
|
logger.exception('room=%d _init_host_server() failed:', self._room_id)
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@ -293,7 +294,7 @@ class BLiveClient:
|
|||||||
self._host_server_list = data['host_list']
|
self._host_server_list = data['host_list']
|
||||||
self._host_server_token = data['token']
|
self._host_server_token = data['token']
|
||||||
if not self._host_server_list:
|
if not self._host_server_list:
|
||||||
logger.warning('room %d getConf失败:host_server_list为空', self._room_id)
|
logger.warning('room=%d _parse_danmaku_server_conf() failed: host_server_list is empty', self._room_id)
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@ -326,9 +327,9 @@ class BLiveClient:
|
|||||||
# 正常停止
|
# 正常停止
|
||||||
pass
|
pass
|
||||||
except Exception as e: # noqa
|
except Exception as e: # noqa
|
||||||
logger.exception('room %s 网络协程异常结束:', self.room_id)
|
logger.exception('room=%s _network_coroutine() finished with exception:', self.room_id)
|
||||||
finally:
|
finally:
|
||||||
logger.debug('room %s 网络协程结束', self.room_id)
|
logger.debug('room=%s _network_coroutine() finished', self.room_id)
|
||||||
self._network_future = None
|
self._network_future = None
|
||||||
|
|
||||||
async def _network_coroutine(self):
|
async def _network_coroutine(self):
|
||||||
@ -363,7 +364,7 @@ class BLiveClient:
|
|||||||
# 掉线重连
|
# 掉线重连
|
||||||
pass
|
pass
|
||||||
except ssl_.SSLError:
|
except ssl_.SSLError:
|
||||||
logger.error('room %d 发生SSL错误,无法重连', self.room_id)
|
logger.error('room=%d a SSLError happened, cannot reconnect', self.room_id)
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
self._websocket = None
|
self._websocket = None
|
||||||
@ -371,7 +372,7 @@ class BLiveClient:
|
|||||||
|
|
||||||
# 准备重连
|
# 准备重连
|
||||||
retry_count += 1
|
retry_count += 1
|
||||||
logger.warning('room %d 掉线重连中%d', self.room_id, retry_count)
|
logger.warning('room=%d is reconnecting, retry_count=%d', self.room_id, retry_count)
|
||||||
await asyncio.sleep(1, loop=self._loop)
|
await asyncio.sleep(1, loop=self._loop)
|
||||||
|
|
||||||
async def _on_ws_connect(self):
|
async def _on_ws_connect(self):
|
||||||
@ -420,7 +421,7 @@ class BLiveClient:
|
|||||||
:param message: websocket消息
|
:param message: websocket消息
|
||||||
"""
|
"""
|
||||||
if message.type != aiohttp.WSMsgType.BINARY:
|
if message.type != aiohttp.WSMsgType.BINARY:
|
||||||
logger.warning('room %d 未知的websocket消息:type=%s %s', self.room_id,
|
logger.warning('room=%d unknown websocket message type=%s, data=%s', self.room_id,
|
||||||
message.type, message.data)
|
message.type, message.data)
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -430,7 +431,7 @@ class BLiveClient:
|
|||||||
# 正常停止,让外层处理
|
# 正常停止,让外层处理
|
||||||
raise
|
raise
|
||||||
except Exception: # noqa
|
except Exception: # noqa
|
||||||
logger.exception('room %d 处理websocket消息时发生错误:', self.room_id)
|
logger.exception('room=%d _parse_ws_message() error:', self.room_id)
|
||||||
|
|
||||||
async def _parse_ws_message(self, data: bytes):
|
async def _parse_ws_message(self, data: bytes):
|
||||||
"""
|
"""
|
||||||
@ -439,71 +440,89 @@ class BLiveClient:
|
|||||||
:param data: websocket消息数据
|
:param data: websocket消息数据
|
||||||
"""
|
"""
|
||||||
offset = 0
|
offset = 0
|
||||||
while offset < len(data):
|
try:
|
||||||
try:
|
header = HeaderTuple(*HEADER_STRUCT.unpack_from(data, offset))
|
||||||
header = HeaderTuple(*HEADER_STRUCT.unpack_from(data, offset))
|
except struct.error:
|
||||||
except struct.error:
|
logger.exception('room=%d parsing header failed, offset=%d, data=%s', self.room_id, offset, data)
|
||||||
break
|
return
|
||||||
|
|
||||||
if header.operation == Operation.HEARTBEAT_REPLY:
|
if header.operation in (Operation.SEND_MSG_REPLY, Operation.AUTH_REPLY):
|
||||||
# 心跳包,自己造个消息当成业务消息处理
|
# 业务消息,可能有多个包一起发,需要分包
|
||||||
popularity = int.from_bytes(
|
while True:
|
||||||
data[offset + HEADER_STRUCT.size: offset + HEADER_STRUCT.size + 4],
|
if header.operation == Operation.SEND_MSG_REPLY:
|
||||||
'big'
|
# 业务消息
|
||||||
)
|
body = data[offset + HEADER_STRUCT.size: offset + header.pack_len]
|
||||||
body = {
|
if header.ver == WS_BODY_PROTOCOL_VERSION_DEFLATE:
|
||||||
'cmd': '_HEARTBEAT',
|
# 压缩过的先解压,为了避免阻塞网络线程,放在其他线程执行
|
||||||
'data': {
|
body = await self._loop.run_in_executor(None, zlib.decompress, body)
|
||||||
'popularity': popularity
|
await self._parse_ws_message(body)
|
||||||
}
|
else:
|
||||||
}
|
# 没压缩过的直接反序列化,因为有万恶的GIL,这里不能并行避免阻塞
|
||||||
await self._handle_command(body)
|
if len(body) != 0:
|
||||||
|
try:
|
||||||
|
body = json.loads(body.decode('utf-8'))
|
||||||
|
await self._handle_command(body)
|
||||||
|
except Exception:
|
||||||
|
logger.error('room=%d, body=%s', self.room_id, body)
|
||||||
|
raise
|
||||||
|
|
||||||
|
elif header.operation == Operation.AUTH_REPLY:
|
||||||
|
# 认证响应 TODO 判断是否成功
|
||||||
|
await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT))
|
||||||
|
|
||||||
elif header.operation == Operation.SEND_MSG_REPLY:
|
|
||||||
# 业务消息
|
|
||||||
body = data[offset + HEADER_STRUCT.size: offset + header.pack_len]
|
|
||||||
if header.ver == WS_BODY_PROTOCOL_VERSION_DEFLATE:
|
|
||||||
# 压缩过的先解压,为了避免阻塞网络线程,放在其他线程执行
|
|
||||||
body = await self._loop.run_in_executor(None, zlib.decompress, body)
|
|
||||||
await self._parse_ws_message(body)
|
|
||||||
else:
|
else:
|
||||||
# 没压缩过的
|
# 未知消息
|
||||||
try:
|
body = data[offset + HEADER_STRUCT.size: offset + header.pack_len]
|
||||||
body = json.loads(body.decode('utf-8'))
|
logger.warning('room=%d unknown message operation=%d, header=%s, body=%s', self.room_id,
|
||||||
await self._handle_command(body)
|
header.operation, header, body)
|
||||||
except Exception:
|
|
||||||
logger.error('room %d body=%s', self.room_id, body)
|
|
||||||
raise
|
|
||||||
|
|
||||||
elif header.operation == Operation.AUTH_REPLY:
|
offset += header.pack_len
|
||||||
# 认证响应
|
if offset >= len(data):
|
||||||
await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT))
|
break
|
||||||
|
|
||||||
else:
|
try:
|
||||||
# 未知消息
|
header = HeaderTuple(*HEADER_STRUCT.unpack_from(data, offset))
|
||||||
body = data[offset + HEADER_STRUCT.size: offset + header.pack_len]
|
except struct.error:
|
||||||
logger.warning('room %d 未知包类型:operation=%d %s%s', self.room_id,
|
logger.exception('room=%d parsing header failed, offset=%d, data=%s', self.room_id, offset, data)
|
||||||
header.operation, header, body)
|
break
|
||||||
|
|
||||||
offset += header.pack_len
|
elif header.operation == Operation.HEARTBEAT_REPLY:
|
||||||
|
# 服务器心跳包,前4字节是人气值,后面是客户端发的心跳包内容
|
||||||
|
# pack_len不包括客户端发的心跳包内容,不知道是不是服务器BUG
|
||||||
|
popularity = int.from_bytes(
|
||||||
|
data[offset + HEADER_STRUCT.size: offset + HEADER_STRUCT.size + 4],
|
||||||
|
'big'
|
||||||
|
)
|
||||||
|
# 自己造个消息当成业务消息处理
|
||||||
|
body = {
|
||||||
|
'cmd': '_HEARTBEAT',
|
||||||
|
'data': {
|
||||||
|
'popularity': popularity
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await self._handle_command(body)
|
||||||
|
|
||||||
async def _handle_command(self, command: Union[list, dict]):
|
else:
|
||||||
|
# 未知消息
|
||||||
|
body = data[offset + HEADER_STRUCT.size: offset + header.pack_len]
|
||||||
|
logger.warning('room=%d unknown message operation=%d, header=%s, body=%s', self.room_id,
|
||||||
|
header.operation, header, body)
|
||||||
|
|
||||||
|
async def _handle_command(self, command: dict):
|
||||||
"""
|
"""
|
||||||
解析并处理业务消息
|
解析并处理业务消息
|
||||||
|
|
||||||
:param command: 业务消息
|
:param command: 业务消息
|
||||||
"""
|
"""
|
||||||
# 这里可能会多个消息一起发
|
# 外部代码可能不能正常处理取消,所以这里加shield
|
||||||
if isinstance(command, list):
|
results = await asyncio.shield(
|
||||||
for one_command in command:
|
asyncio.gather(
|
||||||
await self._handle_command(one_command)
|
*(handler.handle(self, command) for handler in self._handlers),
|
||||||
return
|
loop=self._loop,
|
||||||
|
return_exceptions=True
|
||||||
results = await asyncio.gather(
|
),
|
||||||
*(handler.handle(self, command) for handler in self._handlers),
|
loop=self._loop
|
||||||
loop=self._loop,
|
|
||||||
return_exceptions=True
|
|
||||||
)
|
)
|
||||||
for res in results:
|
for res in results:
|
||||||
if isinstance(res, Exception):
|
if isinstance(res, Exception):
|
||||||
logger.exception('room %d 处理消息时发生错误,command=%s', self.room_id, command, exc_info=res)
|
logger.exception('room=%d _handle_command() failed, command=%s', self.room_id, command, exc_info=res)
|
||||||
|
@ -108,7 +108,7 @@ class BaseHandler(HandlerInterface):
|
|||||||
if cmd not in self._CMD_CALLBACK_DICT:
|
if cmd not in self._CMD_CALLBACK_DICT:
|
||||||
# 只有第一次遇到未知cmd时打日志
|
# 只有第一次遇到未知cmd时打日志
|
||||||
if cmd not in logged_unknown_cmds:
|
if cmd not in logged_unknown_cmds:
|
||||||
logger.warning('room %d 未知cmd:cmd=%s %s', client.room_id, cmd, command)
|
logger.warning('room=%d unknown cmd=%s, command=%s', client.room_id, cmd, command)
|
||||||
logged_unknown_cmds.add(cmd)
|
logged_unknown_cmds.add(cmd)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user