From ad1fd62f2936cba2c0cd7d42206f9e19f9bc0afb Mon Sep 17 00:00:00 2001 From: kinori Date: Sat, 2 Sep 2023 11:52:04 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=AF=B9=E7=9B=B4?= =?UTF-8?q?=E6=92=AD=E5=BC=80=E6=94=BE=E5=B9=B3=E5=8F=B0=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E5=8F=8AWS=E7=9A=84=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- blivedm/__init__.py | 1 + blivedm/client.py | 43 +++++++- blivedm/open_live_client.py | 193 ++++++++++++++++++++++++++++++++++++ open_live_sample.py | 37 +++++++ 4 files changed, 272 insertions(+), 2 deletions(-) create mode 100644 blivedm/open_live_client.py create mode 100644 open_live_sample.py diff --git a/blivedm/__init__.py b/blivedm/__init__.py index 6de332d..b8d89d8 100644 --- a/blivedm/__init__.py +++ b/blivedm/__init__.py @@ -2,3 +2,4 @@ from .models import * from .handlers import * from .client import * +from .open_live_client import * \ No newline at end of file diff --git a/blivedm/client.py b/blivedm/client.py index 66b63d3..7df3e03 100644 --- a/blivedm/client.py +++ b/blivedm/client.py @@ -10,8 +10,11 @@ from typing import * import aiohttp import brotli +from . import open_live_client from . import handlers +OpenLiveClient = open_live_client.OpenLiveClient + __all__ = ( 'BLiveClient', ) @@ -95,11 +98,16 @@ class BLiveClient: def __init__( self, - room_id, + room_id=0, uid=0, session: Optional[aiohttp.ClientSession] = None, heartbeat_interval=30, ssl: Union[bool, ssl_.SSLContext] = True, + + open_live_app_id: Optional[int] = None, + open_live_access_key: Optional[str] = None, + open_live_access_secret: Optional[str] = None, + open_live_code: Optional[str] = None, ): self._tmp_room_id = room_id """用来init_room的临时房间ID,可以用短ID""" @@ -142,6 +150,13 @@ class BLiveClient: self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None """发心跳包定时器的handle""" + self._host_server_auth_body: Dict = None + """开放平台的完整鉴权body""" + + if open_live_app_id and open_live_access_key and open_live_access_secret and open_live_code: + self._open_live_client = OpenLiveClient(open_live_app_id, open_live_access_key, open_live_access_secret, self._session, self._ssl) + self._open_live_auth_code = open_live_code + @property def is_running(self) -> bool: """ @@ -249,6 +264,9 @@ class BLiveClient: :return: True代表没有降级,如果需要降级后还可用,重载这个函数返回True """ res = True + if self._open_live_client and await self._init_room_by_open_live(): + return res + if not await self._init_room_id_and_owner(): res = False # 失败了则降级 @@ -261,6 +279,22 @@ class BLiveClient: self._host_server_list = DEFAULT_DANMAKU_SERVER_LIST self._host_server_token = None return res + + async def _init_room_by_open_live(self): + """ + 通过开放平台初始化房间 + """ + if not self._open_live_client: + logger.warning('_init_room_by_open_live() failed, open_live_client is None') + return False + if not await self._open_live_client.start(self._open_live_auth_code): + logger.warning('app=%d _init_room_by_open_live() failed, open_live_client.start() failed', self._open_live_client.app_id) + return False + self._room_id = self._open_live_client.anchor_room_id + self._room_owner_uid = self._open_live_client.anchor_uid + self._host_server_auth_body = self._open_live_client.ws_auth_body + self._host_server_list = self._open_live_client.wss_link + return True async def _init_room_id_and_owner(self): try: @@ -374,7 +408,7 @@ class BLiveClient: 网络协程,负责连接服务器、接收消息、解包 """ # 如果之前未初始化则初始化 - if self._host_server_token is None: + if self._host_server_auth_body is None and self._host_server_token is None: if not await self.init_room(): raise InitError('init_room() failed') @@ -384,6 +418,7 @@ class BLiveClient: # 连接 host_server = self._host_server_list[retry_count % len(self._host_server_list)] async with self._session.ws_connect( + host_server if isinstance(host_server, str) else f"wss://{host_server['host']}:{host_server['wss_port']}/sub", headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)' @@ -452,6 +487,10 @@ class BLiveClient: } if self._host_server_token is not None: auth_params['key'] = self._host_server_token + + # 开放平台连接则直接替换认证包 + if self._host_server_auth_body is not None: + auth_params = self._host_server_auth_body await self._websocket.send_bytes(self._make_packet(auth_params, Operation.AUTH)) def _on_send_heartbeat(self): diff --git a/blivedm/open_live_client.py b/blivedm/open_live_client.py new file mode 100644 index 0000000..456038a --- /dev/null +++ b/blivedm/open_live_client.py @@ -0,0 +1,193 @@ +# -*- coding: utf-8 -*- +import aiohttp +import asyncio +import hashlib +import hmac +import logging +import random +import ssl as ssl_ +import time +import json +from hashlib import sha256 +from typing import * + +logger = logging.getLogger('open-live-client') + +OPEN_LIVE_START_URL = 'https://live-open.biliapi.com/v2/app/start' +OPEN_LIVE_HEARTBEAT_URL = 'https://live-open.biliapi.com/v2/app/heartbeat' +OPEN_LIVE_END_URL = 'https://live-open.biliapi.com/v2/app/end' + +class OpenLiveClient: + def __init__( + self, + app_id: int, + access_key: str, + access_secret: str, + session: Optional[aiohttp.ClientSession] = None, + ssl: Union[bool, ssl_.SSLContext] = True, + ): + self.app_id = app_id + self.access_key = access_key + self.access_secret = access_secret + self.session = session + + if session is None: + self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) + self._own_session = True + else: + self._session = session + self._own_session = False + assert self._session.loop is asyncio.get_event_loop() # noqa + self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa + + @property + def game_id(self) -> Optional[int]: + return self._game_id + + @property + def ws_auth_body(self) -> Optional[Dict]: + return self._ws_auth_body + + @property + def wss_link(self) -> Optional[List[str]]: + return self._wss_link + + @property + def anchor_room_id(self) -> Optional[int]: + return self._anchor_room_id + + @property + def anchor_uname(self) -> Optional[str]: + return self._anchor_uname + + @property + def anchor_uface(self) -> Optional[str]: + return self._anchor_uface + + @property + def anchor_uid(self) -> Optional[int]: + return self._anchor_uid + + def _sign_request_header( + self, + body: str, + ): + md5 = hashlib.md5() + md5.update(body.encode()) + ts = time.time() + nonce = random.randint(1,100000)+time.time() + md5data = md5.hexdigest() + headerMap = { + "x-bili-timestamp": str(int(ts)), + "x-bili-signature-method": "HMAC-SHA256", + "x-bili-signature-nonce": str(nonce), + "x-bili-accesskeyid": self.access_key, + "x-bili-signature-version": "1.0", + "x-bili-content-md5": md5data, + } + headerList = sorted(headerMap) + headerStr = '' + + for key in headerList: + headerStr = headerStr+ key+":"+str(headerMap[key])+"\n" + headerStr = headerStr.rstrip("\n") + + appsecret = self.access_secret.encode() + data = headerStr.encode() + + signature = hmac.new(appsecret, data, digestmod=sha256).hexdigest() + headerMap["Authorization"] = signature + headerMap["Content-Type"] = "application/json" + headerMap["Accept"] = "application/json" + return headerMap + + # 通过身份码获取直播间及wss连接信息 + async def start( + self, + code: str + ): + try: + params = f'{{"code":"{code}","app_id":{self.app_id}}}' + headers = self._sign_request_header(params) + async with self._session.post( + OPEN_LIVE_START_URL, headers=headers, data=params, ssl=self._ssl + ) as res: + if res.status != 200: + logger.warning('app=%d start failed, status=%d, reason=%s', self.app_id, res.status, res.reason) + return False + data = await res.json() + if data['code'] != 0: + logger.warning('app=%d start failed, code=%d, message=%s', self.app_id, data['code'], data['message']) + return False + if not self._parse_start_data( + data + ): + return False + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): + logger.exception('app=%d start failed', self.app_id) + return False + return True + + def _parse_start_data( + self, + data: dict + ): + self._game_id = data['data']['game_info']['game_id'] + self._ws_auth_body = json.loads(data['data']['websocket_info']['auth_body']) + self._wss_link = data['data']['websocket_info']['wss_link'] + self._anchor_room_id = data['data']['anchor_info']['room_id'] + self._anchor_uname = data['data']['anchor_info']['uname'] + self._anchor_uface = data['data']['anchor_info']['uface'] + self._anchor_uid = data['data']['anchor_info']['uid'] + return True + + async def end( + self + ): + if not self._game_id: + logger.warning('app=%d end failed, game_id not found', self.app_id) + return False + + try: + params = f'{{"app_id":"{self.app_id}","game_id":{self._game_id}}}' + headers = self._sign_request_header(params) + async with self._session.post( + OPEN_LIVE_END_URL, headers=headers, data=params, ssl=self._ssl + ) as res: + if res.status != 200: + logger.warning('app=%d end failed, status=%d, reason=%s', self.app_id, res.status, res.reason) + return False + data = await res.json() + if data['code'] != 0: + logger.warning('app=%d end failed, code=%d, message=%s', self.app_id, data['code'], data['message']) + return False + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): + logger.exception('app=%d end failed', self.app_id) + return False + return True + + # 开放平台互动玩法心跳, 用于维持直播间内定制礼物及统计使用数据, 非互动玩法类暂时不需要 + async def heartbeat( + self + ): + if not self._game_id: + logger.warning('game=%d heartbeat failed, game_id not found', self._game_id) + return False + + try: + params = f'{{""game_id":{self._game_id}}}' + headers = self._sign_request_header(params) + async with self._session.post( + OPEN_LIVE_HEARTBEAT_URL, headers=headers, data=params, ssl=self._ssl + ) as res: + if res.status != 200: + logger.warning('game=%d heartbeat failed, status=%d, reason=%s', self._game_id, res.status, res.reason) + return False + data = await res.json() + if data['code'] != 0: + logger.warning('game=%d heartbeat failed, code=%d, message=%s', self._game_id, data['code'], data['message']) + return False + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): + logger.exception('game=%d heartbeat failed', self._game_id) + return False + return True \ No newline at end of file diff --git a/open_live_sample.py b/open_live_sample.py new file mode 100644 index 0000000..832d123 --- /dev/null +++ b/open_live_sample.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +import asyncio +import blivedm + +TEST_AUTH_CODE = '' +APP_ID = '' +ACCESS_KEY = '' +ACCESS_KEY_SECRET = '' + +class OpenLiveHandlerInterface: + """ + 开放平台直播消息处理器接口 + """ + + async def handle(self, client: blivedm.BLiveClient, command: dict): + print(f'{command}') + +async def main(): + await run_start() + +async def run_start(): + client = blivedm.BLiveClient(open_live_app_id=APP_ID, open_live_access_key=ACCESS_KEY, open_live_access_secret=ACCESS_KEY_SECRET, open_live_code=TEST_AUTH_CODE, ssl=True) + handler = OpenLiveHandlerInterface() + client.add_handler(handler) + + client.start() + try: + # 演示60秒后停止 + await asyncio.sleep(60) + client.stop() + + await client.join() + finally: + await client.stop_and_close() + +if __name__ == '__main__': + asyncio.run(main()) \ No newline at end of file From 20867a2135de368f0cbd15bcf943a4e0a63edbb2 Mon Sep 17 00:00:00 2001 From: kinori Date: Sat, 2 Sep 2023 12:42:40 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=BC=80=E6=94=BE?= =?UTF-8?q?=E5=B9=B3=E5=8F=B0=E6=8E=A5=E5=8F=A3=E5=90=AF=E7=94=A8=E7=9A=84?= =?UTF-8?q?=E5=88=A4=E6=96=AD=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- blivedm/client.py | 4 +++- blivedm/open_live_client.py | 5 +++-- open_live_sample.py | 4 ++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/blivedm/client.py b/blivedm/client.py index 7df3e03..e05d5c9 100644 --- a/blivedm/client.py +++ b/blivedm/client.py @@ -104,6 +104,7 @@ class BLiveClient: heartbeat_interval=30, ssl: Union[bool, ssl_.SSLContext] = True, + use_open_live: bool = False, open_live_app_id: Optional[int] = None, open_live_access_key: Optional[str] = None, open_live_access_secret: Optional[str] = None, @@ -150,10 +151,11 @@ class BLiveClient: self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None """发心跳包定时器的handle""" + self._open_live_client = None self._host_server_auth_body: Dict = None """开放平台的完整鉴权body""" - if open_live_app_id and open_live_access_key and open_live_access_secret and open_live_code: + if use_open_live: self._open_live_client = OpenLiveClient(open_live_app_id, open_live_access_key, open_live_access_secret, self._session, self._ssl) self._open_live_auth_code = open_live_code diff --git a/blivedm/open_live_client.py b/blivedm/open_live_client.py index 456038a..e010bbe 100644 --- a/blivedm/open_live_client.py +++ b/blivedm/open_live_client.py @@ -149,7 +149,7 @@ class OpenLiveClient: return False try: - params = f'{{"app_id":"{self.app_id}","game_id":{self._game_id}}}' + params = f'{{"app_id":{self.app_id},"game_id":{self._game_id}}}' headers = self._sign_request_header(params) async with self._session.post( OPEN_LIVE_END_URL, headers=headers, data=params, ssl=self._ssl @@ -175,7 +175,8 @@ class OpenLiveClient: return False try: - params = f'{{""game_id":{self._game_id}}}' + params = f'{{"game_id":{self._game_id}}}' + print(params) headers = self._sign_request_header(params) async with self._session.post( OPEN_LIVE_HEARTBEAT_URL, headers=headers, data=params, ssl=self._ssl diff --git a/open_live_sample.py b/open_live_sample.py index 832d123..bf21222 100644 --- a/open_live_sample.py +++ b/open_live_sample.py @@ -19,14 +19,14 @@ async def main(): await run_start() async def run_start(): - client = blivedm.BLiveClient(open_live_app_id=APP_ID, open_live_access_key=ACCESS_KEY, open_live_access_secret=ACCESS_KEY_SECRET, open_live_code=TEST_AUTH_CODE, ssl=True) + client = blivedm.BLiveClient(use_open_live=True, open_live_app_id=APP_ID, open_live_access_key=ACCESS_KEY, open_live_access_secret=ACCESS_KEY_SECRET, open_live_code=TEST_AUTH_CODE, ssl=True) handler = OpenLiveHandlerInterface() client.add_handler(handler) client.start() try: # 演示60秒后停止 - await asyncio.sleep(60) + await asyncio.sleep(600) client.stop() await client.join() From 0bdec42f5007cb1373eb39fafa8dfdd9070bcacf Mon Sep 17 00:00:00 2001 From: kinori Date: Sat, 2 Sep 2023 19:36:04 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E8=A1=A5=E5=85=85=E5=BC=80=E6=94=BE?= =?UTF-8?q?=E5=B9=B3=E5=8F=B0=E6=8E=A5=E5=8F=A3=E7=9A=84=E7=BB=93=E6=9D=9F?= =?UTF-8?q?=E5=8F=8A=E5=BF=83=E8=B7=B3=E8=B0=83=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- blivedm/client.py | 11 +++++++++++ blivedm/open_live_client.py | 5 ++--- open_live_sample.py | 4 ++-- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/blivedm/client.py b/blivedm/client.py index e05d5c9..174b635 100644 --- a/blivedm/client.py +++ b/blivedm/client.py @@ -233,6 +233,10 @@ class BLiveClient: """ 便利函数,停止本客户端并释放本客户端的资源,调用后本客户端将不可用 """ + + if self._open_live_client: + await self._open_live_client.end() + if self.is_running: self.stop() await self.join() @@ -522,6 +526,13 @@ class BLiveClient: except Exception: # noqa logger.exception('room=%d _send_heartbeat() failed:', self.room_id) + try: + await self._open_live_client.heartbeat() + except (ConnectionResetError, aiohttp.ClientConnectionError) as e: + logger.warning('room=%d _send_heartbeat() failed: %r', self.room_id, e) + except Exception: # noqa + logger.exception('room=%d _send_heartbeat() failed:', self.room_id) + async def _on_ws_message(self, message: aiohttp.WSMessage): """ 收到WebSocket消息 diff --git a/blivedm/open_live_client.py b/blivedm/open_live_client.py index e010bbe..f938f35 100644 --- a/blivedm/open_live_client.py +++ b/blivedm/open_live_client.py @@ -149,7 +149,7 @@ class OpenLiveClient: return False try: - params = f'{{"app_id":{self.app_id},"game_id":{self._game_id}}}' + params = f'{{"game_id":"{self._game_id}", "app_id":{self.app_id}}}' headers = self._sign_request_header(params) async with self._session.post( OPEN_LIVE_END_URL, headers=headers, data=params, ssl=self._ssl @@ -175,8 +175,7 @@ class OpenLiveClient: return False try: - params = f'{{"game_id":{self._game_id}}}' - print(params) + params = f'{{"game_id":"{self._game_id}"}}' headers = self._sign_request_header(params) async with self._session.post( OPEN_LIVE_HEARTBEAT_URL, headers=headers, data=params, ssl=self._ssl diff --git a/open_live_sample.py b/open_live_sample.py index bf21222..f705b20 100644 --- a/open_live_sample.py +++ b/open_live_sample.py @@ -25,8 +25,8 @@ async def run_start(): client.start() try: - # 演示60秒后停止 - await asyncio.sleep(600) + # 演示20秒后停止 + await asyncio.sleep(60) client.stop() await client.join() From c99909be1303105643f0c0ec4b1b874dee483415 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 3 Sep 2023 10:46:01 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E6=95=B4=E7=90=86=E5=BC=80=E6=94=BE?= =?UTF-8?q?=E5=B9=B3=E5=8F=B0=E6=8E=A5=E5=8F=A3=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- blivedm/__init__.py | 2 +- blivedm/client.py | 63 +----- blivedm/handlers.py | 2 + blivedm/open_live_client.py | 401 ++++++++++++++++++++++++------------ open_live_sample.py | 44 ++-- 5 files changed, 309 insertions(+), 203 deletions(-) diff --git a/blivedm/__init__.py b/blivedm/__init__.py index b8d89d8..92170b5 100644 --- a/blivedm/__init__.py +++ b/blivedm/__init__.py @@ -2,4 +2,4 @@ from .models import * from .handlers import * from .client import * -from .open_live_client import * \ No newline at end of file +from .open_live_client import * diff --git a/blivedm/client.py b/blivedm/client.py index 174b635..1e33838 100644 --- a/blivedm/client.py +++ b/blivedm/client.py @@ -5,16 +5,14 @@ import json import logging import ssl as ssl_ import struct +import zlib from typing import * import aiohttp import brotli -from . import open_live_client from . import handlers -OpenLiveClient = open_live_client.OpenLiveClient - __all__ = ( 'BLiveClient', ) @@ -98,17 +96,11 @@ class BLiveClient: def __init__( self, - room_id=0, + room_id, uid=0, session: Optional[aiohttp.ClientSession] = None, heartbeat_interval=30, ssl: Union[bool, ssl_.SSLContext] = True, - - use_open_live: bool = False, - open_live_app_id: Optional[int] = None, - open_live_access_key: Optional[str] = None, - open_live_access_secret: Optional[str] = None, - open_live_code: Optional[str] = None, ): self._tmp_room_id = room_id """用来init_room的临时房间ID,可以用短ID""" @@ -125,6 +117,7 @@ class BLiveClient: self._heartbeat_interval = heartbeat_interval self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa + # TODO 没必要支持多个handler,改成单个吧 self._handlers: List[handlers.HandlerInterface] = [] """消息处理器,可动态增删""" @@ -151,14 +144,6 @@ class BLiveClient: self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None """发心跳包定时器的handle""" - self._open_live_client = None - self._host_server_auth_body: Dict = None - """开放平台的完整鉴权body""" - - if use_open_live: - self._open_live_client = OpenLiveClient(open_live_app_id, open_live_access_key, open_live_access_secret, self._session, self._ssl) - self._open_live_auth_code = open_live_code - @property def is_running(self) -> bool: """ @@ -233,10 +218,6 @@ class BLiveClient: """ 便利函数,停止本客户端并释放本客户端的资源,调用后本客户端将不可用 """ - - if self._open_live_client: - await self._open_live_client.end() - if self.is_running: self.stop() await self.join() @@ -270,9 +251,6 @@ class BLiveClient: :return: True代表没有降级,如果需要降级后还可用,重载这个函数返回True """ res = True - if self._open_live_client and await self._init_room_by_open_live(): - return res - if not await self._init_room_id_and_owner(): res = False # 失败了则降级 @@ -285,22 +263,6 @@ class BLiveClient: self._host_server_list = DEFAULT_DANMAKU_SERVER_LIST self._host_server_token = None return res - - async def _init_room_by_open_live(self): - """ - 通过开放平台初始化房间 - """ - if not self._open_live_client: - logger.warning('_init_room_by_open_live() failed, open_live_client is None') - return False - if not await self._open_live_client.start(self._open_live_auth_code): - logger.warning('app=%d _init_room_by_open_live() failed, open_live_client.start() failed', self._open_live_client.app_id) - return False - self._room_id = self._open_live_client.anchor_room_id - self._room_owner_uid = self._open_live_client.anchor_uid - self._host_server_auth_body = self._open_live_client.ws_auth_body - self._host_server_list = self._open_live_client.wss_link - return True async def _init_room_id_and_owner(self): try: @@ -414,7 +376,7 @@ class BLiveClient: 网络协程,负责连接服务器、接收消息、解包 """ # 如果之前未初始化则初始化 - if self._host_server_auth_body is None and self._host_server_token is None: + if self._host_server_token is None: if not await self.init_room(): raise InitError('init_room() failed') @@ -424,7 +386,6 @@ class BLiveClient: # 连接 host_server = self._host_server_list[retry_count % len(self._host_server_list)] async with self._session.ws_connect( - host_server if isinstance(host_server, str) else f"wss://{host_server['host']}:{host_server['wss_port']}/sub", headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)' @@ -493,10 +454,6 @@ class BLiveClient: } if self._host_server_token is not None: auth_params['key'] = self._host_server_token - - # 开放平台连接则直接替换认证包 - if self._host_server_auth_body is not None: - auth_params = self._host_server_auth_body await self._websocket.send_bytes(self._make_packet(auth_params, Operation.AUTH)) def _on_send_heartbeat(self): @@ -526,13 +483,6 @@ class BLiveClient: except Exception: # noqa logger.exception('room=%d _send_heartbeat() failed:', self.room_id) - try: - await self._open_live_client.heartbeat() - except (ConnectionResetError, aiohttp.ClientConnectionError) as e: - logger.warning('room=%d _send_heartbeat() failed: %r', self.room_id, e) - except Exception: # noqa - logger.exception('room=%d _send_heartbeat() failed:', self.room_id) - async def _on_ws_message(self, message: aiohttp.WSMessage): """ 收到WebSocket消息 @@ -611,6 +561,10 @@ class BLiveClient: # 压缩过的先解压,为了避免阻塞网络线程,放在其他线程执行 body = await asyncio.get_running_loop().run_in_executor(None, brotli.decompress, body) await self._parse_ws_message(body) + elif header.ver == ProtoVer.DEFLATE: + # web端已经不用zlib压缩了,但是开放平台会用 + body = await asyncio.get_running_loop().run_in_executor(None, zlib.decompress, body) + await self._parse_ws_message(body) elif header.ver == ProtoVer.NORMAL: # 没压缩过的直接反序列化,因为有万恶的GIL,这里不能并行避免阻塞 if len(body) != 0: @@ -645,6 +599,7 @@ class BLiveClient: :param command: 业务消息 """ + # TODO 考虑解析完整个WS包后再一次处理所有消息。另外用call_soon就不会阻塞网络协程了,也不用加shield # 外部代码可能不能正常处理取消,所以这里加shield results = await asyncio.shield( asyncio.gather( diff --git a/blivedm/handlers.py b/blivedm/handlers.py index 43b1852..614aecb 100644 --- a/blivedm/handlers.py +++ b/blivedm/handlers.py @@ -51,6 +51,8 @@ class HandlerInterface: async def handle(self, client: client_.BLiveClient, command: dict): raise NotImplementedError + # TODO 加个异常停止的回调 + class BaseHandler(HandlerInterface): """ diff --git a/blivedm/open_live_client.py b/blivedm/open_live_client.py index f938f35..bd50087 100644 --- a/blivedm/open_live_client.py +++ b/blivedm/open_live_client.py @@ -1,193 +1,332 @@ # -*- coding: utf-8 -*- -import aiohttp import asyncio import hashlib import hmac +import json import logging import random import ssl as ssl_ -import time -import json -from hashlib import sha256 +import datetime from typing import * -logger = logging.getLogger('open-live-client') +import aiohttp -OPEN_LIVE_START_URL = 'https://live-open.biliapi.com/v2/app/start' -OPEN_LIVE_HEARTBEAT_URL = 'https://live-open.biliapi.com/v2/app/heartbeat' -OPEN_LIVE_END_URL = 'https://live-open.biliapi.com/v2/app/end' +from . import client, handlers + +logger = logging.getLogger('blivedm') + +START_URL = 'https://live-open.biliapi.com/v2/app/start' +HEARTBEAT_URL = 'https://live-open.biliapi.com/v2/app/heartbeat' +END_URL = 'https://live-open.biliapi.com/v2/app/end' + + +# TODO 抽出公共基类,现在BLiveClient和OpenLiveClient还有不重合的代码 +class OpenLiveClient(client.BLiveClient): + """ + B站直播开放平台客户端,负责连接房间 + + 文档参考:https://open-live.bilibili.com/document/ + + :param access_key: 在开放平台申请的access_key + :param access_secret: 在开放平台申请的access_secret + :param app_id: 在开放平台创建的项目ID + :param room_owner_auth_code: 主播身份码 + :param session: cookie、连接池 + :param heartbeat_interval: 发送连接心跳包的间隔时间(秒) + :param game_heartbeat_interval: 发送项目心跳包的间隔时间(秒) + :param ssl: True表示用默认的SSLContext验证,False表示不验证,也可以传入SSLContext + """ -class OpenLiveClient: def __init__( self, - app_id: int, access_key: str, access_secret: str, + app_id: int, + room_owner_auth_code: str, session: Optional[aiohttp.ClientSession] = None, + heartbeat_interval=30, + game_heartbeat_interval=20, ssl: Union[bool, ssl_.SSLContext] = True, ): - self.app_id = app_id - self.access_key = access_key - self.access_secret = access_secret - self.session = session - + self._access_key = access_key + self._access_secret = access_secret + self._app_id = app_id + self._room_owner_auth_code = room_owner_auth_code + if session is None: - self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) - self._own_session = True + self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) + self._own_session = True else: - self._session = session - self._own_session = False - assert self._session.loop is asyncio.get_event_loop() # noqa + self._session = session + self._own_session = False + assert self._session.loop is asyncio.get_event_loop() # noqa + + self._heartbeat_interval = heartbeat_interval + self._game_heartbeat_interval = game_heartbeat_interval self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa + self._handlers: List[handlers.HandlerInterface] = [] + """消息处理器,可动态增删""" + + # 在调用init_room后初始化的字段 + self._room_id = None + """真实房间ID""" + self._room_owner_uid = None + """主播用户ID""" + self._host_server_list: Optional[List[str]] = [] + """弹幕服务器URL列表""" + self._auth_body = None + """连接弹幕服务器用的认证包内容""" + self._game_id = None + """项目场次ID,仅用于互动玩法类项目,其他项目为空字符串""" + + # 在运行时初始化的字段 + self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None + """WebSocket连接""" + self._network_future: Optional[asyncio.Future] = None + """网络协程的future""" + self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None + """发连接心跳包定时器的handle""" + self._game_heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None + """发项目心跳包定时器的handle""" + @property - def game_id(self) -> Optional[int]: + def room_id(self) -> Optional[int]: + """ + 房间ID,调用init_room后初始化 + """ + return self._room_id + + @property + def room_owner_uid(self) -> Optional[int]: + """ + 主播用户ID,调用init_room后初始化 + """ + return self._room_owner_uid + + @property + def room_owner_auth_code(self): + """ + 主播身份码 + """ + return self._room_owner_auth_code + + @property + def app_id(self): + """ + 在开放平台创建的项目ID + """ + return self._app_id + + @property + def game_id(self) -> Optional[str]: + """ + 项目场次ID,仅用于互动玩法类项目,其他项目为空字符串,调用init_room后初始化 + """ return self._game_id - - @property - def ws_auth_body(self) -> Optional[Dict]: - return self._ws_auth_body - - @property - def wss_link(self) -> Optional[List[str]]: - return self._wss_link - - @property - def anchor_room_id(self) -> Optional[int]: - return self._anchor_room_id - - @property - def anchor_uname(self) -> Optional[str]: - return self._anchor_uname - - @property - def anchor_uface(self) -> Optional[str]: - return self._anchor_uface - - @property - def anchor_uid(self) -> Optional[int]: - return self._anchor_uid - - def _sign_request_header( - self, - body: str, - ): - md5 = hashlib.md5() - md5.update(body.encode()) - ts = time.time() - nonce = random.randint(1,100000)+time.time() - md5data = md5.hexdigest() - headerMap = { - "x-bili-timestamp": str(int(ts)), - "x-bili-signature-method": "HMAC-SHA256", - "x-bili-signature-nonce": str(nonce), - "x-bili-accesskeyid": self.access_key, - "x-bili-signature-version": "1.0", - "x-bili-content-md5": md5data, + + async def close(self): + """ + 释放本客户端的资源,调用后本客户端将不可用 + """ + if self.is_running: + logger.warning('room=%s is calling close(), but client is running', self.room_id) + + if self._game_heartbeat_timer_handle is not None: + self._game_heartbeat_timer_handle.cancel() + self._game_heartbeat_timer_handle = None + await self._end_game() + + await super().close() + + def _request_open_live(self, url, body: dict): + body_bytes = json.dumps(body).encode('utf-8') + headers = { + 'x-bili-accesskeyid': self._access_key, + 'x-bili-content-md5': hashlib.md5(body_bytes).hexdigest(), + 'x-bili-signature-method': 'HMAC-SHA256', + 'x-bili-signature-nonce': str(random.randint(0, 999999999)), + 'x-bili-signature-version': '1.0', + 'x-bili-timestamp': str(int(datetime.datetime.now().timestamp())), } - headerList = sorted(headerMap) - headerStr = '' - for key in headerList: - headerStr = headerStr+ key+":"+str(headerMap[key])+"\n" - headerStr = headerStr.rstrip("\n") + str_to_sign = '\n'.join( + f'{key}:{value}' + for key, value in headers.items() + ) + signature = hmac.new( + self._access_secret.encode('utf-8'), str_to_sign.encode('utf-8'), hashlib.sha256 + ).hexdigest() + headers['Authorization'] = signature - appsecret = self.access_secret.encode() - data = headerStr.encode() + headers['Content-Type'] = 'application/json' + headers['Accept'] = 'application/json' + return self._session.post(url, headers=headers, data=body_bytes, ssl=self._ssl) - signature = hmac.new(appsecret, data, digestmod=sha256).hexdigest() - headerMap["Authorization"] = signature - headerMap["Content-Type"] = "application/json" - headerMap["Accept"] = "application/json" - return headerMap - - # 通过身份码获取直播间及wss连接信息 - async def start( - self, - code: str - ): + async def init_room(self): + """ + 开启项目,并初始化连接房间需要的字段 + + :return: 是否成功 + """ + if not await self._start_game(): + return False + + if self._game_id != '' and self._game_heartbeat_timer_handle is None: + self._game_heartbeat_timer_handle = asyncio.get_running_loop().call_later( + self._game_heartbeat_interval, self._on_send_game_heartbeat + ) + return True + + async def _start_game(self): try: - params = f'{{"code":"{code}","app_id":{self.app_id}}}' - headers = self._sign_request_header(params) - async with self._session.post( - OPEN_LIVE_START_URL, headers=headers, data=params, ssl=self._ssl + async with self._request_open_live( + START_URL, + {'code': self._room_owner_auth_code, 'app_id': self._app_id} ) as res: if res.status != 200: - logger.warning('app=%d start failed, status=%d, reason=%s', self.app_id, res.status, res.reason) + logger.warning('init_room() failed, status=%d, reason=%s', res.status, res.reason) return False data = await res.json() if data['code'] != 0: - logger.warning('app=%d start failed, code=%d, message=%s', self.app_id, data['code'], data['message']) + logger.warning('init_room() failed, code=%d, message=%s, request_id=%s', + data['code'], data['message'], data['request_id']) return False - if not self._parse_start_data( - data - ): + if not self._parse_start_game(data['data']): return False except (aiohttp.ClientConnectionError, asyncio.TimeoutError): - logger.exception('app=%d start failed', self.app_id) + logger.exception('init_room() failed:') return False return True - - def _parse_start_data( - self, - data: dict - ): - self._game_id = data['data']['game_info']['game_id'] - self._ws_auth_body = json.loads(data['data']['websocket_info']['auth_body']) - self._wss_link = data['data']['websocket_info']['wss_link'] - self._anchor_room_id = data['data']['anchor_info']['room_id'] - self._anchor_uname = data['data']['anchor_info']['uname'] - self._anchor_uface = data['data']['anchor_info']['uface'] - self._anchor_uid = data['data']['anchor_info']['uid'] - return True - async def end( - self - ): - if not self._game_id: - logger.warning('app=%d end failed, game_id not found', self.app_id) - return False + def _parse_start_game(self, data): + self._game_id = data['game_info']['game_id'] + websocket_info = data['websocket_info'] + self._auth_body = websocket_info['auth_body'] + self._host_server_list = websocket_info['wss_link'] + anchor_info = data['anchor_info'] + self._room_id = anchor_info['room_id'] + self._room_owner_uid = anchor_info['uid'] + return True + + async def _end_game(self): + """ + 关闭项目。互动玩法类项目建议断开连接时保证调用到这个函数(close会调用),否则短时间内无法重复连接同一个房间 + """ + if self._game_id in (None, ''): + return True try: - params = f'{{"game_id":"{self._game_id}", "app_id":{self.app_id}}}' - headers = self._sign_request_header(params) - async with self._session.post( - OPEN_LIVE_END_URL, headers=headers, data=params, ssl=self._ssl + async with self._request_open_live( + END_URL, + {'app_id': self._app_id, 'game_id': self._game_id} ) as res: if res.status != 200: - logger.warning('app=%d end failed, status=%d, reason=%s', self.app_id, res.status, res.reason) + logger.warning('room=%d _end_game() failed, status=%d, reason=%s', + self._room_id, res.status, res.reason) return False data = await res.json() if data['code'] != 0: - logger.warning('app=%d end failed, code=%d, message=%s', self.app_id, data['code'], data['message']) + logger.warning('room=%d _end_game() failed, code=%d, message=%s, request_id=%s', + self._room_id, data['code'], data['message'], data['request_id']) return False except (aiohttp.ClientConnectionError, asyncio.TimeoutError): - logger.exception('app=%d end failed', self.app_id) + logger.exception('room=%d _end_game() failed:', self._room_id) return False return True - - # 开放平台互动玩法心跳, 用于维持直播间内定制礼物及统计使用数据, 非互动玩法类暂时不需要 - async def heartbeat( - self - ): - if not self._game_id: + + def _on_send_game_heartbeat(self): + """ + 定时发送项目心跳包的回调 + """ + if not self.is_running: + self._game_heartbeat_timer_handle = None + return + + self._game_heartbeat_timer_handle = asyncio.get_running_loop().call_later( + self._game_heartbeat_interval, self._on_send_game_heartbeat + ) + asyncio.create_task(self._send_game_heartbeat()) + + async def _send_game_heartbeat(self): + """ + 发送项目心跳包,仅用于互动玩法类项目 + """ + if self._game_id in (None, ''): logger.warning('game=%d heartbeat failed, game_id not found', self._game_id) return False - + try: - params = f'{{"game_id":"{self._game_id}"}}' - headers = self._sign_request_header(params) - async with self._session.post( - OPEN_LIVE_HEARTBEAT_URL, headers=headers, data=params, ssl=self._ssl + async with self._request_open_live( + HEARTBEAT_URL, + {'game_id': self._game_id} ) as res: if res.status != 200: - logger.warning('game=%d heartbeat failed, status=%d, reason=%s', self._game_id, res.status, res.reason) + logger.warning('room=%d _send_game_heartbeat() failed, status=%d, reason=%s', + self._room_id, res.status, res.reason) return False data = await res.json() if data['code'] != 0: - logger.warning('game=%d heartbeat failed, code=%d, message=%s', self._game_id, data['code'], data['message']) + logger.warning('room=%d _send_game_heartbeat() failed, code=%d, message=%s, request_id=%s', + self._room_id, data['code'], data['message'], data['request_id']) return False except (aiohttp.ClientConnectionError, asyncio.TimeoutError): - logger.exception('game=%d heartbeat failed', self._game_id) + logger.exception('room=%d _send_game_heartbeat() failed:', self._room_id) return False - return True \ No newline at end of file + return True + + async def _network_coroutine(self): + """ + 网络协程,负责连接服务器、接收消息、解包 + """ + # 如果之前未初始化则初始化 + if self._auth_body is None: + if not await self.init_room(): + raise client.InitError('init_room() failed') + + retry_count = 0 + while True: + try: + # 连接 + host_server_url = self._host_server_list[retry_count % len(self._host_server_list)] + async with self._session.ws_connect( + host_server_url, + receive_timeout=self._heartbeat_interval + 5, + ssl=self._ssl + ) as websocket: + self._websocket = websocket + await self._on_ws_connect() + + # 处理消息 + message: aiohttp.WSMessage + async for message in websocket: + await self._on_ws_message(message) + # 至少成功处理1条消息 + retry_count = 0 + + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): + # 掉线重连 + pass + except client.AuthError: + # 认证失败了,应该重新获取auth_body再重连 + logger.exception('room=%d auth failed, trying init_room() again', self.room_id) + if not await self.init_room(): + raise client.InitError('init_room() failed') + except ssl_.SSLError: + logger.error('room=%d a SSLError happened, cannot reconnect', self.room_id) + raise + finally: + self._websocket = None + await self._on_ws_close() + + # 准备重连 + retry_count += 1 + logger.warning('room=%d is reconnecting, retry_count=%d', self.room_id, retry_count) + await asyncio.sleep(1) + + async def _send_auth(self): + """ + 发送认证包 + """ + auth_body = json.loads(self._auth_body) + await self._websocket.send_bytes(self._make_packet(auth_body, client.Operation.AUTH)) diff --git a/open_live_sample.py b/open_live_sample.py index f705b20..dce2a99 100644 --- a/open_live_sample.py +++ b/open_live_sample.py @@ -1,37 +1,47 @@ # -*- coding: utf-8 -*- import asyncio + import blivedm +import blivedm.open_live_client as open_live_client -TEST_AUTH_CODE = '' -APP_ID = '' ACCESS_KEY = '' -ACCESS_KEY_SECRET = '' +ACCESS_SECRET = '' +APP_ID = 0 +ROOM_OWNER_AUTH_CODE = '' -class OpenLiveHandlerInterface: - """ - 开放平台直播消息处理器接口 - """ - - async def handle(self, client: blivedm.BLiveClient, command: dict): - print(f'{command}') async def main(): - await run_start() + await run_single_client() -async def run_start(): - client = blivedm.BLiveClient(use_open_live=True, open_live_app_id=APP_ID, open_live_access_key=ACCESS_KEY, open_live_access_secret=ACCESS_KEY_SECRET, open_live_code=TEST_AUTH_CODE, ssl=True) - handler = OpenLiveHandlerInterface() + +async def run_single_client(): + """ + 演示监听一个直播间 + """ + client = open_live_client.OpenLiveClient( + access_key=ACCESS_KEY, + access_secret=ACCESS_SECRET, + app_id=APP_ID, + room_owner_auth_code=ROOM_OWNER_AUTH_CODE, + ) + handler = MyHandler() client.add_handler(handler) client.start() try: - # 演示20秒后停止 - await asyncio.sleep(60) + # 演示70秒后停止 + await asyncio.sleep(70) client.stop() await client.join() finally: await client.stop_and_close() + +class MyHandler(blivedm.HandlerInterface): + async def handle(self, client: open_live_client.OpenLiveClient, command: dict): + print(command) + + if __name__ == '__main__': - asyncio.run(main()) \ No newline at end of file + asyncio.run(main())