diff --git a/blivedm.py b/blivedm.py index 88a3876..bd1043a 100644 --- a/blivedm.py +++ b/blivedm.py @@ -1,17 +1,15 @@ # -*- coding: utf-8 -*- +import asyncio import json import struct import sys -from asyncio import get_event_loop, gather, sleep, CancelledError from collections import namedtuple from enum import IntEnum # noinspection PyProtectedMember from ssl import _create_unverified_context import aiohttp -import websockets -from websockets.exceptions import ConnectionClosed class Operation(IntEnum): @@ -29,22 +27,39 @@ class BLiveClient: HEADER_STRUCT = struct.Struct('>I2H2I') HeaderTuple = namedtuple('HeaderTuple', ('total_len', 'header_len', 'proto_ver', 'operation', 'sequence')) - def __init__(self, room_id, ssl=True, loop=None): + def __init__(self, room_id, ssl=True, loop=None, session: aiohttp.ClientSession=None, + uid=0): """ :param room_id: URL中的房间ID :param ssl: True表示用默认的SSLContext验证,False表示不验证,也可以传入SSLContext :param loop: 协程事件循环 + :param session: cookie、连接池 + :param uid: B站用户ID,0表示未登录 """ self._short_id = room_id self._room_id = None - # 未登录 - self._uid = 0 + self._uid = uid + self._loop = loop or asyncio.get_event_loop() + self._future = None + + if session is None: + self._session = aiohttp.ClientSession(loop=self._loop) + self._own_session = True + else: + self._session = session + self._own_session = False + if self._session.loop is not self._loop: + raise RuntimeError('BLiveClient and session has to use same event loop') self._ssl = ssl if ssl else _create_unverified_context() self._websocket = None - self._loop = loop or get_event_loop() - self._future = None + async def close(self): + """ + 如果session是自己创建的则关闭session + """ + if self._own_session: + await self._session.close() def start(self): """ @@ -53,7 +68,7 @@ class BLiveClient: """ if self._future is not None: return False - self._future = gather( + self._future = asyncio.gather( self._message_loop(), self._heartbeat_loop(), loop=self._loop @@ -74,18 +89,17 @@ class BLiveClient: async def _get_room_id(self): try: - async with aiohttp.ClientSession(loop=self._loop) as session: - async with session.get(self.ROOM_INIT_URL, - params={'id': self._short_id}, - ssl=self._ssl) as res: - if res.status == 200: - data = await res.json() - if data['code'] == 0: - self._room_id = data['data']['room_id'] - else: - raise ConnectionAbortedError('获取房间ID失败:' + data['msg']) + async with self._session.get(self.ROOM_INIT_URL, + params={'id': self._short_id}, + ssl=self._ssl) as res: + if res.status == 200: + data = await res.json() + if data['code'] == 0: + self._room_id = data['data']['room_id'] else: - raise ConnectionAbortedError('获取房间ID失败:' + res.reason) + raise ConnectionAbortedError('获取房间ID失败:' + data['msg']) + else: + raise ConnectionAbortedError('获取房间ID失败:' + res.reason) except Exception as e: if not self._handle_error(e): self._future.cancel() @@ -110,7 +124,7 @@ class BLiveClient: 'platform': 'web', 'clientver': '1.4.0' } - await self._websocket.send(self._make_packet(auth_params, Operation.AUTH)) + await self._websocket.send_bytes(self._make_packet(auth_params, Operation.AUTH)) async def _message_loop(self): # 获取房间ID @@ -120,25 +134,27 @@ class BLiveClient: while True: try: # 连接 - async with websockets.connect(self.WEBSOCKET_URL, - ssl=self._ssl, - loop=self._loop) as websocket: + async with self._session.ws_connect(self.WEBSOCKET_URL, + ssl=self._ssl) as websocket: self._websocket = websocket await self._send_auth() # 处理消息 - async for message in websocket: - await self._handle_message(message) + async for message in websocket: # type: aiohttp.WSMessage + if message.type == aiohttp.WSMsgType.BINARY: + await self._handle_message(message.data) + else: + print('未知的websocket消息:', message.type, message.data) - except CancelledError: + except asyncio.CancelledError: break - except ConnectionClosed: + except aiohttp.ClientConnectorError: self._websocket = None # 重连 print('掉线重连中', file=sys.stderr) try: - await sleep(5) - except CancelledError: + await asyncio.sleep(5) + except asyncio.CancelledError: break continue except Exception as e: @@ -153,14 +169,14 @@ class BLiveClient: while True: try: if self._websocket is None: - await sleep(0.5) + await asyncio.sleep(0.5) else: - await self._websocket.send(self._make_packet({}, Operation.SEND_HEARTBEAT)) - await sleep(30) + await self._websocket.send_bytes(self._make_packet({}, Operation.SEND_HEARTBEAT)) + await asyncio.sleep(30) - except CancelledError: + except asyncio.CancelledError: break - except ConnectionClosed: + except aiohttp.ClientConnectorError: # 等待重连 continue except Exception as e: @@ -189,7 +205,7 @@ class BLiveClient: await self._handle_command(body) elif header.operation == Operation.RECV_HEARTBEAT: - await self._websocket.send(self._make_packet({}, Operation.SEND_HEARTBEAT)) + await self._websocket.send_bytes(self._make_packet({}, Operation.SEND_HEARTBEAT)) else: body = message[offset + self.HEADER_STRUCT.size: offset + header.total_len] diff --git a/requirements.txt b/requirements.txt index 47db4db..72d6259 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1 @@ -aiohttp==3.2.1 -websockets==4.0.1 +aiohttp==3.5.4 diff --git a/sample.py b/sample.py index 96dfe35..b6f8ff7 100644 --- a/sample.py +++ b/sample.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- +import asyncio import sys -from asyncio import get_event_loop from ssl import SSLError from blivedm import BLiveClient @@ -16,7 +16,12 @@ class MyBLiveClient(BLiveClient): print(user_name, '说:', content) def _on_stop(self, exc): - self._loop.stop() + # 执行self.close,然后关闭事件循环 + asyncio.ensure_future( + self.close(), loop=self._loop + ).add_done_callback( + lambda future: self._loop.stop() + ) def _handle_error(self, exc): print(exc, file=sys.stderr) @@ -26,8 +31,9 @@ class MyBLiveClient(BLiveClient): def main(): - loop = get_event_loop() + loop = asyncio.get_event_loop() + # 139是黑桐谷歌的直播间 # 如果SSL验证失败就把第二个参数设为False client = MyBLiveClient(139, True) client.start() @@ -41,7 +47,6 @@ def main(): try: loop.run_forever() finally: - loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()