2023-11-04 16:52:07 +08:00
|
|
|
|
# -*- coding: utf-8 -*-
|
2023-11-05 16:29:11 +08:00
|
|
|
|
import asyncio
|
|
|
|
|
import logging
|
|
|
|
|
from typing import *
|
|
|
|
|
|
|
|
|
|
import aiohttp
|
|
|
|
|
|
|
|
|
|
from . import handlers
|
|
|
|
|
from . import models
|
|
|
|
|
|
|
|
|
|
__all__ = (
|
|
|
|
|
'BlcPluginClient',
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger('blcsdk')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BlcPluginClient:
|
|
|
|
|
"""
|
|
|
|
|
blivechat插件服务的客户端
|
|
|
|
|
|
|
|
|
|
:param ws_url: blivechat消息转发服务WebSocket地址
|
|
|
|
|
:param session: 连接池
|
|
|
|
|
:param heartbeat_interval: 发送心跳包的间隔时间(秒)
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
ws_url: str,
|
|
|
|
|
*,
|
|
|
|
|
session: Optional[aiohttp.ClientSession] = None,
|
2024-03-12 23:52:17 +08:00
|
|
|
|
heartbeat_interval: float = 30,
|
2023-11-05 16:29:11 +08:00
|
|
|
|
):
|
|
|
|
|
self._ws_url = ws_url
|
|
|
|
|
|
|
|
|
|
if session is None:
|
|
|
|
|
self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
|
|
|
|
|
self._own_session = True
|
|
|
|
|
else:
|
|
|
|
|
self._session = session
|
|
|
|
|
self._own_session = False
|
|
|
|
|
assert self._session.loop is asyncio.get_event_loop() # noqa
|
|
|
|
|
|
|
|
|
|
self._heartbeat_interval = heartbeat_interval
|
|
|
|
|
|
|
|
|
|
self._handler: Optional[handlers.HandlerInterface] = None
|
|
|
|
|
"""消息处理器"""
|
|
|
|
|
|
|
|
|
|
# 在运行时初始化的字段
|
|
|
|
|
self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None
|
|
|
|
|
"""WebSocket连接"""
|
|
|
|
|
self._network_future: Optional[asyncio.Future] = None
|
|
|
|
|
"""网络协程的future"""
|
|
|
|
|
self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None
|
|
|
|
|
"""发心跳包定时器的handle"""
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def is_running(self) -> bool:
|
|
|
|
|
"""本客户端正在运行,注意调用stop后还没完全停止也算正在运行"""
|
|
|
|
|
return self._network_future is not None
|
|
|
|
|
|
|
|
|
|
def set_handler(self, handler: Optional['handlers.HandlerInterface']):
|
|
|
|
|
"""
|
|
|
|
|
设置消息处理器
|
|
|
|
|
|
|
|
|
|
注意消息处理器和网络协程运行在同一个协程,如果处理消息耗时太长会阻塞接收消息。如果是CPU密集型的任务,建议将消息推到线程池处理;
|
|
|
|
|
如果是IO密集型的任务,应该使用async函数,并且在handler里使用create_task创建新的协程
|
|
|
|
|
|
|
|
|
|
:param handler: 消息处理器
|
|
|
|
|
"""
|
|
|
|
|
self._handler = handler
|
|
|
|
|
|
|
|
|
|
def start(self):
|
|
|
|
|
"""启动本客户端"""
|
|
|
|
|
if self.is_running:
|
|
|
|
|
logger.warning('Plugin client is running, cannot start() again')
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
self._network_future = asyncio.create_task(self._network_coroutine_wrapper())
|
|
|
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
|
"""停止本客户端"""
|
|
|
|
|
if not self.is_running:
|
|
|
|
|
logger.warning('Plugin client is stopped, cannot stop() again')
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
self._network_future.cancel()
|
|
|
|
|
|
|
|
|
|
async def stop_and_close(self):
|
|
|
|
|
"""便利函数,停止本客户端并释放本客户端的资源,调用后本客户端将不可用"""
|
|
|
|
|
if self.is_running:
|
|
|
|
|
self.stop()
|
|
|
|
|
await self.join()
|
|
|
|
|
await self.close()
|
|
|
|
|
|
|
|
|
|
async def join(self):
|
|
|
|
|
"""等待本客户端停止"""
|
|
|
|
|
if not self.is_running:
|
|
|
|
|
logger.warning('Plugin client is stopped, cannot join()')
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
await asyncio.shield(self._network_future)
|
|
|
|
|
|
|
|
|
|
async def close(self):
|
|
|
|
|
"""释放本客户端的资源,调用后本客户端将不可用"""
|
|
|
|
|
if self.is_running:
|
|
|
|
|
logger.warning('Plugin is calling close(), but client is running')
|
|
|
|
|
|
|
|
|
|
# 如果session是自己创建的则关闭session
|
|
|
|
|
if self._own_session:
|
|
|
|
|
await self._session.close()
|
|
|
|
|
|
|
|
|
|
async def send_cmd_data(self, cmd: models.Command, data: dict):
|
|
|
|
|
"""
|
|
|
|
|
发送消息给服务器
|
|
|
|
|
|
|
|
|
|
:param cmd: 消息类型,见Command
|
|
|
|
|
:param data: 消息体JSON数据
|
|
|
|
|
"""
|
|
|
|
|
if self._websocket is None or self._websocket.closed:
|
|
|
|
|
raise ConnectionResetError('websocket is closed')
|
|
|
|
|
|
|
|
|
|
body = {'cmd': cmd, 'data': data}
|
|
|
|
|
await self._websocket.send_json(body)
|
|
|
|
|
|
|
|
|
|
async def _network_coroutine_wrapper(self):
|
|
|
|
|
"""负责处理网络协程的异常,网络协程具体逻辑在_network_coroutine里"""
|
|
|
|
|
exc = None
|
|
|
|
|
try:
|
|
|
|
|
await self._network_coroutine()
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
# 正常停止
|
|
|
|
|
pass
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.exception('_network_coroutine() finished with exception:')
|
|
|
|
|
exc = e
|
|
|
|
|
finally:
|
|
|
|
|
logger.debug('_network_coroutine() finished')
|
|
|
|
|
self._network_future = None
|
|
|
|
|
|
|
|
|
|
if self._handler is not None:
|
|
|
|
|
self._handler.on_client_stopped(self, exc)
|
|
|
|
|
|
|
|
|
|
async def _network_coroutine(self):
|
|
|
|
|
"""网络协程,负责连接服务器、接收消息、解包"""
|
|
|
|
|
try:
|
|
|
|
|
# 连接
|
|
|
|
|
async with self._session.ws_connect(
|
|
|
|
|
self._ws_url,
|
|
|
|
|
receive_timeout=self._heartbeat_interval + 5,
|
|
|
|
|
) as websocket:
|
|
|
|
|
self._websocket = websocket
|
|
|
|
|
await self._on_ws_connect()
|
|
|
|
|
|
|
|
|
|
# 处理消息
|
|
|
|
|
message: aiohttp.WSMessage
|
|
|
|
|
async for message in websocket:
|
|
|
|
|
self._on_ws_message(message)
|
|
|
|
|
finally:
|
|
|
|
|
self._websocket = None
|
|
|
|
|
await self._on_ws_close()
|
|
|
|
|
# 插件消息都是本地通信的,这里不可能是因为网络问题而掉线,所以不尝试重连
|
|
|
|
|
|
|
|
|
|
async def _on_ws_connect(self):
|
|
|
|
|
"""WebSocket连接成功"""
|
|
|
|
|
self._heartbeat_timer_handle = asyncio.get_running_loop().call_later(
|
|
|
|
|
self._heartbeat_interval, self._on_send_heartbeat
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
async def _on_ws_close(self):
|
|
|
|
|
"""WebSocket连接断开"""
|
|
|
|
|
if self._heartbeat_timer_handle is not None:
|
|
|
|
|
self._heartbeat_timer_handle.cancel()
|
|
|
|
|
self._heartbeat_timer_handle = None
|
|
|
|
|
|
|
|
|
|
def _on_send_heartbeat(self):
|
|
|
|
|
"""定时发送心跳包的回调"""
|
|
|
|
|
if self._websocket is None or self._websocket.closed:
|
|
|
|
|
self._heartbeat_timer_handle = None
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
self._heartbeat_timer_handle = asyncio.get_running_loop().call_later(
|
|
|
|
|
self._heartbeat_interval, self._on_send_heartbeat
|
|
|
|
|
)
|
|
|
|
|
asyncio.create_task(self._send_heartbeat())
|
|
|
|
|
|
|
|
|
|
async def _send_heartbeat(self):
|
|
|
|
|
"""发送心跳包"""
|
|
|
|
|
try:
|
|
|
|
|
await self.send_cmd_data(models.Command.HEARTBEAT, {})
|
|
|
|
|
except (ConnectionResetError, aiohttp.ClientConnectionError) as e:
|
|
|
|
|
logger.warning('Plugin client _send_heartbeat() failed: %r', e)
|
|
|
|
|
except Exception: # noqa
|
|
|
|
|
logger.exception('Plugin client _send_heartbeat() failed:')
|
|
|
|
|
|
|
|
|
|
def _on_ws_message(self, message: aiohttp.WSMessage):
|
|
|
|
|
"""
|
|
|
|
|
收到WebSocket消息
|
|
|
|
|
|
|
|
|
|
:param message: WebSocket消息
|
|
|
|
|
"""
|
|
|
|
|
if message.type != aiohttp.WSMsgType.TEXT:
|
|
|
|
|
logger.warning('Unknown websocket message type=%s, data=%s', message.type, message.data)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
body = message.json()
|
|
|
|
|
self._handle_command(body)
|
|
|
|
|
except Exception:
|
|
|
|
|
logger.error('body=%s', message.data)
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
def _handle_command(self, command: dict):
|
|
|
|
|
"""
|
|
|
|
|
处理业务消息
|
|
|
|
|
|
|
|
|
|
:param command: 业务消息
|
|
|
|
|
"""
|
|
|
|
|
if self._handler is not None:
|
|
|
|
|
try:
|
|
|
|
|
self._handler.handle(self, command)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.exception('Plugin client _handle_command() failed, command=%s', command, exc_info=e)
|