diff --git a/blivedm.py b/blivedm.py index 2586489..88a3876 100644 --- a/blivedm.py +++ b/blivedm.py @@ -1,189 +1,264 @@ -import asyncio -import struct +# -*- coding: utf-8 -*- + import json +import struct import sys +from asyncio import get_event_loop, gather, sleep, CancelledError +from collections import namedtuple +from enum import IntEnum +# noinspection PyProtectedMember +from ssl import _create_unverified_context + import aiohttp +import websockets +from websockets.exceptions import ConnectionClosed -class BaseDanmu(): - structer = struct.Struct('!I2H2I') +class Operation(IntEnum): + SEND_HEARTBEAT = 2 + POPULARITY = 3 + COMMAND = 5 + AUTH = 7 + RECV_HEARTBEAT = 8 - def __init__(self, room_id, area_id, client_session=None): - if client_session is None: - self.client = aiohttp.ClientSession() - else: - self.client = client_session - self.ws = None - self._area_id = area_id - self.room_id = room_id - # 建立连接过程中难以处理重设置房间问题 - self.lock_for_reseting_roomid_manually = asyncio.Lock() - self.task_main = None - self._waiting = None - self._closed = False - self._bytes_heartbeat = self._wrap_str(opt=2, body='') - - @property - def room_id(self): - return self._room_id - - @room_id.setter - def room_id(self, room_id): - self._room_id = room_id - str_conn_room = f'{{"uid":0,"roomid":{room_id},"protover":1,"platform":"web","clientver":"1.3.3"}}' - self._bytes_conn_room = self._wrap_str(opt=7, body=str_conn_room) - - def _wrap_str(self, opt, body, len_header=16, ver=1, seq=1): - remain_data = body.encode('utf-8') - len_data = len(remain_data) + len_header - header = self.structer.pack(len_data, len_header, ver, opt, seq) - data = header + remain_data - return data - async def _send_bytes(self, bytes_data): - try: - await self.ws.send_bytes(bytes_data) - except asyncio.CancelledError: - return False - except: - print(sys.exc_info()[0], sys.exc_info()[1]) +class BLiveClient: + ROOM_INIT_URL = 'https://api.live.bilibili.com/room/v1/Room/room_init' + WEBSOCKET_URL = 'wss://broadcastlv.chat.bilibili.com:2245/sub' + + HEADER_STRUCT = struct.Struct('>I2H2I') + HeaderTuple = namedtuple('HeaderTuple', ('total_len', 'header_len', 'proto_ver', 'operation', 'sequence')) + + def __init__(self, room_id, ssl=True, loop=None): + """ + :param room_id: URL中的房间ID + :param ssl: True表示用默认的SSLContext验证,False表示不验证,也可以传入SSLContext + :param loop: 协程事件循环 + """ + self._short_id = room_id + self._room_id = None + # 未登录 + self._uid = 0 + + self._ssl = ssl if ssl else _create_unverified_context() + self._websocket = None + + self._loop = loop or get_event_loop() + self._future = None + + def start(self): + """ + 创建相关的协程,不会执行事件循环 + :return: True表示成功创建协程,False表示之前创建的协程未结束 + """ + if self._future is not None: return False + self._future = gather( + self._message_loop(), + self._heartbeat_loop(), + loop=self._loop + ) + self._future.add_done_callback(self.__on_done) return True - async def _read_bytes(self): - bytes_data = None + def stop(self): + """ + 取消相关的协程,不会停止事件循环 + """ + if self._future is not None: + self._future.cancel() + + def __on_done(self, future): + self._future = None + self._on_stop(future.exception()) + + async def _get_room_id(self): try: - # 如果调用aiohttp的bytes read,none的时候,会raise exception - msg = await asyncio.wait_for(self.ws.receive(), timeout=35.0) - bytes_data = msg.data - except asyncio.TimeoutError: - print('# 由于心跳包30s一次,但是发现35内没有收到任何包,说明已经悄悄失联了,主动断开') - return None - except: - print(sys.exc_info()[0], sys.exc_info()[1]) - print('请联系开发者') - return None - - return bytes_data - - async def connect_ws(self): - try: - url = 'wss://broadcastlv.chat.bilibili.com:443/sub' - self.ws = await asyncio.wait_for(self.client.ws_connect(url), timeout=3) - except: - print("# 连接无法建立,请检查本地网络状况") - print(sys.exc_info()[0], sys.exc_info()[1]) - return False - print(f'{self._area_id}号弹幕监控已连接b站服务器') - return (await self._send_bytes(self._bytes_conn_room)) - - async def heart_beat(self): - try: - while True: - if not (await self._send_bytes(self._bytes_heartbeat)): - return - await asyncio.sleep(30) - except asyncio.CancelledError: - pass - - async def read_datas(self): + async with aiohttp.ClientSession(loop=self._loop) as session: + async with session.get(self.ROOM_INIT_URL, + params={'id': self._short_id}, + ssl=self._ssl) as res: + if res.status == 200: + data = await res.json() + if data['code'] == 0: + self._room_id = data['data']['room_id'] + else: + raise ConnectionAbortedError('获取房间ID失败:' + data['msg']) + else: + raise ConnectionAbortedError('获取房间ID失败:' + res.reason) + except Exception as e: + if not self._handle_error(e): + self._future.cancel() + raise + + def _make_packet(self, data, operation): + body = json.dumps(data).encode('utf-8') + header = self.HEADER_STRUCT.pack( + self.HEADER_STRUCT.size + len(body), + self.HEADER_STRUCT.size, + 1, + operation, + 1 + ) + return header + body + + async def _send_auth(self): + auth_params = { + 'uid': self._uid, + 'roomid': self._room_id, + 'protover': 1, + 'platform': 'web', + 'clientver': '1.4.0' + } + await self._websocket.send(self._make_packet(auth_params, Operation.AUTH)) + + async def _message_loop(self): + # 获取房间ID + if self._room_id is None: + await self._get_room_id() + while True: - datas = await self._read_bytes() - # 本函数对bytes进行相关操作,不特别声明,均为bytes - if datas is None: - return - data_l = 0 - len_datas = len(datas) - while data_l != len_datas: - # 每片data都分为header和body,data和data可能粘连 - # data_l == header_l && next_data_l = next_header_l - # ||header_l...header_r|body_l...body_r||next_data_l... - tuple_header = self.structer.unpack_from(datas[data_l:]) - len_data, len_header, ver, opt, seq = tuple_header - body_l = data_l + len_header - next_data_l = data_l + len_data - body = datas[body_l:next_data_l] - # 人气值(或者在线人数或者类似)以及心跳 - if opt == 3: - UserCount, = struct.unpack('!I', body) - print(f'弹幕心跳检测{self._area_id}') - pass - # cmd - elif opt == 5: - if not self.handle_danmu(body): - return - # 握手确认 - elif opt == 8: - print(f'{self._area_id}号弹幕监控进入房间({self._room_id})') - else: - print(datas[data_l:next_data_l]) + try: + # 连接 + async with websockets.connect(self.WEBSOCKET_URL, + ssl=self._ssl, + loop=self._loop) as websocket: + self._websocket = websocket + await self._send_auth() - data_l = next_data_l + # 处理消息 + async for message in websocket: + await self._handle_message(message) - # 待确认 - async def close_ws(self): - try: - await self.ws.close() - except: - print('请联系开发者', sys.exc_info()[0], sys.exc_info()[1]) - if not self.ws.closed: - print(f'请联系开发者 {self._area_id}号弹幕收尾模块状态{self.ws.closed}') - - def handle_danmu(self, body): - return True - - async def run_forever(self): - self._waiting = asyncio.Future() - while not self._closed: - print(f'正在启动{self._area_id}号弹幕姬') - - async with self.lock_for_reseting_roomid_manually: - if self._closed: + except CancelledError: + break + except ConnectionClosed: + self._websocket = None + # 重连 + print('掉线重连中', file=sys.stderr) + try: + await sleep(5) + except CancelledError: break - is_open = await self.connect_ws() - if not is_open: continue - self.task_main = asyncio.ensure_future(self.read_datas()) - task_heartbeat = asyncio.ensure_future(self.heart_beat()) - tasks = [self.task_main, task_heartbeat] - _, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - print(f'{self._area_id}号弹幕姬异常或主动断开,正在处理剩余信息') - if not task_heartbeat.done(): - task_heartbeat.cancel() - await self.close_ws() - await asyncio.wait(pending) - print(f'{self._area_id}号弹幕姬退出,剩余任务处理完毕') - self._waiting.set_result(True) - - async def reconnect(self, room_id): - async with self.lock_for_reseting_roomid_manually: - # not None是判断是否已经连接了的(重连过程中也可以处理) - if self.ws is not None: - await self.close_ws() - if self.task_main is not None: - await self.task_main - # 由于锁的存在,绝对不可能到达下一个的自动重连状态,这里是保证正确显示当前监控房间号 - self.room_id = room_id - print(f'{self._area_id}号弹幕姬已经切换房间({room_id})') - - async def close(self): - if not self._closed: - self._closed = True - async with self.lock_for_reseting_roomid_manually: - if self.ws is not None: - await self.close_ws() - if self._waiting is not None: - await self._waiting - return True - else: - return False - - -class DanmuPrinter(BaseDanmu): - def handle_danmu(self, body): - dic = json.loads(body.decode('utf-8')) - cmd = dic['cmd'] - if cmd == 'DANMU_MSG': - print(dic) - return True + except Exception as e: + if not self._handle_error(e): + self._future.cancel() + raise + continue + finally: + self._websocket = None + async def _heartbeat_loop(self): + while True: + try: + if self._websocket is None: + await sleep(0.5) + else: + await self._websocket.send(self._make_packet({}, Operation.SEND_HEARTBEAT)) + await sleep(30) + + except CancelledError: + break + except ConnectionClosed: + # 等待重连 + continue + except Exception as e: + if not self._handle_error(e): + self._future.cancel() + raise + continue + + async def _handle_message(self, message): + offset = 0 + while offset < len(message): + try: + header = self.HeaderTuple(*self.HEADER_STRUCT.unpack_from(message, offset)) + except struct.error: + break + + if header.operation == Operation.POPULARITY: + popularity = int.from_bytes(message[offset + self.HEADER_STRUCT.size: + offset + self.HEADER_STRUCT.size + 4], + 'big') + await self._on_get_popularity(popularity) + + elif header.operation == Operation.COMMAND: + body = message[offset + self.HEADER_STRUCT.size: offset + header.total_len] + body = json.loads(body.decode('utf-8')) + await self._handle_command(body) + + elif header.operation == Operation.RECV_HEARTBEAT: + await self._websocket.send(self._make_packet({}, Operation.SEND_HEARTBEAT)) + + else: + body = message[offset + self.HEADER_STRUCT.size: offset + header.total_len] + print('未知包类型:', header, body, file=sys.stderr) + + offset += header.total_len + + async def _handle_command(self, command): + if isinstance(command, list): + for one_command in command: + await self._handle_command(one_command) + return + + cmd = command['cmd'] + # print(command) + + if cmd == 'DANMU_MSG': # 收到弹幕 + await self._on_get_danmaku(command['info'][1], command['info'][2][1]) + + elif cmd == 'SEND_GIFT': # 送礼物 + pass + + elif cmd == 'WELCOME': # 欢迎 + pass + + elif cmd == 'WELCOME_GUARD': # 欢迎房管 + pass + + elif cmd == 'SYS_MSG': # 系统消息 + pass + + elif cmd == 'PREPARING': # 房主准备中 + pass + + elif cmd == 'LIVE': # 直播开始 + pass + + elif cmd == 'WISH_BOTTLE': # 许愿瓶? + pass + + else: + print('未知命令:', command, file=sys.stderr) + + async def _on_get_popularity(self, popularity): + """ + 获取到人气值 + :param popularity: 人气值 + """ + pass + + async def _on_get_danmaku(self, content, user_name): + """ + 获取到弹幕 + :param content: 弹幕内容 + :param user_name: 弹幕作者 + """ + pass + + def _on_stop(self, exc): + """ + 协程结束后被调用 + :param exc: 如果是异常结束则为异常,否则为None + """ + pass + + def _handle_error(self, exc): + """ + 处理异常时被调用 + :param exc: 异常 + :return: True表示异常被处理,False表示异常没被处理 + """ + return False diff --git a/sample.py b/sample.py index c4eb37e..96dfe35 100644 --- a/sample.py +++ b/sample.py @@ -1,24 +1,48 @@ # -*- coding: utf-8 -*- -import asyncio -from time import time -from blivedm import DanmuPrinter +import sys +from asyncio import get_event_loop +from ssl import SSLError -async def test1(): - connection = DanmuPrinter(23058, 0) - task_run = asyncio.ensure_future(connection.run_forever()) - await asyncio.sleep(30) - print(time(), 'closing') - await connection.close() - print(time(), 'closed') - await task_run - print(time(), 'all done') +from blivedm import BLiveClient + + +class MyBLiveClient(BLiveClient): + + async def _on_get_popularity(self, popularity): + print('当前人气值:', popularity) + + async def _on_get_danmaku(self, content, user_name): + print(user_name, '说:', content) + + def _on_stop(self, exc): + self._loop.stop() + + def _handle_error(self, exc): + print(exc, file=sys.stderr) + if isinstance(exc, SSLError): + print('SSL验证失败!', file=sys.stderr) + return False def main(): - loop = asyncio.get_event_loop() - loop.run_until_complete(test1()) - loop.close() + loop = get_event_loop() + + # 如果SSL验证失败就把第二个参数设为False + client = MyBLiveClient(139, True) + client.start() + + # 5秒后停止,测试用 + # loop.call_later(5, client.stop) + # 按Ctrl+C停止 + # import signal + # signal.signal(signal.SIGINT, lambda signum, frame: client.stop()) + + try: + loop.run_forever() + finally: + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() if __name__ == '__main__':