diff --git a/blivedm.py b/blivedm.py index 431425c..928613c 100644 --- a/blivedm.py +++ b/blivedm.py @@ -13,24 +13,42 @@ import aiohttp logger = logging.getLogger(__name__) +ROOM_INIT_URL = 'https://api.live.bilibili.com/room/v1/Room/room_init' +WEBSOCKET_URL = 'wss://broadcastlv.chat.bilibili.com:2245/sub' +HEADER_STRUCT = struct.Struct('>I2H2I') +HeaderTuple = namedtuple('HeaderTuple', ('pack_len', 'raw_header_size', 'ver', 'operation', 'seq_id')) + + +# go-common\app\service\main\broadcast\model\operation.go class Operation(IntEnum): - SEND_HEARTBEAT = 2 - POPULARITY = 3 - COMMAND = 5 + HANDSHAKE = 0 + HANDSHAKE_REPLY = 1 + HEARTBEAT = 2 + HEARTBEAT_REPLY = 3 + SEND_MSG = 4 + SEND_MSG_REPLY = 5 + DISCONNECT_REPLY = 6 AUTH = 7 - RECV_HEARTBEAT = 8 + AUTH_REPLY = 8 + RAW = 9 + PROTO_READY = 10 + PROTO_FINISH = 11 + CHANGE_ROOM = 12 + CHANGE_ROOM_REPLY = 13 + REGISTER = 14 + REGISTER_REPLY = 15 + UNREGISTER = 16 + UNREGISTER_REPLY = 17 + # B站业务自定义OP + # MinBusinessOp = 1000 + # MaxBusinessOp = 10000 class BLiveClient: - ROOM_INIT_URL = 'https://api.live.bilibili.com/room/v1/Room/room_init' - WEBSOCKET_URL = 'wss://broadcastlv.chat.bilibili.com:2245/sub' - - HEADER_STRUCT = struct.Struct('>I2H2I') - HeaderTuple = namedtuple('HeaderTuple', ('total_len', 'header_len', 'proto_ver', 'operation', 'sequence')) - _COMMAND_HANDLERS = { # 收到弹幕 + # go-common\app\service\live\live-dm\service\v1\send.go 'DANMU_MSG': lambda client, command: client._on_get_danmaku( command['info'][1], command['info'][2][1] ), @@ -115,7 +133,7 @@ class BLiveClient: return asyncio.ensure_future(self._message_loop(), loop=self._loop) async def _get_room_id(self): - async with self._session.get(self.ROOM_INIT_URL, + async with self._session.get(ROOM_INIT_URL, params={'id': self._short_id}, ssl=self._ssl) as res: if res.status == 200: @@ -129,9 +147,9 @@ class BLiveClient: def _make_packet(self, data, operation): body = json.dumps(data).encode('utf-8') - header = self.HEADER_STRUCT.pack( - self.HEADER_STRUCT.size + len(body), - self.HEADER_STRUCT.size, + header = HEADER_STRUCT.pack( + HEADER_STRUCT.size + len(body), + HEADER_STRUCT.size, 1, operation, 1 @@ -157,7 +175,7 @@ class BLiveClient: heartbeat_future = None try: # 连接 - async with self._session.ws_connect(self.WEBSOCKET_URL, + async with self._session.ws_connect(WEBSOCKET_URL, ssl=self._ssl) as websocket: self._websocket = websocket await self._send_auth() @@ -193,7 +211,7 @@ class BLiveClient: async def _heartbeat_loop(self): while True: try: - await self._websocket.send_bytes(self._make_packet({}, Operation.SEND_HEARTBEAT)) + await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT)) await asyncio.sleep(30) except (asyncio.CancelledError, aiohttp.ClientConnectorError): @@ -203,29 +221,29 @@ class BLiveClient: offset = 0 while offset < len(message): try: - header = self.HeaderTuple(*self.HEADER_STRUCT.unpack_from(message, offset)) + header = HeaderTuple(*HEADER_STRUCT.unpack_from(message, offset)) except struct.error: break - if header.operation == Operation.POPULARITY: - popularity = int.from_bytes(message[offset + self.HEADER_STRUCT.size: - offset + self.HEADER_STRUCT.size + 4], + if header.operation == Operation.HEARTBEAT_REPLY: + popularity = int.from_bytes(message[offset + HEADER_STRUCT.size: + offset + HEADER_STRUCT.size + 4], 'big') await self._on_get_popularity(popularity) - elif header.operation == Operation.COMMAND: - body = message[offset + self.HEADER_STRUCT.size: offset + header.total_len] + elif header.operation == Operation.SEND_MSG_REPLY: + body = message[offset + HEADER_STRUCT.size: offset + header.pack_len] body = json.loads(body.decode('utf-8')) await self._handle_command(body) - elif header.operation == Operation.RECV_HEARTBEAT: - await self._websocket.send_bytes(self._make_packet({}, Operation.SEND_HEARTBEAT)) + elif header.operation == Operation.AUTH_REPLY: + await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT)) else: - body = message[offset + self.HEADER_STRUCT.size: offset + header.total_len] + body = message[offset + HEADER_STRUCT.size: offset + header.pack_len] logger.warning('未知包类型:operation=%d %s%s', header.operation, header, body) - offset += header.total_len + offset += header.pack_len async def _handle_command(self, command): if isinstance(command, list):