From 16faeadab807083401f470bfab6f78b9d5d6fa18 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 3 Jun 2018 14:06:00 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E9=94=99=E8=AF=AF=E5=A4=84?= =?UTF-8?q?=E7=90=86=E5=87=BD=E6=95=B0=E3=80=81=E4=BD=BF=E7=94=A8aiohttp?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- blivedm.py | 123 ++++++++++++++++++++++++++++++----------------- requirements.txt | 2 +- sample.py | 19 ++++++-- 3 files changed, 96 insertions(+), 48 deletions(-) diff --git a/blivedm.py b/blivedm.py index 061efb5..88a3876 100644 --- a/blivedm.py +++ b/blivedm.py @@ -2,13 +2,14 @@ import json import struct -from asyncio import gather, sleep, CancelledError +import sys +from asyncio import get_event_loop, gather, sleep, CancelledError from collections import namedtuple from enum import IntEnum # noinspection PyProtectedMember from ssl import _create_unverified_context -import requests +import aiohttp import websockets from websockets.exceptions import ConnectionClosed @@ -36,54 +37,59 @@ class BLiveClient: """ self._short_id = room_id self._room_id = None - self._ssl = ssl if ssl else _create_unverified_context() - self._websocket = None # 未登录 self._uid = 0 - self._loop = loop + self._ssl = ssl if ssl else _create_unverified_context() + self._websocket = None + + self._loop = loop or get_event_loop() self._future = None def start(self): """ 创建相关的协程,不会执行事件循环 - :raise ConnectionError: 获取房间ID失败 :return: True表示成功创建协程,False表示之前创建的协程未结束 """ if self._future is not None: return False - - # 获取房间ID - if self._room_id is None: - res = requests.get(self.ROOM_INIT_URL, {'id': self._short_id}) - if res.status_code != 200: - raise ConnectionError() - else: - self._room_id = res.json()['data']['room_id'] - - # 创建协程 self._future = gather( self._message_loop(), self._heartbeat_loop(), loop=self._loop ) - - def on_done(_future): - self._future = None - self._future.add_done_callback(on_done) + self._future.add_done_callback(self.__on_done) return True - def stop(self, callback=None): + def stop(self): """ 取消相关的协程,不会停止事件循环 - :param callback: 协程结束后调用 """ - if self._future is None or self._future.cancelled(): - return + if self._future is not None: + self._future.cancel() - if callback is not None: - self._future.add_done_callback(lambda future: callback()) - self._future.cancel() + def __on_done(self, future): + self._future = None + self._on_stop(future.exception()) + + async def _get_room_id(self): + try: + async with aiohttp.ClientSession(loop=self._loop) as session: + async with session.get(self.ROOM_INIT_URL, + params={'id': self._short_id}, + ssl=self._ssl) as res: + if res.status == 200: + data = await res.json() + if data['code'] == 0: + self._room_id = data['data']['room_id'] + else: + raise ConnectionAbortedError('获取房间ID失败:' + data['msg']) + else: + raise ConnectionAbortedError('获取房间ID失败:' + res.reason) + except Exception as e: + if not self._handle_error(e): + self._future.cancel() + raise def _make_packet(self, data, operation): body = json.dumps(data).encode('utf-8') @@ -96,11 +102,27 @@ class BLiveClient: ) return header + body + async def _send_auth(self): + auth_params = { + 'uid': self._uid, + 'roomid': self._room_id, + 'protover': 1, + 'platform': 'web', + 'clientver': '1.4.0' + } + await self._websocket.send(self._make_packet(auth_params, Operation.AUTH)) + async def _message_loop(self): + # 获取房间ID + if self._room_id is None: + await self._get_room_id() + while True: try: # 连接 - async with websockets.connect(self.WEBSOCKET_URL, ssl=self._ssl, loop=self._loop) as websocket: + async with websockets.connect(self.WEBSOCKET_URL, + ssl=self._ssl, + loop=self._loop) as websocket: self._websocket = websocket await self._send_auth() @@ -113,25 +135,20 @@ class BLiveClient: except ConnectionClosed: self._websocket = None # 重连 - print('掉线重连中') + print('掉线重连中', file=sys.stderr) try: await sleep(5) except CancelledError: break continue + except Exception as e: + if not self._handle_error(e): + self._future.cancel() + raise + continue finally: self._websocket = None - async def _send_auth(self): - auth_params = { - 'uid': self._uid, - 'roomid': self._room_id, - 'protover': 1, - 'platform': 'web', - 'clientver': '1.4.0' - } - await self._websocket.send(self._make_packet(auth_params, Operation.AUTH)) - async def _heartbeat_loop(self): while True: try: @@ -146,6 +163,11 @@ class BLiveClient: except ConnectionClosed: # 等待重连 continue + except Exception as e: + if not self._handle_error(e): + self._future.cancel() + raise + continue async def _handle_message(self, message): offset = 0 @@ -157,8 +179,8 @@ class BLiveClient: if header.operation == Operation.POPULARITY: popularity = int.from_bytes(message[offset + self.HEADER_STRUCT.size: - offset + self.HEADER_STRUCT.size + 4] - , 'big') + offset + self.HEADER_STRUCT.size + 4], + 'big') await self._on_get_popularity(popularity) elif header.operation == Operation.COMMAND: @@ -171,7 +193,7 @@ class BLiveClient: else: body = message[offset + self.HEADER_STRUCT.size: offset + header.total_len] - print('未知包类型:', header, body) + print('未知包类型:', header, body, file=sys.stderr) offset += header.total_len @@ -209,7 +231,7 @@ class BLiveClient: pass else: - print('未知命令:', command) + print('未知命令:', command, file=sys.stderr) async def _on_get_popularity(self, popularity): """ @@ -225,3 +247,18 @@ class BLiveClient: :param user_name: 弹幕作者 """ pass + + def _on_stop(self, exc): + """ + 协程结束后被调用 + :param exc: 如果是异常结束则为异常,否则为None + """ + pass + + def _handle_error(self, exc): + """ + 处理异常时被调用 + :param exc: 异常 + :return: True表示异常被处理,False表示异常没被处理 + """ + return False diff --git a/requirements.txt b/requirements.txt index 296ec3e..47db4db 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -requests==2.18.4 +aiohttp==3.2.1 websockets==4.0.1 diff --git a/sample.py b/sample.py index f894359..96dfe35 100644 --- a/sample.py +++ b/sample.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- +import sys from asyncio import get_event_loop +from ssl import SSLError from blivedm import BLiveClient @@ -13,19 +15,28 @@ class MyBLiveClient(BLiveClient): async def _on_get_danmaku(self, content, user_name): print(user_name, '说:', content) + def _on_stop(self, exc): + self._loop.stop() + + def _handle_error(self, exc): + print(exc, file=sys.stderr) + if isinstance(exc, SSLError): + print('SSL验证失败!', file=sys.stderr) + return False + def main(): loop = get_event_loop() - # 如果SSL验证失败或连接卡死就把第二个参数设为False - client = MyBLiveClient(139, True, loop) + # 如果SSL验证失败就把第二个参数设为False + client = MyBLiveClient(139, True) client.start() # 5秒后停止,测试用 - # loop.call_later(5, client.stop, loop.stop) + # loop.call_later(5, client.stop) # 按Ctrl+C停止 # import signal - # signal.signal(signal.SIGINT, lambda signum, frame: client.stop(loop.stop)) + # signal.signal(signal.SIGINT, lambda signum, frame: client.stop()) try: loop.run_forever()