修改客户端生命周期接口

This commit is contained in:
John Smith 2021-12-15 00:09:07 +08:00
parent fd9fdae6ef
commit 34d1a8a44a
3 changed files with 98 additions and 55 deletions

View File

@ -73,18 +73,13 @@ class BLiveClient:
self,
room_id,
uid=0,
session: aiohttp.ClientSession = None,
session: Optional[aiohttp.ClientSession] = None,
heartbeat_interval=30,
ssl: Union[bool, ssl_.SSLContext] = True,
loop: asyncio.BaseEventLoop = None,
loop: Optional[asyncio.BaseEventLoop] = None,
):
# 用来init_room的临时房间ID
# 用来init_room的临时房间ID可以用短ID
self._tmp_room_id = room_id
# 调用init_room后初始化
self._room_id = self._room_short_id = self._room_owner_uid = None
# [{host: "tx-bj4-live-comet-04.chat.bilibili.com", port: 2243, wss_port: 443, ws_port: 2244}, ...]
self._host_server_list = None
self._host_server_token = None
self._uid = uid
if loop is not None:
@ -93,7 +88,6 @@ class BLiveClient:
self._loop = session.loop # noqa
else:
self._loop = asyncio.get_event_loop()
self._future = None
if session is None:
self._session = aiohttp.ClientSession(loop=self._loop, timeout=aiohttp.ClientTimeout(total=10))
@ -102,43 +96,67 @@ class BLiveClient:
self._session = session
self._own_session = False
if self._session.loop is not self._loop: # noqa
raise RuntimeError('BLiveClient and session has to use same event loop')
raise RuntimeError('BLiveClient and session must use the same event loop')
self._heartbeat_interval = heartbeat_interval
self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa
self._websocket = None
self._heartbeat_timer_handle = None
# 消息处理器,可动态增删
self._handlers: List[handlers.HandlerInterface] = []
@property
def is_running(self):
return self._future is not None
# 在调用init_room后初始化的字段
# 真实房间ID
self._room_id = None
# 房间短ID没有则为0
self._room_short_id = None
# 主播用户ID
self._room_owner_uid = None
# 弹幕服务器列表
# [{host: "tx-bj4-live-comet-04.chat.bilibili.com", port: 2243, wss_port: 443, ws_port: 2244}, ...]
self._host_server_list: Optional[List[dict]] = None
# 连接弹幕服务器用的token
self._host_server_token = None
# 在运行时初始化的字段
# websocket连接
self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None
# 网络协程的future
self._network_future: Optional[asyncio.Future] = None
# 发心跳包定时器的handle
self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None
@property
def room_id(self):
def is_running(self) -> bool:
"""
本客户端正在运行注意调用stop后还没完全停止也算正在运行
"""
return self._network_future is not None
@property
def room_id(self) -> Optional[int]:
"""
房间ID调用init_room后初始化
"""
return self._room_id
@property
def room_short_id(self):
def room_short_id(self) -> Optional[int]:
"""
房间短ID没有则为0调用init_room后初始化
"""
return self._room_short_id
@property
def room_owner_uid(self):
def room_owner_uid(self) -> Optional[int]:
"""
主播ID调用init_room后初始化
主播用户ID调用init_room后初始化
"""
return self._room_owner_uid
def add_handler(self, handler: 'handlers.HandlerInterface'):
"""
添加消息处理器
:param handler: 消息处理器
"""
if handler not in self._handlers:
@ -147,6 +165,7 @@ class BLiveClient:
def remove_handler(self, handler: 'handlers.HandlerInterface'):
"""
移除消息处理器
:param handler: 消息处理器
"""
try:
@ -156,37 +175,60 @@ class BLiveClient:
def start(self):
"""
创建相关的协程
:return: 协程的future
启动本客户端
"""
if self._future is not None:
raise RuntimeError('This client is already running')
self._future = asyncio.ensure_future(self._message_loop(), loop=self._loop)
self._future.add_done_callback(self.__on_message_loop_done)
return self._future
if self._network_future is not None:
logger.warning('room %s 已经在运行中不能再次start', self.room_id)
return
def __on_message_loop_done(self, future):
self._future = None
logger.debug('room %s 消息协程结束', self.room_id)
self._network_future = asyncio.ensure_future(self._network_coroutine(), loop=self._loop)
self._network_future.add_done_callback(self.__on_network_coroutine_done)
def __on_network_coroutine_done(self, future):
self._network_future = None
logger.debug('room %s 网络协程结束', self.room_id)
exception = future.exception()
if exception is not None:
logger.exception('room %s 消息协程异常结束:', self.room_id,
exc_info=(type(exception), exception, exception.__traceback__))
exc_info = (type(exception), exception, exception.__traceback__)
logger.exception('room %s 网络协程异常结束:', self.room_id, exc_info=exc_info)
def stop(self):
"""
停止相关的协程
:return: 协程的future
停止本客户端
"""
if self._future is None:
raise RuntimeError('This client is not running')
self._future.cancel()
return self._future
if self._network_future is None:
logger.warning('room %s 已经停止不能再次stop', self.room_id)
return
self._network_future.cancel()
async def stop_and_close(self):
"""
停止本客户端并释放本客户端的资源调用后本客户端将不可用
"""
self.stop()
await self.join()
await self.close()
async def join(self):
"""
等待本客户端停止
"""
if self._network_future is None:
logger.warning('room %s 已经停止不能join', self.room_id)
return
await self._network_future
async def close(self):
"""
如果session是自己创建的则关闭session
释放本客户端的资源调用后本客户端将不可用
"""
if self._network_future is not None:
logger.warning('room %s 在运行状态中调用了close', self.room_id)
# 如果session是自己创建的则关闭session
if self._own_session:
await self._session.close()
@ -275,18 +317,18 @@ class BLiveClient:
async def _send_auth(self):
auth_params = {
'uid': self._uid,
'roomid': self._room_id,
'protover': 2,
'platform': 'web',
'uid': self._uid,
'roomid': self._room_id,
'protover': 2,
'platform': 'web',
'clientver': '1.14.3',
'type': 2
'type': 2
}
if self._host_server_token is not None:
auth_params['key'] = self._host_server_token
await self._websocket.send_bytes(self._make_packet(auth_params, Operation.AUTH))
async def _message_loop(self):
async def _network_coroutine(self):
# 如果之前未初始化则初始化
if self._host_server_token is None:
if not await self.init_room():

View File

@ -13,7 +13,7 @@ __all__ = (
logger = logging.getLogger('blivedm')
# 常见可忽略的cmd
FREQUENT_CMDS = (
IGNORED_CMDS = (
'INTERACT_WORD',
'ROOM_BANNER',
'ROOM_REAL_TIME_MESSAGE_UPDATE',
@ -55,7 +55,7 @@ class BaseHandler(HandlerInterface):
"""
def __heartbeat_callback(self, client: client_.BLiveClient, command: dict):
return self._on_popularity(client, models.HeartbeatMessage.from_command(command['data']))
return self._on_heartbeat(client, models.HeartbeatMessage.from_command(command['data']))
def __danmu_msg_callback(self, client: client_.BLiveClient, command: dict):
return self._on_danmaku(client, models.DanmakuMessage.from_command(command['info']))
@ -95,7 +95,7 @@ class BaseHandler(HandlerInterface):
'SUPER_CHAT_MESSAGE_DELETE': __super_chat_message_delete_callback,
}
# 忽略其他常见cmd
for cmd in FREQUENT_CMDS:
for cmd in IGNORED_CMDS:
_CMD_CALLBACK_DICT[cmd] = None
del cmd
@ -116,9 +116,9 @@ class BaseHandler(HandlerInterface):
if callback is not None:
await callback(self, client, command)
async def _on_popularity(self, client: client_.BLiveClient, message: models.HeartbeatMessage):
async def _on_heartbeat(self, client: client_.BLiveClient, message: models.HeartbeatMessage):
"""
收到人气值
收到心跳包人气值
"""
async def _on_danmaku(self, client: client_.BLiveClient, message: models.DanmakuMessage):

View File

@ -7,17 +7,17 @@ import blivedm
async def main():
# 直播间ID的取值看直播间URL
# 如果SSL验证失败就把ssl设为FalseB站真的有过忘续证书的情况
client = blivedm.BLiveClient(room_id=411318, ssl=True)
client = blivedm.BLiveClient(room_id=21449083, ssl=True)
handler = MyHandler()
client.add_handler(handler)
future = client.start()
client.start()
try:
# 5秒后停止测试用
# await asyncio.sleep(5)
# future = client.stop()
# client.stop()
await future
await client.join()
finally:
await client.close()
@ -28,10 +28,11 @@ class MyHandler(blivedm.BaseHandler):
# 入场消息回调
async def __interact_word_callback(self, client: blivedm.BLiveClient, command: dict):
print(f"self_type={type(self).__name__}, room_id={client.room_id}, uname={command['data']['uname']}")
print(f"INTERACT_WORD: self_type={type(self).__name__}, room_id={client.room_id},"
f" uname={command['data']['uname']}")
_CMD_CALLBACK_DICT['INTERACT_WORD'] = __interact_word_callback # noqa
async def _on_popularity(self, client: blivedm.BLiveClient, message: blivedm.HeartbeatMessage):
async def _on_heartbeat(self, client: blivedm.BLiveClient, message: blivedm.HeartbeatMessage):
print(f'当前人气值:{message.popularity}')
async def _on_danmaku(self, client: blivedm.BLiveClient, message: blivedm.DanmakuMessage):