diff --git a/blivedm/__init__.py b/blivedm/__init__.py index 6de332d..92170b5 100644 --- a/blivedm/__init__.py +++ b/blivedm/__init__.py @@ -2,3 +2,4 @@ from .models import * from .handlers import * from .client import * +from .open_live_client import * diff --git a/blivedm/client.py b/blivedm/client.py index b066754..1699569 100644 --- a/blivedm/client.py +++ b/blivedm/client.py @@ -5,6 +5,7 @@ import json import logging import ssl as ssl_ import struct +import zlib from typing import * import aiohttp @@ -123,6 +124,7 @@ class BLiveClient: self._heartbeat_interval = heartbeat_interval self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa + # TODO 没必要支持多个handler,改成单个吧 self._handlers: List[handlers.HandlerInterface] = [] """消息处理器,可动态增删""" @@ -627,6 +629,10 @@ class BLiveClient: # 压缩过的先解压,为了避免阻塞网络线程,放在其他线程执行 body = await asyncio.get_running_loop().run_in_executor(None, brotli.decompress, body) await self._parse_ws_message(body) + elif header.ver == ProtoVer.DEFLATE: + # web端已经不用zlib压缩了,但是开放平台会用 + body = await asyncio.get_running_loop().run_in_executor(None, zlib.decompress, body) + await self._parse_ws_message(body) elif header.ver == ProtoVer.NORMAL: # 没压缩过的直接反序列化,因为有万恶的GIL,这里不能并行避免阻塞 if len(body) != 0: @@ -661,6 +667,7 @@ class BLiveClient: :param command: 业务消息 """ + # TODO 考虑解析完整个WS包后再一次处理所有消息。另外用call_soon就不会阻塞网络协程了,也不用加shield # 外部代码可能不能正常处理取消,所以这里加shield results = await asyncio.shield( asyncio.gather( diff --git a/blivedm/handlers.py b/blivedm/handlers.py index 43b1852..614aecb 100644 --- a/blivedm/handlers.py +++ b/blivedm/handlers.py @@ -51,6 +51,8 @@ class HandlerInterface: async def handle(self, client: client_.BLiveClient, command: dict): raise NotImplementedError + # TODO 加个异常停止的回调 + class BaseHandler(HandlerInterface): """ diff --git a/blivedm/open_live_client.py b/blivedm/open_live_client.py new file mode 100644 index 0000000..bd50087 --- /dev/null +++ b/blivedm/open_live_client.py @@ -0,0 +1,332 @@ +# -*- coding: utf-8 -*- +import asyncio +import hashlib +import hmac +import json +import logging +import random +import ssl as ssl_ +import datetime +from typing import * + +import aiohttp + +from . import client, handlers + +logger = logging.getLogger('blivedm') + +START_URL = 'https://live-open.biliapi.com/v2/app/start' +HEARTBEAT_URL = 'https://live-open.biliapi.com/v2/app/heartbeat' +END_URL = 'https://live-open.biliapi.com/v2/app/end' + + +# TODO 抽出公共基类,现在BLiveClient和OpenLiveClient还有不重合的代码 +class OpenLiveClient(client.BLiveClient): + """ + B站直播开放平台客户端,负责连接房间 + + 文档参考:https://open-live.bilibili.com/document/ + + :param access_key: 在开放平台申请的access_key + :param access_secret: 在开放平台申请的access_secret + :param app_id: 在开放平台创建的项目ID + :param room_owner_auth_code: 主播身份码 + :param session: cookie、连接池 + :param heartbeat_interval: 发送连接心跳包的间隔时间(秒) + :param game_heartbeat_interval: 发送项目心跳包的间隔时间(秒) + :param ssl: True表示用默认的SSLContext验证,False表示不验证,也可以传入SSLContext + """ + + def __init__( + self, + access_key: str, + access_secret: str, + app_id: int, + room_owner_auth_code: str, + session: Optional[aiohttp.ClientSession] = None, + heartbeat_interval=30, + game_heartbeat_interval=20, + ssl: Union[bool, ssl_.SSLContext] = True, + ): + self._access_key = access_key + self._access_secret = access_secret + self._app_id = app_id + self._room_owner_auth_code = room_owner_auth_code + + if session is None: + self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) + self._own_session = True + else: + self._session = session + self._own_session = False + assert self._session.loop is asyncio.get_event_loop() # noqa + + self._heartbeat_interval = heartbeat_interval + self._game_heartbeat_interval = game_heartbeat_interval + self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa + + self._handlers: List[handlers.HandlerInterface] = [] + """消息处理器,可动态增删""" + + # 在调用init_room后初始化的字段 + self._room_id = None + """真实房间ID""" + self._room_owner_uid = None + """主播用户ID""" + self._host_server_list: Optional[List[str]] = [] + """弹幕服务器URL列表""" + self._auth_body = None + """连接弹幕服务器用的认证包内容""" + self._game_id = None + """项目场次ID,仅用于互动玩法类项目,其他项目为空字符串""" + + # 在运行时初始化的字段 + self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None + """WebSocket连接""" + self._network_future: Optional[asyncio.Future] = None + """网络协程的future""" + self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None + """发连接心跳包定时器的handle""" + self._game_heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None + """发项目心跳包定时器的handle""" + + @property + def room_id(self) -> Optional[int]: + """ + 房间ID,调用init_room后初始化 + """ + return self._room_id + + @property + def room_owner_uid(self) -> Optional[int]: + """ + 主播用户ID,调用init_room后初始化 + """ + return self._room_owner_uid + + @property + def room_owner_auth_code(self): + """ + 主播身份码 + """ + return self._room_owner_auth_code + + @property + def app_id(self): + """ + 在开放平台创建的项目ID + """ + return self._app_id + + @property + def game_id(self) -> Optional[str]: + """ + 项目场次ID,仅用于互动玩法类项目,其他项目为空字符串,调用init_room后初始化 + """ + return self._game_id + + async def close(self): + """ + 释放本客户端的资源,调用后本客户端将不可用 + """ + if self.is_running: + logger.warning('room=%s is calling close(), but client is running', self.room_id) + + if self._game_heartbeat_timer_handle is not None: + self._game_heartbeat_timer_handle.cancel() + self._game_heartbeat_timer_handle = None + await self._end_game() + + await super().close() + + def _request_open_live(self, url, body: dict): + body_bytes = json.dumps(body).encode('utf-8') + headers = { + 'x-bili-accesskeyid': self._access_key, + 'x-bili-content-md5': hashlib.md5(body_bytes).hexdigest(), + 'x-bili-signature-method': 'HMAC-SHA256', + 'x-bili-signature-nonce': str(random.randint(0, 999999999)), + 'x-bili-signature-version': '1.0', + 'x-bili-timestamp': str(int(datetime.datetime.now().timestamp())), + } + + str_to_sign = '\n'.join( + f'{key}:{value}' + for key, value in headers.items() + ) + signature = hmac.new( + self._access_secret.encode('utf-8'), str_to_sign.encode('utf-8'), hashlib.sha256 + ).hexdigest() + headers['Authorization'] = signature + + headers['Content-Type'] = 'application/json' + headers['Accept'] = 'application/json' + return self._session.post(url, headers=headers, data=body_bytes, ssl=self._ssl) + + async def init_room(self): + """ + 开启项目,并初始化连接房间需要的字段 + + :return: 是否成功 + """ + if not await self._start_game(): + return False + + if self._game_id != '' and self._game_heartbeat_timer_handle is None: + self._game_heartbeat_timer_handle = asyncio.get_running_loop().call_later( + self._game_heartbeat_interval, self._on_send_game_heartbeat + ) + return True + + async def _start_game(self): + try: + async with self._request_open_live( + START_URL, + {'code': self._room_owner_auth_code, 'app_id': self._app_id} + ) as res: + if res.status != 200: + logger.warning('init_room() failed, status=%d, reason=%s', res.status, res.reason) + return False + data = await res.json() + if data['code'] != 0: + logger.warning('init_room() failed, code=%d, message=%s, request_id=%s', + data['code'], data['message'], data['request_id']) + return False + if not self._parse_start_game(data['data']): + return False + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): + logger.exception('init_room() failed:') + return False + return True + + def _parse_start_game(self, data): + self._game_id = data['game_info']['game_id'] + websocket_info = data['websocket_info'] + self._auth_body = websocket_info['auth_body'] + self._host_server_list = websocket_info['wss_link'] + anchor_info = data['anchor_info'] + self._room_id = anchor_info['room_id'] + self._room_owner_uid = anchor_info['uid'] + return True + + async def _end_game(self): + """ + 关闭项目。互动玩法类项目建议断开连接时保证调用到这个函数(close会调用),否则短时间内无法重复连接同一个房间 + """ + if self._game_id in (None, ''): + return True + + try: + async with self._request_open_live( + END_URL, + {'app_id': self._app_id, 'game_id': self._game_id} + ) as res: + if res.status != 200: + logger.warning('room=%d _end_game() failed, status=%d, reason=%s', + self._room_id, res.status, res.reason) + return False + data = await res.json() + if data['code'] != 0: + logger.warning('room=%d _end_game() failed, code=%d, message=%s, request_id=%s', + self._room_id, data['code'], data['message'], data['request_id']) + return False + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): + logger.exception('room=%d _end_game() failed:', self._room_id) + return False + return True + + def _on_send_game_heartbeat(self): + """ + 定时发送项目心跳包的回调 + """ + if not self.is_running: + self._game_heartbeat_timer_handle = None + return + + self._game_heartbeat_timer_handle = asyncio.get_running_loop().call_later( + self._game_heartbeat_interval, self._on_send_game_heartbeat + ) + asyncio.create_task(self._send_game_heartbeat()) + + async def _send_game_heartbeat(self): + """ + 发送项目心跳包,仅用于互动玩法类项目 + """ + if self._game_id in (None, ''): + logger.warning('game=%d heartbeat failed, game_id not found', self._game_id) + return False + + try: + async with self._request_open_live( + HEARTBEAT_URL, + {'game_id': self._game_id} + ) as res: + if res.status != 200: + logger.warning('room=%d _send_game_heartbeat() failed, status=%d, reason=%s', + self._room_id, res.status, res.reason) + return False + data = await res.json() + if data['code'] != 0: + logger.warning('room=%d _send_game_heartbeat() failed, code=%d, message=%s, request_id=%s', + self._room_id, data['code'], data['message'], data['request_id']) + return False + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): + logger.exception('room=%d _send_game_heartbeat() failed:', self._room_id) + return False + return True + + async def _network_coroutine(self): + """ + 网络协程,负责连接服务器、接收消息、解包 + """ + # 如果之前未初始化则初始化 + if self._auth_body is None: + if not await self.init_room(): + raise client.InitError('init_room() failed') + + retry_count = 0 + while True: + try: + # 连接 + host_server_url = self._host_server_list[retry_count % len(self._host_server_list)] + async with self._session.ws_connect( + host_server_url, + receive_timeout=self._heartbeat_interval + 5, + ssl=self._ssl + ) as websocket: + self._websocket = websocket + await self._on_ws_connect() + + # 处理消息 + message: aiohttp.WSMessage + async for message in websocket: + await self._on_ws_message(message) + # 至少成功处理1条消息 + retry_count = 0 + + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): + # 掉线重连 + pass + except client.AuthError: + # 认证失败了,应该重新获取auth_body再重连 + logger.exception('room=%d auth failed, trying init_room() again', self.room_id) + if not await self.init_room(): + raise client.InitError('init_room() failed') + except ssl_.SSLError: + logger.error('room=%d a SSLError happened, cannot reconnect', self.room_id) + raise + finally: + self._websocket = None + await self._on_ws_close() + + # 准备重连 + retry_count += 1 + logger.warning('room=%d is reconnecting, retry_count=%d', self.room_id, retry_count) + await asyncio.sleep(1) + + async def _send_auth(self): + """ + 发送认证包 + """ + auth_body = json.loads(self._auth_body) + await self._websocket.send_bytes(self._make_packet(auth_body, client.Operation.AUTH)) diff --git a/open_live_sample.py b/open_live_sample.py new file mode 100644 index 0000000..dce2a99 --- /dev/null +++ b/open_live_sample.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +import asyncio + +import blivedm +import blivedm.open_live_client as open_live_client + +ACCESS_KEY = '' +ACCESS_SECRET = '' +APP_ID = 0 +ROOM_OWNER_AUTH_CODE = '' + + +async def main(): + await run_single_client() + + +async def run_single_client(): + """ + 演示监听一个直播间 + """ + client = open_live_client.OpenLiveClient( + access_key=ACCESS_KEY, + access_secret=ACCESS_SECRET, + app_id=APP_ID, + room_owner_auth_code=ROOM_OWNER_AUTH_CODE, + ) + handler = MyHandler() + client.add_handler(handler) + + client.start() + try: + # 演示70秒后停止 + await asyncio.sleep(70) + client.stop() + + await client.join() + finally: + await client.stop_and_close() + + +class MyHandler(blivedm.HandlerInterface): + async def handle(self, client: open_live_client.OpenLiveClient, command: dict): + print(command) + + +if __name__ == '__main__': + asyncio.run(main())