添加对直播开放平台接口及WS的支持

This commit is contained in:
kinori 2023-09-02 11:52:04 +08:00
parent 28645d5e37
commit ad1fd62f29
4 changed files with 272 additions and 2 deletions

View File

@ -2,3 +2,4 @@
from .models import *
from .handlers import *
from .client import *
from .open_live_client import *

View File

@ -10,8 +10,11 @@ from typing import *
import aiohttp
import brotli
from . import open_live_client
from . import handlers
OpenLiveClient = open_live_client.OpenLiveClient
__all__ = (
'BLiveClient',
)
@ -95,11 +98,16 @@ class BLiveClient:
def __init__(
self,
room_id,
room_id=0,
uid=0,
session: Optional[aiohttp.ClientSession] = None,
heartbeat_interval=30,
ssl: Union[bool, ssl_.SSLContext] = True,
open_live_app_id: Optional[int] = None,
open_live_access_key: Optional[str] = None,
open_live_access_secret: Optional[str] = None,
open_live_code: Optional[str] = None,
):
self._tmp_room_id = room_id
"""用来init_room的临时房间ID可以用短ID"""
@ -142,6 +150,13 @@ class BLiveClient:
self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None
"""发心跳包定时器的handle"""
self._host_server_auth_body: Dict = None
"""开放平台的完整鉴权body"""
if open_live_app_id and open_live_access_key and open_live_access_secret and open_live_code:
self._open_live_client = OpenLiveClient(open_live_app_id, open_live_access_key, open_live_access_secret, self._session, self._ssl)
self._open_live_auth_code = open_live_code
@property
def is_running(self) -> bool:
"""
@ -249,6 +264,9 @@ class BLiveClient:
:return: True代表没有降级如果需要降级后还可用重载这个函数返回True
"""
res = True
if self._open_live_client and await self._init_room_by_open_live():
return res
if not await self._init_room_id_and_owner():
res = False
# 失败了则降级
@ -261,6 +279,22 @@ class BLiveClient:
self._host_server_list = DEFAULT_DANMAKU_SERVER_LIST
self._host_server_token = None
return res
async def _init_room_by_open_live(self):
"""
通过开放平台初始化房间
"""
if not self._open_live_client:
logger.warning('_init_room_by_open_live() failed, open_live_client is None')
return False
if not await self._open_live_client.start(self._open_live_auth_code):
logger.warning('app=%d _init_room_by_open_live() failed, open_live_client.start() failed', self._open_live_client.app_id)
return False
self._room_id = self._open_live_client.anchor_room_id
self._room_owner_uid = self._open_live_client.anchor_uid
self._host_server_auth_body = self._open_live_client.ws_auth_body
self._host_server_list = self._open_live_client.wss_link
return True
async def _init_room_id_and_owner(self):
try:
@ -374,7 +408,7 @@ class BLiveClient:
网络协程负责连接服务器接收消息解包
"""
# 如果之前未初始化则初始化
if self._host_server_token is None:
if self._host_server_auth_body is None and self._host_server_token is None:
if not await self.init_room():
raise InitError('init_room() failed')
@ -384,6 +418,7 @@ class BLiveClient:
# 连接
host_server = self._host_server_list[retry_count % len(self._host_server_list)]
async with self._session.ws_connect(
host_server if isinstance(host_server, str) else
f"wss://{host_server['host']}:{host_server['wss_port']}/sub",
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)'
@ -452,6 +487,10 @@ class BLiveClient:
}
if self._host_server_token is not None:
auth_params['key'] = self._host_server_token
# 开放平台连接则直接替换认证包
if self._host_server_auth_body is not None:
auth_params = self._host_server_auth_body
await self._websocket.send_bytes(self._make_packet(auth_params, Operation.AUTH))
def _on_send_heartbeat(self):

193
blivedm/open_live_client.py Normal file
View File

@ -0,0 +1,193 @@
# -*- coding: utf-8 -*-
import aiohttp
import asyncio
import hashlib
import hmac
import logging
import random
import ssl as ssl_
import time
import json
from hashlib import sha256
from typing import *
logger = logging.getLogger('open-live-client')
OPEN_LIVE_START_URL = 'https://live-open.biliapi.com/v2/app/start'
OPEN_LIVE_HEARTBEAT_URL = 'https://live-open.biliapi.com/v2/app/heartbeat'
OPEN_LIVE_END_URL = 'https://live-open.biliapi.com/v2/app/end'
class OpenLiveClient:
def __init__(
self,
app_id: int,
access_key: str,
access_secret: str,
session: Optional[aiohttp.ClientSession] = None,
ssl: Union[bool, ssl_.SSLContext] = True,
):
self.app_id = app_id
self.access_key = access_key
self.access_secret = access_secret
self.session = session
if session is None:
self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
self._own_session = True
else:
self._session = session
self._own_session = False
assert self._session.loop is asyncio.get_event_loop() # noqa
self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa
@property
def game_id(self) -> Optional[int]:
return self._game_id
@property
def ws_auth_body(self) -> Optional[Dict]:
return self._ws_auth_body
@property
def wss_link(self) -> Optional[List[str]]:
return self._wss_link
@property
def anchor_room_id(self) -> Optional[int]:
return self._anchor_room_id
@property
def anchor_uname(self) -> Optional[str]:
return self._anchor_uname
@property
def anchor_uface(self) -> Optional[str]:
return self._anchor_uface
@property
def anchor_uid(self) -> Optional[int]:
return self._anchor_uid
def _sign_request_header(
self,
body: str,
):
md5 = hashlib.md5()
md5.update(body.encode())
ts = time.time()
nonce = random.randint(1,100000)+time.time()
md5data = md5.hexdigest()
headerMap = {
"x-bili-timestamp": str(int(ts)),
"x-bili-signature-method": "HMAC-SHA256",
"x-bili-signature-nonce": str(nonce),
"x-bili-accesskeyid": self.access_key,
"x-bili-signature-version": "1.0",
"x-bili-content-md5": md5data,
}
headerList = sorted(headerMap)
headerStr = ''
for key in headerList:
headerStr = headerStr+ key+":"+str(headerMap[key])+"\n"
headerStr = headerStr.rstrip("\n")
appsecret = self.access_secret.encode()
data = headerStr.encode()
signature = hmac.new(appsecret, data, digestmod=sha256).hexdigest()
headerMap["Authorization"] = signature
headerMap["Content-Type"] = "application/json"
headerMap["Accept"] = "application/json"
return headerMap
# 通过身份码获取直播间及wss连接信息
async def start(
self,
code: str
):
try:
params = f'{{"code":"{code}","app_id":{self.app_id}}}'
headers = self._sign_request_header(params)
async with self._session.post(
OPEN_LIVE_START_URL, headers=headers, data=params, ssl=self._ssl
) as res:
if res.status != 200:
logger.warning('app=%d start failed, status=%d, reason=%s', self.app_id, res.status, res.reason)
return False
data = await res.json()
if data['code'] != 0:
logger.warning('app=%d start failed, code=%d, message=%s', self.app_id, data['code'], data['message'])
return False
if not self._parse_start_data(
data
):
return False
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('app=%d start failed', self.app_id)
return False
return True
def _parse_start_data(
self,
data: dict
):
self._game_id = data['data']['game_info']['game_id']
self._ws_auth_body = json.loads(data['data']['websocket_info']['auth_body'])
self._wss_link = data['data']['websocket_info']['wss_link']
self._anchor_room_id = data['data']['anchor_info']['room_id']
self._anchor_uname = data['data']['anchor_info']['uname']
self._anchor_uface = data['data']['anchor_info']['uface']
self._anchor_uid = data['data']['anchor_info']['uid']
return True
async def end(
self
):
if not self._game_id:
logger.warning('app=%d end failed, game_id not found', self.app_id)
return False
try:
params = f'{{"app_id":"{self.app_id}","game_id":{self._game_id}}}'
headers = self._sign_request_header(params)
async with self._session.post(
OPEN_LIVE_END_URL, headers=headers, data=params, ssl=self._ssl
) as res:
if res.status != 200:
logger.warning('app=%d end failed, status=%d, reason=%s', self.app_id, res.status, res.reason)
return False
data = await res.json()
if data['code'] != 0:
logger.warning('app=%d end failed, code=%d, message=%s', self.app_id, data['code'], data['message'])
return False
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('app=%d end failed', self.app_id)
return False
return True
# 开放平台互动玩法心跳, 用于维持直播间内定制礼物及统计使用数据, 非互动玩法类暂时不需要
async def heartbeat(
self
):
if not self._game_id:
logger.warning('game=%d heartbeat failed, game_id not found', self._game_id)
return False
try:
params = f'{{""game_id":{self._game_id}}}'
headers = self._sign_request_header(params)
async with self._session.post(
OPEN_LIVE_HEARTBEAT_URL, headers=headers, data=params, ssl=self._ssl
) as res:
if res.status != 200:
logger.warning('game=%d heartbeat failed, status=%d, reason=%s', self._game_id, res.status, res.reason)
return False
data = await res.json()
if data['code'] != 0:
logger.warning('game=%d heartbeat failed, code=%d, message=%s', self._game_id, data['code'], data['message'])
return False
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('game=%d heartbeat failed', self._game_id)
return False
return True

37
open_live_sample.py Normal file
View File

@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
import asyncio
import blivedm
TEST_AUTH_CODE = ''
APP_ID = ''
ACCESS_KEY = ''
ACCESS_KEY_SECRET = ''
class OpenLiveHandlerInterface:
"""
开放平台直播消息处理器接口
"""
async def handle(self, client: blivedm.BLiveClient, command: dict):
print(f'{command}')
async def main():
await run_start()
async def run_start():
client = blivedm.BLiveClient(open_live_app_id=APP_ID, open_live_access_key=ACCESS_KEY, open_live_access_secret=ACCESS_KEY_SECRET, open_live_code=TEST_AUTH_CODE, ssl=True)
handler = OpenLiveHandlerInterface()
client.add_handler(handler)
client.start()
try:
# 演示60秒后停止
await asyncio.sleep(60)
client.stop()
await client.join()
finally:
await client.stop_and_close()
if __name__ == '__main__':
asyncio.run(main())