mirror of
https://github.com/xfgryujk/blivechat.git
synced 2025-01-14 06:10:23 +08:00
224 lines
7.4 KiB
Python
224 lines
7.4 KiB
Python
# -*- coding: utf-8 -*-
|
||
import asyncio
|
||
import logging
|
||
from typing import *
|
||
|
||
import aiohttp
|
||
|
||
from . import handlers
|
||
from . import models
|
||
|
||
__all__ = (
|
||
'BlcPluginClient',
|
||
)
|
||
|
||
logger = logging.getLogger('blcsdk')
|
||
|
||
|
||
class BlcPluginClient:
|
||
"""
|
||
blivechat插件服务的客户端
|
||
|
||
:param ws_url: blivechat消息转发服务WebSocket地址
|
||
:param session: 连接池
|
||
:param heartbeat_interval: 发送心跳包的间隔时间(秒)
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
ws_url: str,
|
||
*,
|
||
session: Optional[aiohttp.ClientSession] = None,
|
||
heartbeat_interval: float = 30,
|
||
):
|
||
self._ws_url = ws_url
|
||
|
||
if session is None:
|
||
self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
|
||
self._own_session = True
|
||
else:
|
||
self._session = session
|
||
self._own_session = False
|
||
assert self._session.loop is asyncio.get_event_loop() # noqa
|
||
|
||
self._heartbeat_interval = heartbeat_interval
|
||
|
||
self._handler: Optional[handlers.HandlerInterface] = None
|
||
"""消息处理器"""
|
||
|
||
# 在运行时初始化的字段
|
||
self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None
|
||
"""WebSocket连接"""
|
||
self._network_future: Optional[asyncio.Future] = None
|
||
"""网络协程的future"""
|
||
self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None
|
||
"""发心跳包定时器的handle"""
|
||
|
||
@property
|
||
def is_running(self) -> bool:
|
||
"""本客户端正在运行,注意调用stop后还没完全停止也算正在运行"""
|
||
return self._network_future is not None
|
||
|
||
def set_handler(self, handler: Optional['handlers.HandlerInterface']):
|
||
"""
|
||
设置消息处理器
|
||
|
||
注意消息处理器和网络协程运行在同一个协程,如果处理消息耗时太长会阻塞接收消息。如果是CPU密集型的任务,建议将消息推到线程池处理;
|
||
如果是IO密集型的任务,应该使用async函数,并且在handler里使用create_task创建新的协程
|
||
|
||
:param handler: 消息处理器
|
||
"""
|
||
self._handler = handler
|
||
|
||
def start(self):
|
||
"""启动本客户端"""
|
||
if self.is_running:
|
||
logger.warning('Plugin client is running, cannot start() again')
|
||
return
|
||
|
||
self._network_future = asyncio.create_task(self._network_coroutine_wrapper())
|
||
|
||
def stop(self):
|
||
"""停止本客户端"""
|
||
if not self.is_running:
|
||
logger.warning('Plugin client is stopped, cannot stop() again')
|
||
return
|
||
|
||
self._network_future.cancel()
|
||
|
||
async def stop_and_close(self):
|
||
"""便利函数,停止本客户端并释放本客户端的资源,调用后本客户端将不可用"""
|
||
if self.is_running:
|
||
self.stop()
|
||
await self.join()
|
||
await self.close()
|
||
|
||
async def join(self):
|
||
"""等待本客户端停止"""
|
||
if not self.is_running:
|
||
logger.warning('Plugin client is stopped, cannot join()')
|
||
return
|
||
|
||
await asyncio.shield(self._network_future)
|
||
|
||
async def close(self):
|
||
"""释放本客户端的资源,调用后本客户端将不可用"""
|
||
if self.is_running:
|
||
logger.warning('Plugin is calling close(), but client is running')
|
||
|
||
# 如果session是自己创建的则关闭session
|
||
if self._own_session:
|
||
await self._session.close()
|
||
|
||
async def send_cmd_data(self, cmd: models.Command, data: dict):
|
||
"""
|
||
发送消息给服务器
|
||
|
||
:param cmd: 消息类型,见Command
|
||
:param data: 消息体JSON数据
|
||
"""
|
||
if self._websocket is None or self._websocket.closed:
|
||
raise ConnectionResetError('websocket is closed')
|
||
|
||
body = {'cmd': cmd, 'data': data}
|
||
await self._websocket.send_json(body)
|
||
|
||
async def _network_coroutine_wrapper(self):
|
||
"""负责处理网络协程的异常,网络协程具体逻辑在_network_coroutine里"""
|
||
exc = None
|
||
try:
|
||
await self._network_coroutine()
|
||
except asyncio.CancelledError:
|
||
# 正常停止
|
||
pass
|
||
except Exception as e:
|
||
logger.exception('_network_coroutine() finished with exception:')
|
||
exc = e
|
||
finally:
|
||
logger.debug('_network_coroutine() finished')
|
||
self._network_future = None
|
||
|
||
if self._handler is not None:
|
||
self._handler.on_client_stopped(self, exc)
|
||
|
||
async def _network_coroutine(self):
|
||
"""网络协程,负责连接服务器、接收消息、解包"""
|
||
try:
|
||
# 连接
|
||
async with self._session.ws_connect(
|
||
self._ws_url,
|
||
receive_timeout=self._heartbeat_interval + 5,
|
||
) as websocket:
|
||
self._websocket = websocket
|
||
await self._on_ws_connect()
|
||
|
||
# 处理消息
|
||
message: aiohttp.WSMessage
|
||
async for message in websocket:
|
||
self._on_ws_message(message)
|
||
finally:
|
||
self._websocket = None
|
||
await self._on_ws_close()
|
||
# 插件消息都是本地通信的,这里不可能是因为网络问题而掉线,所以不尝试重连
|
||
|
||
async def _on_ws_connect(self):
|
||
"""WebSocket连接成功"""
|
||
self._heartbeat_timer_handle = asyncio.get_running_loop().call_later(
|
||
self._heartbeat_interval, self._on_send_heartbeat
|
||
)
|
||
|
||
async def _on_ws_close(self):
|
||
"""WebSocket连接断开"""
|
||
if self._heartbeat_timer_handle is not None:
|
||
self._heartbeat_timer_handle.cancel()
|
||
self._heartbeat_timer_handle = None
|
||
|
||
def _on_send_heartbeat(self):
|
||
"""定时发送心跳包的回调"""
|
||
if self._websocket is None or self._websocket.closed:
|
||
self._heartbeat_timer_handle = None
|
||
return
|
||
|
||
self._heartbeat_timer_handle = asyncio.get_running_loop().call_later(
|
||
self._heartbeat_interval, self._on_send_heartbeat
|
||
)
|
||
asyncio.create_task(self._send_heartbeat())
|
||
|
||
async def _send_heartbeat(self):
|
||
"""发送心跳包"""
|
||
try:
|
||
await self.send_cmd_data(models.Command.HEARTBEAT, {})
|
||
except (ConnectionResetError, aiohttp.ClientConnectionError) as e:
|
||
logger.warning('Plugin client _send_heartbeat() failed: %r', e)
|
||
except Exception: # noqa
|
||
logger.exception('Plugin client _send_heartbeat() failed:')
|
||
|
||
def _on_ws_message(self, message: aiohttp.WSMessage):
|
||
"""
|
||
收到WebSocket消息
|
||
|
||
:param message: WebSocket消息
|
||
"""
|
||
if message.type != aiohttp.WSMsgType.TEXT:
|
||
logger.warning('Unknown websocket message type=%s, data=%s', message.type, message.data)
|
||
return
|
||
|
||
try:
|
||
body = message.json()
|
||
self._handle_command(body)
|
||
except Exception:
|
||
logger.error('body=%s', message.data)
|
||
raise
|
||
|
||
def _handle_command(self, command: dict):
|
||
"""
|
||
处理业务消息
|
||
|
||
:param command: 业务消息
|
||
"""
|
||
if self._handler is not None:
|
||
try:
|
||
self._handler.handle(self, command)
|
||
except Exception as e:
|
||
logger.exception('Plugin client _handle_command() failed, command=%s', command, exc_info=e)
|