整理解包代码

This commit is contained in:
John Smith 2021-12-18 14:42:11 +08:00
parent 60c60e69c5
commit 62df6345c4

View File

@ -13,6 +13,10 @@ import aiohttp
from . import handlers
__all__ = (
'BLiveClient',
)
logger = logging.getLogger('blivedm')
ROOM_INIT_URL = 'https://api.live.bilibili.com/xlive/web-room/v1/index/getInfoByRoom'
@ -23,9 +27,14 @@ DEFAULT_DANMAKU_SERVER_LIST = [
HEADER_STRUCT = struct.Struct('>I2H2I')
HeaderTuple = collections.namedtuple('HeaderTuple', ('pack_len', 'raw_header_size', 'ver', 'operation', 'seq_id'))
WS_BODY_PROTOCOL_VERSION_INFLATE = 0
WS_BODY_PROTOCOL_VERSION_NORMAL = 1
WS_BODY_PROTOCOL_VERSION_DEFLATE = 2
# WS_BODY_PROTOCOL_VERSION
class ProtoVer(enum.IntEnum):
NORMAL = 0
HEARTBEAT = 1
DEFLATE = 2
BROTLI = 3
# go-common\app\service\main\broadcast\model\operation.go
@ -308,13 +317,13 @@ class BLiveClient:
:return: 整个包的数据
"""
body = json.dumps(data).encode('utf-8')
header = HEADER_STRUCT.pack(
HEADER_STRUCT.size + len(body), # pack_len
HEADER_STRUCT.size, # raw_header_size
1, # ver
operation, # operation
1 # seq_id
)
header = HEADER_STRUCT.pack(*HeaderTuple(
pack_len=HEADER_STRUCT.size + len(body),
raw_header_size=HEADER_STRUCT.size,
ver=1,
operation=operation,
seq_id=1
))
return header + body
async def _network_coroutine_wrapper(self):
@ -468,12 +477,12 @@ class BLiveClient:
while True:
if header.operation == Operation.SEND_MSG_REPLY:
# 业务消息
body = data[offset + HEADER_STRUCT.size: offset + header.pack_len]
if header.ver == WS_BODY_PROTOCOL_VERSION_DEFLATE:
body = data[offset + header.raw_header_size: offset + header.pack_len]
if header.ver == ProtoVer.DEFLATE:
# 压缩过的先解压,为了避免阻塞网络线程,放在其他线程执行
body = await self._loop.run_in_executor(None, zlib.decompress, body)
await self._parse_ws_message(body)
else:
elif header.ver == ProtoVer.NORMAL:
# 没压缩过的直接反序列化因为有万恶的GIL这里不能并行避免阻塞
if len(body) != 0:
try:
@ -482,6 +491,10 @@ class BLiveClient:
except Exception:
logger.error('room=%d, body=%s', self.room_id, body)
raise
else:
# 未知格式
logger.warning('room=%d unknown protocol version=%d, header=%s, body=%s', self.room_id,
header.ver, header, body)
elif header.operation == Operation.AUTH_REPLY:
# 认证响应 TODO 判断是否成功
@ -489,7 +502,7 @@ class BLiveClient:
else:
# 未知消息
body = data[offset + HEADER_STRUCT.size: offset + header.pack_len]
body = data[offset + header.raw_header_size: offset + header.pack_len]
logger.warning('room=%d unknown message operation=%d, header=%s, body=%s', self.room_id,
header.operation, header, body)
@ -507,7 +520,7 @@ class BLiveClient:
# 服务器心跳包前4字节是人气值后面是客户端发的心跳包内容
# pack_len不包括客户端发的心跳包内容不知道是不是服务器BUG
popularity = int.from_bytes(
data[offset + HEADER_STRUCT.size: offset + HEADER_STRUCT.size + 4],
data[offset + header.raw_header_size: offset + header.raw_header_size + 4],
'big'
)
# 自己造个消息当成业务消息处理
@ -521,7 +534,7 @@ class BLiveClient:
else:
# 未知消息
body = data[offset + HEADER_STRUCT.size: offset + header.pack_len]
body = data[offset + header.raw_header_size: offset + header.pack_len]
logger.warning('room=%d unknown message operation=%d, header=%s, body=%s', self.room_id,
header.operation, header, body)