From 300840fd65d90ba9954e0c13894c6f037476a6f4 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 3 Sep 2023 18:09:12 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E9=99=A4=E4=BA=86=E4=B8=80=E5=A0=86?= =?UTF-8?q?=E6=B2=A1=E7=94=A8=E7=9A=84=E7=89=B9=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 消息处理接口改成同步函数 消息处理器从数组改成单个对象 移除web端短ID属性 移除SSL配置 消息处理器添加客户端异常停止的回调 --- blivedm/clients/open_live.py | 9 ++--- blivedm/clients/web.py | 21 +++-------- blivedm/clients/ws_base.py | 70 ++++++++++++++---------------------- blivedm/handlers.py | 37 ++++++++++--------- open_live_sample.py | 16 ++++----- sample.py | 25 ++++++------- 6 files changed, 75 insertions(+), 103 deletions(-) diff --git a/blivedm/clients/open_live.py b/blivedm/clients/open_live.py index 89301e6..4f4c3fd 100644 --- a/blivedm/clients/open_live.py +++ b/blivedm/clients/open_live.py @@ -6,7 +6,6 @@ import hmac import json import logging import random -import ssl as ssl_ from typing import * import aiohttp @@ -37,7 +36,6 @@ class OpenLiveClient(ws_base.WebSocketClientBase): :param session: cookie、连接池 :param heartbeat_interval: 发送连接心跳包的间隔时间(秒) :param game_heartbeat_interval: 发送项目心跳包的间隔时间(秒) - :param ssl: True表示用默认的SSLContext验证,False表示不验证,也可以传入SSLContext """ def __init__( @@ -50,9 +48,8 @@ class OpenLiveClient(ws_base.WebSocketClientBase): session: Optional[aiohttp.ClientSession] = None, heartbeat_interval=30, game_heartbeat_interval=20, - ssl: Union[bool, ssl_.SSLContext] = True, ): - super().__init__(session, heartbeat_interval, ssl) + super().__init__(session, heartbeat_interval) self._access_key = access_key self._access_secret = access_secret @@ -144,7 +141,7 @@ class OpenLiveClient(ws_base.WebSocketClientBase): headers['Content-Type'] = 'application/json' headers['Accept'] = 'application/json' - return self._session.post(url, headers=headers, data=body_bytes, ssl=self._ssl) + return self._session.post(url, headers=headers, data=body_bytes) async def init_room(self): """ @@ -194,7 +191,7 @@ class OpenLiveClient(ws_base.WebSocketClientBase): async def _end_game(self): """ - 关闭项目。互动玩法类项目建议断开连接时保证调用到这个函数(close会调用),否则短时间内无法重复连接同一个房间 + 关闭项目。互动玩法类项目建议断开连接时保证调用到这个函数(close会调用),否则可能短时间内无法重复连接同一个房间 """ if self._game_id in (None, ''): return True diff --git a/blivedm/clients/web.py b/blivedm/clients/web.py index b091bca..48400ae 100644 --- a/blivedm/clients/web.py +++ b/blivedm/clients/web.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- import asyncio import logging -import ssl as ssl_ from typing import * import aiohttp @@ -33,7 +32,6 @@ class BLiveClient(ws_base.WebSocketClientBase): :param uid: B站用户ID,0表示未登录,None表示自动获取 :param session: cookie、连接池 :param heartbeat_interval: 发送心跳包的间隔时间(秒) - :param ssl: True表示用默认的SSLContext验证,False表示不验证,也可以传入SSLContext """ def __init__( @@ -43,18 +41,14 @@ class BLiveClient(ws_base.WebSocketClientBase): uid: Optional[int] = None, session: Optional[aiohttp.ClientSession] = None, heartbeat_interval=30, - ssl: Union[bool, ssl_.SSLContext] = True, ): - super().__init__(session, heartbeat_interval, ssl) + super().__init__(session, heartbeat_interval) self._tmp_room_id = room_id """用来init_room的临时房间ID,可以用短ID""" self._uid = uid # 在调用init_room后初始化的字段 - # TODO 移除短ID - self._room_short_id: Optional[int] = None - """房间短ID,没有则为0""" self._room_owner_uid: Optional[int] = None """主播用户ID""" self._host_server_list: Optional[List[dict]] = None @@ -67,11 +61,11 @@ class BLiveClient(ws_base.WebSocketClientBase): """连接弹幕服务器用的token""" @property - def room_short_id(self) -> Optional[int]: + def tmp_room_id(self) -> int: """ - 房间短ID,没有则为0,调用init_room后初始化 + 构造时传进来的room_id参数 """ - return self._room_short_id + return self._tmp_room_id @property def room_owner_uid(self) -> Optional[int]: @@ -106,7 +100,7 @@ class BLiveClient(ws_base.WebSocketClientBase): if not await self._init_room_id_and_owner(): res = False # 失败了则降级 - self._room_id = self._room_short_id = self._tmp_room_id + self._room_id = self._tmp_room_id self._room_owner_uid = 0 if not await self._init_host_server(): @@ -128,7 +122,6 @@ class BLiveClient(ws_base.WebSocketClientBase): async with self._session.get( UID_INIT_URL, headers={'User-Agent': utils.USER_AGENT}, - ssl=self._ssl ) as res: if res.status != 200: logger.warning('room=%d _init_uid() failed, status=%d, reason=%s', self._tmp_room_id, @@ -167,7 +160,6 @@ class BLiveClient(ws_base.WebSocketClientBase): async with self._session.get( BUVID_INIT_URL, headers={'User-Agent': utils.USER_AGENT}, - ssl=self._ssl ) as res: if res.status != 200: logger.warning('room=%d _init_buvid() status error, status=%d, reason=%s', @@ -184,7 +176,6 @@ class BLiveClient(ws_base.WebSocketClientBase): params={ 'room_id': self._tmp_room_id }, - ssl=self._ssl ) as res: if res.status != 200: logger.warning('room=%d _init_room_id_and_owner() failed, status=%d, reason=%s', self._tmp_room_id, @@ -205,7 +196,6 @@ class BLiveClient(ws_base.WebSocketClientBase): def _parse_room_init(self, data): room_info = data['room_info'] self._room_id = room_info['room_id'] - self._room_short_id = room_info['short_id'] self._room_owner_uid = room_info['uid'] return True @@ -218,7 +208,6 @@ class BLiveClient(ws_base.WebSocketClientBase): 'id': self._room_id, 'type': 0 }, - ssl=self._ssl ) as res: if res.status != 200: logger.warning('room=%d _init_host_server() failed, status=%d, reason=%s', self._room_id, diff --git a/blivedm/clients/ws_base.py b/blivedm/clients/ws_base.py index b569e02..da21755 100644 --- a/blivedm/clients/ws_base.py +++ b/blivedm/clients/ws_base.py @@ -3,7 +3,6 @@ import asyncio import enum import json import logging -import ssl as ssl_ import struct import zlib from typing import * @@ -83,14 +82,12 @@ class WebSocketClientBase: :param session: cookie、连接池 :param heartbeat_interval: 发送心跳包的间隔时间(秒) - :param ssl: True表示用默认的SSLContext验证,False表示不验证,也可以传入SSLContext """ def __init__( self, session: Optional[aiohttp.ClientSession] = None, heartbeat_interval: float = 30, - ssl: Union[bool, ssl_.SSLContext] = True, ): if session is None: self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) @@ -101,12 +98,9 @@ class WebSocketClientBase: assert self._session.loop is asyncio.get_event_loop() # noqa self._heartbeat_interval = heartbeat_interval - # TODO 移除SSL配置 - self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa - # TODO 没必要支持多个handler,改成单个吧 - self._handlers: List[handlers.HandlerInterface] = [] - """消息处理器,可动态增删""" + self._handler: Optional[handlers.HandlerInterface] = None + """消息处理器""" # 在调用init_room后初始化的字段 self._room_id: Optional[int] = None @@ -133,27 +127,16 @@ class WebSocketClientBase: """ return self._room_id - def add_handler(self, handler: 'handlers.HandlerInterface'): + def set_handler(self, handler: Optional['handlers.HandlerInterface']): """ - 添加消息处理器 - 注意多个处理器是并发处理的,不要依赖处理的顺序 - 消息处理器和接收消息运行在同一协程,如果处理消息耗时太长会阻塞接收消息,这种情况建议将消息推到队列,让另一个协程处理 + 设置消息处理器 + + 注意消息处理器和网络协程运行在同一个协程,如果处理消息耗时太长会阻塞接收消息。如果是CPU密集型的任务,建议将消息推到线程池处理; + 如果是IO密集型的任务,应该使用async函数,并且在handler里使用create_task创建新的协程 :param handler: 消息处理器 """ - if handler not in self._handlers: - self._handlers.append(handler) - - def remove_handler(self, handler: 'handlers.HandlerInterface'): - """ - 移除消息处理器 - - :param handler: 消息处理器 - """ - try: - self._handlers.remove(handler) - except ValueError: - pass + self._handler = handler def start(self): """ @@ -236,17 +219,22 @@ class WebSocketClientBase: """ 负责处理网络协程的异常,网络协程具体逻辑在_network_coroutine里 """ + exc = None try: await self._network_coroutine() except asyncio.CancelledError: # 正常停止 pass - except Exception: # noqa + except Exception as e: logger.exception('room=%s _network_coroutine() finished with exception:', self.room_id) + exc = e finally: logger.debug('room=%s _network_coroutine() finished', self.room_id) self._network_future = None + if exc is not None and self._handler is not None: + self._handler.on_stopped_by_exception(self, exc) + async def _network_coroutine(self): """ 网络协程,负责连接服务器、接收消息、解包 @@ -261,7 +249,6 @@ class WebSocketClientBase: self._get_ws_url(retry_count), headers={'User-Agent': utils.USER_AGENT}, # web端的token也会签名UA receive_timeout=self._heartbeat_interval + 5, - ssl=self._ssl ) as websocket: self._websocket = websocket await self._on_ws_connect() @@ -281,9 +268,6 @@ class WebSocketClientBase: logger.exception('room=%d auth failed, trying init_room() again', self.room_id) if not await self.init_room(): raise InitError('init_room() failed') - except ssl_.SSLError: - logger.error('room=%d a SSLError happened, cannot reconnect', self.room_id) - raise finally: self._websocket = None await self._on_ws_close() @@ -414,7 +398,7 @@ class WebSocketClientBase: 'popularity': popularity } } - await self._handle_command(body) + self._handle_command(body) else: # 未知消息 @@ -441,7 +425,7 @@ class WebSocketClientBase: if len(body) != 0: try: body = json.loads(body.decode('utf-8')) - await self._handle_command(body) + self._handle_command(body) except asyncio.CancelledError: raise except Exception: @@ -464,19 +448,17 @@ class WebSocketClientBase: logger.warning('room=%d unknown message operation=%d, header=%s, body=%s', self.room_id, header.operation, header, body) - async def _handle_command(self, command: dict): + def _handle_command(self, command: dict): """ - 解析并处理业务消息 + 处理业务消息 :param command: 业务消息 """ - # TODO 考虑解析完整个WS包后再一次处理所有消息。另外用call_soon就不会阻塞网络协程了,也不用加shield - # 外部代码可能不能正常处理取消,所以这里加shield - results = await asyncio.shield( - asyncio.gather( - *(handler.handle(self, command) for handler in self._handlers), return_exceptions=True - ) - ) - for res in results: - if isinstance(res, Exception): - logger.exception('room=%d _handle_command() failed, command=%s', self.room_id, command, exc_info=res) + try: + # 为什么不做成异步的: + # 1. 为了保持处理消息的顺序,这里不使用call_soon、create_task等方法延迟处理 + # 2. 如果支持handle使用async函数,用户可能会在里面处理耗时很长的异步操作,导致网络协程阻塞 + # 这里做成同步的,强制用户使用create_task或消息队列处理异步操作,这样就不会阻塞网络协程 + self._handler.handle(self, command) + except Exception as e: + logger.exception('room=%d _handle_command() failed, command=%s', self.room_id, command, exc_info=e) diff --git a/blivedm/handlers.py b/blivedm/handlers.py index e879e90..c53cea6 100644 --- a/blivedm/handlers.py +++ b/blivedm/handlers.py @@ -45,10 +45,13 @@ class HandlerInterface: 直播消息处理器接口 """ - async def handle(self, client: ws_base.WebSocketClientBase, command: dict): + def handle(self, client: ws_base.WebSocketClientBase, command: dict): raise NotImplementedError - # TODO 加个异常停止的回调 + def on_stopped_by_exception(self, client: ws_base.WebSocketClientBase, exception: Exception): + """ + 当客户端被异常停止时调用。可以在这里close或者重新start + """ def _make_msg_callback(method_name, message_cls): @@ -72,7 +75,7 @@ class BaseHandler(HandlerInterface): str, Optional[Callable[ ['BaseHandler', ws_base.WebSocketClientBase, dict], - Awaitable + Any ]] ] = { # 收到心跳包,这是blivedm自造的消息,原本的心跳包格式不一样 @@ -110,7 +113,7 @@ class BaseHandler(HandlerInterface): } """cmd -> 处理回调""" - async def handle(self, client: ws_base.WebSocketClientBase, command: dict): + def handle(self, client: ws_base.WebSocketClientBase, command: dict): cmd = command.get('cmd', '') pos = cmd.find(':') # 2019-5-29 B站弹幕升级新增了参数 if pos != -1: @@ -125,34 +128,34 @@ class BaseHandler(HandlerInterface): callback = self._CMD_CALLBACK_DICT[cmd] if callback is not None: - await callback(self, client, command) + callback(self, client, command) - async def _on_heartbeat(self, client: ws_base.WebSocketClientBase, message: web_models.HeartbeatMessage): + def _on_heartbeat(self, client: ws_base.WebSocketClientBase, message: web_models.HeartbeatMessage): """ 收到心跳包 """ - async def _on_danmaku(self, client: ws_base.WebSocketClientBase, message: web_models.DanmakuMessage): + def _on_danmaku(self, client: ws_base.WebSocketClientBase, message: web_models.DanmakuMessage): """ 收到弹幕 """ - async def _on_gift(self, client: ws_base.WebSocketClientBase, message: web_models.GiftMessage): + def _on_gift(self, client: ws_base.WebSocketClientBase, message: web_models.GiftMessage): """ 收到礼物 """ - async def _on_buy_guard(self, client: ws_base.WebSocketClientBase, message: web_models.GuardBuyMessage): + def _on_buy_guard(self, client: ws_base.WebSocketClientBase, message: web_models.GuardBuyMessage): """ 有人上舰 """ - async def _on_super_chat(self, client: ws_base.WebSocketClientBase, message: web_models.SuperChatMessage): + def _on_super_chat(self, client: ws_base.WebSocketClientBase, message: web_models.SuperChatMessage): """ 醒目留言 """ - async def _on_super_chat_delete( + def _on_super_chat_delete( self, client: ws_base.WebSocketClientBase, message: web_models.SuperChatDeleteMessage ): """ @@ -163,36 +166,36 @@ class BaseHandler(HandlerInterface): # 开放平台消息 # - async def _on_open_live_danmaku(self, client: ws_base.WebSocketClientBase, message: open_models.DanmakuMessage): + def _on_open_live_danmaku(self, client: ws_base.WebSocketClientBase, message: open_models.DanmakuMessage): """ 收到弹幕 """ - async def _on_open_live_gift(self, client: ws_base.WebSocketClientBase, message: open_models.GiftMessage): + def _on_open_live_gift(self, client: ws_base.WebSocketClientBase, message: open_models.GiftMessage): """ 收到礼物 """ - async def _on_open_live_buy_guard(self, client: ws_base.WebSocketClientBase, message: open_models.GuardBuyMessage): + def _on_open_live_buy_guard(self, client: ws_base.WebSocketClientBase, message: open_models.GuardBuyMessage): """ 有人上舰 """ - async def _on_open_live_super_chat( + def _on_open_live_super_chat( self, client: ws_base.WebSocketClientBase, message: open_models.SuperChatMessage ): """ 醒目留言 """ - async def _on_open_live_super_chat_delete( + def _on_open_live_super_chat_delete( self, client: ws_base.WebSocketClientBase, message: open_models.SuperChatDeleteMessage ): """ 删除醒目留言 """ - async def _on_open_live_like(self, client: ws_base.WebSocketClientBase, message: open_models.LikeMessage): + def _on_open_live_like(self, client: ws_base.WebSocketClientBase, message: open_models.LikeMessage): """ 点赞 """ diff --git a/open_live_sample.py b/open_live_sample.py index c179674..f963af6 100644 --- a/open_live_sample.py +++ b/open_live_sample.py @@ -29,7 +29,7 @@ async def run_single_client(): room_owner_auth_code=ROOM_OWNER_AUTH_CODE, ) handler = MyHandler() - client.add_handler(handler) + client.set_handler(handler) client.start() try: @@ -43,31 +43,31 @@ async def run_single_client(): class MyHandler(blivedm.BaseHandler): - async def _on_heartbeat(self, client: blivedm.BLiveClient, message: web_models.HeartbeatMessage): + def _on_heartbeat(self, client: blivedm.BLiveClient, message: web_models.HeartbeatMessage): print(f'[{client.room_id}] 心跳') - async def _on_open_live_danmaku(self, client: blivedm.OpenLiveClient, message: open_models.DanmakuMessage): + def _on_open_live_danmaku(self, client: blivedm.OpenLiveClient, message: open_models.DanmakuMessage): print(f'[{message.room_id}] {message.uname}:{message.msg}') - async def _on_open_live_gift(self, client: blivedm.OpenLiveClient, message: open_models.GiftMessage): + def _on_open_live_gift(self, client: blivedm.OpenLiveClient, message: open_models.GiftMessage): coin_type = '金瓜子' if message.paid else '银瓜子' print(f'[{message.room_id}] {message.uname} 赠送{message.gift_name}x{message.gift_num}' f' ({coin_type}x{message.price})') - async def _on_open_live_buy_guard(self, client: blivedm.OpenLiveClient, message: open_models.GuardBuyMessage): + def _on_open_live_buy_guard(self, client: blivedm.OpenLiveClient, message: open_models.GuardBuyMessage): print(f'[{message.room_id}] {message.user_info.uname} 购买 大航海等级={message.guard_level}') - async def _on_open_live_super_chat( + def _on_open_live_super_chat( self, client: blivedm.OpenLiveClient, message: open_models.SuperChatMessage ): print(f'[{message.room_id}] 醒目留言 ¥{message.rmb} {message.uname}:{message.message}') - async def _on_open_live_super_chat_delete( + def _on_open_live_super_chat_delete( self, client: blivedm.OpenLiveClient, message: open_models.SuperChatDeleteMessage ): print(f'[{message.room_id}] 删除醒目留言 message_ids={message.message_ids}') - async def _on_open_live_like(self, client: blivedm.OpenLiveClient, message: open_models.LikeMessage): + def _on_open_live_like(self, client: blivedm.OpenLiveClient, message: open_models.LikeMessage): print(f'[{message.room_id}] {message.uname} 点赞') diff --git a/sample.py b/sample.py index a4bb513..d54ef15 100644 --- a/sample.py +++ b/sample.py @@ -18,6 +18,9 @@ TEST_ROOM_IDS = [ 23105590, ] +# 这里填一个已登录账号的cookie。不填cookie也可以连接,但是收到弹幕的用户名会打码,UID会变成0 +SESSDATA = '' + session: Optional[aiohttp.ClientSession] = None @@ -31,9 +34,8 @@ async def main(): def init_session(): - # 这里填一个已登录账号的cookie。不填cookie也可以连接,但是收到弹幕的用户名会打码,UID会变成0 cookies = http.cookies.SimpleCookie() - cookies['SESSDATA'] = '' + cookies['SESSDATA'] = SESSDATA cookies['SESSDATA']['domain'] = 'bilibili.com' global session @@ -46,10 +48,9 @@ async def run_single_client(): 演示监听一个直播间 """ room_id = random.choice(TEST_ROOM_IDS) - # 如果SSL验证失败就把ssl设为False,B站真的有过忘续证书的情况 - client = blivedm.BLiveClient(room_id, session=session, ssl=True) + client = blivedm.BLiveClient(room_id, session=session) handler = MyHandler() - client.add_handler(handler) + client.set_handler(handler) client.start() try: @@ -69,7 +70,7 @@ async def run_multi_clients(): clients = [blivedm.BLiveClient(room_id, session=session) for room_id in TEST_ROOM_IDS] handler = MyHandler() for client in clients: - client.add_handler(handler) + client.set_handler(handler) client.start() try: @@ -87,25 +88,25 @@ class MyHandler(blivedm.BaseHandler): # _CMD_CALLBACK_DICT = blivedm.BaseHandler._CMD_CALLBACK_DICT.copy() # # # 入场消息回调 - # async def __interact_word_callback(self, client: blivedm.BLiveClient, command: dict): + # def __interact_word_callback(self, client: blivedm.BLiveClient, command: dict): # print(f"[{client.room_id}] INTERACT_WORD: self_type={type(self).__name__}, room_id={client.room_id}," # f" uname={command['data']['uname']}") # _CMD_CALLBACK_DICT['INTERACT_WORD'] = __interact_word_callback # noqa - async def _on_heartbeat(self, client: blivedm.BLiveClient, message: web_models.HeartbeatMessage): + def _on_heartbeat(self, client: blivedm.BLiveClient, message: web_models.HeartbeatMessage): print(f'[{client.room_id}] 心跳') - async def _on_danmaku(self, client: blivedm.BLiveClient, message: web_models.DanmakuMessage): + def _on_danmaku(self, client: blivedm.BLiveClient, message: web_models.DanmakuMessage): print(f'[{client.room_id}] {message.uname}:{message.msg}') - async def _on_gift(self, client: blivedm.BLiveClient, message: web_models.GiftMessage): + def _on_gift(self, client: blivedm.BLiveClient, message: web_models.GiftMessage): print(f'[{client.room_id}] {message.uname} 赠送{message.gift_name}x{message.num}' f' ({message.coin_type}瓜子x{message.total_coin})') - async def _on_buy_guard(self, client: blivedm.BLiveClient, message: web_models.GuardBuyMessage): + def _on_buy_guard(self, client: blivedm.BLiveClient, message: web_models.GuardBuyMessage): print(f'[{client.room_id}] {message.username} 购买{message.gift_name}') - async def _on_super_chat(self, client: blivedm.BLiveClient, message: web_models.SuperChatMessage): + def _on_super_chat(self, client: blivedm.BLiveClient, message: web_models.SuperChatMessage): print(f'[{client.room_id}] 醒目留言 ¥{message.price} {message.uname}:{message.message}')