mirror of
https://github.com/xfgryujk/blivedm.git
synced 2024-12-26 21:00:17 +08:00
整理开放平台接口代码
This commit is contained in:
parent
0bdec42f50
commit
c99909be13
@ -5,16 +5,14 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
import ssl as ssl_
|
import ssl as ssl_
|
||||||
import struct
|
import struct
|
||||||
|
import zlib
|
||||||
from typing import *
|
from typing import *
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
import brotli
|
import brotli
|
||||||
|
|
||||||
from . import open_live_client
|
|
||||||
from . import handlers
|
from . import handlers
|
||||||
|
|
||||||
OpenLiveClient = open_live_client.OpenLiveClient
|
|
||||||
|
|
||||||
__all__ = (
|
__all__ = (
|
||||||
'BLiveClient',
|
'BLiveClient',
|
||||||
)
|
)
|
||||||
@ -98,17 +96,11 @@ class BLiveClient:
|
|||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
room_id=0,
|
room_id,
|
||||||
uid=0,
|
uid=0,
|
||||||
session: Optional[aiohttp.ClientSession] = None,
|
session: Optional[aiohttp.ClientSession] = None,
|
||||||
heartbeat_interval=30,
|
heartbeat_interval=30,
|
||||||
ssl: Union[bool, ssl_.SSLContext] = True,
|
ssl: Union[bool, ssl_.SSLContext] = True,
|
||||||
|
|
||||||
use_open_live: bool = False,
|
|
||||||
open_live_app_id: Optional[int] = None,
|
|
||||||
open_live_access_key: Optional[str] = None,
|
|
||||||
open_live_access_secret: Optional[str] = None,
|
|
||||||
open_live_code: Optional[str] = None,
|
|
||||||
):
|
):
|
||||||
self._tmp_room_id = room_id
|
self._tmp_room_id = room_id
|
||||||
"""用来init_room的临时房间ID,可以用短ID"""
|
"""用来init_room的临时房间ID,可以用短ID"""
|
||||||
@ -125,6 +117,7 @@ class BLiveClient:
|
|||||||
self._heartbeat_interval = heartbeat_interval
|
self._heartbeat_interval = heartbeat_interval
|
||||||
self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa
|
self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa
|
||||||
|
|
||||||
|
# TODO 没必要支持多个handler,改成单个吧
|
||||||
self._handlers: List[handlers.HandlerInterface] = []
|
self._handlers: List[handlers.HandlerInterface] = []
|
||||||
"""消息处理器,可动态增删"""
|
"""消息处理器,可动态增删"""
|
||||||
|
|
||||||
@ -151,14 +144,6 @@ class BLiveClient:
|
|||||||
self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None
|
self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None
|
||||||
"""发心跳包定时器的handle"""
|
"""发心跳包定时器的handle"""
|
||||||
|
|
||||||
self._open_live_client = None
|
|
||||||
self._host_server_auth_body: Dict = None
|
|
||||||
"""开放平台的完整鉴权body"""
|
|
||||||
|
|
||||||
if use_open_live:
|
|
||||||
self._open_live_client = OpenLiveClient(open_live_app_id, open_live_access_key, open_live_access_secret, self._session, self._ssl)
|
|
||||||
self._open_live_auth_code = open_live_code
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_running(self) -> bool:
|
def is_running(self) -> bool:
|
||||||
"""
|
"""
|
||||||
@ -233,10 +218,6 @@ class BLiveClient:
|
|||||||
"""
|
"""
|
||||||
便利函数,停止本客户端并释放本客户端的资源,调用后本客户端将不可用
|
便利函数,停止本客户端并释放本客户端的资源,调用后本客户端将不可用
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if self._open_live_client:
|
|
||||||
await self._open_live_client.end()
|
|
||||||
|
|
||||||
if self.is_running:
|
if self.is_running:
|
||||||
self.stop()
|
self.stop()
|
||||||
await self.join()
|
await self.join()
|
||||||
@ -270,9 +251,6 @@ class BLiveClient:
|
|||||||
:return: True代表没有降级,如果需要降级后还可用,重载这个函数返回True
|
:return: True代表没有降级,如果需要降级后还可用,重载这个函数返回True
|
||||||
"""
|
"""
|
||||||
res = True
|
res = True
|
||||||
if self._open_live_client and await self._init_room_by_open_live():
|
|
||||||
return res
|
|
||||||
|
|
||||||
if not await self._init_room_id_and_owner():
|
if not await self._init_room_id_and_owner():
|
||||||
res = False
|
res = False
|
||||||
# 失败了则降级
|
# 失败了则降级
|
||||||
@ -286,22 +264,6 @@ class BLiveClient:
|
|||||||
self._host_server_token = None
|
self._host_server_token = None
|
||||||
return res
|
return res
|
||||||
|
|
||||||
async def _init_room_by_open_live(self):
|
|
||||||
"""
|
|
||||||
通过开放平台初始化房间
|
|
||||||
"""
|
|
||||||
if not self._open_live_client:
|
|
||||||
logger.warning('_init_room_by_open_live() failed, open_live_client is None')
|
|
||||||
return False
|
|
||||||
if not await self._open_live_client.start(self._open_live_auth_code):
|
|
||||||
logger.warning('app=%d _init_room_by_open_live() failed, open_live_client.start() failed', self._open_live_client.app_id)
|
|
||||||
return False
|
|
||||||
self._room_id = self._open_live_client.anchor_room_id
|
|
||||||
self._room_owner_uid = self._open_live_client.anchor_uid
|
|
||||||
self._host_server_auth_body = self._open_live_client.ws_auth_body
|
|
||||||
self._host_server_list = self._open_live_client.wss_link
|
|
||||||
return True
|
|
||||||
|
|
||||||
async def _init_room_id_and_owner(self):
|
async def _init_room_id_and_owner(self):
|
||||||
try:
|
try:
|
||||||
async with self._session.get(
|
async with self._session.get(
|
||||||
@ -414,7 +376,7 @@ class BLiveClient:
|
|||||||
网络协程,负责连接服务器、接收消息、解包
|
网络协程,负责连接服务器、接收消息、解包
|
||||||
"""
|
"""
|
||||||
# 如果之前未初始化则初始化
|
# 如果之前未初始化则初始化
|
||||||
if self._host_server_auth_body is None and self._host_server_token is None:
|
if self._host_server_token is None:
|
||||||
if not await self.init_room():
|
if not await self.init_room():
|
||||||
raise InitError('init_room() failed')
|
raise InitError('init_room() failed')
|
||||||
|
|
||||||
@ -424,7 +386,6 @@ class BLiveClient:
|
|||||||
# 连接
|
# 连接
|
||||||
host_server = self._host_server_list[retry_count % len(self._host_server_list)]
|
host_server = self._host_server_list[retry_count % len(self._host_server_list)]
|
||||||
async with self._session.ws_connect(
|
async with self._session.ws_connect(
|
||||||
host_server if isinstance(host_server, str) else
|
|
||||||
f"wss://{host_server['host']}:{host_server['wss_port']}/sub",
|
f"wss://{host_server['host']}:{host_server['wss_port']}/sub",
|
||||||
headers={
|
headers={
|
||||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)'
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)'
|
||||||
@ -493,10 +454,6 @@ class BLiveClient:
|
|||||||
}
|
}
|
||||||
if self._host_server_token is not None:
|
if self._host_server_token is not None:
|
||||||
auth_params['key'] = self._host_server_token
|
auth_params['key'] = self._host_server_token
|
||||||
|
|
||||||
# 开放平台连接则直接替换认证包
|
|
||||||
if self._host_server_auth_body is not None:
|
|
||||||
auth_params = self._host_server_auth_body
|
|
||||||
await self._websocket.send_bytes(self._make_packet(auth_params, Operation.AUTH))
|
await self._websocket.send_bytes(self._make_packet(auth_params, Operation.AUTH))
|
||||||
|
|
||||||
def _on_send_heartbeat(self):
|
def _on_send_heartbeat(self):
|
||||||
@ -526,13 +483,6 @@ class BLiveClient:
|
|||||||
except Exception: # noqa
|
except Exception: # noqa
|
||||||
logger.exception('room=%d _send_heartbeat() failed:', self.room_id)
|
logger.exception('room=%d _send_heartbeat() failed:', self.room_id)
|
||||||
|
|
||||||
try:
|
|
||||||
await self._open_live_client.heartbeat()
|
|
||||||
except (ConnectionResetError, aiohttp.ClientConnectionError) as e:
|
|
||||||
logger.warning('room=%d _send_heartbeat() failed: %r', self.room_id, e)
|
|
||||||
except Exception: # noqa
|
|
||||||
logger.exception('room=%d _send_heartbeat() failed:', self.room_id)
|
|
||||||
|
|
||||||
async def _on_ws_message(self, message: aiohttp.WSMessage):
|
async def _on_ws_message(self, message: aiohttp.WSMessage):
|
||||||
"""
|
"""
|
||||||
收到WebSocket消息
|
收到WebSocket消息
|
||||||
@ -611,6 +561,10 @@ class BLiveClient:
|
|||||||
# 压缩过的先解压,为了避免阻塞网络线程,放在其他线程执行
|
# 压缩过的先解压,为了避免阻塞网络线程,放在其他线程执行
|
||||||
body = await asyncio.get_running_loop().run_in_executor(None, brotli.decompress, body)
|
body = await asyncio.get_running_loop().run_in_executor(None, brotli.decompress, body)
|
||||||
await self._parse_ws_message(body)
|
await self._parse_ws_message(body)
|
||||||
|
elif header.ver == ProtoVer.DEFLATE:
|
||||||
|
# web端已经不用zlib压缩了,但是开放平台会用
|
||||||
|
body = await asyncio.get_running_loop().run_in_executor(None, zlib.decompress, body)
|
||||||
|
await self._parse_ws_message(body)
|
||||||
elif header.ver == ProtoVer.NORMAL:
|
elif header.ver == ProtoVer.NORMAL:
|
||||||
# 没压缩过的直接反序列化,因为有万恶的GIL,这里不能并行避免阻塞
|
# 没压缩过的直接反序列化,因为有万恶的GIL,这里不能并行避免阻塞
|
||||||
if len(body) != 0:
|
if len(body) != 0:
|
||||||
@ -645,6 +599,7 @@ class BLiveClient:
|
|||||||
|
|
||||||
:param command: 业务消息
|
:param command: 业务消息
|
||||||
"""
|
"""
|
||||||
|
# TODO 考虑解析完整个WS包后再一次处理所有消息。另外用call_soon就不会阻塞网络协程了,也不用加shield
|
||||||
# 外部代码可能不能正常处理取消,所以这里加shield
|
# 外部代码可能不能正常处理取消,所以这里加shield
|
||||||
results = await asyncio.shield(
|
results = await asyncio.shield(
|
||||||
asyncio.gather(
|
asyncio.gather(
|
||||||
|
@ -51,6 +51,8 @@ class HandlerInterface:
|
|||||||
async def handle(self, client: client_.BLiveClient, command: dict):
|
async def handle(self, client: client_.BLiveClient, command: dict):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
# TODO 加个异常停止的回调
|
||||||
|
|
||||||
|
|
||||||
class BaseHandler(HandlerInterface):
|
class BaseHandler(HandlerInterface):
|
||||||
"""
|
"""
|
||||||
|
@ -1,35 +1,57 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
import aiohttp
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import hashlib
|
import hashlib
|
||||||
import hmac
|
import hmac
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import random
|
import random
|
||||||
import ssl as ssl_
|
import ssl as ssl_
|
||||||
import time
|
import datetime
|
||||||
import json
|
|
||||||
from hashlib import sha256
|
|
||||||
from typing import *
|
from typing import *
|
||||||
|
|
||||||
logger = logging.getLogger('open-live-client')
|
import aiohttp
|
||||||
|
|
||||||
OPEN_LIVE_START_URL = 'https://live-open.biliapi.com/v2/app/start'
|
from . import client, handlers
|
||||||
OPEN_LIVE_HEARTBEAT_URL = 'https://live-open.biliapi.com/v2/app/heartbeat'
|
|
||||||
OPEN_LIVE_END_URL = 'https://live-open.biliapi.com/v2/app/end'
|
logger = logging.getLogger('blivedm')
|
||||||
|
|
||||||
|
START_URL = 'https://live-open.biliapi.com/v2/app/start'
|
||||||
|
HEARTBEAT_URL = 'https://live-open.biliapi.com/v2/app/heartbeat'
|
||||||
|
END_URL = 'https://live-open.biliapi.com/v2/app/end'
|
||||||
|
|
||||||
|
|
||||||
|
# TODO 抽出公共基类,现在BLiveClient和OpenLiveClient还有不重合的代码
|
||||||
|
class OpenLiveClient(client.BLiveClient):
|
||||||
|
"""
|
||||||
|
B站直播开放平台客户端,负责连接房间
|
||||||
|
|
||||||
|
文档参考:https://open-live.bilibili.com/document/
|
||||||
|
|
||||||
|
:param access_key: 在开放平台申请的access_key
|
||||||
|
:param access_secret: 在开放平台申请的access_secret
|
||||||
|
:param app_id: 在开放平台创建的项目ID
|
||||||
|
:param room_owner_auth_code: 主播身份码
|
||||||
|
:param session: cookie、连接池
|
||||||
|
:param heartbeat_interval: 发送连接心跳包的间隔时间(秒)
|
||||||
|
:param game_heartbeat_interval: 发送项目心跳包的间隔时间(秒)
|
||||||
|
:param ssl: True表示用默认的SSLContext验证,False表示不验证,也可以传入SSLContext
|
||||||
|
"""
|
||||||
|
|
||||||
class OpenLiveClient:
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
app_id: int,
|
|
||||||
access_key: str,
|
access_key: str,
|
||||||
access_secret: str,
|
access_secret: str,
|
||||||
|
app_id: int,
|
||||||
|
room_owner_auth_code: str,
|
||||||
session: Optional[aiohttp.ClientSession] = None,
|
session: Optional[aiohttp.ClientSession] = None,
|
||||||
|
heartbeat_interval=30,
|
||||||
|
game_heartbeat_interval=20,
|
||||||
ssl: Union[bool, ssl_.SSLContext] = True,
|
ssl: Union[bool, ssl_.SSLContext] = True,
|
||||||
):
|
):
|
||||||
self.app_id = app_id
|
self._access_key = access_key
|
||||||
self.access_key = access_key
|
self._access_secret = access_secret
|
||||||
self.access_secret = access_secret
|
self._app_id = app_id
|
||||||
self.session = session
|
self._room_owner_auth_code = room_owner_auth_code
|
||||||
|
|
||||||
if session is None:
|
if session is None:
|
||||||
self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
|
self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
|
||||||
@ -38,156 +60,273 @@ class OpenLiveClient:
|
|||||||
self._session = session
|
self._session = session
|
||||||
self._own_session = False
|
self._own_session = False
|
||||||
assert self._session.loop is asyncio.get_event_loop() # noqa
|
assert self._session.loop is asyncio.get_event_loop() # noqa
|
||||||
|
|
||||||
|
self._heartbeat_interval = heartbeat_interval
|
||||||
|
self._game_heartbeat_interval = game_heartbeat_interval
|
||||||
self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa
|
self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa
|
||||||
|
|
||||||
|
self._handlers: List[handlers.HandlerInterface] = []
|
||||||
|
"""消息处理器,可动态增删"""
|
||||||
|
|
||||||
|
# 在调用init_room后初始化的字段
|
||||||
|
self._room_id = None
|
||||||
|
"""真实房间ID"""
|
||||||
|
self._room_owner_uid = None
|
||||||
|
"""主播用户ID"""
|
||||||
|
self._host_server_list: Optional[List[str]] = []
|
||||||
|
"""弹幕服务器URL列表"""
|
||||||
|
self._auth_body = None
|
||||||
|
"""连接弹幕服务器用的认证包内容"""
|
||||||
|
self._game_id = None
|
||||||
|
"""项目场次ID,仅用于互动玩法类项目,其他项目为空字符串"""
|
||||||
|
|
||||||
|
# 在运行时初始化的字段
|
||||||
|
self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None
|
||||||
|
"""WebSocket连接"""
|
||||||
|
self._network_future: Optional[asyncio.Future] = None
|
||||||
|
"""网络协程的future"""
|
||||||
|
self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None
|
||||||
|
"""发连接心跳包定时器的handle"""
|
||||||
|
self._game_heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None
|
||||||
|
"""发项目心跳包定时器的handle"""
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def game_id(self) -> Optional[int]:
|
def room_id(self) -> Optional[int]:
|
||||||
|
"""
|
||||||
|
房间ID,调用init_room后初始化
|
||||||
|
"""
|
||||||
|
return self._room_id
|
||||||
|
|
||||||
|
@property
|
||||||
|
def room_owner_uid(self) -> Optional[int]:
|
||||||
|
"""
|
||||||
|
主播用户ID,调用init_room后初始化
|
||||||
|
"""
|
||||||
|
return self._room_owner_uid
|
||||||
|
|
||||||
|
@property
|
||||||
|
def room_owner_auth_code(self):
|
||||||
|
"""
|
||||||
|
主播身份码
|
||||||
|
"""
|
||||||
|
return self._room_owner_auth_code
|
||||||
|
|
||||||
|
@property
|
||||||
|
def app_id(self):
|
||||||
|
"""
|
||||||
|
在开放平台创建的项目ID
|
||||||
|
"""
|
||||||
|
return self._app_id
|
||||||
|
|
||||||
|
@property
|
||||||
|
def game_id(self) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
项目场次ID,仅用于互动玩法类项目,其他项目为空字符串,调用init_room后初始化
|
||||||
|
"""
|
||||||
return self._game_id
|
return self._game_id
|
||||||
|
|
||||||
@property
|
async def close(self):
|
||||||
def ws_auth_body(self) -> Optional[Dict]:
|
"""
|
||||||
return self._ws_auth_body
|
释放本客户端的资源,调用后本客户端将不可用
|
||||||
|
"""
|
||||||
|
if self.is_running:
|
||||||
|
logger.warning('room=%s is calling close(), but client is running', self.room_id)
|
||||||
|
|
||||||
@property
|
if self._game_heartbeat_timer_handle is not None:
|
||||||
def wss_link(self) -> Optional[List[str]]:
|
self._game_heartbeat_timer_handle.cancel()
|
||||||
return self._wss_link
|
self._game_heartbeat_timer_handle = None
|
||||||
|
await self._end_game()
|
||||||
|
|
||||||
@property
|
await super().close()
|
||||||
def anchor_room_id(self) -> Optional[int]:
|
|
||||||
return self._anchor_room_id
|
|
||||||
|
|
||||||
@property
|
def _request_open_live(self, url, body: dict):
|
||||||
def anchor_uname(self) -> Optional[str]:
|
body_bytes = json.dumps(body).encode('utf-8')
|
||||||
return self._anchor_uname
|
headers = {
|
||||||
|
'x-bili-accesskeyid': self._access_key,
|
||||||
@property
|
'x-bili-content-md5': hashlib.md5(body_bytes).hexdigest(),
|
||||||
def anchor_uface(self) -> Optional[str]:
|
'x-bili-signature-method': 'HMAC-SHA256',
|
||||||
return self._anchor_uface
|
'x-bili-signature-nonce': str(random.randint(0, 999999999)),
|
||||||
|
'x-bili-signature-version': '1.0',
|
||||||
@property
|
'x-bili-timestamp': str(int(datetime.datetime.now().timestamp())),
|
||||||
def anchor_uid(self) -> Optional[int]:
|
|
||||||
return self._anchor_uid
|
|
||||||
|
|
||||||
def _sign_request_header(
|
|
||||||
self,
|
|
||||||
body: str,
|
|
||||||
):
|
|
||||||
md5 = hashlib.md5()
|
|
||||||
md5.update(body.encode())
|
|
||||||
ts = time.time()
|
|
||||||
nonce = random.randint(1,100000)+time.time()
|
|
||||||
md5data = md5.hexdigest()
|
|
||||||
headerMap = {
|
|
||||||
"x-bili-timestamp": str(int(ts)),
|
|
||||||
"x-bili-signature-method": "HMAC-SHA256",
|
|
||||||
"x-bili-signature-nonce": str(nonce),
|
|
||||||
"x-bili-accesskeyid": self.access_key,
|
|
||||||
"x-bili-signature-version": "1.0",
|
|
||||||
"x-bili-content-md5": md5data,
|
|
||||||
}
|
}
|
||||||
headerList = sorted(headerMap)
|
|
||||||
headerStr = ''
|
|
||||||
|
|
||||||
for key in headerList:
|
str_to_sign = '\n'.join(
|
||||||
headerStr = headerStr+ key+":"+str(headerMap[key])+"\n"
|
f'{key}:{value}'
|
||||||
headerStr = headerStr.rstrip("\n")
|
for key, value in headers.items()
|
||||||
|
)
|
||||||
|
signature = hmac.new(
|
||||||
|
self._access_secret.encode('utf-8'), str_to_sign.encode('utf-8'), hashlib.sha256
|
||||||
|
).hexdigest()
|
||||||
|
headers['Authorization'] = signature
|
||||||
|
|
||||||
appsecret = self.access_secret.encode()
|
headers['Content-Type'] = 'application/json'
|
||||||
data = headerStr.encode()
|
headers['Accept'] = 'application/json'
|
||||||
|
return self._session.post(url, headers=headers, data=body_bytes, ssl=self._ssl)
|
||||||
|
|
||||||
signature = hmac.new(appsecret, data, digestmod=sha256).hexdigest()
|
async def init_room(self):
|
||||||
headerMap["Authorization"] = signature
|
"""
|
||||||
headerMap["Content-Type"] = "application/json"
|
开启项目,并初始化连接房间需要的字段
|
||||||
headerMap["Accept"] = "application/json"
|
|
||||||
return headerMap
|
|
||||||
|
|
||||||
# 通过身份码获取直播间及wss连接信息
|
:return: 是否成功
|
||||||
async def start(
|
"""
|
||||||
self,
|
if not await self._start_game():
|
||||||
code: str
|
return False
|
||||||
):
|
|
||||||
|
if self._game_id != '' and self._game_heartbeat_timer_handle is None:
|
||||||
|
self._game_heartbeat_timer_handle = asyncio.get_running_loop().call_later(
|
||||||
|
self._game_heartbeat_interval, self._on_send_game_heartbeat
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
|
||||||
|
async def _start_game(self):
|
||||||
try:
|
try:
|
||||||
params = f'{{"code":"{code}","app_id":{self.app_id}}}'
|
async with self._request_open_live(
|
||||||
headers = self._sign_request_header(params)
|
START_URL,
|
||||||
async with self._session.post(
|
{'code': self._room_owner_auth_code, 'app_id': self._app_id}
|
||||||
OPEN_LIVE_START_URL, headers=headers, data=params, ssl=self._ssl
|
|
||||||
) as res:
|
) as res:
|
||||||
if res.status != 200:
|
if res.status != 200:
|
||||||
logger.warning('app=%d start failed, status=%d, reason=%s', self.app_id, res.status, res.reason)
|
logger.warning('init_room() failed, status=%d, reason=%s', res.status, res.reason)
|
||||||
return False
|
return False
|
||||||
data = await res.json()
|
data = await res.json()
|
||||||
if data['code'] != 0:
|
if data['code'] != 0:
|
||||||
logger.warning('app=%d start failed, code=%d, message=%s', self.app_id, data['code'], data['message'])
|
logger.warning('init_room() failed, code=%d, message=%s, request_id=%s',
|
||||||
|
data['code'], data['message'], data['request_id'])
|
||||||
return False
|
return False
|
||||||
if not self._parse_start_data(
|
if not self._parse_start_game(data['data']):
|
||||||
data
|
|
||||||
):
|
|
||||||
return False
|
return False
|
||||||
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
||||||
logger.exception('app=%d start failed', self.app_id)
|
logger.exception('init_room() failed:')
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _parse_start_data(
|
def _parse_start_game(self, data):
|
||||||
self,
|
self._game_id = data['game_info']['game_id']
|
||||||
data: dict
|
websocket_info = data['websocket_info']
|
||||||
):
|
self._auth_body = websocket_info['auth_body']
|
||||||
self._game_id = data['data']['game_info']['game_id']
|
self._host_server_list = websocket_info['wss_link']
|
||||||
self._ws_auth_body = json.loads(data['data']['websocket_info']['auth_body'])
|
anchor_info = data['anchor_info']
|
||||||
self._wss_link = data['data']['websocket_info']['wss_link']
|
self._room_id = anchor_info['room_id']
|
||||||
self._anchor_room_id = data['data']['anchor_info']['room_id']
|
self._room_owner_uid = anchor_info['uid']
|
||||||
self._anchor_uname = data['data']['anchor_info']['uname']
|
|
||||||
self._anchor_uface = data['data']['anchor_info']['uface']
|
|
||||||
self._anchor_uid = data['data']['anchor_info']['uid']
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def end(
|
async def _end_game(self):
|
||||||
self
|
"""
|
||||||
):
|
关闭项目。互动玩法类项目建议断开连接时保证调用到这个函数(close会调用),否则短时间内无法重复连接同一个房间
|
||||||
if not self._game_id:
|
"""
|
||||||
logger.warning('app=%d end failed, game_id not found', self.app_id)
|
if self._game_id in (None, ''):
|
||||||
return False
|
return True
|
||||||
|
|
||||||
try:
|
try:
|
||||||
params = f'{{"game_id":"{self._game_id}", "app_id":{self.app_id}}}'
|
async with self._request_open_live(
|
||||||
headers = self._sign_request_header(params)
|
END_URL,
|
||||||
async with self._session.post(
|
{'app_id': self._app_id, 'game_id': self._game_id}
|
||||||
OPEN_LIVE_END_URL, headers=headers, data=params, ssl=self._ssl
|
|
||||||
) as res:
|
) as res:
|
||||||
if res.status != 200:
|
if res.status != 200:
|
||||||
logger.warning('app=%d end failed, status=%d, reason=%s', self.app_id, res.status, res.reason)
|
logger.warning('room=%d _end_game() failed, status=%d, reason=%s',
|
||||||
|
self._room_id, res.status, res.reason)
|
||||||
return False
|
return False
|
||||||
data = await res.json()
|
data = await res.json()
|
||||||
if data['code'] != 0:
|
if data['code'] != 0:
|
||||||
logger.warning('app=%d end failed, code=%d, message=%s', self.app_id, data['code'], data['message'])
|
logger.warning('room=%d _end_game() failed, code=%d, message=%s, request_id=%s',
|
||||||
|
self._room_id, data['code'], data['message'], data['request_id'])
|
||||||
return False
|
return False
|
||||||
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
||||||
logger.exception('app=%d end failed', self.app_id)
|
logger.exception('room=%d _end_game() failed:', self._room_id)
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# 开放平台互动玩法心跳, 用于维持直播间内定制礼物及统计使用数据, 非互动玩法类暂时不需要
|
def _on_send_game_heartbeat(self):
|
||||||
async def heartbeat(
|
"""
|
||||||
self
|
定时发送项目心跳包的回调
|
||||||
):
|
"""
|
||||||
if not self._game_id:
|
if not self.is_running:
|
||||||
|
self._game_heartbeat_timer_handle = None
|
||||||
|
return
|
||||||
|
|
||||||
|
self._game_heartbeat_timer_handle = asyncio.get_running_loop().call_later(
|
||||||
|
self._game_heartbeat_interval, self._on_send_game_heartbeat
|
||||||
|
)
|
||||||
|
asyncio.create_task(self._send_game_heartbeat())
|
||||||
|
|
||||||
|
async def _send_game_heartbeat(self):
|
||||||
|
"""
|
||||||
|
发送项目心跳包,仅用于互动玩法类项目
|
||||||
|
"""
|
||||||
|
if self._game_id in (None, ''):
|
||||||
logger.warning('game=%d heartbeat failed, game_id not found', self._game_id)
|
logger.warning('game=%d heartbeat failed, game_id not found', self._game_id)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
params = f'{{"game_id":"{self._game_id}"}}'
|
async with self._request_open_live(
|
||||||
headers = self._sign_request_header(params)
|
HEARTBEAT_URL,
|
||||||
async with self._session.post(
|
{'game_id': self._game_id}
|
||||||
OPEN_LIVE_HEARTBEAT_URL, headers=headers, data=params, ssl=self._ssl
|
|
||||||
) as res:
|
) as res:
|
||||||
if res.status != 200:
|
if res.status != 200:
|
||||||
logger.warning('game=%d heartbeat failed, status=%d, reason=%s', self._game_id, res.status, res.reason)
|
logger.warning('room=%d _send_game_heartbeat() failed, status=%d, reason=%s',
|
||||||
|
self._room_id, res.status, res.reason)
|
||||||
return False
|
return False
|
||||||
data = await res.json()
|
data = await res.json()
|
||||||
if data['code'] != 0:
|
if data['code'] != 0:
|
||||||
logger.warning('game=%d heartbeat failed, code=%d, message=%s', self._game_id, data['code'], data['message'])
|
logger.warning('room=%d _send_game_heartbeat() failed, code=%d, message=%s, request_id=%s',
|
||||||
|
self._room_id, data['code'], data['message'], data['request_id'])
|
||||||
return False
|
return False
|
||||||
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
||||||
logger.exception('game=%d heartbeat failed', self._game_id)
|
logger.exception('room=%d _send_game_heartbeat() failed:', self._room_id)
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
async def _network_coroutine(self):
|
||||||
|
"""
|
||||||
|
网络协程,负责连接服务器、接收消息、解包
|
||||||
|
"""
|
||||||
|
# 如果之前未初始化则初始化
|
||||||
|
if self._auth_body is None:
|
||||||
|
if not await self.init_room():
|
||||||
|
raise client.InitError('init_room() failed')
|
||||||
|
|
||||||
|
retry_count = 0
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
# 连接
|
||||||
|
host_server_url = self._host_server_list[retry_count % len(self._host_server_list)]
|
||||||
|
async with self._session.ws_connect(
|
||||||
|
host_server_url,
|
||||||
|
receive_timeout=self._heartbeat_interval + 5,
|
||||||
|
ssl=self._ssl
|
||||||
|
) as websocket:
|
||||||
|
self._websocket = websocket
|
||||||
|
await self._on_ws_connect()
|
||||||
|
|
||||||
|
# 处理消息
|
||||||
|
message: aiohttp.WSMessage
|
||||||
|
async for message in websocket:
|
||||||
|
await self._on_ws_message(message)
|
||||||
|
# 至少成功处理1条消息
|
||||||
|
retry_count = 0
|
||||||
|
|
||||||
|
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
||||||
|
# 掉线重连
|
||||||
|
pass
|
||||||
|
except client.AuthError:
|
||||||
|
# 认证失败了,应该重新获取auth_body再重连
|
||||||
|
logger.exception('room=%d auth failed, trying init_room() again', self.room_id)
|
||||||
|
if not await self.init_room():
|
||||||
|
raise client.InitError('init_room() failed')
|
||||||
|
except ssl_.SSLError:
|
||||||
|
logger.error('room=%d a SSLError happened, cannot reconnect', self.room_id)
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
self._websocket = None
|
||||||
|
await self._on_ws_close()
|
||||||
|
|
||||||
|
# 准备重连
|
||||||
|
retry_count += 1
|
||||||
|
logger.warning('room=%d is reconnecting, retry_count=%d', self.room_id, retry_count)
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
async def _send_auth(self):
|
||||||
|
"""
|
||||||
|
发送认证包
|
||||||
|
"""
|
||||||
|
auth_body = json.loads(self._auth_body)
|
||||||
|
await self._websocket.send_bytes(self._make_packet(auth_body, client.Operation.AUTH))
|
||||||
|
@ -1,37 +1,47 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
import blivedm
|
import blivedm
|
||||||
|
import blivedm.open_live_client as open_live_client
|
||||||
|
|
||||||
TEST_AUTH_CODE = ''
|
|
||||||
APP_ID = ''
|
|
||||||
ACCESS_KEY = ''
|
ACCESS_KEY = ''
|
||||||
ACCESS_KEY_SECRET = ''
|
ACCESS_SECRET = ''
|
||||||
|
APP_ID = 0
|
||||||
|
ROOM_OWNER_AUTH_CODE = ''
|
||||||
|
|
||||||
class OpenLiveHandlerInterface:
|
|
||||||
"""
|
|
||||||
开放平台直播消息处理器接口
|
|
||||||
"""
|
|
||||||
|
|
||||||
async def handle(self, client: blivedm.BLiveClient, command: dict):
|
|
||||||
print(f'{command}')
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
await run_start()
|
await run_single_client()
|
||||||
|
|
||||||
async def run_start():
|
|
||||||
client = blivedm.BLiveClient(use_open_live=True, open_live_app_id=APP_ID, open_live_access_key=ACCESS_KEY, open_live_access_secret=ACCESS_KEY_SECRET, open_live_code=TEST_AUTH_CODE, ssl=True)
|
async def run_single_client():
|
||||||
handler = OpenLiveHandlerInterface()
|
"""
|
||||||
|
演示监听一个直播间
|
||||||
|
"""
|
||||||
|
client = open_live_client.OpenLiveClient(
|
||||||
|
access_key=ACCESS_KEY,
|
||||||
|
access_secret=ACCESS_SECRET,
|
||||||
|
app_id=APP_ID,
|
||||||
|
room_owner_auth_code=ROOM_OWNER_AUTH_CODE,
|
||||||
|
)
|
||||||
|
handler = MyHandler()
|
||||||
client.add_handler(handler)
|
client.add_handler(handler)
|
||||||
|
|
||||||
client.start()
|
client.start()
|
||||||
try:
|
try:
|
||||||
# 演示20秒后停止
|
# 演示70秒后停止
|
||||||
await asyncio.sleep(60)
|
await asyncio.sleep(70)
|
||||||
client.stop()
|
client.stop()
|
||||||
|
|
||||||
await client.join()
|
await client.join()
|
||||||
finally:
|
finally:
|
||||||
await client.stop_and_close()
|
await client.stop_and_close()
|
||||||
|
|
||||||
|
|
||||||
|
class MyHandler(blivedm.HandlerInterface):
|
||||||
|
async def handle(self, client: open_live_client.OpenLiveClient, command: dict):
|
||||||
|
print(command)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
Loading…
Reference in New Issue
Block a user