diff --git a/blivedm/client.py b/blivedm/client.py index c76dd5f..2cf3035 100644 --- a/blivedm/client.py +++ b/blivedm/client.py @@ -73,18 +73,13 @@ class BLiveClient: self, room_id, uid=0, - session: aiohttp.ClientSession = None, + session: Optional[aiohttp.ClientSession] = None, heartbeat_interval=30, ssl: Union[bool, ssl_.SSLContext] = True, - loop: asyncio.BaseEventLoop = None, + loop: Optional[asyncio.BaseEventLoop] = None, ): - # 用来init_room的临时房间ID + # 用来init_room的临时房间ID,可以用短ID self._tmp_room_id = room_id - # 调用init_room后初始化 - self._room_id = self._room_short_id = self._room_owner_uid = None - # [{host: "tx-bj4-live-comet-04.chat.bilibili.com", port: 2243, wss_port: 443, ws_port: 2244}, ...] - self._host_server_list = None - self._host_server_token = None self._uid = uid if loop is not None: @@ -93,7 +88,6 @@ class BLiveClient: self._loop = session.loop # noqa else: self._loop = asyncio.get_event_loop() - self._future = None if session is None: self._session = aiohttp.ClientSession(loop=self._loop, timeout=aiohttp.ClientTimeout(total=10)) @@ -102,43 +96,67 @@ class BLiveClient: self._session = session self._own_session = False if self._session.loop is not self._loop: # noqa - raise RuntimeError('BLiveClient and session has to use same event loop') + raise RuntimeError('BLiveClient and session must use the same event loop') self._heartbeat_interval = heartbeat_interval self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa - self._websocket = None - self._heartbeat_timer_handle = None + # 消息处理器,可动态增删 self._handlers: List[handlers.HandlerInterface] = [] - @property - def is_running(self): - return self._future is not None + # 在调用init_room后初始化的字段 + # 真实房间ID + self._room_id = None + # 房间短ID,没有则为0 + self._room_short_id = None + # 主播用户ID + self._room_owner_uid = None + # 弹幕服务器列表 + # [{host: "tx-bj4-live-comet-04.chat.bilibili.com", port: 2243, wss_port: 443, ws_port: 2244}, ...] + self._host_server_list: Optional[List[dict]] = None + # 连接弹幕服务器用的token + self._host_server_token = None + + # 在运行时初始化的字段 + # websocket连接 + self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None + # 网络协程的future + self._network_future: Optional[asyncio.Future] = None + # 发心跳包定时器的handle + self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None @property - def room_id(self): + def is_running(self) -> bool: + """ + 本客户端正在运行,注意调用stop后还没完全停止也算正在运行 + """ + return self._network_future is not None + + @property + def room_id(self) -> Optional[int]: """ 房间ID,调用init_room后初始化 """ return self._room_id @property - def room_short_id(self): + def room_short_id(self) -> Optional[int]: """ 房间短ID,没有则为0,调用init_room后初始化 """ return self._room_short_id @property - def room_owner_uid(self): + def room_owner_uid(self) -> Optional[int]: """ - 主播ID,调用init_room后初始化 + 主播用户ID,调用init_room后初始化 """ return self._room_owner_uid def add_handler(self, handler: 'handlers.HandlerInterface'): """ 添加消息处理器 + :param handler: 消息处理器 """ if handler not in self._handlers: @@ -147,6 +165,7 @@ class BLiveClient: def remove_handler(self, handler: 'handlers.HandlerInterface'): """ 移除消息处理器 + :param handler: 消息处理器 """ try: @@ -156,37 +175,60 @@ class BLiveClient: def start(self): """ - 创建相关的协程 - :return: 协程的future + 启动本客户端 """ - if self._future is not None: - raise RuntimeError('This client is already running') - self._future = asyncio.ensure_future(self._message_loop(), loop=self._loop) - self._future.add_done_callback(self.__on_message_loop_done) - return self._future + if self._network_future is not None: + logger.warning('room %s 已经在运行中,不能再次start', self.room_id) + return - def __on_message_loop_done(self, future): - self._future = None - logger.debug('room %s 消息协程结束', self.room_id) + self._network_future = asyncio.ensure_future(self._network_coroutine(), loop=self._loop) + self._network_future.add_done_callback(self.__on_network_coroutine_done) + + def __on_network_coroutine_done(self, future): + self._network_future = None + + logger.debug('room %s 网络协程结束', self.room_id) exception = future.exception() if exception is not None: - logger.exception('room %s 消息协程异常结束:', self.room_id, - exc_info=(type(exception), exception, exception.__traceback__)) + exc_info = (type(exception), exception, exception.__traceback__) + logger.exception('room %s 网络协程异常结束:', self.room_id, exc_info=exc_info) def stop(self): """ - 停止相关的协程 - :return: 协程的future + 停止本客户端 """ - if self._future is None: - raise RuntimeError('This client is not running') - self._future.cancel() - return self._future + if self._network_future is None: + logger.warning('room %s 已经停止,不能再次stop', self.room_id) + return + + self._network_future.cancel() + + async def stop_and_close(self): + """ + 停止本客户端并释放本客户端的资源,调用后本客户端将不可用 + """ + self.stop() + await self.join() + await self.close() + + async def join(self): + """ + 等待本客户端停止 + """ + if self._network_future is None: + logger.warning('room %s 已经停止,不能join', self.room_id) + return + + await self._network_future async def close(self): """ - 如果session是自己创建的则关闭session + 释放本客户端的资源,调用后本客户端将不可用 """ + if self._network_future is not None: + logger.warning('room %s 在运行状态中调用了close', self.room_id) + + # 如果session是自己创建的则关闭session if self._own_session: await self._session.close() @@ -275,18 +317,18 @@ class BLiveClient: async def _send_auth(self): auth_params = { - 'uid': self._uid, - 'roomid': self._room_id, - 'protover': 2, - 'platform': 'web', + 'uid': self._uid, + 'roomid': self._room_id, + 'protover': 2, + 'platform': 'web', 'clientver': '1.14.3', - 'type': 2 + 'type': 2 } if self._host_server_token is not None: auth_params['key'] = self._host_server_token await self._websocket.send_bytes(self._make_packet(auth_params, Operation.AUTH)) - async def _message_loop(self): + async def _network_coroutine(self): # 如果之前未初始化则初始化 if self._host_server_token is None: if not await self.init_room(): diff --git a/blivedm/handlers.py b/blivedm/handlers.py index c3788af..3a27121 100644 --- a/blivedm/handlers.py +++ b/blivedm/handlers.py @@ -13,7 +13,7 @@ __all__ = ( logger = logging.getLogger('blivedm') # 常见可忽略的cmd -FREQUENT_CMDS = ( +IGNORED_CMDS = ( 'INTERACT_WORD', 'ROOM_BANNER', 'ROOM_REAL_TIME_MESSAGE_UPDATE', @@ -55,7 +55,7 @@ class BaseHandler(HandlerInterface): """ def __heartbeat_callback(self, client: client_.BLiveClient, command: dict): - return self._on_popularity(client, models.HeartbeatMessage.from_command(command['data'])) + return self._on_heartbeat(client, models.HeartbeatMessage.from_command(command['data'])) def __danmu_msg_callback(self, client: client_.BLiveClient, command: dict): return self._on_danmaku(client, models.DanmakuMessage.from_command(command['info'])) @@ -95,7 +95,7 @@ class BaseHandler(HandlerInterface): 'SUPER_CHAT_MESSAGE_DELETE': __super_chat_message_delete_callback, } # 忽略其他常见cmd - for cmd in FREQUENT_CMDS: + for cmd in IGNORED_CMDS: _CMD_CALLBACK_DICT[cmd] = None del cmd @@ -116,9 +116,9 @@ class BaseHandler(HandlerInterface): if callback is not None: await callback(self, client, command) - async def _on_popularity(self, client: client_.BLiveClient, message: models.HeartbeatMessage): + async def _on_heartbeat(self, client: client_.BLiveClient, message: models.HeartbeatMessage): """ - 收到人气值 + 收到心跳包(人气值) """ async def _on_danmaku(self, client: client_.BLiveClient, message: models.DanmakuMessage): diff --git a/sample.py b/sample.py index 9d870e7..cc421c7 100644 --- a/sample.py +++ b/sample.py @@ -7,17 +7,17 @@ import blivedm async def main(): # 直播间ID的取值看直播间URL # 如果SSL验证失败就把ssl设为False,B站真的有过忘续证书的情况 - client = blivedm.BLiveClient(room_id=411318, ssl=True) + client = blivedm.BLiveClient(room_id=21449083, ssl=True) handler = MyHandler() client.add_handler(handler) - future = client.start() + client.start() try: # 5秒后停止,测试用 # await asyncio.sleep(5) - # future = client.stop() + # client.stop() - await future + await client.join() finally: await client.close() @@ -28,10 +28,11 @@ class MyHandler(blivedm.BaseHandler): # 入场消息回调 async def __interact_word_callback(self, client: blivedm.BLiveClient, command: dict): - print(f"self_type={type(self).__name__}, room_id={client.room_id}, uname={command['data']['uname']}") + print(f"INTERACT_WORD: self_type={type(self).__name__}, room_id={client.room_id}," + f" uname={command['data']['uname']}") _CMD_CALLBACK_DICT['INTERACT_WORD'] = __interact_word_callback # noqa - async def _on_popularity(self, client: blivedm.BLiveClient, message: blivedm.HeartbeatMessage): + async def _on_heartbeat(self, client: blivedm.BLiveClient, message: blivedm.HeartbeatMessage): print(f'当前人气值:{message.popularity}') async def _on_danmaku(self, client: blivedm.BLiveClient, message: blivedm.DanmakuMessage):