From be92ac57975c2f48ad2d84aec0feee6f4b8c4991 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 15 Sep 2019 18:46:45 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8D=87=E7=BA=A7=E5=8D=8F=E8=AE=AE=E7=89=88?= =?UTF-8?q?=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- blivedm.py | 119 ++++++++++++++++++++++++++++++++++++++--------------- sample.py | 4 +- 2 files changed, 87 insertions(+), 36 deletions(-) diff --git a/blivedm.py b/blivedm.py index b875676..579e65a 100644 --- a/blivedm.py +++ b/blivedm.py @@ -5,6 +5,7 @@ import json import logging import ssl as ssl_ import struct +import zlib from collections import namedtuple from enum import IntEnum from typing import * @@ -14,10 +15,13 @@ import aiohttp logger = logging.getLogger(__name__) ROOM_INIT_URL = 'https://api.live.bilibili.com/room/v1/Room/room_init' -WEBSOCKET_URL = 'wss://broadcastlv.chat.bilibili.com:2245/sub' +DANMAKU_SERVER_CONF_URL = 'https://api.live.bilibili.com/room/v1/Danmu/getConf' HEADER_STRUCT = struct.Struct('>I2H2I') HeaderTuple = namedtuple('HeaderTuple', ('pack_len', 'raw_header_size', 'ver', 'operation', 'seq_id')) +WS_BODY_PROTOCOL_VERSION_NORMAL = 0 +WS_BODY_PROTOCOL_VERSION_INT = 1 # 用于心跳包 +WS_BODY_PROTOCOL_VERSION_DEFLATE = 2 # go-common\app\service\main\broadcast\model\operation.go @@ -45,6 +49,10 @@ class Operation(IntEnum): # MaxBusinessOp = 10000 +class InitError(Exception): + """初始化失败""" + + class DanmakuMessage: def __init__(self, mode, font_size, color, timestamp, rnd, uid_crc32, msg_type, bubble, msg, @@ -245,19 +253,23 @@ class BLiveClient: ): _COMMAND_HANDLERS[cmd] = None - def __init__(self, room_id, ssl=True, loop=None, session: aiohttp.ClientSession=None, - uid=0): + def __init__(self, room_id, uid=0, session: aiohttp.ClientSession=None, + heartbeat_interval=30, ssl=True, loop=None): """ :param room_id: URL中的房间ID,可以为短ID + :param uid: B站用户ID,0表示未登录 + :param session: cookie、连接池 + :param heartbeat_interval: 发送心跳包的间隔时间(秒) :param ssl: True表示用默认的SSLContext验证,False表示不验证,也可以传入SSLContext :param loop: 协程事件循环 - :param session: cookie、连接池 - :param uid: B站用户ID,0表示未登录 """ # 用来init_room的临时房间ID self._tmp_room_id = room_id # 调用init_room后初始化 self._room_id = self._room_short_id = self._room_owner_uid = None + # [{host: "tx-bj4-live-comet-04.chat.bilibili.com", port: 2243, wss_port: 443, ws_port: 2244}, ...] + self._host_server_list = None + self._host_server_token = None self._uid = uid if loop is not None: @@ -278,6 +290,8 @@ class BLiveClient: # noinspection PyDeprecation if self._session.loop is not self._loop: raise RuntimeError('BLiveClient and session has to use same event loop') + + self._heartbeat_interval = heartbeat_interval # noinspection PyProtectedMember self._ssl = ssl if ssl else ssl_._create_unverified_context() self._websocket = None @@ -344,22 +358,49 @@ class BLiveClient: return self._future async def init_room(self): - async with self._session.get(ROOM_INIT_URL, - params={'id': self._tmp_room_id}, - ssl=self._ssl) as res: - if res.status != 200: - raise ConnectionAbortedError(f'room {self._tmp_room_id} init_room失败:' - f'{res.status} {res.reason}') - data = await res.json() - if data['code'] != 0: - raise ConnectionAbortedError(f'room {self._tmp_room_id} init_room失败:' - f'{data["msg"]}') - self._parse_room_init(data['data']) + try: + async with self._session.get(ROOM_INIT_URL, params={'id': self._tmp_room_id}, + ssl=self._ssl) as res: + if res.status != 200: + logger.warning('room %d room_init失败:%d %s', self._tmp_room_id, + res.status, res.reason) + return False + data = await res.json() + if data['code'] != 0: + logger.warning('room %d room_init失败:%s', self._tmp_room_id, data['msg']) + return False + if not self._parse_room_init(data['data']): + return False + except aiohttp.ClientConnectionError: + logger.exception('room %d room_init失败:', self._tmp_room_id) + return False + + try: + async with self._session.get(DANMAKU_SERVER_CONF_URL, params={'id': self._tmp_room_id}, + ssl=self._ssl) as res: + if res.status != 200: + logger.warning('room %d getConf失败:%d %s', self._tmp_room_id, + res.status, res.reason) + return False + data = await res.json() + if data['code'] != 0: + logger.warning('room %d getConf失败:%s', self._tmp_room_id, data['msg']) + return False + self._host_server_list = data['data']['host_server_list'] + self._host_server_token = data['data']['token'] + if not self._host_server_list: + logger.warning('room %d getConf失败:host_server_list为空') + return False + except aiohttp.ClientConnectionError: + logger.exception('room %d getConf失败:', self._tmp_room_id) + return False + return True def _parse_room_init(self, data): self._room_id = data['room_id'] self._room_short_id = data['short_id'] self._room_owner_uid = data['uid'] + return True def _make_packet(self, data, operation): body = json.dumps(data).encode('utf-8') @@ -376,24 +417,30 @@ class BLiveClient: auth_params = { 'uid': self._uid, 'roomid': self._room_id, - 'protover': 1, + 'protover': 2, 'platform': 'web', - 'clientver': '1.4.0' + 'clientver': '1.8.2', + 'type': 2, + 'key': self._host_server_token } await self._websocket.send_bytes(self._make_packet(auth_params, Operation.AUTH)) async def _message_loop(self): - # 获取房间ID - if self._room_id is None: - await self.init_room() + # 如果之前未初始化则初始化 + if self._host_server_token is None: + if not await self.init_room(): + raise InitError('初始化失败') retry_count = 0 while True: heartbeat_future = None try: # 连接 - async with self._session.ws_connect(WEBSOCKET_URL, - ssl=self._ssl) as websocket: + host_server = self._host_server_list[retry_count % len(self._host_server_list)] + async with self._session.ws_connect( + f'wss://{host_server["host"]}:{host_server["wss_port"]}/sub', + ssl=self._ssl + ) as websocket: retry_count = 0 self._websocket = websocket await self._send_auth() @@ -447,35 +494,39 @@ class BLiveClient: while True: try: await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT)) - await asyncio.sleep(30) + await asyncio.sleep(self._heartbeat_interval) - except (asyncio.CancelledError, aiohttp.ClientConnectorError): + except (asyncio.CancelledError, aiohttp.ClientConnectionError): break - async def _handle_message(self, message): + async def _handle_message(self, data): offset = 0 - while offset < len(message): + while offset < len(data): try: - header = HeaderTuple(*HEADER_STRUCT.unpack_from(message, offset)) + header = HeaderTuple(*HEADER_STRUCT.unpack_from(data, offset)) except struct.error: break if header.operation == Operation.HEARTBEAT_REPLY: - popularity = int.from_bytes(message[offset + HEADER_STRUCT.size: - offset + HEADER_STRUCT.size + 4], + popularity = int.from_bytes(data[offset + HEADER_STRUCT.size: + offset + HEADER_STRUCT.size + 4], 'big') await self._on_receive_popularity(popularity) elif header.operation == Operation.SEND_MSG_REPLY: - body = message[offset + HEADER_STRUCT.size: offset + header.pack_len] - body = json.loads(body.decode('utf-8')) - await self._handle_command(body) + body = data[offset + HEADER_STRUCT.size: offset + header.pack_len] + if header.ver == 2: # WS_BODY_PROTOCOL_VERSION_DEFLATE + body = zlib.decompress(body) + await self._handle_message(body) + else: + body = json.loads(body.decode('utf-8')) + await self._handle_command(body) elif header.operation == Operation.AUTH_REPLY: await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT)) else: - body = message[offset + HEADER_STRUCT.size: offset + header.pack_len] + body = data[offset + HEADER_STRUCT.size: offset + header.pack_len] logger.warning('room %d 未知包类型:operation=%d %s%s', self.room_id, header.operation, header, body) diff --git a/sample.py b/sample.py index 99b3348..08b4c21 100644 --- a/sample.py +++ b/sample.py @@ -28,8 +28,8 @@ class MyBLiveClient(blivedm.BLiveClient): async def main(): # 139是黑桐谷歌的直播间 - # 如果SSL验证失败就把第二个参数设为False - client = MyBLiveClient(139, True) + # 如果SSL验证失败就把ssl设为False + client = MyBLiveClient(139, ssl=True) future = client.start() try: # 5秒后停止,测试用