升级协议版本

This commit is contained in:
John Smith 2019-09-15 18:46:45 +08:00
parent 9eb953720c
commit be92ac5797
2 changed files with 87 additions and 36 deletions

View File

@ -5,6 +5,7 @@ import json
import logging
import ssl as ssl_
import struct
import zlib
from collections import namedtuple
from enum import IntEnum
from typing import *
@ -14,10 +15,13 @@ import aiohttp
logger = logging.getLogger(__name__)
ROOM_INIT_URL = 'https://api.live.bilibili.com/room/v1/Room/room_init'
WEBSOCKET_URL = 'wss://broadcastlv.chat.bilibili.com:2245/sub'
DANMAKU_SERVER_CONF_URL = 'https://api.live.bilibili.com/room/v1/Danmu/getConf'
HEADER_STRUCT = struct.Struct('>I2H2I')
HeaderTuple = namedtuple('HeaderTuple', ('pack_len', 'raw_header_size', 'ver', 'operation', 'seq_id'))
WS_BODY_PROTOCOL_VERSION_NORMAL = 0
WS_BODY_PROTOCOL_VERSION_INT = 1 # 用于心跳包
WS_BODY_PROTOCOL_VERSION_DEFLATE = 2
# go-common\app\service\main\broadcast\model\operation.go
@ -45,6 +49,10 @@ class Operation(IntEnum):
# MaxBusinessOp = 10000
class InitError(Exception):
"""初始化失败"""
class DanmakuMessage:
def __init__(self, mode, font_size, color, timestamp, rnd, uid_crc32, msg_type, bubble,
msg,
@ -245,19 +253,23 @@ class BLiveClient:
):
_COMMAND_HANDLERS[cmd] = None
def __init__(self, room_id, ssl=True, loop=None, session: aiohttp.ClientSession=None,
uid=0):
def __init__(self, room_id, uid=0, session: aiohttp.ClientSession=None,
heartbeat_interval=30, ssl=True, loop=None):
"""
:param room_id: URL中的房间ID可以为短ID
:param uid: B站用户ID0表示未登录
:param session: cookie连接池
:param heartbeat_interval: 发送心跳包的间隔时间
:param ssl: True表示用默认的SSLContext验证False表示不验证也可以传入SSLContext
:param loop: 协程事件循环
:param session: cookie连接池
:param uid: B站用户ID0表示未登录
"""
# 用来init_room的临时房间ID
self._tmp_room_id = room_id
# 调用init_room后初始化
self._room_id = self._room_short_id = self._room_owner_uid = None
# [{host: "tx-bj4-live-comet-04.chat.bilibili.com", port: 2243, wss_port: 443, ws_port: 2244}, ...]
self._host_server_list = None
self._host_server_token = None
self._uid = uid
if loop is not None:
@ -278,6 +290,8 @@ class BLiveClient:
# noinspection PyDeprecation
if self._session.loop is not self._loop:
raise RuntimeError('BLiveClient and session has to use same event loop')
self._heartbeat_interval = heartbeat_interval
# noinspection PyProtectedMember
self._ssl = ssl if ssl else ssl_._create_unverified_context()
self._websocket = None
@ -344,22 +358,49 @@ class BLiveClient:
return self._future
async def init_room(self):
async with self._session.get(ROOM_INIT_URL,
params={'id': self._tmp_room_id},
ssl=self._ssl) as res:
if res.status != 200:
raise ConnectionAbortedError(f'room {self._tmp_room_id} init_room失败'
f'{res.status} {res.reason}')
data = await res.json()
if data['code'] != 0:
raise ConnectionAbortedError(f'room {self._tmp_room_id} init_room失败'
f'{data["msg"]}')
self._parse_room_init(data['data'])
try:
async with self._session.get(ROOM_INIT_URL, params={'id': self._tmp_room_id},
ssl=self._ssl) as res:
if res.status != 200:
logger.warning('room %d room_init失败%d %s', self._tmp_room_id,
res.status, res.reason)
return False
data = await res.json()
if data['code'] != 0:
logger.warning('room %d room_init失败%s', self._tmp_room_id, data['msg'])
return False
if not self._parse_room_init(data['data']):
return False
except aiohttp.ClientConnectionError:
logger.exception('room %d room_init失败', self._tmp_room_id)
return False
try:
async with self._session.get(DANMAKU_SERVER_CONF_URL, params={'id': self._tmp_room_id},
ssl=self._ssl) as res:
if res.status != 200:
logger.warning('room %d getConf失败%d %s', self._tmp_room_id,
res.status, res.reason)
return False
data = await res.json()
if data['code'] != 0:
logger.warning('room %d getConf失败%s', self._tmp_room_id, data['msg'])
return False
self._host_server_list = data['data']['host_server_list']
self._host_server_token = data['data']['token']
if not self._host_server_list:
logger.warning('room %d getConf失败host_server_list为空')
return False
except aiohttp.ClientConnectionError:
logger.exception('room %d getConf失败', self._tmp_room_id)
return False
return True
def _parse_room_init(self, data):
self._room_id = data['room_id']
self._room_short_id = data['short_id']
self._room_owner_uid = data['uid']
return True
def _make_packet(self, data, operation):
body = json.dumps(data).encode('utf-8')
@ -376,24 +417,30 @@ class BLiveClient:
auth_params = {
'uid': self._uid,
'roomid': self._room_id,
'protover': 1,
'protover': 2,
'platform': 'web',
'clientver': '1.4.0'
'clientver': '1.8.2',
'type': 2,
'key': self._host_server_token
}
await self._websocket.send_bytes(self._make_packet(auth_params, Operation.AUTH))
async def _message_loop(self):
# 获取房间ID
if self._room_id is None:
await self.init_room()
# 如果之前未初始化则初始化
if self._host_server_token is None:
if not await self.init_room():
raise InitError('初始化失败')
retry_count = 0
while True:
heartbeat_future = None
try:
# 连接
async with self._session.ws_connect(WEBSOCKET_URL,
ssl=self._ssl) as websocket:
host_server = self._host_server_list[retry_count % len(self._host_server_list)]
async with self._session.ws_connect(
f'wss://{host_server["host"]}:{host_server["wss_port"]}/sub',
ssl=self._ssl
) as websocket:
retry_count = 0
self._websocket = websocket
await self._send_auth()
@ -447,35 +494,39 @@ class BLiveClient:
while True:
try:
await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT))
await asyncio.sleep(30)
await asyncio.sleep(self._heartbeat_interval)
except (asyncio.CancelledError, aiohttp.ClientConnectorError):
except (asyncio.CancelledError, aiohttp.ClientConnectionError):
break
async def _handle_message(self, message):
async def _handle_message(self, data):
offset = 0
while offset < len(message):
while offset < len(data):
try:
header = HeaderTuple(*HEADER_STRUCT.unpack_from(message, offset))
header = HeaderTuple(*HEADER_STRUCT.unpack_from(data, offset))
except struct.error:
break
if header.operation == Operation.HEARTBEAT_REPLY:
popularity = int.from_bytes(message[offset + HEADER_STRUCT.size:
offset + HEADER_STRUCT.size + 4],
popularity = int.from_bytes(data[offset + HEADER_STRUCT.size:
offset + HEADER_STRUCT.size + 4],
'big')
await self._on_receive_popularity(popularity)
elif header.operation == Operation.SEND_MSG_REPLY:
body = message[offset + HEADER_STRUCT.size: offset + header.pack_len]
body = json.loads(body.decode('utf-8'))
await self._handle_command(body)
body = data[offset + HEADER_STRUCT.size: offset + header.pack_len]
if header.ver == 2: # WS_BODY_PROTOCOL_VERSION_DEFLATE
body = zlib.decompress(body)
await self._handle_message(body)
else:
body = json.loads(body.decode('utf-8'))
await self._handle_command(body)
elif header.operation == Operation.AUTH_REPLY:
await self._websocket.send_bytes(self._make_packet({}, Operation.HEARTBEAT))
else:
body = message[offset + HEADER_STRUCT.size: offset + header.pack_len]
body = data[offset + HEADER_STRUCT.size: offset + header.pack_len]
logger.warning('room %d 未知包类型operation=%d %s%s', self.room_id,
header.operation, header, body)

View File

@ -28,8 +28,8 @@ class MyBLiveClient(blivedm.BLiveClient):
async def main():
# 139是黑桐谷歌的直播间
# 如果SSL验证失败就把第二个参数设为False
client = MyBLiveClient(139, True)
# 如果SSL验证失败就把ssl设为False
client = MyBLiveClient(139, ssl=True)
future = client.start()
try:
# 5秒后停止测试用