From b0b38bf0f0e38a28475442f1f9091adebbf3adf7 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 5 Nov 2023 16:29:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90=E6=8F=92=E4=BB=B6=E5=92=8Cbl?= =?UTF-8?q?ivechat=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/plugin.py | 19 +++ blcsdk/__init__.py | 5 + blcsdk/api.py | 156 ++++++++++++++++++++ blcsdk/client.py | 222 ++++++++++++++++++++++++++++ blcsdk/exc.py | 13 ++ blcsdk/handlers.py | 94 ++++++++++++ blcsdk/models.py | 253 ++++++++++++++++++++++++++++++++ main.py | 1 + plugins/msg-logging/config.py | 5 + plugins/msg-logging/listener.py | 42 ++++++ plugins/msg-logging/main.py | 73 ++++++++- services/plugin.py | 11 ++ 12 files changed, 893 insertions(+), 1 deletion(-) create mode 100644 blcsdk/api.py create mode 100644 blcsdk/exc.py create mode 100644 blcsdk/handlers.py mode change 100644 => 100755 main.py create mode 100644 plugins/msg-logging/config.py create mode 100644 plugins/msg-logging/listener.py mode change 100644 => 100755 plugins/msg-logging/main.py diff --git a/api/plugin.py b/api/plugin.py index 8c09a6c..bf54d24 100644 --- a/api/plugin.py +++ b/api/plugin.py @@ -82,6 +82,25 @@ class PluginWsHandler(_PluginHandlerBase, tornado.websocket.WebSocketHandler): def on_close(self): logger.info('plugin=%s disconnected', self.plugin.id) self.plugin.on_client_close(self) + if self._heartbeat_timer_handle is not None: + self._heartbeat_timer_handle.cancel() + self._heartbeat_timer_handle = None + if self._receive_timeout_timer_handle is not None: + self._receive_timeout_timer_handle.cancel() + self._receive_timeout_timer_handle = None + + def on_message(self, message): + try: + body = json.loads(message) + cmd = int(body['cmd']) + + if cmd == models.Command.HEARTBEAT: + self._refresh_receive_timeout_timer() + else: + logger.warning('plugin=%s unknown cmd=%d, body=%s', self.plugin.id, cmd, body) + + except Exception: # noqa + logger.exception('plugin=%s on_message error, message=%s', self.plugin.id, message) def send_cmd_data(self, cmd, data, extra: Optional[dict] = None): self.send_body_no_raise(make_message_body(cmd, data, extra)) diff --git a/blcsdk/__init__.py b/blcsdk/__init__.py index 2bb0f2c..e5a674e 100644 --- a/blcsdk/__init__.py +++ b/blcsdk/__init__.py @@ -1,2 +1,7 @@ # -*- coding: utf-8 -*- __version__ = '0.0.1' + +from .handlers import * +from .client import * +from .exc import * +from .api import * diff --git a/blcsdk/api.py b/blcsdk/api.py new file mode 100644 index 0000000..71ce0b0 --- /dev/null +++ b/blcsdk/api.py @@ -0,0 +1,156 @@ +# -*- coding: utf-8 -*- +import asyncio +import logging +import os +import re +from typing import * + +import aiohttp + +from . import ( + __version__, + client as cli, + exc, + handlers, + models, +) + +__all__ = ( + 'init', + 'shut_down', + 'set_msg_handler', + 'is_sdk_version_compatible', +) + +logger = logging.getLogger('blcsdk') + +# 环境变量 +_base_url = '' +"""HTTP API的URL""" +_token = '' +"""插件认证用的token""" + +# 初始化消息 +_init_future: Optional[asyncio.Future] = None +"""初始化消息的future""" +_init_msg: Optional[dict] = None +"""初始化消息,包含版本等信息""" + +# 其他和blivechat通信用的对象 +_http_session: Optional[aiohttp.ClientSession] = None +"""插件请求专用的HTTP客户端""" +_plugin_client: Optional[cli.BlcPluginClient] = None +"""插件客户端""" +_msg_handler: Optional[handlers.HandlerInterface] = None +"""插件消息处理器""" +_msg_handler_wrapper: Optional['_HandlerWrapper'] = None +"""用于SDK处理一些消息,然后转发给插件消息处理器""" + + +async def init(): + """ + 初始化SDK + + 在调用除了set_msg_handler以外的其他接口之前必须先调用这个。如果抛出任何异常,应该退出当前程序 + """ + try: + global _base_url, _token, _init_future, _init_msg, _http_session, _plugin_client, _msg_handler_wrapper + if _init_future is not None: + raise exc.InitError('Cannot call init() again') + _init_future = asyncio.get_running_loop().create_future() + + # 初始化环境变量信息 + blc_port = int(os.environ['BLC_PORT']) + _base_url = f'http://localhost:{blc_port}' + blc_ws_url = f'ws://localhost:{blc_port}/api/plugin/websocket' + _token = os.environ['BLC_TOKEN'] + + _http_session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=10), + headers={'Authorization': f'Bearer {_token}'}, + ) + + # 连接blivechat + _msg_handler_wrapper = _HandlerWrapper() + _plugin_client = cli.BlcPluginClient(blc_ws_url, session=_http_session) + _plugin_client.set_handler(_msg_handler_wrapper) + _plugin_client.start() + + # 等待初始化消息 + _init_msg = await _init_future + logger.debug('SDK initialized, _init_msg=%s', _init_msg) + except exc.InitError: + raise + except Exception as e: + raise exc.InitError(f'Error in init(): {e}') from e + + +async def shut_down(): + """退出程序之前建议调用""" + if _plugin_client is not None: + await _plugin_client.stop_and_close() + if _http_session is not None: + await _http_session.close() + + +def set_msg_handler(handler: Optional[handlers.HandlerInterface]): + """ + 设置消息处理器 + + 注意消息处理器和网络协程运行在同一个协程,如果处理消息耗时太长会阻塞接收消息。如果是CPU密集型的任务,建议将消息推到线程池处理; + 如果是IO密集型的任务,应该使用async函数,并且在handler里使用create_task创建新的协程 + + :param handler: 消息处理器 + """ + global _msg_handler + _msg_handler = handler + + +class _HandlerWrapper(handlers.HandlerInterface): + """用于SDK处理一些消息,然后转发给插件消息处理器""" + + def handle(self, client: cli.BlcPluginClient, command: dict): + if not _init_future.done(): + if command['cmd'] == models.Command.BLC_INIT: + _init_future.set_result(command['data']) + + if _msg_handler is not None: + _msg_handler.handle(client, command) + + def on_client_stopped(self, client: cli.BlcPluginClient, exception: Optional[Exception]): + if not _init_future.done(): + if exception is not None: + _init_future.set_exception(exception) + else: + _init_future.set_exception(exc.InitError('Connection closed before init msg')) + + if _msg_handler is not None: + _msg_handler.on_client_stopped(client, exception) + + +def is_sdk_version_compatible(): + """ + 检查SDK版本和blivechat的版本是否兼容 + + 如果不兼容,建议退出当前程序。如果继续执行有可能不能正常工作 + """ + if _init_msg is None: + raise exc.SdkError('Please call init() first') + + major_ver_pattern = r'(\d+)\.\d+\.\d+' + remote_ver = _init_msg['sdkVersion'] + + m = re.match(major_ver_pattern, remote_ver) + if m is None: + raise exc.SdkError(f"Bad remote version format: {remote_ver}") + remote_major_ver = m[1] + + m = re.match(major_ver_pattern, __version__) + if m is None: + raise exc.SdkError(f"Bad local version format: {__version__}") + local_major_ver = m[1] + + res = remote_major_ver == local_major_ver + if not res: + logger.warning('SDK version is not compatible, remote=%s, local=%s', remote_ver, __version__) + return res diff --git a/blcsdk/client.py b/blcsdk/client.py index 40a96af..60a59fd 100644 --- a/blcsdk/client.py +++ b/blcsdk/client.py @@ -1 +1,223 @@ # -*- coding: utf-8 -*- +import asyncio +import logging +from typing import * + +import aiohttp + +from . import handlers +from . import models + +__all__ = ( + 'BlcPluginClient', +) + +logger = logging.getLogger('blcsdk') + + +class BlcPluginClient: + """ + blivechat插件服务的客户端 + + :param ws_url: blivechat消息转发服务WebSocket地址 + :param session: 连接池 + :param heartbeat_interval: 发送心跳包的间隔时间(秒) + """ + + def __init__( + self, + ws_url: str, + *, + session: Optional[aiohttp.ClientSession] = None, + heartbeat_interval: float = 10, + ): + self._ws_url = ws_url + + if session is None: + self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) + self._own_session = True + else: + self._session = session + self._own_session = False + assert self._session.loop is asyncio.get_event_loop() # noqa + + self._heartbeat_interval = heartbeat_interval + + self._handler: Optional[handlers.HandlerInterface] = None + """消息处理器""" + + # 在运行时初始化的字段 + self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None + """WebSocket连接""" + self._network_future: Optional[asyncio.Future] = None + """网络协程的future""" + self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None + """发心跳包定时器的handle""" + + @property + def is_running(self) -> bool: + """本客户端正在运行,注意调用stop后还没完全停止也算正在运行""" + return self._network_future is not None + + def set_handler(self, handler: Optional['handlers.HandlerInterface']): + """ + 设置消息处理器 + + 注意消息处理器和网络协程运行在同一个协程,如果处理消息耗时太长会阻塞接收消息。如果是CPU密集型的任务,建议将消息推到线程池处理; + 如果是IO密集型的任务,应该使用async函数,并且在handler里使用create_task创建新的协程 + + :param handler: 消息处理器 + """ + self._handler = handler + + def start(self): + """启动本客户端""" + if self.is_running: + logger.warning('Plugin client is running, cannot start() again') + return + + self._network_future = asyncio.create_task(self._network_coroutine_wrapper()) + + def stop(self): + """停止本客户端""" + if not self.is_running: + logger.warning('Plugin client is stopped, cannot stop() again') + return + + self._network_future.cancel() + + async def stop_and_close(self): + """便利函数,停止本客户端并释放本客户端的资源,调用后本客户端将不可用""" + if self.is_running: + self.stop() + await self.join() + await self.close() + + async def join(self): + """等待本客户端停止""" + if not self.is_running: + logger.warning('Plugin client is stopped, cannot join()') + return + + await asyncio.shield(self._network_future) + + async def close(self): + """释放本客户端的资源,调用后本客户端将不可用""" + if self.is_running: + logger.warning('Plugin is calling close(), but client is running') + + # 如果session是自己创建的则关闭session + if self._own_session: + await self._session.close() + + async def send_cmd_data(self, cmd: models.Command, data: dict): + """ + 发送消息给服务器 + + :param cmd: 消息类型,见Command + :param data: 消息体JSON数据 + """ + if self._websocket is None or self._websocket.closed: + raise ConnectionResetError('websocket is closed') + + body = {'cmd': cmd, 'data': data} + await self._websocket.send_json(body) + + async def _network_coroutine_wrapper(self): + """负责处理网络协程的异常,网络协程具体逻辑在_network_coroutine里""" + exc = None + try: + await self._network_coroutine() + except asyncio.CancelledError: + # 正常停止 + pass + except Exception as e: + logger.exception('_network_coroutine() finished with exception:') + exc = e + finally: + logger.debug('_network_coroutine() finished') + self._network_future = None + + if self._handler is not None: + self._handler.on_client_stopped(self, exc) + + async def _network_coroutine(self): + """网络协程,负责连接服务器、接收消息、解包""" + try: + # 连接 + async with self._session.ws_connect( + self._ws_url, + receive_timeout=self._heartbeat_interval + 5, + ) as websocket: + self._websocket = websocket + await self._on_ws_connect() + + # 处理消息 + message: aiohttp.WSMessage + async for message in websocket: + self._on_ws_message(message) + finally: + self._websocket = None + await self._on_ws_close() + # 插件消息都是本地通信的,这里不可能是因为网络问题而掉线,所以不尝试重连 + + async def _on_ws_connect(self): + """WebSocket连接成功""" + self._heartbeat_timer_handle = asyncio.get_running_loop().call_later( + self._heartbeat_interval, self._on_send_heartbeat + ) + + async def _on_ws_close(self): + """WebSocket连接断开""" + if self._heartbeat_timer_handle is not None: + self._heartbeat_timer_handle.cancel() + self._heartbeat_timer_handle = None + + def _on_send_heartbeat(self): + """定时发送心跳包的回调""" + if self._websocket is None or self._websocket.closed: + self._heartbeat_timer_handle = None + return + + self._heartbeat_timer_handle = asyncio.get_running_loop().call_later( + self._heartbeat_interval, self._on_send_heartbeat + ) + asyncio.create_task(self._send_heartbeat()) + + async def _send_heartbeat(self): + """发送心跳包""" + try: + await self.send_cmd_data(models.Command.HEARTBEAT, {}) + except (ConnectionResetError, aiohttp.ClientConnectionError) as e: + logger.warning('Plugin client _send_heartbeat() failed: %r', e) + except Exception: # noqa + logger.exception('Plugin client _send_heartbeat() failed:') + + def _on_ws_message(self, message: aiohttp.WSMessage): + """ + 收到WebSocket消息 + + :param message: WebSocket消息 + """ + if message.type != aiohttp.WSMsgType.TEXT: + logger.warning('Unknown websocket message type=%s, data=%s', message.type, message.data) + return + + try: + body = message.json() + self._handle_command(body) + except Exception: + logger.error('body=%s', message.data) + raise + + def _handle_command(self, command: dict): + """ + 处理业务消息 + + :param command: 业务消息 + """ + if self._handler is not None: + try: + self._handler.handle(self, command) + except Exception as e: + logger.exception('Plugin client _handle_command() failed, command=%s', command, exc_info=e) diff --git a/blcsdk/exc.py b/blcsdk/exc.py new file mode 100644 index 0000000..7c175fb --- /dev/null +++ b/blcsdk/exc.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +__all__ = ( + 'SdkError', + 'InitError', +) + + +class SdkError(Exception): + """SDK错误的基类""" + + +class InitError(SdkError): + """初始化失败""" diff --git a/blcsdk/handlers.py b/blcsdk/handlers.py new file mode 100644 index 0000000..d5950f1 --- /dev/null +++ b/blcsdk/handlers.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +from typing import * + +from . import client as cli +from . import models + +__all__ = ( + 'HandlerInterface', + 'BaseHandler', +) + + +class HandlerInterface: + """blivechat插件消息处理器接口""" + + def handle(self, client: cli.BlcPluginClient, command: dict): + raise NotImplementedError + + def on_client_stopped(self, client: cli.BlcPluginClient, exception: Optional[Exception]): + """ + 当客户端停止时调用 + + 这种情况说明blivechat已经退出了,或者插件被禁用了,因此重连基本会失败。这里唯一建议的操作是退出当前程序 + """ + + +def _make_msg_callback(method_name, message_cls): + def callback(self: 'BaseHandler', client: cli.BlcPluginClient, command: dict): + method = getattr(self, method_name) + msg = message_cls.from_command(command['data']) + extra = _get_extra(command) + return method(client, msg, extra) + return callback + + +def _get_extra(command: dict): + extra = command.get('extra', {}) + room_key_dict = extra.get('roomKey', None) + if room_key_dict is not None: + extra['roomKey'] = models.RoomKey( + type=models.RoomKeyType(room_key_dict['type']), + value=room_key_dict['value'], + ) + return extra + + +class BaseHandler(HandlerInterface): + """一个简单的消息处理器实现,带消息分发和消息类型转换。继承并重写_on_xxx方法即可实现自己的处理器""" + + _CMD_CALLBACK_DICT: Dict[ + int, + Optional[Callable[ + ['BaseHandler', cli.BlcPluginClient, dict], + Any + ]] + ] = { + # 收到弹幕 + models.Command.ADD_TEXT: _make_msg_callback('_on_add_text', models.AddTextMsg), + # 有人送礼 + models.Command.ADD_GIFT: _make_msg_callback('_on_add_gift', models.AddGiftMsg), + # 有人上舰 + models.Command.ADD_MEMBER: _make_msg_callback('_on_add_member', models.AddMemberMsg), + # 醒目留言 + models.Command.ADD_SUPER_CHAT: _make_msg_callback('_on_add_super_chat', models.AddSuperChatMsg), + # 删除醒目留言 + models.Command.DEL_SUPER_CHAT: _make_msg_callback('_on_del_super_chat', models.DelSuperChatMsg), + # 更新翻译 + models.Command.UPDATE_TRANSLATION: _make_msg_callback('_on_update_translation', models.UpdateTranslationMsg), + } + """cmd -> 处理回调""" + + def handle(self, client: cli.BlcPluginClient, command: dict): + cmd = command['cmd'] + callback = self._CMD_CALLBACK_DICT.get(cmd, None) + if callback is not None: + callback(self, client, command) + + def _on_add_text(self, client: cli.BlcPluginClient, message: models.AddTextMsg): + """收到弹幕""" + + def _on_add_gift(self, client: cli.BlcPluginClient, message: models.AddGiftMsg): + """有人送礼""" + + def _on_add_member(self, client: cli.BlcPluginClient, message: models.AddMemberMsg): + """有人上舰""" + + def _on_add_super_chat(self, client: cli.BlcPluginClient, message: models.AddSuperChatMsg): + """醒目留言""" + + def _on_del_super_chat(self, client: cli.BlcPluginClient, message: models.DelSuperChatMsg): + """删除醒目留言""" + + def _on_update_translation(self, client: cli.BlcPluginClient, message: models.UpdateTranslationMsg): + """更新翻译""" diff --git a/blcsdk/models.py b/blcsdk/models.py index 7411d31..ffba3d5 100644 --- a/blcsdk/models.py +++ b/blcsdk/models.py @@ -1,6 +1,259 @@ # -*- coding: utf-8 -*- +import dataclasses import enum +from typing import * + +__all__ = ( + 'RoomKeyType', + 'RoomKey', + 'Command', + 'AuthorType', + 'GuardLevel', + 'ContentType', + 'AddTextMsg', + 'AddGiftMsg', + 'AddMemberMsg', + 'AddSuperChatMsg', + 'DelSuperChatMsg', + 'UpdateTranslationMsg', +) + + +class RoomKeyType(enum.IntEnum): + ROOM_ID = 1 + AUTH_CODE = 2 + + +class RoomKey(NamedTuple): + """用来标识一个房间""" + type: RoomKeyType + value: Union[int, str] + + def __str__(self): + res = str(self.value) + if self.type == RoomKeyType.AUTH_CODE: + # 身份码要脱敏 + res = '***' + res[-3:] + return res + __repr__ = __str__ class Command(enum.IntEnum): HEARTBEAT = 0 + BLC_INIT = 1 + + ADD_TEXT = 20 + ADD_GIFT = 21 + ADD_MEMBER = 22 + ADD_SUPER_CHAT = 23 + DEL_SUPER_CHAT = 24 + UPDATE_TRANSLATION = 25 + + +class AuthorType(enum.IntEnum): + NORMAL = 0 + GUARD = 1 + """舰队""" + ADMIN = 2 + """房管""" + ROOM_OWNER = 3 + """主播""" + + +class GuardLevel(enum.IntEnum): + """舰队等级""" + + NONE = 0 + LV3 = 1 + """总督""" + LV2 = 2 + """提督""" + LV1 = 3 + """舰长""" + + +class ContentType(enum.IntEnum): + TEXT = 0 + EMOTICON = 1 + + +@dataclasses.dataclass +class AddTextMsg: + """弹幕消息""" + + avatar_url: str = '' + """用户头像URL""" + timestamp: int = 0 + """时间戳(秒)""" + author_name: str = '' + """用户名""" + author_type: int = AuthorType.NORMAL.value + """用户类型,见AuthorType""" + content: str = '' + """弹幕内容""" + privilege_type: int = GuardLevel.NONE.value + """舰队等级,见GuardLevel""" + is_gift_danmaku: bool = False + """是否礼物弹幕""" + author_level: int = 1 + """用户等级""" + is_newbie: bool = False + """是否正式会员""" + is_mobile_verified: bool = True + """是否绑定手机""" + medal_level: int = 0 + """勋章等级,如果没戴当前房间勋章则为0""" + id: str = '' + """消息ID""" + translation: str = '' + """弹幕内容翻译""" + content_type: int = ContentType.TEXT.value + """内容类型,见ContentType""" + content_type_params: Union[dict, list] = dataclasses.field(default_factory=dict) + """跟内容类型相关的参数""" + + @classmethod + def from_command(cls, data: list): + content_type = data[13] + content_type_params = data[14] + if content_type == ContentType.EMOTICON: + content_type_params = {'url': content_type_params[0]} + + return cls( + avatar_url=data[0], + timestamp=data[1], + author_name=data[2], + author_type=data[3], + content=data[4], + privilege_type=data[5], + is_gift_danmaku=bool(data[6]), + author_level=data[7], + is_newbie=bool(data[8]), + is_mobile_verified=bool(data[9]), + medal_level=data[10], + id=data[11], + translation=data[12], + content_type=content_type, + content_type_params=content_type_params, + ) + + +@dataclasses.dataclass +class AddGiftMsg: + """礼物消息""" + + id: str = '' + """消息ID""" + avatar_url: str = '' + """用户头像URL""" + timestamp: int = 0 + """时间戳(秒)""" + author_name: str = '' + """用户名""" + total_coin: int = 0 + """总价瓜子数,1000金瓜子 = 1元""" + gift_name: str = '' + """礼物名""" + num: int = 0 + """数量""" + + @classmethod + def from_command(cls, data: dict): + return cls( + id=data['id'], + avatar_url=data['avatarUrl'], + timestamp=data['timestamp'], + author_name=data['authorName'], + total_coin=data['totalCoin'], + gift_name=data['giftName'], + num=data['num'], + ) + + +@dataclasses.dataclass +class AddMemberMsg: + """上舰消息""" + + id: str = '' + """消息ID""" + avatar_url: str = '' + """用户头像URL""" + timestamp: int = 0 + """时间戳(秒)""" + author_name: str = '' + """用户名""" + privilege_type: int = GuardLevel.NONE.value + """舰队等级,见GuardLevel""" + + @classmethod + def from_command(cls, data: dict): + return cls( + id=data['id'], + avatar_url=data['avatarUrl'], + timestamp=data['timestamp'], + author_name=data['authorName'], + privilege_type=data['privilegeType'], + ) + + +@dataclasses.dataclass +class AddSuperChatMsg: + """醒目留言消息""" + + id: str = '' + """消息ID""" + avatar_url: str = '' + """用户头像URL""" + timestamp: int = 0 + """时间戳(秒)""" + author_name: str = '' + """用户名""" + price: int = 0 + """价格(元)""" + content: str = '' + """内容""" + translation: str = '' + """内容翻译""" + + @classmethod + def from_command(cls, data: dict): + return cls( + id=data['id'], + avatar_url=data['avatarUrl'], + timestamp=data['timestamp'], + author_name=data['authorName'], + price=data['price'], + content=data['content'], + translation=data['translation'], + ) + + +@dataclasses.dataclass +class DelSuperChatMsg: + """删除醒目留言消息""" + + ids: List[str] = dataclasses.field(default_factory=list) + """醒目留言ID数组""" + + @classmethod + def from_command(cls, data: dict): + return cls( + ids=data['ids'], + ) + + +@dataclasses.dataclass +class UpdateTranslationMsg: + """更新内容翻译消息""" + + id: str = '' + """消息ID""" + translation: str = '' + """内容翻译""" + + @classmethod + def from_command(cls, data: list): + return cls( + id=data[0], + translation=data[1], + ) diff --git a/main.py b/main.py old mode 100644 new mode 100755 index 2462340..b763f25 --- a/main.py +++ b/main.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python # -*- coding: utf-8 -*- import argparse import asyncio diff --git a/plugins/msg-logging/config.py b/plugins/msg-logging/config.py new file mode 100644 index 0000000..b0273fa --- /dev/null +++ b/plugins/msg-logging/config.py @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- +import os + +BASE_PATH = os.path.realpath(os.getcwd()) +LOG_PATH = os.path.join(BASE_PATH, 'log') diff --git a/plugins/msg-logging/listener.py b/plugins/msg-logging/listener.py new file mode 100644 index 0000000..f30a9ab --- /dev/null +++ b/plugins/msg-logging/listener.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +import __main__ +import logging +from typing import * + +import blcsdk +import blcsdk.models as sdk_models +from blcsdk import client as cli + +logger = logging.getLogger(__name__) + +_msg_handler: Optional['MsgHandler'] = None + + +async def init(): + global _msg_handler + _msg_handler = MsgHandler() + blcsdk.set_msg_handler(_msg_handler) + + +class MsgHandler(blcsdk.BaseHandler): + def on_client_stopped(self, client: cli.BlcPluginClient, exception: Optional[Exception]): + logger.info('blivechat disconnected') + __main__.start_shut_down() + + def _on_add_text(self, client: cli.BlcPluginClient, message: sdk_models.AddTextMsg): + """收到弹幕""" + + def _on_add_gift(self, client: cli.BlcPluginClient, message: sdk_models.AddGiftMsg): + """有人送礼""" + + def _on_add_member(self, client: cli.BlcPluginClient, message: sdk_models.AddMemberMsg): + """有人上舰""" + + def _on_add_super_chat(self, client: cli.BlcPluginClient, message: sdk_models.AddSuperChatMsg): + """醒目留言""" + + def _on_del_super_chat(self, client: cli.BlcPluginClient, message: sdk_models.DelSuperChatMsg): + """删除醒目留言""" + + def _on_update_translation(self, client: cli.BlcPluginClient, message: sdk_models.UpdateTranslationMsg): + """更新翻译""" diff --git a/plugins/msg-logging/main.py b/plugins/msg-logging/main.py old mode 100644 new mode 100755 index 4644440..3722063 --- a/plugins/msg-logging/main.py +++ b/plugins/msg-logging/main.py @@ -1,14 +1,85 @@ +#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio +import logging.handlers +import os +import signal import sys +from typing import * import blcsdk +import config +import listener + +logger = logging.getLogger('msg-logging') + +shut_down_event: Optional[asyncio.Event] = None async def main(): - print('hello world!', blcsdk.__version__) + try: + await init() + await run() + finally: + await shut_down() return 0 +async def init(): + init_signal_handlers() + + init_logging() + + await blcsdk.init() + if not blcsdk.is_sdk_version_compatible(): + raise RuntimeError('SDK version is not compatible') + + await listener.init() + + +def init_signal_handlers(): + global shut_down_event + shut_down_event = asyncio.Event() + + signums = (signal.SIGINT, signal.SIGTERM) + try: + loop = asyncio.get_running_loop() + for signum in signums: + loop.add_signal_handler(signum, start_shut_down) + except NotImplementedError: + # 不太安全,但Windows只能用这个 + for signum in signums: + signal.signal(signum, start_shut_down) + + +def start_shut_down(*_args): + shut_down_event.set() + + +def init_logging(): + filename = os.path.join(config.LOG_PATH, 'msg-logging.log') + stream_handler = logging.StreamHandler() + file_handler = logging.handlers.TimedRotatingFileHandler( + filename, encoding='utf-8', when='midnight', backupCount=7, delay=True + ) + logging.basicConfig( + format='{asctime} {levelname} [{name}]: {message}', + style='{', + level=logging.INFO, + # level=logging.DEBUG, + handlers=[stream_handler, file_handler], + ) + + +async def run(): + logger.info('Running event loop') + await shut_down_event.wait() + logger.info('Start to shut down') + + +async def shut_down(): + await blcsdk.shut_down() + + if __name__ == '__main__': sys.exit(asyncio.run(main())) diff --git a/services/plugin.py b/services/plugin.py index 8c56e37..5f86706 100644 --- a/services/plugin.py +++ b/services/plugin.py @@ -10,7 +10,10 @@ import subprocess from typing import * import api.plugin +import blcsdk +import blcsdk.models as sdk_models import config +import update logger = logging.getLogger(__name__) @@ -233,12 +236,20 @@ class Plugin: if self._client is client: return if self._client is not None: + logger.info('plugin=%s closing old client', self._id) self._client.close() self._client = client def on_client_connect(self, client: 'api.plugin.PluginWsHandler'): self._set_client(client) + # 发送初始化消息 + self.send_cmd_data(sdk_models.Command.BLC_INIT, { + 'blcVersion': update.VERSION, + 'sdkVersion': blcsdk.__version__, + 'pluginId': self._id, + }) + def on_client_close(self, client: 'api.plugin.PluginWsHandler'): if self._client is client: self._set_client(None)