修改重连过程、简化一些代码

This commit is contained in:
John Smith 2019-02-20 14:12:11 +08:00
parent 27c51d1342
commit e0959dddfb
2 changed files with 54 additions and 95 deletions

View File

@ -40,8 +40,13 @@ class BLiveClient:
self._room_id = None
self._uid = uid
self._loop = loop or asyncio.get_event_loop()
self._future = None
if loop is not None:
self._loop = loop
elif session is not None:
self._loop = session.loop
else:
self._loop = asyncio.get_event_loop()
self._is_running = False
if session is None:
self._session = aiohttp.ClientSession(loop=self._loop)
@ -54,6 +59,10 @@ class BLiveClient:
self._ssl = ssl if ssl else _create_unverified_context()
self._websocket = None
@property
def is_running(self):
return self._is_running
async def close(self):
"""
如果session是自己创建的则关闭session
@ -61,49 +70,28 @@ class BLiveClient:
if self._own_session:
await self._session.close()
def start(self):
def run(self):
"""
创建相关的协程不会执行事件循环
:return: True表示成功创建协程False表示之前创建的协程未结束
:return: 协程的future
"""
if self._future is not None:
return False
self._future = asyncio.gather(
self._message_loop(),
self._heartbeat_loop(),
loop=self._loop
)
self._future.add_done_callback(self.__on_done)
return True
def stop(self):
"""
取消相关的协程不会停止事件循环
"""
if self._future is not None:
self._future.cancel()
def __on_done(self, future):
self._future = None
self._on_stop(future.exception())
if self._is_running:
raise RuntimeError('This client is already running')
self._is_running = True
return asyncio.ensure_future(self._message_loop(), loop=self._loop)
async def _get_room_id(self):
try:
async with self._session.get(self.ROOM_INIT_URL,
params={'id': self._short_id},
ssl=self._ssl) as res:
if res.status == 200:
data = await res.json()
if data['code'] == 0:
self._room_id = data['data']['room_id']
else:
raise ConnectionAbortedError('获取房间ID失败' + data['msg'])
async with self._session.get(self.ROOM_INIT_URL,
params={'id': self._short_id},
ssl=self._ssl) as res:
if res.status == 200:
data = await res.json()
if data['code'] == 0:
self._room_id = data['data']['room_id']
else:
raise ConnectionAbortedError('获取房间ID失败' + res.reason)
except Exception as e:
if not self._handle_error(e):
self._future.cancel()
raise
raise ConnectionAbortedError('获取房间ID失败' + data['msg'])
else:
raise ConnectionAbortedError('获取房间ID失败' + res.reason)
def _make_packet(self, data, operation):
body = json.dumps(data).encode('utf-8')
@ -132,12 +120,14 @@ class BLiveClient:
await self._get_room_id()
while True:
heartbeat_future = None
try:
# 连接
async with self._session.ws_connect(self.WEBSOCKET_URL,
ssl=self._ssl) as websocket:
self._websocket = websocket
await self._send_auth()
heartbeat_future = asyncio.ensure_future(self._heartbeat_loop(), loop=self._loop)
# 处理消息
async for message in websocket: # type: aiohttp.WSMessage
@ -149,41 +139,31 @@ class BLiveClient:
except asyncio.CancelledError:
break
except aiohttp.ClientConnectorError:
self._websocket = None
# 重连
print('掉线重连中', file=sys.stderr)
try:
await asyncio.sleep(5)
except asyncio.CancelledError:
break
continue
except Exception as e:
if not self._handle_error(e):
self._future.cancel()
raise
continue
finally:
if heartbeat_future is not None:
heartbeat_future.cancel()
try:
await heartbeat_future
except asyncio.CancelledError:
break
self._websocket = None
self._is_running = False
async def _heartbeat_loop(self):
while True:
try:
if self._websocket is None:
await asyncio.sleep(0.5)
else:
await self._websocket.send_bytes(self._make_packet({}, Operation.SEND_HEARTBEAT))
await asyncio.sleep(30)
await self._websocket.send_bytes(self._make_packet({}, Operation.SEND_HEARTBEAT))
await asyncio.sleep(30)
except asyncio.CancelledError:
except (asyncio.CancelledError, aiohttp.ClientConnectorError):
break
except aiohttp.ClientConnectorError:
# 等待重连
continue
except Exception as e:
if not self._handle_error(e):
self._future.cancel()
raise
continue
async def _handle_message(self, message):
offset = 0
@ -270,11 +250,3 @@ class BLiveClient:
:param exc: 如果是异常结束则为异常否则为None
"""
pass
def _handle_error(self, exc):
"""
处理异常时被调用
:param exc: 异常
:return: True表示异常被处理False表示异常没被处理
"""
return False

View File

@ -1,8 +1,6 @@
# -*- coding: utf-8 -*-
import asyncio
import sys
from ssl import SSLError
from blivedm import BLiveClient
@ -15,37 +13,26 @@ class MyBLiveClient(BLiveClient):
async def _on_get_danmaku(self, content, user_name):
print(user_name, '说:', content)
def _on_stop(self, exc):
# 执行self.close然后关闭事件循环
asyncio.ensure_future(
self.close(), loop=self._loop
).add_done_callback(
lambda future: self._loop.stop()
)
def _handle_error(self, exc):
print(exc, file=sys.stderr)
if isinstance(exc, SSLError):
print('SSL验证失败', file=sys.stderr)
return False
async def async_main():
# 139是黑桐谷歌的直播间
# 如果SSL验证失败就把第二个参数设为False
client = MyBLiveClient(139, True)
future = client.run()
try:
# 5秒后停止测试用
# await asyncio.sleep(5)
# future.cancel()
await future
finally:
await client.close()
def main():
loop = asyncio.get_event_loop()
# 139是黑桐谷歌的直播间
# 如果SSL验证失败就把第二个参数设为False
client = MyBLiveClient(139, True)
client.start()
# 5秒后停止测试用
# loop.call_later(5, client.stop)
# 按Ctrl+C停止
# import signal
# signal.signal(signal.SIGINT, lambda signum, frame: client.stop())
try:
loop.run_forever()
loop.run_until_complete(async_main())
finally:
loop.close()