Merge branch 'pr/35' into dev

This commit is contained in:
John Smith 2023-09-03 10:46:31 +08:00
commit 92e5cf56db
5 changed files with 389 additions and 0 deletions

View File

@ -2,3 +2,4 @@
from .models import *
from .handlers import *
from .client import *
from .open_live_client import *

View File

@ -5,6 +5,7 @@ import json
import logging
import ssl as ssl_
import struct
import zlib
from typing import *
import aiohttp
@ -123,6 +124,7 @@ class BLiveClient:
self._heartbeat_interval = heartbeat_interval
self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa
# TODO 没必要支持多个handler改成单个吧
self._handlers: List[handlers.HandlerInterface] = []
"""消息处理器,可动态增删"""
@ -627,6 +629,10 @@ class BLiveClient:
# 压缩过的先解压,为了避免阻塞网络线程,放在其他线程执行
body = await asyncio.get_running_loop().run_in_executor(None, brotli.decompress, body)
await self._parse_ws_message(body)
elif header.ver == ProtoVer.DEFLATE:
# web端已经不用zlib压缩了但是开放平台会用
body = await asyncio.get_running_loop().run_in_executor(None, zlib.decompress, body)
await self._parse_ws_message(body)
elif header.ver == ProtoVer.NORMAL:
# 没压缩过的直接反序列化因为有万恶的GIL这里不能并行避免阻塞
if len(body) != 0:
@ -661,6 +667,7 @@ class BLiveClient:
:param command: 业务消息
"""
# TODO 考虑解析完整个WS包后再一次处理所有消息。另外用call_soon就不会阻塞网络协程了也不用加shield
# 外部代码可能不能正常处理取消所以这里加shield
results = await asyncio.shield(
asyncio.gather(

View File

@ -51,6 +51,8 @@ class HandlerInterface:
async def handle(self, client: client_.BLiveClient, command: dict):
raise NotImplementedError
# TODO 加个异常停止的回调
class BaseHandler(HandlerInterface):
"""

332
blivedm/open_live_client.py Normal file
View File

@ -0,0 +1,332 @@
# -*- coding: utf-8 -*-
import asyncio
import hashlib
import hmac
import json
import logging
import random
import ssl as ssl_
import datetime
from typing import *
import aiohttp
from . import client, handlers
logger = logging.getLogger('blivedm')
START_URL = 'https://live-open.biliapi.com/v2/app/start'
HEARTBEAT_URL = 'https://live-open.biliapi.com/v2/app/heartbeat'
END_URL = 'https://live-open.biliapi.com/v2/app/end'
# TODO 抽出公共基类现在BLiveClient和OpenLiveClient还有不重合的代码
class OpenLiveClient(client.BLiveClient):
"""
B站直播开放平台客户端负责连接房间
文档参考https://open-live.bilibili.com/document/
:param access_key: 在开放平台申请的access_key
:param access_secret: 在开放平台申请的access_secret
:param app_id: 在开放平台创建的项目ID
:param room_owner_auth_code: 主播身份码
:param session: cookie连接池
:param heartbeat_interval: 发送连接心跳包的间隔时间
:param game_heartbeat_interval: 发送项目心跳包的间隔时间
:param ssl: True表示用默认的SSLContext验证False表示不验证也可以传入SSLContext
"""
def __init__(
self,
access_key: str,
access_secret: str,
app_id: int,
room_owner_auth_code: str,
session: Optional[aiohttp.ClientSession] = None,
heartbeat_interval=30,
game_heartbeat_interval=20,
ssl: Union[bool, ssl_.SSLContext] = True,
):
self._access_key = access_key
self._access_secret = access_secret
self._app_id = app_id
self._room_owner_auth_code = room_owner_auth_code
if session is None:
self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
self._own_session = True
else:
self._session = session
self._own_session = False
assert self._session.loop is asyncio.get_event_loop() # noqa
self._heartbeat_interval = heartbeat_interval
self._game_heartbeat_interval = game_heartbeat_interval
self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa
self._handlers: List[handlers.HandlerInterface] = []
"""消息处理器,可动态增删"""
# 在调用init_room后初始化的字段
self._room_id = None
"""真实房间ID"""
self._room_owner_uid = None
"""主播用户ID"""
self._host_server_list: Optional[List[str]] = []
"""弹幕服务器URL列表"""
self._auth_body = None
"""连接弹幕服务器用的认证包内容"""
self._game_id = None
"""项目场次ID仅用于互动玩法类项目其他项目为空字符串"""
# 在运行时初始化的字段
self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None
"""WebSocket连接"""
self._network_future: Optional[asyncio.Future] = None
"""网络协程的future"""
self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None
"""发连接心跳包定时器的handle"""
self._game_heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None
"""发项目心跳包定时器的handle"""
@property
def room_id(self) -> Optional[int]:
"""
房间ID调用init_room后初始化
"""
return self._room_id
@property
def room_owner_uid(self) -> Optional[int]:
"""
主播用户ID调用init_room后初始化
"""
return self._room_owner_uid
@property
def room_owner_auth_code(self):
"""
主播身份码
"""
return self._room_owner_auth_code
@property
def app_id(self):
"""
在开放平台创建的项目ID
"""
return self._app_id
@property
def game_id(self) -> Optional[str]:
"""
项目场次ID仅用于互动玩法类项目其他项目为空字符串调用init_room后初始化
"""
return self._game_id
async def close(self):
"""
释放本客户端的资源调用后本客户端将不可用
"""
if self.is_running:
logger.warning('room=%s is calling close(), but client is running', self.room_id)
if self._game_heartbeat_timer_handle is not None:
self._game_heartbeat_timer_handle.cancel()
self._game_heartbeat_timer_handle = None
await self._end_game()
await super().close()
def _request_open_live(self, url, body: dict):
body_bytes = json.dumps(body).encode('utf-8')
headers = {
'x-bili-accesskeyid': self._access_key,
'x-bili-content-md5': hashlib.md5(body_bytes).hexdigest(),
'x-bili-signature-method': 'HMAC-SHA256',
'x-bili-signature-nonce': str(random.randint(0, 999999999)),
'x-bili-signature-version': '1.0',
'x-bili-timestamp': str(int(datetime.datetime.now().timestamp())),
}
str_to_sign = '\n'.join(
f'{key}:{value}'
for key, value in headers.items()
)
signature = hmac.new(
self._access_secret.encode('utf-8'), str_to_sign.encode('utf-8'), hashlib.sha256
).hexdigest()
headers['Authorization'] = signature
headers['Content-Type'] = 'application/json'
headers['Accept'] = 'application/json'
return self._session.post(url, headers=headers, data=body_bytes, ssl=self._ssl)
async def init_room(self):
"""
开启项目并初始化连接房间需要的字段
:return: 是否成功
"""
if not await self._start_game():
return False
if self._game_id != '' and self._game_heartbeat_timer_handle is None:
self._game_heartbeat_timer_handle = asyncio.get_running_loop().call_later(
self._game_heartbeat_interval, self._on_send_game_heartbeat
)
return True
async def _start_game(self):
try:
async with self._request_open_live(
START_URL,
{'code': self._room_owner_auth_code, 'app_id': self._app_id}
) as res:
if res.status != 200:
logger.warning('init_room() failed, status=%d, reason=%s', res.status, res.reason)
return False
data = await res.json()
if data['code'] != 0:
logger.warning('init_room() failed, code=%d, message=%s, request_id=%s',
data['code'], data['message'], data['request_id'])
return False
if not self._parse_start_game(data['data']):
return False
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('init_room() failed:')
return False
return True
def _parse_start_game(self, data):
self._game_id = data['game_info']['game_id']
websocket_info = data['websocket_info']
self._auth_body = websocket_info['auth_body']
self._host_server_list = websocket_info['wss_link']
anchor_info = data['anchor_info']
self._room_id = anchor_info['room_id']
self._room_owner_uid = anchor_info['uid']
return True
async def _end_game(self):
"""
关闭项目互动玩法类项目建议断开连接时保证调用到这个函数close会调用否则短时间内无法重复连接同一个房间
"""
if self._game_id in (None, ''):
return True
try:
async with self._request_open_live(
END_URL,
{'app_id': self._app_id, 'game_id': self._game_id}
) as res:
if res.status != 200:
logger.warning('room=%d _end_game() failed, status=%d, reason=%s',
self._room_id, res.status, res.reason)
return False
data = await res.json()
if data['code'] != 0:
logger.warning('room=%d _end_game() failed, code=%d, message=%s, request_id=%s',
self._room_id, data['code'], data['message'], data['request_id'])
return False
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room=%d _end_game() failed:', self._room_id)
return False
return True
def _on_send_game_heartbeat(self):
"""
定时发送项目心跳包的回调
"""
if not self.is_running:
self._game_heartbeat_timer_handle = None
return
self._game_heartbeat_timer_handle = asyncio.get_running_loop().call_later(
self._game_heartbeat_interval, self._on_send_game_heartbeat
)
asyncio.create_task(self._send_game_heartbeat())
async def _send_game_heartbeat(self):
"""
发送项目心跳包仅用于互动玩法类项目
"""
if self._game_id in (None, ''):
logger.warning('game=%d heartbeat failed, game_id not found', self._game_id)
return False
try:
async with self._request_open_live(
HEARTBEAT_URL,
{'game_id': self._game_id}
) as res:
if res.status != 200:
logger.warning('room=%d _send_game_heartbeat() failed, status=%d, reason=%s',
self._room_id, res.status, res.reason)
return False
data = await res.json()
if data['code'] != 0:
logger.warning('room=%d _send_game_heartbeat() failed, code=%d, message=%s, request_id=%s',
self._room_id, data['code'], data['message'], data['request_id'])
return False
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room=%d _send_game_heartbeat() failed:', self._room_id)
return False
return True
async def _network_coroutine(self):
"""
网络协程负责连接服务器接收消息解包
"""
# 如果之前未初始化则初始化
if self._auth_body is None:
if not await self.init_room():
raise client.InitError('init_room() failed')
retry_count = 0
while True:
try:
# 连接
host_server_url = self._host_server_list[retry_count % len(self._host_server_list)]
async with self._session.ws_connect(
host_server_url,
receive_timeout=self._heartbeat_interval + 5,
ssl=self._ssl
) as websocket:
self._websocket = websocket
await self._on_ws_connect()
# 处理消息
message: aiohttp.WSMessage
async for message in websocket:
await self._on_ws_message(message)
# 至少成功处理1条消息
retry_count = 0
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
# 掉线重连
pass
except client.AuthError:
# 认证失败了应该重新获取auth_body再重连
logger.exception('room=%d auth failed, trying init_room() again', self.room_id)
if not await self.init_room():
raise client.InitError('init_room() failed')
except ssl_.SSLError:
logger.error('room=%d a SSLError happened, cannot reconnect', self.room_id)
raise
finally:
self._websocket = None
await self._on_ws_close()
# 准备重连
retry_count += 1
logger.warning('room=%d is reconnecting, retry_count=%d', self.room_id, retry_count)
await asyncio.sleep(1)
async def _send_auth(self):
"""
发送认证包
"""
auth_body = json.loads(self._auth_body)
await self._websocket.send_bytes(self._make_packet(auth_body, client.Operation.AUTH))

47
open_live_sample.py Normal file
View File

@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
import asyncio
import blivedm
import blivedm.open_live_client as open_live_client
ACCESS_KEY = ''
ACCESS_SECRET = ''
APP_ID = 0
ROOM_OWNER_AUTH_CODE = ''
async def main():
await run_single_client()
async def run_single_client():
"""
演示监听一个直播间
"""
client = open_live_client.OpenLiveClient(
access_key=ACCESS_KEY,
access_secret=ACCESS_SECRET,
app_id=APP_ID,
room_owner_auth_code=ROOM_OWNER_AUTH_CODE,
)
handler = MyHandler()
client.add_handler(handler)
client.start()
try:
# 演示70秒后停止
await asyncio.sleep(70)
client.stop()
await client.join()
finally:
await client.stop_and_close()
class MyHandler(blivedm.HandlerInterface):
async def handle(self, client: open_live_client.OpenLiveClient, command: dict):
print(command)
if __name__ == '__main__':
asyncio.run(main())