close的设计参考了aiohttp 的ws client,非常感谢
This commit is contained in:
yjqiang 2019-02-17 17:01:52 +08:00
parent 07bc699f2c
commit 8d7d63f1bb
2 changed files with 39 additions and 12 deletions

View File

@ -19,6 +19,8 @@ class BaseDanmu():
# 建立连接过程中难以处理重设置房间问题
self.lock_for_reseting_roomid_manually = asyncio.Lock()
self.task_main = None
self._waiting = None
self._closed = False
self._bytes_heartbeat = self._wrap_str(opt=2, body='')
@property
@ -64,7 +66,7 @@ class BaseDanmu():
return bytes_data
async def open(self):
async def connect_ws(self):
try:
url = 'wss://broadcastlv.chat.bilibili.com:443/sub'
self.ws = await asyncio.wait_for(self.client.ws_connect(url), timeout=3)
@ -103,8 +105,8 @@ class BaseDanmu():
body = datas[body_l:next_data_l]
# 人气值(或者在线人数或者类似)以及心跳
if opt == 3:
# UserCount, = struct.unpack('!I', remain_data)
# printer.debug(f'弹幕心跳检测{self._area_id}')
UserCount, = struct.unpack('!I', body)
print(f'弹幕心跳检测{self._area_id}')
pass
# cmd
elif opt == 5:
@ -119,7 +121,7 @@ class BaseDanmu():
data_l = next_data_l
# 待确认
async def close(self):
async def close_ws(self):
try:
await self.ws.close()
except:
@ -131,11 +133,14 @@ class BaseDanmu():
return True
async def run_forever(self):
while True:
self._waiting = asyncio.Future()
while not self._closed:
print(f'正在启动{self._area_id}号弹幕姬')
async with self.lock_for_reseting_roomid_manually:
is_open = await self.open()
if self._closed:
break
is_open = await self.connect_ws()
if not is_open:
continue
self.task_main = asyncio.ensure_future(self.read_datas())
@ -145,9 +150,10 @@ class BaseDanmu():
print(f'{self._area_id}号弹幕姬异常或主动断开,正在处理剩余信息')
if not task_heartbeat.done():
task_heartbeat.cancel()
await self.close()
await self.close_ws()
await asyncio.wait(pending)
print(f'{self._area_id}号弹幕姬退出,剩余任务处理完毕')
self._waiting.set_result(True)
async def reconnect(self, room_id):
async with self.lock_for_reseting_roomid_manually:
@ -159,8 +165,18 @@ class BaseDanmu():
# 由于锁的存在,绝对不可能到达下一个的自动重连状态,这里是保证正确显示当前监控房间号
self.room_id = room_id
print(f'{self._area_id}号弹幕姬已经切换房间({room_id}')
async def close(self):
if not self._closed:
self._closed = True
await self.close_ws()
if self._waiting is not None:
await self._waiting
return True
else:
return False
class DanmuPrinter(BaseDanmu):
def handle_danmu(self, body):
dic = json.loads(body.decode('utf-8'))

View File

@ -1,12 +1,23 @@
# -*- coding: utf-8 -*-
from asyncio import get_event_loop
import asyncio
from time import time
from blivedm import DanmuPrinter
async def test1():
connection = DanmuPrinter(23058, 0)
task_run = asyncio.ensure_future(connection.run_forever())
await asyncio.sleep(30)
print(time(), 'closing')
await connection.close()
print(time(), 'closed')
await task_run
print(time(), 'all done')
def main():
loop = get_event_loop()
loop.run_until_complete(DanmuPrinter(23058, 0).run_forever())
loop = asyncio.get_event_loop()
loop.run_until_complete(test1())
loop.close()