diff --git a/blivedm.py b/blivedm.py index 69ba24a..adde68e 100644 --- a/blivedm.py +++ b/blivedm.py @@ -2,6 +2,7 @@ import json import struct +from asyncio import gather, sleep, CancelledError from collections import namedtuple from enum import IntEnum @@ -24,7 +25,7 @@ class BLiveClient: HEADER_STRUCT = struct.Struct('>I2H2I') HeaderTuple = namedtuple('HeaderTuple', ('total_len', 'header_len', 'proto_ver', 'operation', 'sequence')) - def __init__(self, room_id): + def __init__(self, room_id, loop): """ :param room_id: URL中的房间ID """ @@ -34,7 +35,10 @@ class BLiveClient: # 未登录 self._uid = 0 - async def start(self): + self._loop = loop + self._future = None + + def start(self): # 获取房间ID if self._room_id is None: res = requests.get(self.ROOM_INIT_URL, {'id': self._short_id}) @@ -43,14 +47,17 @@ class BLiveClient: else: self._room_id = res.json()['data']['room_id'] - # 连接 - async with websockets.connect(self.WEBSOCKET_URL) as websocket: - self._websocket = websocket - await self._send_auth() + self._future = gather( + self._message_loop(), + self._heartbeat_loop() + ) + self._loop.run_until_complete(self._future) - # 处理消息 - async for message in websocket: - await self._handle_message(message) + def stop(self): + if self._future is None: + return + self._future.cancel() + self._future = None def _make_packet(self, data, operation): body = json.dumps(data).encode('utf-8') @@ -63,6 +70,21 @@ class BLiveClient: ) return header + body + async def _message_loop(self): + try: + # 连接 + async with websockets.connect(self.WEBSOCKET_URL) as websocket: + self._websocket = websocket + await self._send_auth() + + # 处理消息 + async for message in websocket: + await self._handle_message(message) + except CancelledError: + pass + finally: + self._websocket = None + async def _send_auth(self): auth_params = { 'uid': self._uid, @@ -73,9 +95,16 @@ class BLiveClient: } await self._websocket.send(self._make_packet(auth_params, Operation.AUTH)) - async def _send_heartbeat(self): - self._websocket.send(self._make_packet({}, Operation.SEND_HEARTBEAT)) - # TODO 每30s调用 + async def _heartbeat_loop(self): + try: + while True: + if self._websocket is None: + await sleep(0.5) + else: + await self._websocket.send(self._make_packet({}, Operation.SEND_HEARTBEAT)) + await sleep(30) + except CancelledError: + pass async def _handle_message(self, message): offset = 0 @@ -96,8 +125,8 @@ class BLiveClient: body = json.loads(body.decode('utf-8')) await self._handle_command(body) - elif header.operation == Operation.RECV_HEARTBEAT: - await self._send_heartbeat() + # elif header.operation == Operation.RECV_HEARTBEAT: + # pass offset += header.total_len @@ -110,19 +139,25 @@ class BLiveClient: cmd = command['cmd'] # print(command) - if cmd == 'DANMU_MSG': # 收到弹幕 + if cmd == 'DANMU_MSG': # 收到弹幕 await self._on_get_danmaku(command['info'][1], command['info'][2][1]) - elif cmd == 'SEND_GIFT': # 送礼物 + elif cmd == 'SEND_GIFT': # 送礼物 pass - elif cmd == 'WELCOME': # 欢迎 + elif cmd == 'WELCOME': # 欢迎 pass - elif cmd == 'PREPARING': # 房主准备中 + elif cmd == 'SYS_MSG': # 系统消息 pass - elif cmd == 'LIVE': # 直播开始 + elif cmd == 'PREPARING': # 房主准备中 + pass + + elif cmd == 'LIVE': # 直播开始 + pass + + elif cmd == 'WISH_BOTTLE': # 许愿瓶? pass else: diff --git a/sample.py b/sample.py index 4e88be2..b2e0814 100644 --- a/sample.py +++ b/sample.py @@ -15,8 +15,11 @@ class MyBLiveClient(BLiveClient): def main(): - client = MyBLiveClient(6) - get_event_loop().run_until_complete(client.start()) + loop = get_event_loop() + client = MyBLiveClient(6, loop) + # loop.call_later(5, lambda: client.stop()) + client.start() + loop.close() if __name__ == '__main__':