使用官方名称

This commit is contained in:
John Smith 2019-04-22 19:47:05 +08:00
parent af714aa1f7
commit f755468908

View File

@ -13,24 +13,42 @@ import aiohttp
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
ROOM_INIT_URL = 'https://api.live.bilibili.com/room/v1/Room/room_init'
WEBSOCKET_URL = 'wss://broadcastlv.chat.bilibili.com:2245/sub'
HEADER_STRUCT = struct.Struct('>I2H2I')
HeaderTuple = namedtuple('HeaderTuple', ('pack_len', 'raw_header_size', 'ver', 'operation', 'seq_id'))
# go-common\app\service\main\broadcast\model\operation.go
class Operation(IntEnum): class Operation(IntEnum):
SEND_HEARTBEAT = 2 HANDSHAKE = 0
POPULARITY = 3 HANDSHAKE_REPLY = 1
COMMAND = 5 HEARTBEAT = 2
HEARTBEAT_REPLY = 3
SEND_MSG = 4
SEND_MSG_REPLY = 5
DISCONNECT_REPLY = 6
AUTH = 7 AUTH = 7
RECV_HEARTBEAT = 8 AUTH_REPLY = 8
RAW = 9
PROTO_READY = 10
PROTO_FINISH = 11
CHANGE_ROOM = 12
CHANGE_ROOM_REPLY = 13
REGISTER = 14
REGISTER_REPLY = 15
UNREGISTER = 16
UNREGISTER_REPLY = 17
# B站业务自定义OP
# MinBusinessOp = 1000
# MaxBusinessOp = 10000
class BLiveClient: class BLiveClient:
ROOM_INIT_URL = 'https://api.live.bilibili.com/room/v1/Room/room_init'
WEBSOCKET_URL = 'wss://broadcastlv.chat.bilibili.com:2245/sub'
HEADER_STRUCT = struct.Struct('>I2H2I')
HeaderTuple = namedtuple('HeaderTuple', ('total_len', 'header_len', 'proto_ver', 'operation', 'sequence'))
_COMMAND_HANDLERS = { _COMMAND_HANDLERS = {
# 收到弹幕 # 收到弹幕
# go-common\app\service\live\live-dm\service\v1\send.go
'DANMU_MSG': lambda client, command: client._on_get_danmaku( 'DANMU_MSG': lambda client, command: client._on_get_danmaku(
command['info'][1], command['info'][2][1] command['info'][1], command['info'][2][1]
), ),
@ -115,7 +133,7 @@ class BLiveClient:
return asyncio.ensure_future(self._message_loop(), loop=self._loop) return asyncio.ensure_future(self._message_loop(), loop=self._loop)
async def _get_room_id(self): async def _get_room_id(self):
async with self._session.get(self.ROOM_INIT_URL, async with self._session.get(ROOM_INIT_URL,
params={'id': self._short_id}, params={'id': self._short_id},
ssl=self._ssl) as res: ssl=self._ssl) as res:
if res.status == 200: if res.status == 200:
@ -129,9 +147,9 @@ class BLiveClient:
def _make_packet(self, data, operation): def _make_packet(self, data, operation):
body = json.dumps(data).encode('utf-8') body = json.dumps(data).encode('utf-8')
header = self.HEADER_STRUCT.pack( header = HEADER_STRUCT.pack(
self.HEADER_STRUCT.size + len(body), HEADER_STRUCT.size + len(body),
self.HEADER_STRUCT.size, HEADER_STRUCT.size,
1, 1,
operation, operation,
1 1
@ -157,7 +175,7 @@ class BLiveClient:
heartbeat_future = None heartbeat_future = None
try: try:
# 连接 # 连接
async with self._session.ws_connect(self.WEBSOCKET_URL, async with self._session.ws_connect(WEBSOCKET_URL,
ssl=self._ssl) as websocket: ssl=self._ssl) as websocket:
self._websocket = websocket self._websocket = websocket
await self._send_auth() await self._send_auth()
@ -193,7 +211,7 @@ class BLiveClient:
async def _heartbeat_loop(self): async def _heartbeat_loop(self):
while True: while True:
try: try:
await self._websocket.send_bytes(self._make_packet({}, Operation.SEND_HEARTBEAT)) await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT))
await asyncio.sleep(30) await asyncio.sleep(30)
except (asyncio.CancelledError, aiohttp.ClientConnectorError): except (asyncio.CancelledError, aiohttp.ClientConnectorError):
@ -203,29 +221,29 @@ class BLiveClient:
offset = 0 offset = 0
while offset < len(message): while offset < len(message):
try: try:
header = self.HeaderTuple(*self.HEADER_STRUCT.unpack_from(message, offset)) header = HeaderTuple(*HEADER_STRUCT.unpack_from(message, offset))
except struct.error: except struct.error:
break break
if header.operation == Operation.POPULARITY: if header.operation == Operation.HEARTBEAT_REPLY:
popularity = int.from_bytes(message[offset + self.HEADER_STRUCT.size: popularity = int.from_bytes(message[offset + HEADER_STRUCT.size:
offset + self.HEADER_STRUCT.size + 4], offset + HEADER_STRUCT.size + 4],
'big') 'big')
await self._on_get_popularity(popularity) await self._on_get_popularity(popularity)
elif header.operation == Operation.COMMAND: elif header.operation == Operation.SEND_MSG_REPLY:
body = message[offset + self.HEADER_STRUCT.size: offset + header.total_len] body = message[offset + HEADER_STRUCT.size: offset + header.pack_len]
body = json.loads(body.decode('utf-8')) body = json.loads(body.decode('utf-8'))
await self._handle_command(body) await self._handle_command(body)
elif header.operation == Operation.RECV_HEARTBEAT: elif header.operation == Operation.AUTH_REPLY:
await self._websocket.send_bytes(self._make_packet({}, Operation.SEND_HEARTBEAT)) await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT))
else: else:
body = message[offset + self.HEADER_STRUCT.size: offset + header.total_len] body = message[offset + HEADER_STRUCT.size: offset + header.pack_len]
logger.warning('未知包类型operation=%d %s%s', header.operation, header, body) logger.warning('未知包类型operation=%d %s%s', header.operation, header, body)
offset += header.total_len offset += header.pack_len
async def _handle_command(self, command): async def _handle_command(self, command):
if isinstance(command, list): if isinstance(command, list):