From a52af864ad4ac9ff732db175cfbfec77acf8e91b Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 3 Sep 2023 12:22:01 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8A=BD=E5=87=BA=E5=AE=A2=E6=88=B7=E7=AB=AF?= =?UTF-8?q?=E5=9F=BA=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- blivedm/__init__.py | 3 +- blivedm/clients/__init__.py | 3 + .../open_live.py} | 101 ++----- blivedm/clients/web.py | 272 ++++++++++++++++++ blivedm/{client.py => clients/ws_base.py} | 242 ++-------------- blivedm/handlers.py | 32 +-- blivedm/models/__init__.py | 2 +- open_live_sample.py | 5 +- 8 files changed, 342 insertions(+), 318 deletions(-) create mode 100644 blivedm/clients/__init__.py rename blivedm/{open_live_client.py => clients/open_live.py} (75%) create mode 100644 blivedm/clients/web.py rename blivedm/{client.py => clients/ws_base.py} (64%) diff --git a/blivedm/__init__.py b/blivedm/__init__.py index 92170b5..01259a4 100644 --- a/blivedm/__init__.py +++ b/blivedm/__init__.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- from .models import * from .handlers import * -from .client import * -from .open_live_client import * +from .clients import * diff --git a/blivedm/clients/__init__.py b/blivedm/clients/__init__.py new file mode 100644 index 0000000..13974f0 --- /dev/null +++ b/blivedm/clients/__init__.py @@ -0,0 +1,3 @@ +# -*- coding: utf-8 -*- +from .web import * +from .open_live import * diff --git a/blivedm/open_live_client.py b/blivedm/clients/open_live.py similarity index 75% rename from blivedm/open_live_client.py rename to blivedm/clients/open_live.py index bd50087..89301e6 100644 --- a/blivedm/open_live_client.py +++ b/blivedm/clients/open_live.py @@ -1,17 +1,21 @@ # -*- coding: utf-8 -*- import asyncio +import datetime import hashlib import hmac import json import logging import random import ssl as ssl_ -import datetime from typing import * import aiohttp -from . import client, handlers +from . import ws_base + +__all__ = ( + 'OpenLiveClient', +) logger = logging.getLogger('blivedm') @@ -20,10 +24,9 @@ HEARTBEAT_URL = 'https://live-open.biliapi.com/v2/app/heartbeat' END_URL = 'https://live-open.biliapi.com/v2/app/end' -# TODO 抽出公共基类,现在BLiveClient和OpenLiveClient还有不重合的代码 -class OpenLiveClient(client.BLiveClient): +class OpenLiveClient(ws_base.WebSocketClientBase): """ - B站直播开放平台客户端,负责连接房间 + 开放平台客户端 文档参考:https://open-live.bilibili.com/document/ @@ -43,41 +46,28 @@ class OpenLiveClient(client.BLiveClient): access_secret: str, app_id: int, room_owner_auth_code: str, + *, session: Optional[aiohttp.ClientSession] = None, heartbeat_interval=30, game_heartbeat_interval=20, ssl: Union[bool, ssl_.SSLContext] = True, ): + super().__init__(session, heartbeat_interval, ssl) + self._access_key = access_key self._access_secret = access_secret self._app_id = app_id self._room_owner_auth_code = room_owner_auth_code - - if session is None: - self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) - self._own_session = True - else: - self._session = session - self._own_session = False - assert self._session.loop is asyncio.get_event_loop() # noqa - - self._heartbeat_interval = heartbeat_interval self._game_heartbeat_interval = game_heartbeat_interval - self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa - - self._handlers: List[handlers.HandlerInterface] = [] - """消息处理器,可动态增删""" # 在调用init_room后初始化的字段 - self._room_id = None - """真实房间ID""" - self._room_owner_uid = None + self._room_owner_uid: Optional[int] = None """主播用户ID""" - self._host_server_list: Optional[List[str]] = [] + self._host_server_url_list: Optional[List[str]] = [] """弹幕服务器URL列表""" - self._auth_body = None + self._auth_body: Optional[str] = None """连接弹幕服务器用的认证包内容""" - self._game_id = None + self._game_id: Optional[str] = None """项目场次ID,仅用于互动玩法类项目,其他项目为空字符串""" # 在运行时初始化的字段 @@ -90,13 +80,6 @@ class OpenLiveClient(client.BLiveClient): self._game_heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None """发项目心跳包定时器的handle""" - @property - def room_id(self) -> Optional[int]: - """ - 房间ID,调用init_room后初始化 - """ - return self._room_id - @property def room_owner_uid(self) -> Optional[int]: """ @@ -203,7 +186,7 @@ class OpenLiveClient(client.BLiveClient): self._game_id = data['game_info']['game_id'] websocket_info = data['websocket_info'] self._auth_body = websocket_info['auth_body'] - self._host_server_list = websocket_info['wss_link'] + self._host_server_url_list = websocket_info['wss_link'] anchor_info = data['anchor_info'] self._room_id = anchor_info['room_id'] self._room_owner_uid = anchor_info['uid'] @@ -275,58 +258,24 @@ class OpenLiveClient(client.BLiveClient): return False return True - async def _network_coroutine(self): + async def _on_network_coroutine_start(self): """ - 网络协程,负责连接服务器、接收消息、解包 + 在_network_coroutine开头运行,可以用来初始化房间 """ # 如果之前未初始化则初始化 if self._auth_body is None: if not await self.init_room(): - raise client.InitError('init_room() failed') + raise ws_base.InitError('init_room() failed') - retry_count = 0 - while True: - try: - # 连接 - host_server_url = self._host_server_list[retry_count % len(self._host_server_list)] - async with self._session.ws_connect( - host_server_url, - receive_timeout=self._heartbeat_interval + 5, - ssl=self._ssl - ) as websocket: - self._websocket = websocket - await self._on_ws_connect() - - # 处理消息 - message: aiohttp.WSMessage - async for message in websocket: - await self._on_ws_message(message) - # 至少成功处理1条消息 - retry_count = 0 - - except (aiohttp.ClientConnectionError, asyncio.TimeoutError): - # 掉线重连 - pass - except client.AuthError: - # 认证失败了,应该重新获取auth_body再重连 - logger.exception('room=%d auth failed, trying init_room() again', self.room_id) - if not await self.init_room(): - raise client.InitError('init_room() failed') - except ssl_.SSLError: - logger.error('room=%d a SSLError happened, cannot reconnect', self.room_id) - raise - finally: - self._websocket = None - await self._on_ws_close() - - # 准备重连 - retry_count += 1 - logger.warning('room=%d is reconnecting, retry_count=%d', self.room_id, retry_count) - await asyncio.sleep(1) + def _get_ws_url(self, retry_count) -> str: + """ + 返回WebSocket连接的URL,可以在这里做故障转移和负载均衡 + """ + return self._host_server_url_list[retry_count % len(self._host_server_url_list)] async def _send_auth(self): """ 发送认证包 """ auth_body = json.loads(self._auth_body) - await self._websocket.send_bytes(self._make_packet(auth_body, client.Operation.AUTH)) + await self._websocket.send_bytes(self._make_packet(auth_body, ws_base.Operation.AUTH)) diff --git a/blivedm/clients/web.py b/blivedm/clients/web.py new file mode 100644 index 0000000..e62313e --- /dev/null +++ b/blivedm/clients/web.py @@ -0,0 +1,272 @@ +# -*- coding: utf-8 -*- +import asyncio +import logging +import ssl as ssl_ +from typing import * + +import aiohttp +import yarl + +from . import ws_base + +__all__ = ( + 'BLiveClient', +) + +logger = logging.getLogger('blivedm') + +USER_AGENT = ( + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36' +) + +UID_INIT_URL = 'https://api.bilibili.com/x/web-interface/nav' +BUVID_INIT_URL = 'https://data.bilibili.com/v/' +ROOM_INIT_URL = 'https://api.live.bilibili.com/xlive/web-room/v1/index/getInfoByRoom' +DANMAKU_SERVER_CONF_URL = 'https://api.live.bilibili.com/xlive/web-room/v1/index/getDanmuInfo' +DEFAULT_DANMAKU_SERVER_LIST = [ + {'host': 'broadcastlv.chat.bilibili.com', 'port': 2243, 'wss_port': 443, 'ws_port': 2244} +] + + +class BLiveClient(ws_base.WebSocketClientBase): + """ + web端客户端 + + :param room_id: URL中的房间ID,可以用短ID + :param uid: B站用户ID,0表示未登录,None表示自动获取 + :param session: cookie、连接池 + :param heartbeat_interval: 发送心跳包的间隔时间(秒) + :param ssl: True表示用默认的SSLContext验证,False表示不验证,也可以传入SSLContext + """ + + def __init__( + self, + room_id: int, + *, + uid: Optional[int] = None, + session: Optional[aiohttp.ClientSession] = None, + heartbeat_interval=30, + ssl: Union[bool, ssl_.SSLContext] = True, + ): + super().__init__(session, heartbeat_interval, ssl) + + self._tmp_room_id = room_id + """用来init_room的临时房间ID,可以用短ID""" + self._uid = uid + + # 在调用init_room后初始化的字段 + # TODO 移除短ID + self._room_short_id: Optional[int] = None + """房间短ID,没有则为0""" + self._room_owner_uid: Optional[int] = None + """主播用户ID""" + self._host_server_list: Optional[List[dict]] = None + """ + 弹幕服务器列表 + + `[{host: "tx-bj4-live-comet-04.chat.bilibili.com", port: 2243, wss_port: 443, ws_port: 2244}, ...]` + """ + self._host_server_token: Optional[str] = None + """连接弹幕服务器用的token""" + + @property + def room_short_id(self) -> Optional[int]: + """ + 房间短ID,没有则为0,调用init_room后初始化 + """ + return self._room_short_id + + @property + def room_owner_uid(self) -> Optional[int]: + """ + 主播用户ID,调用init_room后初始化 + """ + return self._room_owner_uid + + @property + def uid(self) -> Optional[int]: + """ + 当前登录的用户ID,未登录则为0,调用init_room后初始化 + """ + return self._uid + + async def init_room(self): + """ + 初始化连接房间需要的字段 + + :return: True代表没有降级,如果需要降级后还可用,重载这个函数返回True + """ + if self._uid is None: + if not await self._init_uid(): + logger.warning('room=%d _init_uid() failed', self._tmp_room_id) + self._uid = 0 + + if self._get_buvid() == '': + if not await self._init_buvid(): + logger.warning('room=%d _init_buvid() failed', self._tmp_room_id) + + res = True + if not await self._init_room_id_and_owner(): + res = False + # 失败了则降级 + self._room_id = self._room_short_id = self._tmp_room_id + self._room_owner_uid = 0 + + if not await self._init_host_server(): + res = False + # 失败了则降级 + self._host_server_list = DEFAULT_DANMAKU_SERVER_LIST + self._host_server_token = None + return res + + async def _init_uid(self): + try: + async with self._session.get( + UID_INIT_URL, + headers={'User-Agent': USER_AGENT}, + ssl=self._ssl + ) as res: + if res.status != 200: + logger.warning('room=%d _init_uid() failed, status=%d, reason=%s', self._tmp_room_id, + res.status, res.reason) + return False + data = await res.json() + if data['code'] != 0: + if data['code'] == -101: + # 未登录 + self._uid = 0 + return True + logger.warning('room=%d _init_uid() failed, message=%s', self._tmp_room_id, + data['message']) + return False + + data = data['data'] + if not data['isLogin']: + # 未登录 + self._uid = 0 + else: + self._uid = data['mid'] + return True + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): + logger.exception('room=%d _init_uid() failed:', self._tmp_room_id) + return False + + def _get_buvid(self): + cookies = self._session.cookie_jar.filter_cookies(yarl.URL(BUVID_INIT_URL)) + buvid_cookie = cookies.get('buvid3', None) + if buvid_cookie is None: + return '' + return buvid_cookie.value + + async def _init_buvid(self): + try: + async with self._session.get( + BUVID_INIT_URL, + headers={'User-Agent': USER_AGENT}, + ssl=self._ssl + ) as res: + if res.status != 200: + logger.warning('room=%d _init_buvid() status error, status=%d, reason=%s', + self._tmp_room_id, res.status, res.reason) + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): + logger.exception('room=%d _init_buvid() exception:', self._tmp_room_id) + return self._get_buvid() != '' + + async def _init_room_id_and_owner(self): + try: + async with self._session.get( + ROOM_INIT_URL, + headers={'User-Agent': USER_AGENT}, + params={ + 'room_id': self._tmp_room_id + }, + ssl=self._ssl + ) as res: + if res.status != 200: + logger.warning('room=%d _init_room_id_and_owner() failed, status=%d, reason=%s', self._tmp_room_id, + res.status, res.reason) + return False + data = await res.json() + if data['code'] != 0: + logger.warning('room=%d _init_room_id_and_owner() failed, message=%s', self._tmp_room_id, + data['message']) + return False + if not self._parse_room_init(data['data']): + return False + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): + logger.exception('room=%d _init_room_id_and_owner() failed:', self._tmp_room_id) + return False + return True + + def _parse_room_init(self, data): + room_info = data['room_info'] + self._room_id = room_info['room_id'] + self._room_short_id = room_info['short_id'] + self._room_owner_uid = room_info['uid'] + return True + + async def _init_host_server(self): + try: + async with self._session.get( + DANMAKU_SERVER_CONF_URL, + headers={'User-Agent': USER_AGENT}, + params={ + 'id': self._room_id, + 'type': 0 + }, + ssl=self._ssl + ) as res: + if res.status != 200: + logger.warning('room=%d _init_host_server() failed, status=%d, reason=%s', self._room_id, + res.status, res.reason) + return False + data = await res.json() + if data['code'] != 0: + logger.warning('room=%d _init_host_server() failed, message=%s', self._room_id, data['message']) + return False + if not self._parse_danmaku_server_conf(data['data']): + return False + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): + logger.exception('room=%d _init_host_server() failed:', self._room_id) + return False + return True + + def _parse_danmaku_server_conf(self, data): + self._host_server_list = data['host_list'] + self._host_server_token = data['token'] + if not self._host_server_list: + logger.warning('room=%d _parse_danmaku_server_conf() failed: host_server_list is empty', self._room_id) + return False + return True + + async def _on_network_coroutine_start(self): + """ + 在_network_coroutine开头运行,可以用来初始化房间 + """ + # 如果之前未初始化则初始化 + if self._host_server_token is None: + if not await self.init_room(): + raise ws_base.InitError('init_room() failed') + + def _get_ws_url(self, retry_count) -> str: + """ + 返回WebSocket连接的URL,可以在这里做故障转移和负载均衡 + """ + host_server = self._host_server_list[retry_count % len(self._host_server_list)] + return f"wss://{host_server['host']}:{host_server['wss_port']}/sub" + + async def _send_auth(self): + """ + 发送认证包 + """ + auth_params = { + 'uid': self._uid, + 'roomid': self._room_id, + 'protover': 3, + 'platform': 'web', + 'type': 2, + 'buvid': self._get_buvid(), + } + if self._host_server_token is not None: + auth_params['key'] = self._host_server_token + await self._websocket.send_bytes(self._make_packet(auth_params, ws_base.Operation.AUTH)) diff --git a/blivedm/client.py b/blivedm/clients/ws_base.py similarity index 64% rename from blivedm/client.py rename to blivedm/clients/ws_base.py index 1699569..41495b9 100644 --- a/blivedm/client.py +++ b/blivedm/clients/ws_base.py @@ -10,13 +10,8 @@ from typing import * import aiohttp import brotli -import yarl -from . import handlers - -__all__ = ( - 'BLiveClient', -) +from .. import handlers logger = logging.getLogger('blivedm') @@ -24,14 +19,6 @@ USER_AGENT = ( 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36' ) -UID_INIT_URL = 'https://api.bilibili.com/x/web-interface/nav' -BUVID_INIT_URL = 'https://data.bilibili.com/v/' -ROOM_INIT_URL = 'https://api.live.bilibili.com/xlive/web-room/v1/index/getInfoByRoom' -DANMAKU_SERVER_CONF_URL = 'https://api.live.bilibili.com/xlive/web-room/v1/index/getDanmuInfo' -DEFAULT_DANMAKU_SERVER_LIST = [ - {'host': 'broadcastlv.chat.bilibili.com', 'port': 2243, 'wss_port': 443, 'ws_port': 2244} -] - HEADER_STRUCT = struct.Struct('>I2H2I') @@ -90,12 +77,10 @@ class AuthError(Exception): """认证失败""" -class BLiveClient: +class WebSocketClientBase: """ - B站直播弹幕客户端,负责连接房间 + 基于WebSocket的客户端 - :param room_id: URL中的房间ID,可以用短ID - :param uid: B站用户ID,0表示未登录,None表示自动获取 :param session: cookie、连接池 :param heartbeat_interval: 发送心跳包的间隔时间(秒) :param ssl: True表示用默认的SSLContext验证,False表示不验证,也可以传入SSLContext @@ -103,16 +88,10 @@ class BLiveClient: def __init__( self, - room_id, - uid=None, session: Optional[aiohttp.ClientSession] = None, - heartbeat_interval=30, + heartbeat_interval: float = 30, ssl: Union[bool, ssl_.SSLContext] = True, ): - self._tmp_room_id = room_id - """用来init_room的临时房间ID,可以用短ID""" - self._uid = uid - if session is None: self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) self._own_session = True @@ -122,6 +101,7 @@ class BLiveClient: assert self._session.loop is asyncio.get_event_loop() # noqa self._heartbeat_interval = heartbeat_interval + # TODO 移除SSL配置 self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa # TODO 没必要支持多个handler,改成单个吧 @@ -129,19 +109,7 @@ class BLiveClient: """消息处理器,可动态增删""" # 在调用init_room后初始化的字段 - self._room_id = None - """真实房间ID""" - self._room_short_id = None - """房间短ID,没有则为0""" - self._room_owner_uid = None - """主播用户ID""" - self._host_server_list: Optional[List[dict]] = None - """ - 弹幕服务器列表 - [{host: "tx-bj4-live-comet-04.chat.bilibili.com", port: 2243, wss_port: 443, ws_port: 2244}, ...] - """ - self._host_server_token = None - """连接弹幕服务器用的token""" + self._room_id: Optional[int] = None # 在运行时初始化的字段 self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None @@ -165,27 +133,6 @@ class BLiveClient: """ return self._room_id - @property - def room_short_id(self) -> Optional[int]: - """ - 房间短ID,没有则为0,调用init_room后初始化 - """ - return self._room_short_id - - @property - def room_owner_uid(self) -> Optional[int]: - """ - 主播用户ID,调用init_room后初始化 - """ - return self._room_owner_uid - - @property - def uid(self) -> Optional[int]: - """ - 当前登录的用户ID,未登录则为0,调用init_room后初始化 - """ - return self._uid - def add_handler(self, handler: 'handlers.HandlerInterface'): """ 添加消息处理器 @@ -258,154 +205,13 @@ class BLiveClient: if self._own_session: await self._session.close() - async def init_room(self): + async def init_room(self) -> bool: """ 初始化连接房间需要的字段 :return: True代表没有降级,如果需要降级后还可用,重载这个函数返回True """ - if self._uid is None: - if not await self._init_uid(): - logger.warning('room=%d _init_uid() failed', self._tmp_room_id) - self._uid = 0 - - if self._get_buvid() == '': - if not await self._init_buvid(): - logger.warning('room=%d _init_buvid() failed', self._tmp_room_id) - - res = True - if not await self._init_room_id_and_owner(): - res = False - # 失败了则降级 - self._room_id = self._room_short_id = self._tmp_room_id - self._room_owner_uid = 0 - - if not await self._init_host_server(): - res = False - # 失败了则降级 - self._host_server_list = DEFAULT_DANMAKU_SERVER_LIST - self._host_server_token = None - return res - - async def _init_uid(self): - try: - async with self._session.get( - UID_INIT_URL, - headers={'User-Agent': USER_AGENT}, - ssl=self._ssl - ) as res: - if res.status != 200: - logger.warning('room=%d _init_uid() failed, status=%d, reason=%s', self._tmp_room_id, - res.status, res.reason) - return False - data = await res.json() - if data['code'] != 0: - if data['code'] == -101: - # 未登录 - self._uid = 0 - return True - logger.warning('room=%d _init_uid() failed, message=%s', self._tmp_room_id, - data['message']) - return False - - data = data['data'] - if not data['isLogin']: - # 未登录 - self._uid = 0 - else: - self._uid = data['mid'] - return True - except (aiohttp.ClientConnectionError, asyncio.TimeoutError): - logger.exception('room=%d _init_uid() failed:', self._tmp_room_id) - return False - - def _get_buvid(self): - cookies = self._session.cookie_jar.filter_cookies(yarl.URL(BUVID_INIT_URL)) - buvid_cookie = cookies.get('buvid3', None) - if buvid_cookie is None: - return '' - return buvid_cookie.value - - async def _init_buvid(self): - try: - async with self._session.get( - BUVID_INIT_URL, - headers={'User-Agent': USER_AGENT}, - ssl=self._ssl - ) as res: - if res.status != 200: - logger.warning('room=%d _init_buvid() status error, status=%d, reason=%s', - self._tmp_room_id, res.status, res.reason) - except (aiohttp.ClientConnectionError, asyncio.TimeoutError): - logger.exception('room=%d _init_buvid() exception:', self._tmp_room_id) - return self._get_buvid() != '' - - async def _init_room_id_and_owner(self): - try: - async with self._session.get( - ROOM_INIT_URL, - headers={'User-Agent': USER_AGENT}, - params={ - 'room_id': self._tmp_room_id - }, - ssl=self._ssl - ) as res: - if res.status != 200: - logger.warning('room=%d _init_room_id_and_owner() failed, status=%d, reason=%s', self._tmp_room_id, - res.status, res.reason) - return False - data = await res.json() - if data['code'] != 0: - logger.warning('room=%d _init_room_id_and_owner() failed, message=%s', self._tmp_room_id, - data['message']) - return False - if not self._parse_room_init(data['data']): - return False - except (aiohttp.ClientConnectionError, asyncio.TimeoutError): - logger.exception('room=%d _init_room_id_and_owner() failed:', self._tmp_room_id) - return False - return True - - def _parse_room_init(self, data): - room_info = data['room_info'] - self._room_id = room_info['room_id'] - self._room_short_id = room_info['short_id'] - self._room_owner_uid = room_info['uid'] - return True - - async def _init_host_server(self): - try: - async with self._session.get( - DANMAKU_SERVER_CONF_URL, - headers={'User-Agent': USER_AGENT}, - params={ - 'id': self._room_id, - 'type': 0 - }, - ssl=self._ssl - ) as res: - if res.status != 200: - logger.warning('room=%d _init_host_server() failed, status=%d, reason=%s', self._room_id, - res.status, res.reason) - return False - data = await res.json() - if data['code'] != 0: - logger.warning('room=%d _init_host_server() failed, message=%s', self._room_id, data['message']) - return False - if not self._parse_danmaku_server_conf(data['data']): - return False - except (aiohttp.ClientConnectionError, asyncio.TimeoutError): - logger.exception('room=%d _init_host_server() failed:', self._room_id) - return False - return True - - def _parse_danmaku_server_conf(self, data): - self._host_server_list = data['host_list'] - self._host_server_token = data['token'] - if not self._host_server_list: - logger.warning('room=%d _parse_danmaku_server_conf() failed: host_server_list is empty', self._room_id) - return False - return True + raise NotImplementedError @staticmethod def _make_packet(data: dict, operation: int) -> bytes: @@ -445,19 +251,14 @@ class BLiveClient: """ 网络协程,负责连接服务器、接收消息、解包 """ - # 如果之前未初始化则初始化 - if self._host_server_token is None: - if not await self.init_room(): - raise InitError('init_room() failed') + await self._on_network_coroutine_start() retry_count = 0 while True: try: # 连接 - host_server = self._host_server_list[retry_count % len(self._host_server_list)] async with self._session.ws_connect( - f"wss://{host_server['host']}:{host_server['wss_port']}/sub", - headers={'User-Agent': USER_AGENT}, + self._get_ws_url(retry_count), receive_timeout=self._heartbeat_interval + 5, ssl=self._ssl ) as websocket: @@ -491,6 +292,17 @@ class BLiveClient: logger.warning('room=%d is reconnecting, retry_count=%d', self.room_id, retry_count) await asyncio.sleep(1) + async def _on_network_coroutine_start(self): + """ + 在_network_coroutine开头运行,可以用来初始化房间 + """ + + def _get_ws_url(self, retry_count) -> str: + """ + 返回WebSocket连接的URL,可以在这里做故障转移和负载均衡 + """ + raise NotImplementedError + async def _on_ws_connect(self): """ WebSocket连接成功 @@ -512,17 +324,7 @@ class BLiveClient: """ 发送认证包 """ - auth_params = { - 'uid': self._uid, - 'roomid': self._room_id, - 'protover': 3, - 'platform': 'web', - 'type': 2, - 'buvid': self._get_buvid(), - } - if self._host_server_token is not None: - auth_params['key'] = self._host_server_token - await self._websocket.send_bytes(self._make_packet(auth_params, Operation.AUTH)) + raise NotImplementedError def _on_send_heartbeat(self): """ diff --git a/blivedm/handlers.py b/blivedm/handlers.py index 614aecb..bdb2e8c 100644 --- a/blivedm/handlers.py +++ b/blivedm/handlers.py @@ -2,7 +2,7 @@ import logging from typing import * -from . import client as client_ +from .clients import ws_base from . import models __all__ = ( @@ -48,7 +48,7 @@ class HandlerInterface: 直播消息处理器接口 """ - async def handle(self, client: client_.BLiveClient, command: dict): + async def handle(self, client: ws_base.WebSocketClientBase, command: dict): raise NotImplementedError # TODO 加个异常停止的回调 @@ -59,28 +59,28 @@ class BaseHandler(HandlerInterface): 一个简单的消息处理器实现,带消息分发和消息类型转换。继承并重写_on_xxx方法即可实现自己的处理器 """ - def __heartbeat_callback(self, client: client_.BLiveClient, command: dict): + def __heartbeat_callback(self, client: ws_base.WebSocketClientBase, command: dict): return self._on_heartbeat(client, models.HeartbeatMessage.from_command(command['data'])) - def __danmu_msg_callback(self, client: client_.BLiveClient, command: dict): + def __danmu_msg_callback(self, client: ws_base.WebSocketClientBase, command: dict): return self._on_danmaku(client, models.DanmakuMessage.from_command(command['info'], command.get('dm_v2', ''))) - def __send_gift_callback(self, client: client_.BLiveClient, command: dict): + def __send_gift_callback(self, client: ws_base.WebSocketClientBase, command: dict): return self._on_gift(client, models.GiftMessage.from_command(command['data'])) - def __guard_buy_callback(self, client: client_.BLiveClient, command: dict): + def __guard_buy_callback(self, client: ws_base.WebSocketClientBase, command: dict): return self._on_buy_guard(client, models.GuardBuyMessage.from_command(command['data'])) - def __super_chat_message_callback(self, client: client_.BLiveClient, command: dict): + def __super_chat_message_callback(self, client: ws_base.WebSocketClientBase, command: dict): return self._on_super_chat(client, models.SuperChatMessage.from_command(command['data'])) - def __super_chat_message_delete_callback(self, client: client_.BLiveClient, command: dict): + def __super_chat_message_delete_callback(self, client: ws_base.WebSocketClientBase, command: dict): return self._on_super_chat_delete(client, models.SuperChatDeleteMessage.from_command(command['data'])) _CMD_CALLBACK_DICT: Dict[ str, Optional[Callable[ - ['BaseHandler', client_.BLiveClient, dict], + ['BaseHandler', ws_base.WebSocketClientBase, dict], Awaitable ]] ] = { @@ -104,7 +104,7 @@ class BaseHandler(HandlerInterface): _CMD_CALLBACK_DICT[cmd] = None del cmd - async def handle(self, client: client_.BLiveClient, command: dict): + async def handle(self, client: ws_base.WebSocketClientBase, command: dict): cmd = command.get('cmd', '') pos = cmd.find(':') # 2019-5-29 B站弹幕升级新增了参数 if pos != -1: @@ -121,32 +121,32 @@ class BaseHandler(HandlerInterface): if callback is not None: await callback(self, client, command) - async def _on_heartbeat(self, client: client_.BLiveClient, message: models.HeartbeatMessage): + async def _on_heartbeat(self, client: ws_base.WebSocketClientBase, message: models.HeartbeatMessage): """ 收到心跳包(人气值) """ - async def _on_danmaku(self, client: client_.BLiveClient, message: models.DanmakuMessage): + async def _on_danmaku(self, client: ws_base.WebSocketClientBase, message: models.DanmakuMessage): """ 收到弹幕 """ - async def _on_gift(self, client: client_.BLiveClient, message: models.GiftMessage): + async def _on_gift(self, client: ws_base.WebSocketClientBase, message: models.GiftMessage): """ 收到礼物 """ - async def _on_buy_guard(self, client: client_.BLiveClient, message: models.GuardBuyMessage): + async def _on_buy_guard(self, client: ws_base.WebSocketClientBase, message: models.GuardBuyMessage): """ 有人上舰 """ - async def _on_super_chat(self, client: client_.BLiveClient, message: models.SuperChatMessage): + async def _on_super_chat(self, client: ws_base.WebSocketClientBase, message: models.SuperChatMessage): """ 醒目留言 """ - async def _on_super_chat_delete(self, client: client_.BLiveClient, message: models.SuperChatDeleteMessage): + async def _on_super_chat_delete(self, client: ws_base.WebSocketClientBase, message: models.SuperChatDeleteMessage): """ 删除醒目留言 """ diff --git a/blivedm/models/__init__.py b/blivedm/models/__init__.py index fc9e9d3..a88aef4 100644 --- a/blivedm/models/__init__.py +++ b/blivedm/models/__init__.py @@ -24,7 +24,7 @@ class HeartbeatMessage: """ popularity: int = 0 - """人气值""" + """人气值,已废弃""" @classmethod def from_command(cls, data: dict): diff --git a/open_live_sample.py b/open_live_sample.py index dce2a99..188a592 100644 --- a/open_live_sample.py +++ b/open_live_sample.py @@ -2,7 +2,6 @@ import asyncio import blivedm -import blivedm.open_live_client as open_live_client ACCESS_KEY = '' ACCESS_SECRET = '' @@ -18,7 +17,7 @@ async def run_single_client(): """ 演示监听一个直播间 """ - client = open_live_client.OpenLiveClient( + client = blivedm.OpenLiveClient( access_key=ACCESS_KEY, access_secret=ACCESS_SECRET, app_id=APP_ID, @@ -39,7 +38,7 @@ async def run_single_client(): class MyHandler(blivedm.HandlerInterface): - async def handle(self, client: open_live_client.OpenLiveClient, command: dict): + async def handle(self, client: blivedm.OpenLiveClient, command: dict): print(command)