From 61e6825d4e5b1e252903b668c2b4f5d599266393 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sat, 25 Mar 2023 18:28:32 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=BC=E5=AE=B9Python=203.10?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- blivedm/client.py | 37 ++++++++++++++----------------------- sample.py | 4 ++-- 2 files changed, 16 insertions(+), 25 deletions(-) diff --git a/blivedm/client.py b/blivedm/client.py index 90e19cc..91bb4b9 100644 --- a/blivedm/client.py +++ b/blivedm/client.py @@ -85,7 +85,6 @@ class BLiveClient: :param session: cookie、连接池 :param heartbeat_interval: 发送心跳包的间隔时间(秒) :param ssl: True表示用默认的SSLContext验证,False表示不验证,也可以传入SSLContext - :param loop: 协程事件循环 """ def __init__( @@ -95,27 +94,18 @@ class BLiveClient: session: Optional[aiohttp.ClientSession] = None, heartbeat_interval=30, ssl: Union[bool, ssl_.SSLContext] = True, - loop: Optional[asyncio.BaseEventLoop] = None, ): # 用来init_room的临时房间ID,可以用短ID self._tmp_room_id = room_id self._uid = uid - if loop is not None: - self._loop = loop - elif session is not None: - self._loop = session.loop # noqa - else: - self._loop = asyncio.get_event_loop() - if session is None: - self._session = aiohttp.ClientSession(loop=self._loop, timeout=aiohttp.ClientTimeout(total=10)) + self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) self._own_session = True else: self._session = session self._own_session = False - if self._session.loop is not self._loop: # noqa - raise RuntimeError('BLiveClient and session must use the same event loop') + assert self._session.loop is asyncio.get_event_loop() # noqa self._heartbeat_interval = heartbeat_interval self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa @@ -202,7 +192,7 @@ class BLiveClient: logger.warning('room=%s client is running, cannot start() again', self.room_id) return - self._network_future = asyncio.ensure_future(self._network_coroutine_wrapper(), loop=self._loop) + self._network_future = asyncio.ensure_future(self._network_coroutine_wrapper()) def stop(self): """ @@ -422,14 +412,16 @@ class BLiveClient: # 准备重连 retry_count += 1 logger.warning('room=%d is reconnecting, retry_count=%d', self.room_id, retry_count) - await asyncio.sleep(1, loop=self._loop) + await asyncio.sleep(1) async def _on_ws_connect(self): """ websocket连接成功 """ await self._send_auth() - self._heartbeat_timer_handle = self._loop.call_later(self._heartbeat_interval, self._on_send_heartbeat) + self._heartbeat_timer_handle = asyncio.get_running_loop().call_later( + self._heartbeat_interval, self._on_send_heartbeat + ) async def _on_ws_close(self): """ @@ -462,8 +454,10 @@ class BLiveClient: self._heartbeat_timer_handle = None return - self._heartbeat_timer_handle = self._loop.call_later(self._heartbeat_interval, self._on_send_heartbeat) - asyncio.ensure_future(self._send_heartbeat(), loop=self._loop) + self._heartbeat_timer_handle = asyncio.get_running_loop().call_later( + self._heartbeat_interval, self._on_send_heartbeat + ) + asyncio.ensure_future(self._send_heartbeat()) async def _send_heartbeat(self): """ @@ -555,7 +549,7 @@ class BLiveClient: # 业务消息 if header.ver == ProtoVer.BROTLI: # 压缩过的先解压,为了避免阻塞网络线程,放在其他线程执行 - body = await self._loop.run_in_executor(None, brotli.decompress, body) + body = await asyncio.get_running_loop().run_in_executor(None, brotli.decompress, body) await self._parse_ws_message(body) elif header.ver == ProtoVer.NORMAL: # 没压缩过的直接反序列化,因为有万恶的GIL,这里不能并行避免阻塞 @@ -594,11 +588,8 @@ class BLiveClient: # 外部代码可能不能正常处理取消,所以这里加shield results = await asyncio.shield( asyncio.gather( - *(handler.handle(self, command) for handler in self._handlers), - loop=self._loop, - return_exceptions=True - ), - loop=self._loop + *(handler.handle(self, command) for handler in self._handlers), return_exceptions=True + ) ) for res in results: if isinstance(res, Exception): diff --git a/sample.py b/sample.py index ea3f51e..0ee567d 100644 --- a/sample.py +++ b/sample.py @@ -16,7 +16,7 @@ TEST_ROOM_IDS = [ async def main(): await run_single_client() - await run_multi_client() + await run_multi_clients() async def run_single_client(): @@ -40,7 +40,7 @@ async def run_single_client(): await client.stop_and_close() -async def run_multi_client(): +async def run_multi_clients(): """ 演示同时监听多个直播间 """