diff --git a/blivedm.py b/blivedm.py index 88a3876..529c377 100644 --- a/blivedm.py +++ b/blivedm.py @@ -1,264 +1,171 @@ -# -*- coding: utf-8 -*- - -import json +import asyncio import struct +import json import sys -from asyncio import get_event_loop, gather, sleep, CancelledError -from collections import namedtuple -from enum import IntEnum -# noinspection PyProtectedMember -from ssl import _create_unverified_context - import aiohttp -import websockets -from websockets.exceptions import ConnectionClosed -class Operation(IntEnum): - SEND_HEARTBEAT = 2 - POPULARITY = 3 - COMMAND = 5 - AUTH = 7 - RECV_HEARTBEAT = 8 +class BaseDanmu(): + structer = struct.Struct('!I2H2I') + def __init__(self, room_id, area_id, client_session=None): + if client_session is None: + self.client = aiohttp.ClientSession() + else: + self.client = client_session + self.ws = None + self._area_id = area_id + self.room_id = room_id + # 建立连接过程中难以处理重设置房间问题 + self.lock_for_reseting_roomid_manually = asyncio.Lock() + self.task_main = None + self._bytes_heartbeat = self._wrap_str(opt=2, body='') + + @property + def room_id(self): + return self._room_id + + @room_id.setter + def room_id(self, room_id): + self._room_id = room_id + str_conn_room = f'{{"uid":0,"roomid":{room_id},"protover":1,"platform":"web","clientver":"1.3.3"}}' + self._bytes_conn_room = self._wrap_str(opt=7, body=str_conn_room) + + def _wrap_str(self, opt, body, len_header=16, ver=1, seq=1): + remain_data = body.encode('utf-8') + len_data = len(remain_data) + len_header + header = self.structer.pack(len_data, len_header, ver, opt, seq) + data = header + remain_data + return data -class BLiveClient: - ROOM_INIT_URL = 'https://api.live.bilibili.com/room/v1/Room/room_init' - WEBSOCKET_URL = 'wss://broadcastlv.chat.bilibili.com:2245/sub' - - HEADER_STRUCT = struct.Struct('>I2H2I') - HeaderTuple = namedtuple('HeaderTuple', ('total_len', 'header_len', 'proto_ver', 'operation', 'sequence')) - - def __init__(self, room_id, ssl=True, loop=None): - """ - :param room_id: URL中的房间ID - :param ssl: True表示用默认的SSLContext验证,False表示不验证,也可以传入SSLContext - :param loop: 协程事件循环 - """ - self._short_id = room_id - self._room_id = None - # 未登录 - self._uid = 0 - - self._ssl = ssl if ssl else _create_unverified_context() - self._websocket = None - - self._loop = loop or get_event_loop() - self._future = None - - def start(self): - """ - 创建相关的协程,不会执行事件循环 - :return: True表示成功创建协程,False表示之前创建的协程未结束 - """ - if self._future is not None: + async def _send_bytes(self, bytes_data): + try: + await self.ws.send_bytes(bytes_data) + except asyncio.CancelledError: + return False + except: + print(sys.exc_info()[0], sys.exc_info()[1]) return False - self._future = gather( - self._message_loop(), - self._heartbeat_loop(), - loop=self._loop - ) - self._future.add_done_callback(self.__on_done) return True - def stop(self): - """ - 取消相关的协程,不会停止事件循环 - """ - if self._future is not None: - self._future.cancel() - - def __on_done(self, future): - self._future = None - self._on_stop(future.exception()) - - async def _get_room_id(self): + async def _read_bytes(self): + bytes_data = None try: - async with aiohttp.ClientSession(loop=self._loop) as session: - async with session.get(self.ROOM_INIT_URL, - params={'id': self._short_id}, - ssl=self._ssl) as res: - if res.status == 200: - data = await res.json() - if data['code'] == 0: - self._room_id = data['data']['room_id'] - else: - raise ConnectionAbortedError('获取房间ID失败:' + data['msg']) - else: - raise ConnectionAbortedError('获取房间ID失败:' + res.reason) - except Exception as e: - if not self._handle_error(e): - self._future.cancel() - raise - - def _make_packet(self, data, operation): - body = json.dumps(data).encode('utf-8') - header = self.HEADER_STRUCT.pack( - self.HEADER_STRUCT.size + len(body), - self.HEADER_STRUCT.size, - 1, - operation, - 1 - ) - return header + body - - async def _send_auth(self): - auth_params = { - 'uid': self._uid, - 'roomid': self._room_id, - 'protover': 1, - 'platform': 'web', - 'clientver': '1.4.0' - } - await self._websocket.send(self._make_packet(auth_params, Operation.AUTH)) - - async def _message_loop(self): - # 获取房间ID - if self._room_id is None: - await self._get_room_id() - + # 如果调用aiohttp的bytes read,none的时候,会raise exception + msg = await asyncio.wait_for(self.ws.receive(), timeout=35.0) + bytes_data = msg.data + except asyncio.TimeoutError: + print('# 由于心跳包30s一次,但是发现35内没有收到任何包,说明已经悄悄失联了,主动断开') + return None + except: + print(sys.exc_info()[0], sys.exc_info()[1]) + print('请联系开发者') + return None + + return bytes_data + + async def open(self): + try: + url = 'wss://broadcastlv.chat.bilibili.com:443/sub' + self.ws = await asyncio.wait_for(self.client.ws_connect(url), timeout=3) + except: + print("# 连接无法建立,请检查本地网络状况") + print(sys.exc_info()[0], sys.exc_info()[1]) + return False + print(f'{self._area_id}号弹幕监控已连接b站服务器') + return (await self._send_bytes(self._bytes_conn_room)) + + async def heart_beat(self): + try: + while True: + if not (await self._send_bytes(self._bytes_heartbeat)): + return + await asyncio.sleep(30) + except asyncio.CancelledError: + pass + + async def read_datas(self): while True: - try: - # 连接 - async with websockets.connect(self.WEBSOCKET_URL, - ssl=self._ssl, - loop=self._loop) as websocket: - self._websocket = websocket - await self._send_auth() - - # 处理消息 - async for message in websocket: - await self._handle_message(message) - - except CancelledError: - break - except ConnectionClosed: - self._websocket = None - # 重连 - print('掉线重连中', file=sys.stderr) - try: - await sleep(5) - except CancelledError: - break - continue - except Exception as e: - if not self._handle_error(e): - self._future.cancel() - raise - continue - finally: - self._websocket = None - - async def _heartbeat_loop(self): - while True: - try: - if self._websocket is None: - await sleep(0.5) + datas = await self._read_bytes() + # 本函数对bytes进行相关操作,不特别声明,均为bytes + if datas is None: + return + data_l = 0 + len_datas = len(datas) + while data_l != len_datas: + # 每片data都分为header和body,data和data可能粘连 + # data_l == header_l && next_data_l = next_header_l + # ||header_l...header_r|body_l...body_r||next_data_l... + tuple_header = self.structer.unpack_from(datas[data_l:]) + len_data, len_header, ver, opt, seq = tuple_header + body_l = data_l + len_header + next_data_l = data_l + len_data + body = datas[body_l:next_data_l] + # 人气值(或者在线人数或者类似)以及心跳 + if opt == 3: + # UserCount, = struct.unpack('!I', remain_data) + # printer.debug(f'弹幕心跳检测{self._area_id}') + pass + # cmd + elif opt == 5: + if not self.handle_danmu(body): + return + # 握手确认 + elif opt == 8: + print(f'{self._area_id}号弹幕监控进入房间({self._room_id})') else: - await self._websocket.send(self._make_packet({}, Operation.SEND_HEARTBEAT)) - await sleep(30) + print(datas[data_l:next_data_l]) - except CancelledError: - break - except ConnectionClosed: - # 等待重连 - continue - except Exception as e: - if not self._handle_error(e): - self._future.cancel() - raise + data_l = next_data_l + + # 待确认 + async def close(self): + try: + await self.ws.close() + except: + print('请联系开发者', sys.exc_info()[0], sys.exc_info()[1]) + if not self.ws.closed: + print(f'请联系开发者 {self._area_id}号弹幕收尾模块状态{self.ws.closed}') + + def handle_danmu(self, body): + return True + + async def run_forever(self): + while True: + print(f'正在启动{self._area_id}号弹幕姬') + + async with self.lock_for_reseting_roomid_manually: + is_open = await self.open() + if not is_open: continue + self.task_main = asyncio.ensure_future(self.read_datas()) + task_heartbeat = asyncio.ensure_future(self.heart_beat()) + tasks = [self.task_main, task_heartbeat] + _, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + print(f'{self._area_id}号弹幕姬异常或主动断开,正在处理剩余信息') + if not task_heartbeat.done(): + task_heartbeat.cancel() + await self.close() + await asyncio.wait(pending) + print(f'{self._area_id}号弹幕姬退出,剩余任务处理完毕') + + async def reconnect(self, room_id): + async with self.lock_for_reseting_roomid_manually: + # not None是判断是否已经连接了的(重连过程中也可以处理) + if self.ws is not None: + await self.close() + if self.task_main is not None: + await self.task_main + # 由于锁的存在,绝对不可能到达下一个的自动重连状态,这里是保证正确显示当前监控房间号 + self.room_id = room_id + print(f'{self._area_id}号弹幕姬已经切换房间({room_id})') + + +class DanmuPrinter(BaseDanmu): + def handle_danmu(self, body): + dic = json.loads(body.decode('utf-8')) + cmd = dic['cmd'] + if cmd == 'DANMU_MSG': + print(dic) + return True - async def _handle_message(self, message): - offset = 0 - while offset < len(message): - try: - header = self.HeaderTuple(*self.HEADER_STRUCT.unpack_from(message, offset)) - except struct.error: - break - - if header.operation == Operation.POPULARITY: - popularity = int.from_bytes(message[offset + self.HEADER_STRUCT.size: - offset + self.HEADER_STRUCT.size + 4], - 'big') - await self._on_get_popularity(popularity) - - elif header.operation == Operation.COMMAND: - body = message[offset + self.HEADER_STRUCT.size: offset + header.total_len] - body = json.loads(body.decode('utf-8')) - await self._handle_command(body) - - elif header.operation == Operation.RECV_HEARTBEAT: - await self._websocket.send(self._make_packet({}, Operation.SEND_HEARTBEAT)) - - else: - body = message[offset + self.HEADER_STRUCT.size: offset + header.total_len] - print('未知包类型:', header, body, file=sys.stderr) - - offset += header.total_len - - async def _handle_command(self, command): - if isinstance(command, list): - for one_command in command: - await self._handle_command(one_command) - return - - cmd = command['cmd'] - # print(command) - - if cmd == 'DANMU_MSG': # 收到弹幕 - await self._on_get_danmaku(command['info'][1], command['info'][2][1]) - - elif cmd == 'SEND_GIFT': # 送礼物 - pass - - elif cmd == 'WELCOME': # 欢迎 - pass - - elif cmd == 'WELCOME_GUARD': # 欢迎房管 - pass - - elif cmd == 'SYS_MSG': # 系统消息 - pass - - elif cmd == 'PREPARING': # 房主准备中 - pass - - elif cmd == 'LIVE': # 直播开始 - pass - - elif cmd == 'WISH_BOTTLE': # 许愿瓶? - pass - - else: - print('未知命令:', command, file=sys.stderr) - - async def _on_get_popularity(self, popularity): - """ - 获取到人气值 - :param popularity: 人气值 - """ - pass - - async def _on_get_danmaku(self, content, user_name): - """ - 获取到弹幕 - :param content: 弹幕内容 - :param user_name: 弹幕作者 - """ - pass - - def _on_stop(self, exc): - """ - 协程结束后被调用 - :param exc: 如果是异常结束则为异常,否则为None - """ - pass - - def _handle_error(self, exc): - """ - 处理异常时被调用 - :param exc: 异常 - :return: True表示异常被处理,False表示异常没被处理 - """ - return False diff --git a/sample.py b/sample.py index 96dfe35..38ea8fd 100644 --- a/sample.py +++ b/sample.py @@ -1,48 +1,13 @@ # -*- coding: utf-8 -*- -import sys from asyncio import get_event_loop -from ssl import SSLError - -from blivedm import BLiveClient - - -class MyBLiveClient(BLiveClient): - - async def _on_get_popularity(self, popularity): - print('当前人气值:', popularity) - - async def _on_get_danmaku(self, content, user_name): - print(user_name, '说:', content) - - def _on_stop(self, exc): - self._loop.stop() - - def _handle_error(self, exc): - print(exc, file=sys.stderr) - if isinstance(exc, SSLError): - print('SSL验证失败!', file=sys.stderr) - return False +from blivedm import DanmuPrinter def main(): loop = get_event_loop() - - # 如果SSL验证失败就把第二个参数设为False - client = MyBLiveClient(139, True) - client.start() - - # 5秒后停止,测试用 - # loop.call_later(5, client.stop) - # 按Ctrl+C停止 - # import signal - # signal.signal(signal.SIGINT, lambda signum, frame: client.stop()) - - try: - loop.run_forever() - finally: - loop.run_until_complete(loop.shutdown_asyncgens()) - loop.close() + loop.run_until_complete(DanmuPrinter(23058, 0).run_forever()) + loop.close() if __name__ == '__main__':