From b5fc26c3e8f89df08db4084fd30341ce249aa405 Mon Sep 17 00:00:00 2001 From: John Smith Date: Mon, 13 Dec 2021 00:07:00 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8B=86=E5=87=BA=E6=B6=88=E6=81=AF=E5=A4=84?= =?UTF-8?q?=E7=90=86=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- blivedm/__init__.py | 3 +- blivedm/blivedm.py | 171 +++++++++++++++++--------------------------- blivedm/handlers.py | 147 +++++++++++++++++++++++++++++++++++++ blivedm/models.py | 25 ++++++- sample.py | 34 +++++---- 5 files changed, 257 insertions(+), 123 deletions(-) create mode 100644 blivedm/handlers.py diff --git a/blivedm/__init__.py b/blivedm/__init__.py index 5dc4d0e..770e2f6 100644 --- a/blivedm/__init__.py +++ b/blivedm/__init__.py @@ -1,3 +1,4 @@ # -*- coding: utf-8 -*- -from .blivedm import * from .models import * +from .handlers import * +from .blivedm import * diff --git a/blivedm/blivedm.py b/blivedm/blivedm.py index 0e61135..c76dd5f 100644 --- a/blivedm/blivedm.py +++ b/blivedm/blivedm.py @@ -11,7 +11,7 @@ from typing import * import aiohttp -from . import models +from . import handlers logger = logging.getLogger('blivedm') @@ -58,49 +58,26 @@ class InitError(Exception): class BLiveClient: - _COMMAND_HANDLERS: Dict[str, Optional[Callable[['BLiveClient', dict], Awaitable]]] = { - # 收到弹幕 - # go-common\app\service\live\live-dm\service\v1\send.go - 'DANMU_MSG': lambda client, command: client._on_receive_danmaku( # noqa - models.DanmakuMessage.from_command(command['info']) - ), - # 有人送礼 - 'SEND_GIFT': lambda client, command: client._on_receive_gift( # noqa - models.GiftMessage.from_command(command['data']) - ), - # 有人上舰 - 'GUARD_BUY': lambda client, command: client._on_buy_guard( # noqa - models.GuardBuyMessage.from_command(command['data']) - ), - # 醒目留言 - 'SUPER_CHAT_MESSAGE': lambda client, command: client._on_super_chat( # noqa - models.SuperChatMessage.from_command(command['data']) - ), - # 删除醒目留言 - 'SUPER_CHAT_MESSAGE_DELETE': lambda client, command: client._on_super_chat_delete( # noqa - models.SuperChatDeleteMessage.from_command(command['data']) - ) - } - # 其他常见命令 - for cmd in ( - 'INTERACT_WORD', 'ROOM_BANNER', 'ROOM_REAL_TIME_MESSAGE_UPDATE', 'NOTICE_MSG', 'COMBO_SEND', - 'COMBO_END', 'ENTRY_EFFECT', 'WELCOME_GUARD', 'WELCOME', 'ROOM_RANK', 'ACTIVITY_BANNER_UPDATE_V2', - 'PANEL', 'SUPER_CHAT_MESSAGE_JPN', 'USER_TOAST_MSG', 'ROOM_BLOCK_MSG', 'LIVE', 'PREPARING', - 'room_admin_entrance', 'ROOM_ADMINS', 'ROOM_CHANGE' - ): - _COMMAND_HANDLERS[cmd] = None - del cmd + """ + B站直播弹幕客户端,负责连接房间 - def __init__(self, room_id, uid=0, session: aiohttp.ClientSession = None, - heartbeat_interval=30, ssl=True, loop=None): - """ - :param room_id: URL中的房间ID,可以为短ID - :param uid: B站用户ID,0表示未登录 - :param session: cookie、连接池 - :param heartbeat_interval: 发送心跳包的间隔时间(秒) - :param ssl: True表示用默认的SSLContext验证,False表示不验证,也可以传入SSLContext - :param loop: 协程事件循环 - """ + :param room_id: URL中的房间ID,可以用短ID + :param uid: B站用户ID,0表示未登录 + :param session: cookie、连接池 + :param heartbeat_interval: 发送心跳包的间隔时间(秒) + :param ssl: True表示用默认的SSLContext验证,False表示不验证,也可以传入SSLContext + :param loop: 协程事件循环 + """ + + def __init__( + self, + room_id, + uid=0, + session: aiohttp.ClientSession = None, + heartbeat_interval=30, + ssl: Union[bool, ssl_.SSLContext] = True, + loop: asyncio.BaseEventLoop = None, + ): # 用来init_room的临时房间ID self._tmp_room_id = room_id # 调用init_room后初始化 @@ -132,6 +109,8 @@ class BLiveClient: self._websocket = None self._heartbeat_timer_handle = None + self._handlers: List[handlers.HandlerInterface] = [] + @property def is_running(self): return self._future is not None @@ -157,16 +136,27 @@ class BLiveClient: """ return self._room_owner_uid - async def close(self): + def add_handler(self, handler: 'handlers.HandlerInterface'): """ - 如果session是自己创建的则关闭session + 添加消息处理器 + :param handler: 消息处理器 """ - if self._own_session: - await self._session.close() + if handler not in self._handlers: + self._handlers.append(handler) + + def remove_handler(self, handler: 'handlers.HandlerInterface'): + """ + 移除消息处理器 + :param handler: 消息处理器 + """ + try: + self._handlers.remove(handler) + except ValueError: + pass def start(self): """ - 创建相关的协程,不会执行事件循环 + 创建相关的协程 :return: 协程的future """ if self._future is not None: @@ -193,6 +183,13 @@ class BLiveClient: self._future.cancel() return self._future + async def close(self): + """ + 如果session是自己创建的则关闭session + """ + if self._own_session: + await self._session.close() + async def init_room(self): """ :return: True代表没有降级,如果需要降级后还可用,重载这个函数返回True @@ -321,11 +318,11 @@ class BLiveClient: continue try: - await self._handle_message(message.data) + await self._handle_ws_message(message.data) except asyncio.CancelledError: raise except Exception: # noqa - logger.exception('room %d 处理消息时发生错误:', self.room_id) + logger.exception('room %d 处理websocket消息时发生错误:', self.room_id) except asyncio.CancelledError: break @@ -354,7 +351,7 @@ class BLiveClient: asyncio.ensure_future(coro, loop=self._loop) self._heartbeat_timer_handle = self._loop.call_later(self._heartbeat_interval, self._on_send_heartbeat) - async def _handle_message(self, data): + async def _handle_ws_message(self, data): offset = 0 while offset < len(data): try: @@ -363,22 +360,29 @@ class BLiveClient: break if header.operation == Operation.HEARTBEAT_REPLY: - popularity = int.from_bytes(data[offset + HEADER_STRUCT.size: - offset + HEADER_STRUCT.size + 4], - 'big') - await self._on_receive_popularity(popularity) + popularity = int.from_bytes( + data[offset + HEADER_STRUCT.size: offset + HEADER_STRUCT.size + 4], + 'big' + ) + body = { + 'cmd': '_HEARTBEAT', + 'data': { + 'popularity': popularity + } + } + await self._handle_command(body) elif header.operation == Operation.SEND_MSG_REPLY: body = data[offset + HEADER_STRUCT.size: offset + header.pack_len] if header.ver == WS_BODY_PROTOCOL_VERSION_DEFLATE: body = await self._loop.run_in_executor(None, zlib.decompress, body) - await self._handle_message(body) + await self._handle_ws_message(body) else: try: body = json.loads(body.decode('utf-8')) await self._handle_command(body) except Exception: - logger.error('body: %s', body) + logger.error('body=%s', body) raise elif header.operation == Operation.AUTH_REPLY: @@ -397,51 +401,8 @@ class BLiveClient: await self._handle_command(one_command) return - cmd = command.get('cmd', '') - pos = cmd.find(':') # 2019-5-29 B站弹幕升级新增了参数 - if pos != -1: - cmd = cmd[:pos] - if cmd in self._COMMAND_HANDLERS: - handler = self._COMMAND_HANDLERS[cmd] - if handler is not None: - await handler(self, command) - else: - logger.warning('room %d 未知命令:cmd=%s %s', self.room_id, cmd, command) - # 只有第一次遇到未知命令时log - self._COMMAND_HANDLERS[cmd] = None - - async def _on_receive_popularity(self, popularity: int): - """ - 收到人气值 - """ - pass - - async def _on_receive_danmaku(self, danmaku: models.DanmakuMessage): - """ - 收到弹幕 - """ - pass - - async def _on_receive_gift(self, gift: models.GiftMessage): - """ - 收到礼物 - """ - pass - - async def _on_buy_guard(self, message: models.GuardBuyMessage): - """ - 有人上舰 - """ - pass - - async def _on_super_chat(self, message: models.SuperChatMessage): - """ - 醒目留言 - """ - pass - - async def _on_super_chat_delete(self, message: models.SuperChatDeleteMessage): - """ - 删除醒目留言 - """ - pass + for handler in self._handlers: + try: + await handler.handle(self, command) + except Exception: # noqa + logger.exception('room %d 处理消息时发生错误,command=%s', self.room_id, command) diff --git a/blivedm/handlers.py b/blivedm/handlers.py new file mode 100644 index 0000000..5cb46f8 --- /dev/null +++ b/blivedm/handlers.py @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- +import logging +from typing import * + +from . import blivedm +from . import models + +__all__ = ( + 'HandlerInterface', + 'BaseHandler', +) + +logger = logging.getLogger('blivedm') + +# 常见可忽略的cmd +FREQUENT_CMDS = ( + 'INTERACT_WORD', + 'ROOM_BANNER', + 'ROOM_REAL_TIME_MESSAGE_UPDATE', + 'NOTICE_MSG', + 'COMBO_SEND', + 'COMBO_END', + 'ENTRY_EFFECT', + 'WELCOME_GUARD', + 'WELCOME', + 'ROOM_RANK', + 'ACTIVITY_BANNER_UPDATE_V2', + 'PANEL', + 'SUPER_CHAT_MESSAGE_JPN', + 'USER_TOAST_MSG', + 'ROOM_BLOCK_MSG', + 'LIVE', + 'PREPARING', + 'room_admin_entrance', + 'ROOM_ADMINS', + 'ROOM_CHANGE', +) + +# 已打日志的未知cmd +logged_unknown_cmds = set() + + +class HandlerInterface: + """ + 直播消息处理器接口 + """ + + async def handle(self, client: blivedm.BLiveClient, command: dict): + raise NotImplementedError + + +class BaseHandler(HandlerInterface): + """ + 一个简单的消息处理器实现,带消息分发和消息类型转换。继承并重写_on_xxx方法即可实现自己的处理器 + """ + + def __heartbeat_callback(self, client: blivedm.BLiveClient, command: dict): + return self._on_popularity(client, models.HeartbeatMessage.from_command(command['data'])) + + def __danmu_msg_callback(self, client: blivedm.BLiveClient, command: dict): + return self._on_danmaku(client, models.DanmakuMessage.from_command(command['info'])) + + def __send_gift_callback(self, client: blivedm.BLiveClient, command: dict): + return self._on_gift(client, models.GiftMessage.from_command(command['data'])) + + def __guard_buy_callback(self, client: blivedm.BLiveClient, command: dict): + return self._on_buy_guard(client, models.GuardBuyMessage.from_command(command['data'])) + + def __super_chat_message_callback(self, client: blivedm.BLiveClient, command: dict): + return self._on_super_chat(client, models.SuperChatMessage.from_command(command['data'])) + + def __super_chat_message_delete_callback(self, client: blivedm.BLiveClient, command: dict): + return self._on_super_chat_delete(client, models.SuperChatDeleteMessage.from_command(command['data'])) + + # cmd -> 处理回调 + _CMD_CALLBACK_DICT: Dict[ + str, + Optional[Callable[ + ['BaseHandler', blivedm.BLiveClient, dict], + Awaitable + ]] + ] = { + # 收到心跳包,这是blivedm自造的消息,原本的心跳包格式不一样 + '_HEARTBEAT': __heartbeat_callback, + # 收到弹幕 + # go-common\app\service\live\live-dm\service\v1\send.go + 'DANMU_MSG': __danmu_msg_callback, + # 有人送礼 + 'SEND_GIFT': __send_gift_callback, + # 有人上舰 + 'GUARD_BUY': __guard_buy_callback, + # 醒目留言 + 'SUPER_CHAT_MESSAGE': __super_chat_message_callback, + # 删除醒目留言 + 'SUPER_CHAT_MESSAGE_DELETE': __super_chat_message_delete_callback, + } + # 忽略其他常见cmd + for cmd in FREQUENT_CMDS: + _CMD_CALLBACK_DICT[cmd] = None + del cmd + + async def handle(self, client: blivedm.BLiveClient, command: dict): + cmd = command.get('cmd', '') + pos = cmd.find(':') # 2019-5-29 B站弹幕升级新增了参数 + if pos != -1: + cmd = cmd[:pos] + + if cmd not in self._CMD_CALLBACK_DICT: + # 只有第一次遇到未知cmd时打日志 + if cmd not in logged_unknown_cmds: + logger.warning('room %d 未知cmd:cmd=%s %s', client.room_id, cmd, command) + logged_unknown_cmds.add(cmd) + return + + callback = self._CMD_CALLBACK_DICT[cmd] + if callback is not None: + await callback(self, client, command) + + async def _on_popularity(self, client: blivedm.BLiveClient, message: models.HeartbeatMessage): + """ + 收到人气值 + """ + + async def _on_danmaku(self, client: blivedm.BLiveClient, message: models.DanmakuMessage): + """ + 收到弹幕 + """ + + async def _on_gift(self, client: blivedm.BLiveClient, message: models.GiftMessage): + """ + 收到礼物 + """ + + async def _on_buy_guard(self, client: blivedm.BLiveClient, message: models.GuardBuyMessage): + """ + 有人上舰 + """ + + async def _on_super_chat(self, client: blivedm.BLiveClient, message: models.SuperChatMessage): + """ + 醒目留言 + """ + + async def _on_super_chat_delete(self, client: blivedm.BLiveClient, message: models.SuperChatDeleteMessage): + """ + 删除醒目留言 + """ diff --git a/blivedm/models.py b/blivedm/models.py index ca20033..aff237c 100644 --- a/blivedm/models.py +++ b/blivedm/models.py @@ -2,6 +2,7 @@ from typing import * __all__ = ( + 'HeartbeatMessage', 'DanmakuMessage', 'GiftMessage', 'GuardBuyMessage', @@ -10,6 +11,26 @@ __all__ = ( ) +class HeartbeatMessage: + """ + 心跳消息 + + :param popularity: 人气值 + """ + + def __init__( + self, + popularity: int = None, + ): + self.popularity: int = popularity + + @classmethod + def from_command(cls, data: dict): + return cls( + popularity=data['popularity'], + ) + + class DanmakuMessage: """ 弹幕消息 @@ -18,7 +39,7 @@ class DanmakuMessage: :param font_size: 字体尺寸 :param color: 颜色 :param timestamp: 时间戳(毫秒) - :param rnd: 随机数 + :param rnd: 随机数,可能是去重用的 :param uid_crc32: 用户ID文本的CRC32 :param msg_type: 是否礼物弹幕(节奏风暴) :param bubble: 右侧评论栏气泡 @@ -196,7 +217,7 @@ class GiftMessage: :param gift_type: 礼物类型(未知) :param action: 目前遇到的有'喂食'、'赠送' :param price: 礼物单价瓜子数 - :param rnd: 随机数,估计是去重用的 + :param rnd: 随机数,可能是去重用的 :param coin_type: 瓜子类型,'silver'或'gold' :param total_coin: 总瓜子数 """ diff --git a/sample.py b/sample.py index 471e266..9d870e7 100644 --- a/sample.py +++ b/sample.py @@ -7,7 +7,10 @@ import blivedm async def main(): # 直播间ID的取值看直播间URL # 如果SSL验证失败就把ssl设为False,B站真的有过忘续证书的情况 - client = MyBLiveClient(room_id=21224291, ssl=True) + client = blivedm.BLiveClient(room_id=411318, ssl=True) + handler = MyHandler() + client.add_handler(handler) + future = client.start() try: # 5秒后停止,测试用 @@ -19,27 +22,28 @@ async def main(): await client.close() -class MyBLiveClient(blivedm.BLiveClient): - # 演示如何自定义handler - _COMMAND_HANDLERS = blivedm.BLiveClient._COMMAND_HANDLERS.copy() +class MyHandler(blivedm.BaseHandler): + # 演示如何添加自定义回调 + _CMD_CALLBACK_DICT = blivedm.BaseHandler._CMD_CALLBACK_DICT.copy() - async def __on_vip_enter(self, command): - print(command) - _COMMAND_HANDLERS['WELCOME'] = __on_vip_enter # 老爷入场 + # 入场消息回调 + async def __interact_word_callback(self, client: blivedm.BLiveClient, command: dict): + print(f"self_type={type(self).__name__}, room_id={client.room_id}, uname={command['data']['uname']}") + _CMD_CALLBACK_DICT['INTERACT_WORD'] = __interact_word_callback # noqa - async def _on_receive_popularity(self, popularity: int): - print(f'当前人气值:{popularity}') + async def _on_popularity(self, client: blivedm.BLiveClient, message: blivedm.HeartbeatMessage): + print(f'当前人气值:{message.popularity}') - async def _on_receive_danmaku(self, danmaku: blivedm.DanmakuMessage): - print(f'{danmaku.uname}:{danmaku.msg}') + async def _on_danmaku(self, client: blivedm.BLiveClient, message: blivedm.DanmakuMessage): + print(f'{message.uname}:{message.msg}') - async def _on_receive_gift(self, gift: blivedm.GiftMessage): - print(f'{gift.uname} 赠送{gift.gift_name}x{gift.num} ({gift.coin_type}币x{gift.total_coin})') + async def _on_gift(self, client: blivedm.BLiveClient, message: blivedm.GiftMessage): + print(f'{message.uname} 赠送{message.gift_name}x{message.num} ({message.coin_type}币x{message.total_coin})') - async def _on_buy_guard(self, message: blivedm.GuardBuyMessage): + async def _on_buy_guard(self, client: blivedm.BLiveClient, message: blivedm.GuardBuyMessage): print(f'{message.username} 购买{message.gift_name}') - async def _on_super_chat(self, message: blivedm.SuperChatMessage): + async def _on_super_chat(self, client: blivedm.BLiveClient, message: blivedm.SuperChatMessage): print(f'醒目留言 ¥{message.price} {message.uname}:{message.message}')