From 8d7d63f1bb8811610df566ca4b7cabb17c3dad8d Mon Sep 17 00:00:00 2001 From: yjqiang <13307130285@fudan.edu.cn> Date: Sun, 17 Feb 2019 17:01:52 +0800 Subject: [PATCH] close MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit close的设计参考了aiohttp 的ws client,非常感谢 --- blivedm.py | 34 +++++++++++++++++++++++++--------- sample.py | 17 ++++++++++++++--- 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/blivedm.py b/blivedm.py index 529c377..95176c3 100644 --- a/blivedm.py +++ b/blivedm.py @@ -19,6 +19,8 @@ class BaseDanmu(): # 建立连接过程中难以处理重设置房间问题 self.lock_for_reseting_roomid_manually = asyncio.Lock() self.task_main = None + self._waiting = None + self._closed = False self._bytes_heartbeat = self._wrap_str(opt=2, body='') @property @@ -64,7 +66,7 @@ class BaseDanmu(): return bytes_data - async def open(self): + async def connect_ws(self): try: url = 'wss://broadcastlv.chat.bilibili.com:443/sub' self.ws = await asyncio.wait_for(self.client.ws_connect(url), timeout=3) @@ -103,8 +105,8 @@ class BaseDanmu(): body = datas[body_l:next_data_l] # 人气值(或者在线人数或者类似)以及心跳 if opt == 3: - # UserCount, = struct.unpack('!I', remain_data) - # printer.debug(f'弹幕心跳检测{self._area_id}') + UserCount, = struct.unpack('!I', body) + print(f'弹幕心跳检测{self._area_id}') pass # cmd elif opt == 5: @@ -119,7 +121,7 @@ class BaseDanmu(): data_l = next_data_l # 待确认 - async def close(self): + async def close_ws(self): try: await self.ws.close() except: @@ -131,11 +133,14 @@ class BaseDanmu(): return True async def run_forever(self): - while True: + self._waiting = asyncio.Future() + while not self._closed: print(f'正在启动{self._area_id}号弹幕姬') async with self.lock_for_reseting_roomid_manually: - is_open = await self.open() + if self._closed: + break + is_open = await self.connect_ws() if not is_open: continue self.task_main = asyncio.ensure_future(self.read_datas()) @@ -145,9 +150,10 @@ class BaseDanmu(): print(f'{self._area_id}号弹幕姬异常或主动断开,正在处理剩余信息') if not task_heartbeat.done(): task_heartbeat.cancel() - await self.close() + await self.close_ws() await asyncio.wait(pending) print(f'{self._area_id}号弹幕姬退出,剩余任务处理完毕') + self._waiting.set_result(True) async def reconnect(self, room_id): async with self.lock_for_reseting_roomid_manually: @@ -159,8 +165,18 @@ class BaseDanmu(): # 由于锁的存在,绝对不可能到达下一个的自动重连状态,这里是保证正确显示当前监控房间号 self.room_id = room_id print(f'{self._area_id}号弹幕姬已经切换房间({room_id})') - - + + async def close(self): + if not self._closed: + self._closed = True + await self.close_ws() + if self._waiting is not None: + await self._waiting + return True + else: + return False + + class DanmuPrinter(BaseDanmu): def handle_danmu(self, body): dic = json.loads(body.decode('utf-8')) diff --git a/sample.py b/sample.py index 38ea8fd..c4eb37e 100644 --- a/sample.py +++ b/sample.py @@ -1,12 +1,23 @@ # -*- coding: utf-8 -*- -from asyncio import get_event_loop +import asyncio +from time import time from blivedm import DanmuPrinter +async def test1(): + connection = DanmuPrinter(23058, 0) + task_run = asyncio.ensure_future(connection.run_forever()) + await asyncio.sleep(30) + print(time(), 'closing') + await connection.close() + print(time(), 'closed') + await task_run + print(time(), 'all done') + def main(): - loop = get_event_loop() - loop.run_until_complete(DanmuPrinter(23058, 0).run_forever()) + loop = asyncio.get_event_loop() + loop.run_until_complete(test1()) loop.close()