diff --git a/blivedm/__init__.py b/blivedm/__init__.py index 6de332d..b8d89d8 100644 --- a/blivedm/__init__.py +++ b/blivedm/__init__.py @@ -2,3 +2,4 @@ from .models import * from .handlers import * from .client import * +from .open_live_client import * \ No newline at end of file diff --git a/blivedm/client.py b/blivedm/client.py index 66b63d3..7df3e03 100644 --- a/blivedm/client.py +++ b/blivedm/client.py @@ -10,8 +10,11 @@ from typing import * import aiohttp import brotli +from . import open_live_client from . import handlers +OpenLiveClient = open_live_client.OpenLiveClient + __all__ = ( 'BLiveClient', ) @@ -95,11 +98,16 @@ class BLiveClient: def __init__( self, - room_id, + room_id=0, uid=0, session: Optional[aiohttp.ClientSession] = None, heartbeat_interval=30, ssl: Union[bool, ssl_.SSLContext] = True, + + open_live_app_id: Optional[int] = None, + open_live_access_key: Optional[str] = None, + open_live_access_secret: Optional[str] = None, + open_live_code: Optional[str] = None, ): self._tmp_room_id = room_id """用来init_room的临时房间ID,可以用短ID""" @@ -142,6 +150,13 @@ class BLiveClient: self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None """发心跳包定时器的handle""" + self._host_server_auth_body: Dict = None + """开放平台的完整鉴权body""" + + if open_live_app_id and open_live_access_key and open_live_access_secret and open_live_code: + self._open_live_client = OpenLiveClient(open_live_app_id, open_live_access_key, open_live_access_secret, self._session, self._ssl) + self._open_live_auth_code = open_live_code + @property def is_running(self) -> bool: """ @@ -249,6 +264,9 @@ class BLiveClient: :return: True代表没有降级,如果需要降级后还可用,重载这个函数返回True """ res = True + if self._open_live_client and await self._init_room_by_open_live(): + return res + if not await self._init_room_id_and_owner(): res = False # 失败了则降级 @@ -261,6 +279,22 @@ class BLiveClient: self._host_server_list = DEFAULT_DANMAKU_SERVER_LIST self._host_server_token = None return res + + async def _init_room_by_open_live(self): + """ + 通过开放平台初始化房间 + """ + if not self._open_live_client: + logger.warning('_init_room_by_open_live() failed, open_live_client is None') + return False + if not await self._open_live_client.start(self._open_live_auth_code): + logger.warning('app=%d _init_room_by_open_live() failed, open_live_client.start() failed', self._open_live_client.app_id) + return False + self._room_id = self._open_live_client.anchor_room_id + self._room_owner_uid = self._open_live_client.anchor_uid + self._host_server_auth_body = self._open_live_client.ws_auth_body + self._host_server_list = self._open_live_client.wss_link + return True async def _init_room_id_and_owner(self): try: @@ -374,7 +408,7 @@ class BLiveClient: 网络协程,负责连接服务器、接收消息、解包 """ # 如果之前未初始化则初始化 - if self._host_server_token is None: + if self._host_server_auth_body is None and self._host_server_token is None: if not await self.init_room(): raise InitError('init_room() failed') @@ -384,6 +418,7 @@ class BLiveClient: # 连接 host_server = self._host_server_list[retry_count % len(self._host_server_list)] async with self._session.ws_connect( + host_server if isinstance(host_server, str) else f"wss://{host_server['host']}:{host_server['wss_port']}/sub", headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)' @@ -452,6 +487,10 @@ class BLiveClient: } if self._host_server_token is not None: auth_params['key'] = self._host_server_token + + # 开放平台连接则直接替换认证包 + if self._host_server_auth_body is not None: + auth_params = self._host_server_auth_body await self._websocket.send_bytes(self._make_packet(auth_params, Operation.AUTH)) def _on_send_heartbeat(self): diff --git a/blivedm/open_live_client.py b/blivedm/open_live_client.py new file mode 100644 index 0000000..456038a --- /dev/null +++ b/blivedm/open_live_client.py @@ -0,0 +1,193 @@ +# -*- coding: utf-8 -*- +import aiohttp +import asyncio +import hashlib +import hmac +import logging +import random +import ssl as ssl_ +import time +import json +from hashlib import sha256 +from typing import * + +logger = logging.getLogger('open-live-client') + +OPEN_LIVE_START_URL = 'https://live-open.biliapi.com/v2/app/start' +OPEN_LIVE_HEARTBEAT_URL = 'https://live-open.biliapi.com/v2/app/heartbeat' +OPEN_LIVE_END_URL = 'https://live-open.biliapi.com/v2/app/end' + +class OpenLiveClient: + def __init__( + self, + app_id: int, + access_key: str, + access_secret: str, + session: Optional[aiohttp.ClientSession] = None, + ssl: Union[bool, ssl_.SSLContext] = True, + ): + self.app_id = app_id + self.access_key = access_key + self.access_secret = access_secret + self.session = session + + if session is None: + self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) + self._own_session = True + else: + self._session = session + self._own_session = False + assert self._session.loop is asyncio.get_event_loop() # noqa + self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa + + @property + def game_id(self) -> Optional[int]: + return self._game_id + + @property + def ws_auth_body(self) -> Optional[Dict]: + return self._ws_auth_body + + @property + def wss_link(self) -> Optional[List[str]]: + return self._wss_link + + @property + def anchor_room_id(self) -> Optional[int]: + return self._anchor_room_id + + @property + def anchor_uname(self) -> Optional[str]: + return self._anchor_uname + + @property + def anchor_uface(self) -> Optional[str]: + return self._anchor_uface + + @property + def anchor_uid(self) -> Optional[int]: + return self._anchor_uid + + def _sign_request_header( + self, + body: str, + ): + md5 = hashlib.md5() + md5.update(body.encode()) + ts = time.time() + nonce = random.randint(1,100000)+time.time() + md5data = md5.hexdigest() + headerMap = { + "x-bili-timestamp": str(int(ts)), + "x-bili-signature-method": "HMAC-SHA256", + "x-bili-signature-nonce": str(nonce), + "x-bili-accesskeyid": self.access_key, + "x-bili-signature-version": "1.0", + "x-bili-content-md5": md5data, + } + headerList = sorted(headerMap) + headerStr = '' + + for key in headerList: + headerStr = headerStr+ key+":"+str(headerMap[key])+"\n" + headerStr = headerStr.rstrip("\n") + + appsecret = self.access_secret.encode() + data = headerStr.encode() + + signature = hmac.new(appsecret, data, digestmod=sha256).hexdigest() + headerMap["Authorization"] = signature + headerMap["Content-Type"] = "application/json" + headerMap["Accept"] = "application/json" + return headerMap + + # 通过身份码获取直播间及wss连接信息 + async def start( + self, + code: str + ): + try: + params = f'{{"code":"{code}","app_id":{self.app_id}}}' + headers = self._sign_request_header(params) + async with self._session.post( + OPEN_LIVE_START_URL, headers=headers, data=params, ssl=self._ssl + ) as res: + if res.status != 200: + logger.warning('app=%d start failed, status=%d, reason=%s', self.app_id, res.status, res.reason) + return False + data = await res.json() + if data['code'] != 0: + logger.warning('app=%d start failed, code=%d, message=%s', self.app_id, data['code'], data['message']) + return False + if not self._parse_start_data( + data + ): + return False + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): + logger.exception('app=%d start failed', self.app_id) + return False + return True + + def _parse_start_data( + self, + data: dict + ): + self._game_id = data['data']['game_info']['game_id'] + self._ws_auth_body = json.loads(data['data']['websocket_info']['auth_body']) + self._wss_link = data['data']['websocket_info']['wss_link'] + self._anchor_room_id = data['data']['anchor_info']['room_id'] + self._anchor_uname = data['data']['anchor_info']['uname'] + self._anchor_uface = data['data']['anchor_info']['uface'] + self._anchor_uid = data['data']['anchor_info']['uid'] + return True + + async def end( + self + ): + if not self._game_id: + logger.warning('app=%d end failed, game_id not found', self.app_id) + return False + + try: + params = f'{{"app_id":"{self.app_id}","game_id":{self._game_id}}}' + headers = self._sign_request_header(params) + async with self._session.post( + OPEN_LIVE_END_URL, headers=headers, data=params, ssl=self._ssl + ) as res: + if res.status != 200: + logger.warning('app=%d end failed, status=%d, reason=%s', self.app_id, res.status, res.reason) + return False + data = await res.json() + if data['code'] != 0: + logger.warning('app=%d end failed, code=%d, message=%s', self.app_id, data['code'], data['message']) + return False + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): + logger.exception('app=%d end failed', self.app_id) + return False + return True + + # 开放平台互动玩法心跳, 用于维持直播间内定制礼物及统计使用数据, 非互动玩法类暂时不需要 + async def heartbeat( + self + ): + if not self._game_id: + logger.warning('game=%d heartbeat failed, game_id not found', self._game_id) + return False + + try: + params = f'{{""game_id":{self._game_id}}}' + headers = self._sign_request_header(params) + async with self._session.post( + OPEN_LIVE_HEARTBEAT_URL, headers=headers, data=params, ssl=self._ssl + ) as res: + if res.status != 200: + logger.warning('game=%d heartbeat failed, status=%d, reason=%s', self._game_id, res.status, res.reason) + return False + data = await res.json() + if data['code'] != 0: + logger.warning('game=%d heartbeat failed, code=%d, message=%s', self._game_id, data['code'], data['message']) + return False + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): + logger.exception('game=%d heartbeat failed', self._game_id) + return False + return True \ No newline at end of file diff --git a/open_live_sample.py b/open_live_sample.py new file mode 100644 index 0000000..832d123 --- /dev/null +++ b/open_live_sample.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +import asyncio +import blivedm + +TEST_AUTH_CODE = '' +APP_ID = '' +ACCESS_KEY = '' +ACCESS_KEY_SECRET = '' + +class OpenLiveHandlerInterface: + """ + 开放平台直播消息处理器接口 + """ + + async def handle(self, client: blivedm.BLiveClient, command: dict): + print(f'{command}') + +async def main(): + await run_start() + +async def run_start(): + client = blivedm.BLiveClient(open_live_app_id=APP_ID, open_live_access_key=ACCESS_KEY, open_live_access_secret=ACCESS_KEY_SECRET, open_live_code=TEST_AUTH_CODE, ssl=True) + handler = OpenLiveHandlerInterface() + client.add_handler(handler) + + client.start() + try: + # 演示60秒后停止 + await asyncio.sleep(60) + client.stop() + + await client.join() + finally: + await client.stop_and_close() + +if __name__ == '__main__': + asyncio.run(main()) \ No newline at end of file