拆出消息处理模块

This commit is contained in:
John Smith 2021-12-13 00:07:00 +08:00
parent 104a080167
commit b5fc26c3e8
5 changed files with 257 additions and 123 deletions

View File

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from .blivedm import *
from .models import * from .models import *
from .handlers import *
from .blivedm import *

View File

@ -11,7 +11,7 @@ from typing import *
import aiohttp import aiohttp
from . import models from . import handlers
logger = logging.getLogger('blivedm') logger = logging.getLogger('blivedm')
@ -58,49 +58,26 @@ class InitError(Exception):
class BLiveClient: class BLiveClient:
_COMMAND_HANDLERS: Dict[str, Optional[Callable[['BLiveClient', dict], Awaitable]]] = {
# 收到弹幕
# go-common\app\service\live\live-dm\service\v1\send.go
'DANMU_MSG': lambda client, command: client._on_receive_danmaku( # noqa
models.DanmakuMessage.from_command(command['info'])
),
# 有人送礼
'SEND_GIFT': lambda client, command: client._on_receive_gift( # noqa
models.GiftMessage.from_command(command['data'])
),
# 有人上舰
'GUARD_BUY': lambda client, command: client._on_buy_guard( # noqa
models.GuardBuyMessage.from_command(command['data'])
),
# 醒目留言
'SUPER_CHAT_MESSAGE': lambda client, command: client._on_super_chat( # noqa
models.SuperChatMessage.from_command(command['data'])
),
# 删除醒目留言
'SUPER_CHAT_MESSAGE_DELETE': lambda client, command: client._on_super_chat_delete( # noqa
models.SuperChatDeleteMessage.from_command(command['data'])
)
}
# 其他常见命令
for cmd in (
'INTERACT_WORD', 'ROOM_BANNER', 'ROOM_REAL_TIME_MESSAGE_UPDATE', 'NOTICE_MSG', 'COMBO_SEND',
'COMBO_END', 'ENTRY_EFFECT', 'WELCOME_GUARD', 'WELCOME', 'ROOM_RANK', 'ACTIVITY_BANNER_UPDATE_V2',
'PANEL', 'SUPER_CHAT_MESSAGE_JPN', 'USER_TOAST_MSG', 'ROOM_BLOCK_MSG', 'LIVE', 'PREPARING',
'room_admin_entrance', 'ROOM_ADMINS', 'ROOM_CHANGE'
):
_COMMAND_HANDLERS[cmd] = None
del cmd
def __init__(self, room_id, uid=0, session: aiohttp.ClientSession = None,
heartbeat_interval=30, ssl=True, loop=None):
""" """
:param room_id: URL中的房间ID可以为短ID B站直播弹幕客户端负责连接房间
:param room_id: URL中的房间ID可以用短ID
:param uid: B站用户ID0表示未登录 :param uid: B站用户ID0表示未登录
:param session: cookie连接池 :param session: cookie连接池
:param heartbeat_interval: 发送心跳包的间隔时间 :param heartbeat_interval: 发送心跳包的间隔时间
:param ssl: True表示用默认的SSLContext验证False表示不验证也可以传入SSLContext :param ssl: True表示用默认的SSLContext验证False表示不验证也可以传入SSLContext
:param loop: 协程事件循环 :param loop: 协程事件循环
""" """
def __init__(
self,
room_id,
uid=0,
session: aiohttp.ClientSession = None,
heartbeat_interval=30,
ssl: Union[bool, ssl_.SSLContext] = True,
loop: asyncio.BaseEventLoop = None,
):
# 用来init_room的临时房间ID # 用来init_room的临时房间ID
self._tmp_room_id = room_id self._tmp_room_id = room_id
# 调用init_room后初始化 # 调用init_room后初始化
@ -132,6 +109,8 @@ class BLiveClient:
self._websocket = None self._websocket = None
self._heartbeat_timer_handle = None self._heartbeat_timer_handle = None
self._handlers: List[handlers.HandlerInterface] = []
@property @property
def is_running(self): def is_running(self):
return self._future is not None return self._future is not None
@ -157,16 +136,27 @@ class BLiveClient:
""" """
return self._room_owner_uid return self._room_owner_uid
async def close(self): def add_handler(self, handler: 'handlers.HandlerInterface'):
""" """
如果session是自己创建的则关闭session 添加消息处理器
:param handler: 消息处理器
""" """
if self._own_session: if handler not in self._handlers:
await self._session.close() self._handlers.append(handler)
def remove_handler(self, handler: 'handlers.HandlerInterface'):
"""
移除消息处理器
:param handler: 消息处理器
"""
try:
self._handlers.remove(handler)
except ValueError:
pass
def start(self): def start(self):
""" """
创建相关的协程不会执行事件循环 创建相关的协程
:return: 协程的future :return: 协程的future
""" """
if self._future is not None: if self._future is not None:
@ -193,6 +183,13 @@ class BLiveClient:
self._future.cancel() self._future.cancel()
return self._future return self._future
async def close(self):
"""
如果session是自己创建的则关闭session
"""
if self._own_session:
await self._session.close()
async def init_room(self): async def init_room(self):
""" """
:return: True代表没有降级如果需要降级后还可用重载这个函数返回True :return: True代表没有降级如果需要降级后还可用重载这个函数返回True
@ -321,11 +318,11 @@ class BLiveClient:
continue continue
try: try:
await self._handle_message(message.data) await self._handle_ws_message(message.data)
except asyncio.CancelledError: except asyncio.CancelledError:
raise raise
except Exception: # noqa except Exception: # noqa
logger.exception('room %d 处理消息时发生错误:', self.room_id) logger.exception('room %d 处理websocket消息时发生错误:', self.room_id)
except asyncio.CancelledError: except asyncio.CancelledError:
break break
@ -354,7 +351,7 @@ class BLiveClient:
asyncio.ensure_future(coro, loop=self._loop) asyncio.ensure_future(coro, loop=self._loop)
self._heartbeat_timer_handle = self._loop.call_later(self._heartbeat_interval, self._on_send_heartbeat) self._heartbeat_timer_handle = self._loop.call_later(self._heartbeat_interval, self._on_send_heartbeat)
async def _handle_message(self, data): async def _handle_ws_message(self, data):
offset = 0 offset = 0
while offset < len(data): while offset < len(data):
try: try:
@ -363,22 +360,29 @@ class BLiveClient:
break break
if header.operation == Operation.HEARTBEAT_REPLY: if header.operation == Operation.HEARTBEAT_REPLY:
popularity = int.from_bytes(data[offset + HEADER_STRUCT.size: popularity = int.from_bytes(
offset + HEADER_STRUCT.size + 4], data[offset + HEADER_STRUCT.size: offset + HEADER_STRUCT.size + 4],
'big') 'big'
await self._on_receive_popularity(popularity) )
body = {
'cmd': '_HEARTBEAT',
'data': {
'popularity': popularity
}
}
await self._handle_command(body)
elif header.operation == Operation.SEND_MSG_REPLY: elif header.operation == Operation.SEND_MSG_REPLY:
body = data[offset + HEADER_STRUCT.size: offset + header.pack_len] body = data[offset + HEADER_STRUCT.size: offset + header.pack_len]
if header.ver == WS_BODY_PROTOCOL_VERSION_DEFLATE: if header.ver == WS_BODY_PROTOCOL_VERSION_DEFLATE:
body = await self._loop.run_in_executor(None, zlib.decompress, body) body = await self._loop.run_in_executor(None, zlib.decompress, body)
await self._handle_message(body) await self._handle_ws_message(body)
else: else:
try: try:
body = json.loads(body.decode('utf-8')) body = json.loads(body.decode('utf-8'))
await self._handle_command(body) await self._handle_command(body)
except Exception: except Exception:
logger.error('body: %s', body) logger.error('body=%s', body)
raise raise
elif header.operation == Operation.AUTH_REPLY: elif header.operation == Operation.AUTH_REPLY:
@ -397,51 +401,8 @@ class BLiveClient:
await self._handle_command(one_command) await self._handle_command(one_command)
return return
cmd = command.get('cmd', '') for handler in self._handlers:
pos = cmd.find(':') # 2019-5-29 B站弹幕升级新增了参数 try:
if pos != -1: await handler.handle(self, command)
cmd = cmd[:pos] except Exception: # noqa
if cmd in self._COMMAND_HANDLERS: logger.exception('room %d 处理消息时发生错误command=%s', self.room_id, command)
handler = self._COMMAND_HANDLERS[cmd]
if handler is not None:
await handler(self, command)
else:
logger.warning('room %d 未知命令cmd=%s %s', self.room_id, cmd, command)
# 只有第一次遇到未知命令时log
self._COMMAND_HANDLERS[cmd] = None
async def _on_receive_popularity(self, popularity: int):
"""
收到人气值
"""
pass
async def _on_receive_danmaku(self, danmaku: models.DanmakuMessage):
"""
收到弹幕
"""
pass
async def _on_receive_gift(self, gift: models.GiftMessage):
"""
收到礼物
"""
pass
async def _on_buy_guard(self, message: models.GuardBuyMessage):
"""
有人上舰
"""
pass
async def _on_super_chat(self, message: models.SuperChatMessage):
"""
醒目留言
"""
pass
async def _on_super_chat_delete(self, message: models.SuperChatDeleteMessage):
"""
删除醒目留言
"""
pass

147
blivedm/handlers.py Normal file
View File

@ -0,0 +1,147 @@
# -*- coding: utf-8 -*-
import logging
from typing import *
from . import blivedm
from . import models
__all__ = (
'HandlerInterface',
'BaseHandler',
)
logger = logging.getLogger('blivedm')
# 常见可忽略的cmd
FREQUENT_CMDS = (
'INTERACT_WORD',
'ROOM_BANNER',
'ROOM_REAL_TIME_MESSAGE_UPDATE',
'NOTICE_MSG',
'COMBO_SEND',
'COMBO_END',
'ENTRY_EFFECT',
'WELCOME_GUARD',
'WELCOME',
'ROOM_RANK',
'ACTIVITY_BANNER_UPDATE_V2',
'PANEL',
'SUPER_CHAT_MESSAGE_JPN',
'USER_TOAST_MSG',
'ROOM_BLOCK_MSG',
'LIVE',
'PREPARING',
'room_admin_entrance',
'ROOM_ADMINS',
'ROOM_CHANGE',
)
# 已打日志的未知cmd
logged_unknown_cmds = set()
class HandlerInterface:
"""
直播消息处理器接口
"""
async def handle(self, client: blivedm.BLiveClient, command: dict):
raise NotImplementedError
class BaseHandler(HandlerInterface):
"""
一个简单的消息处理器实现带消息分发和消息类型转换继承并重写_on_xxx方法即可实现自己的处理器
"""
def __heartbeat_callback(self, client: blivedm.BLiveClient, command: dict):
return self._on_popularity(client, models.HeartbeatMessage.from_command(command['data']))
def __danmu_msg_callback(self, client: blivedm.BLiveClient, command: dict):
return self._on_danmaku(client, models.DanmakuMessage.from_command(command['info']))
def __send_gift_callback(self, client: blivedm.BLiveClient, command: dict):
return self._on_gift(client, models.GiftMessage.from_command(command['data']))
def __guard_buy_callback(self, client: blivedm.BLiveClient, command: dict):
return self._on_buy_guard(client, models.GuardBuyMessage.from_command(command['data']))
def __super_chat_message_callback(self, client: blivedm.BLiveClient, command: dict):
return self._on_super_chat(client, models.SuperChatMessage.from_command(command['data']))
def __super_chat_message_delete_callback(self, client: blivedm.BLiveClient, command: dict):
return self._on_super_chat_delete(client, models.SuperChatDeleteMessage.from_command(command['data']))
# cmd -> 处理回调
_CMD_CALLBACK_DICT: Dict[
str,
Optional[Callable[
['BaseHandler', blivedm.BLiveClient, dict],
Awaitable
]]
] = {
# 收到心跳包这是blivedm自造的消息原本的心跳包格式不一样
'_HEARTBEAT': __heartbeat_callback,
# 收到弹幕
# go-common\app\service\live\live-dm\service\v1\send.go
'DANMU_MSG': __danmu_msg_callback,
# 有人送礼
'SEND_GIFT': __send_gift_callback,
# 有人上舰
'GUARD_BUY': __guard_buy_callback,
# 醒目留言
'SUPER_CHAT_MESSAGE': __super_chat_message_callback,
# 删除醒目留言
'SUPER_CHAT_MESSAGE_DELETE': __super_chat_message_delete_callback,
}
# 忽略其他常见cmd
for cmd in FREQUENT_CMDS:
_CMD_CALLBACK_DICT[cmd] = None
del cmd
async def handle(self, client: blivedm.BLiveClient, command: dict):
cmd = command.get('cmd', '')
pos = cmd.find(':') # 2019-5-29 B站弹幕升级新增了参数
if pos != -1:
cmd = cmd[:pos]
if cmd not in self._CMD_CALLBACK_DICT:
# 只有第一次遇到未知cmd时打日志
if cmd not in logged_unknown_cmds:
logger.warning('room %d 未知cmdcmd=%s %s', client.room_id, cmd, command)
logged_unknown_cmds.add(cmd)
return
callback = self._CMD_CALLBACK_DICT[cmd]
if callback is not None:
await callback(self, client, command)
async def _on_popularity(self, client: blivedm.BLiveClient, message: models.HeartbeatMessage):
"""
收到人气值
"""
async def _on_danmaku(self, client: blivedm.BLiveClient, message: models.DanmakuMessage):
"""
收到弹幕
"""
async def _on_gift(self, client: blivedm.BLiveClient, message: models.GiftMessage):
"""
收到礼物
"""
async def _on_buy_guard(self, client: blivedm.BLiveClient, message: models.GuardBuyMessage):
"""
有人上舰
"""
async def _on_super_chat(self, client: blivedm.BLiveClient, message: models.SuperChatMessage):
"""
醒目留言
"""
async def _on_super_chat_delete(self, client: blivedm.BLiveClient, message: models.SuperChatDeleteMessage):
"""
删除醒目留言
"""

View File

@ -2,6 +2,7 @@
from typing import * from typing import *
__all__ = ( __all__ = (
'HeartbeatMessage',
'DanmakuMessage', 'DanmakuMessage',
'GiftMessage', 'GiftMessage',
'GuardBuyMessage', 'GuardBuyMessage',
@ -10,6 +11,26 @@ __all__ = (
) )
class HeartbeatMessage:
"""
心跳消息
:param popularity: 人气值
"""
def __init__(
self,
popularity: int = None,
):
self.popularity: int = popularity
@classmethod
def from_command(cls, data: dict):
return cls(
popularity=data['popularity'],
)
class DanmakuMessage: class DanmakuMessage:
""" """
弹幕消息 弹幕消息
@ -18,7 +39,7 @@ class DanmakuMessage:
:param font_size: 字体尺寸 :param font_size: 字体尺寸
:param color: 颜色 :param color: 颜色
:param timestamp: 时间戳毫秒 :param timestamp: 时间戳毫秒
:param rnd: 随机数 :param rnd: 随机数可能是去重用的
:param uid_crc32: 用户ID文本的CRC32 :param uid_crc32: 用户ID文本的CRC32
:param msg_type: 是否礼物弹幕节奏风暴 :param msg_type: 是否礼物弹幕节奏风暴
:param bubble: 右侧评论栏气泡 :param bubble: 右侧评论栏气泡
@ -196,7 +217,7 @@ class GiftMessage:
:param gift_type: 礼物类型未知 :param gift_type: 礼物类型未知
:param action: 目前遇到的有'喂食''赠送' :param action: 目前遇到的有'喂食''赠送'
:param price: 礼物单价瓜子数 :param price: 礼物单价瓜子数
:param rnd: 随机数估计是去重用的 :param rnd: 随机数可能是去重用的
:param coin_type: 瓜子类型'silver''gold' :param coin_type: 瓜子类型'silver''gold'
:param total_coin: 总瓜子数 :param total_coin: 总瓜子数
""" """

View File

@ -7,7 +7,10 @@ import blivedm
async def main(): async def main():
# 直播间ID的取值看直播间URL # 直播间ID的取值看直播间URL
# 如果SSL验证失败就把ssl设为FalseB站真的有过忘续证书的情况 # 如果SSL验证失败就把ssl设为FalseB站真的有过忘续证书的情况
client = MyBLiveClient(room_id=21224291, ssl=True) client = blivedm.BLiveClient(room_id=411318, ssl=True)
handler = MyHandler()
client.add_handler(handler)
future = client.start() future = client.start()
try: try:
# 5秒后停止测试用 # 5秒后停止测试用
@ -19,27 +22,28 @@ async def main():
await client.close() await client.close()
class MyBLiveClient(blivedm.BLiveClient): class MyHandler(blivedm.BaseHandler):
# 演示如何自定义handler # 演示如何添加自定义回调
_COMMAND_HANDLERS = blivedm.BLiveClient._COMMAND_HANDLERS.copy() _CMD_CALLBACK_DICT = blivedm.BaseHandler._CMD_CALLBACK_DICT.copy()
async def __on_vip_enter(self, command): # 入场消息回调
print(command) async def __interact_word_callback(self, client: blivedm.BLiveClient, command: dict):
_COMMAND_HANDLERS['WELCOME'] = __on_vip_enter # 老爷入场 print(f"self_type={type(self).__name__}, room_id={client.room_id}, uname={command['data']['uname']}")
_CMD_CALLBACK_DICT['INTERACT_WORD'] = __interact_word_callback # noqa
async def _on_receive_popularity(self, popularity: int): async def _on_popularity(self, client: blivedm.BLiveClient, message: blivedm.HeartbeatMessage):
print(f'当前人气值:{popularity}') print(f'当前人气值:{message.popularity}')
async def _on_receive_danmaku(self, danmaku: blivedm.DanmakuMessage): async def _on_danmaku(self, client: blivedm.BLiveClient, message: blivedm.DanmakuMessage):
print(f'{danmaku.uname}{danmaku.msg}') print(f'{message.uname}{message.msg}')
async def _on_receive_gift(self, gift: blivedm.GiftMessage): async def _on_gift(self, client: blivedm.BLiveClient, message: blivedm.GiftMessage):
print(f'{gift.uname} 赠送{gift.gift_name}x{gift.num} {gift.coin_type}币x{gift.total_coin}') print(f'{message.uname} 赠送{message.gift_name}x{message.num} {message.coin_type}币x{message.total_coin}')
async def _on_buy_guard(self, message: blivedm.GuardBuyMessage): async def _on_buy_guard(self, client: blivedm.BLiveClient, message: blivedm.GuardBuyMessage):
print(f'{message.username} 购买{message.gift_name}') print(f'{message.username} 购买{message.gift_name}')
async def _on_super_chat(self, message: blivedm.SuperChatMessage): async def _on_super_chat(self, client: blivedm.BLiveClient, message: blivedm.SuperChatMessage):
print(f'醒目留言 ¥{message.price} {message.uname}{message.message}') print(f'醒目留言 ¥{message.price} {message.uname}{message.message}')