几乎完全重写

reset roomid支持;断线重连更好地解决;更方便的自定义;支持aiohttp的session,可以提高较多房间时的效率与性能
This commit is contained in:
yjqiang 2019-02-14 18:36:37 +08:00
parent 16faeadab8
commit 07bc699f2c
2 changed files with 158 additions and 286 deletions

View File

@ -1,264 +1,171 @@
# -*- coding: utf-8 -*- import asyncio
import json
import struct import struct
import json
import sys import sys
from asyncio import get_event_loop, gather, sleep, CancelledError
from collections import namedtuple
from enum import IntEnum
# noinspection PyProtectedMember
from ssl import _create_unverified_context
import aiohttp import aiohttp
import websockets
from websockets.exceptions import ConnectionClosed
class Operation(IntEnum): class BaseDanmu():
SEND_HEARTBEAT = 2 structer = struct.Struct('!I2H2I')
POPULARITY = 3
COMMAND = 5
AUTH = 7
RECV_HEARTBEAT = 8
def __init__(self, room_id, area_id, client_session=None):
if client_session is None:
self.client = aiohttp.ClientSession()
else:
self.client = client_session
self.ws = None
self._area_id = area_id
self.room_id = room_id
# 建立连接过程中难以处理重设置房间问题
self.lock_for_reseting_roomid_manually = asyncio.Lock()
self.task_main = None
self._bytes_heartbeat = self._wrap_str(opt=2, body='')
@property
def room_id(self):
return self._room_id
@room_id.setter
def room_id(self, room_id):
self._room_id = room_id
str_conn_room = f'{{"uid":0,"roomid":{room_id},"protover":1,"platform":"web","clientver":"1.3.3"}}'
self._bytes_conn_room = self._wrap_str(opt=7, body=str_conn_room)
def _wrap_str(self, opt, body, len_header=16, ver=1, seq=1):
remain_data = body.encode('utf-8')
len_data = len(remain_data) + len_header
header = self.structer.pack(len_data, len_header, ver, opt, seq)
data = header + remain_data
return data
class BLiveClient: async def _send_bytes(self, bytes_data):
ROOM_INIT_URL = 'https://api.live.bilibili.com/room/v1/Room/room_init' try:
WEBSOCKET_URL = 'wss://broadcastlv.chat.bilibili.com:2245/sub' await self.ws.send_bytes(bytes_data)
except asyncio.CancelledError:
HEADER_STRUCT = struct.Struct('>I2H2I') return False
HeaderTuple = namedtuple('HeaderTuple', ('total_len', 'header_len', 'proto_ver', 'operation', 'sequence')) except:
print(sys.exc_info()[0], sys.exc_info()[1])
def __init__(self, room_id, ssl=True, loop=None):
"""
:param room_id: URL中的房间ID
:param ssl: True表示用默认的SSLContext验证False表示不验证也可以传入SSLContext
:param loop: 协程事件循环
"""
self._short_id = room_id
self._room_id = None
# 未登录
self._uid = 0
self._ssl = ssl if ssl else _create_unverified_context()
self._websocket = None
self._loop = loop or get_event_loop()
self._future = None
def start(self):
"""
创建相关的协程不会执行事件循环
:return: True表示成功创建协程False表示之前创建的协程未结束
"""
if self._future is not None:
return False return False
self._future = gather(
self._message_loop(),
self._heartbeat_loop(),
loop=self._loop
)
self._future.add_done_callback(self.__on_done)
return True return True
def stop(self): async def _read_bytes(self):
""" bytes_data = None
取消相关的协程不会停止事件循环
"""
if self._future is not None:
self._future.cancel()
def __on_done(self, future):
self._future = None
self._on_stop(future.exception())
async def _get_room_id(self):
try: try:
async with aiohttp.ClientSession(loop=self._loop) as session: # 如果调用aiohttp的bytes readnone的时候会raise exception
async with session.get(self.ROOM_INIT_URL, msg = await asyncio.wait_for(self.ws.receive(), timeout=35.0)
params={'id': self._short_id}, bytes_data = msg.data
ssl=self._ssl) as res: except asyncio.TimeoutError:
if res.status == 200: print('# 由于心跳包30s一次但是发现35内没有收到任何包说明已经悄悄失联了主动断开')
data = await res.json() return None
if data['code'] == 0: except:
self._room_id = data['data']['room_id'] print(sys.exc_info()[0], sys.exc_info()[1])
else: print('请联系开发者')
raise ConnectionAbortedError('获取房间ID失败' + data['msg']) return None
else:
raise ConnectionAbortedError('获取房间ID失败' + res.reason) return bytes_data
except Exception as e:
if not self._handle_error(e): async def open(self):
self._future.cancel() try:
raise url = 'wss://broadcastlv.chat.bilibili.com:443/sub'
self.ws = await asyncio.wait_for(self.client.ws_connect(url), timeout=3)
def _make_packet(self, data, operation): except:
body = json.dumps(data).encode('utf-8') print("# 连接无法建立,请检查本地网络状况")
header = self.HEADER_STRUCT.pack( print(sys.exc_info()[0], sys.exc_info()[1])
self.HEADER_STRUCT.size + len(body), return False
self.HEADER_STRUCT.size, print(f'{self._area_id}号弹幕监控已连接b站服务器')
1, return (await self._send_bytes(self._bytes_conn_room))
operation,
1 async def heart_beat(self):
) try:
return header + body while True:
if not (await self._send_bytes(self._bytes_heartbeat)):
async def _send_auth(self): return
auth_params = { await asyncio.sleep(30)
'uid': self._uid, except asyncio.CancelledError:
'roomid': self._room_id, pass
'protover': 1,
'platform': 'web', async def read_datas(self):
'clientver': '1.4.0'
}
await self._websocket.send(self._make_packet(auth_params, Operation.AUTH))
async def _message_loop(self):
# 获取房间ID
if self._room_id is None:
await self._get_room_id()
while True: while True:
try: datas = await self._read_bytes()
# 连接 # 本函数对bytes进行相关操作不特别声明均为bytes
async with websockets.connect(self.WEBSOCKET_URL, if datas is None:
ssl=self._ssl, return
loop=self._loop) as websocket: data_l = 0
self._websocket = websocket len_datas = len(datas)
await self._send_auth() while data_l != len_datas:
# 每片data都分为header和bodydata和data可能粘连
# 处理消息 # data_l == header_l && next_data_l = next_header_l
async for message in websocket: # ||header_l...header_r|body_l...body_r||next_data_l...
await self._handle_message(message) tuple_header = self.structer.unpack_from(datas[data_l:])
len_data, len_header, ver, opt, seq = tuple_header
except CancelledError: body_l = data_l + len_header
break next_data_l = data_l + len_data
except ConnectionClosed: body = datas[body_l:next_data_l]
self._websocket = None # 人气值(或者在线人数或者类似)以及心跳
# 重连 if opt == 3:
print('掉线重连中', file=sys.stderr) # UserCount, = struct.unpack('!I', remain_data)
try: # printer.debug(f'弹幕心跳检测{self._area_id}')
await sleep(5) pass
except CancelledError: # cmd
break elif opt == 5:
continue if not self.handle_danmu(body):
except Exception as e: return
if not self._handle_error(e): # 握手确认
self._future.cancel() elif opt == 8:
raise print(f'{self._area_id}号弹幕监控进入房间({self._room_id}')
continue
finally:
self._websocket = None
async def _heartbeat_loop(self):
while True:
try:
if self._websocket is None:
await sleep(0.5)
else: else:
await self._websocket.send(self._make_packet({}, Operation.SEND_HEARTBEAT)) print(datas[data_l:next_data_l])
await sleep(30)
except CancelledError: data_l = next_data_l
break
except ConnectionClosed: # 待确认
# 等待重连 async def close(self):
continue try:
except Exception as e: await self.ws.close()
if not self._handle_error(e): except:
self._future.cancel() print('请联系开发者', sys.exc_info()[0], sys.exc_info()[1])
raise if not self.ws.closed:
print(f'请联系开发者 {self._area_id}号弹幕收尾模块状态{self.ws.closed}')
def handle_danmu(self, body):
return True
async def run_forever(self):
while True:
print(f'正在启动{self._area_id}号弹幕姬')
async with self.lock_for_reseting_roomid_manually:
is_open = await self.open()
if not is_open:
continue continue
self.task_main = asyncio.ensure_future(self.read_datas())
task_heartbeat = asyncio.ensure_future(self.heart_beat())
tasks = [self.task_main, task_heartbeat]
_, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f'{self._area_id}号弹幕姬异常或主动断开,正在处理剩余信息')
if not task_heartbeat.done():
task_heartbeat.cancel()
await self.close()
await asyncio.wait(pending)
print(f'{self._area_id}号弹幕姬退出,剩余任务处理完毕')
async def reconnect(self, room_id):
async with self.lock_for_reseting_roomid_manually:
# not None是判断是否已经连接了的(重连过程中也可以处理)
if self.ws is not None:
await self.close()
if self.task_main is not None:
await self.task_main
# 由于锁的存在,绝对不可能到达下一个的自动重连状态,这里是保证正确显示当前监控房间号
self.room_id = room_id
print(f'{self._area_id}号弹幕姬已经切换房间({room_id}')
class DanmuPrinter(BaseDanmu):
def handle_danmu(self, body):
dic = json.loads(body.decode('utf-8'))
cmd = dic['cmd']
if cmd == 'DANMU_MSG':
print(dic)
return True
async def _handle_message(self, message):
offset = 0
while offset < len(message):
try:
header = self.HeaderTuple(*self.HEADER_STRUCT.unpack_from(message, offset))
except struct.error:
break
if header.operation == Operation.POPULARITY:
popularity = int.from_bytes(message[offset + self.HEADER_STRUCT.size:
offset + self.HEADER_STRUCT.size + 4],
'big')
await self._on_get_popularity(popularity)
elif header.operation == Operation.COMMAND:
body = message[offset + self.HEADER_STRUCT.size: offset + header.total_len]
body = json.loads(body.decode('utf-8'))
await self._handle_command(body)
elif header.operation == Operation.RECV_HEARTBEAT:
await self._websocket.send(self._make_packet({}, Operation.SEND_HEARTBEAT))
else:
body = message[offset + self.HEADER_STRUCT.size: offset + header.total_len]
print('未知包类型:', header, body, file=sys.stderr)
offset += header.total_len
async def _handle_command(self, command):
if isinstance(command, list):
for one_command in command:
await self._handle_command(one_command)
return
cmd = command['cmd']
# print(command)
if cmd == 'DANMU_MSG': # 收到弹幕
await self._on_get_danmaku(command['info'][1], command['info'][2][1])
elif cmd == 'SEND_GIFT': # 送礼物
pass
elif cmd == 'WELCOME': # 欢迎
pass
elif cmd == 'WELCOME_GUARD': # 欢迎房管
pass
elif cmd == 'SYS_MSG': # 系统消息
pass
elif cmd == 'PREPARING': # 房主准备中
pass
elif cmd == 'LIVE': # 直播开始
pass
elif cmd == 'WISH_BOTTLE': # 许愿瓶?
pass
else:
print('未知命令:', command, file=sys.stderr)
async def _on_get_popularity(self, popularity):
"""
获取到人气值
:param popularity: 人气值
"""
pass
async def _on_get_danmaku(self, content, user_name):
"""
获取到弹幕
:param content: 弹幕内容
:param user_name: 弹幕作者
"""
pass
def _on_stop(self, exc):
"""
协程结束后被调用
:param exc: 如果是异常结束则为异常否则为None
"""
pass
def _handle_error(self, exc):
"""
处理异常时被调用
:param exc: 异常
:return: True表示异常被处理False表示异常没被处理
"""
return False

View File

@ -1,48 +1,13 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import sys
from asyncio import get_event_loop from asyncio import get_event_loop
from ssl import SSLError from blivedm import DanmuPrinter
from blivedm import BLiveClient
class MyBLiveClient(BLiveClient):
async def _on_get_popularity(self, popularity):
print('当前人气值:', popularity)
async def _on_get_danmaku(self, content, user_name):
print(user_name, '说:', content)
def _on_stop(self, exc):
self._loop.stop()
def _handle_error(self, exc):
print(exc, file=sys.stderr)
if isinstance(exc, SSLError):
print('SSL验证失败', file=sys.stderr)
return False
def main(): def main():
loop = get_event_loop() loop = get_event_loop()
loop.run_until_complete(DanmuPrinter(23058, 0).run_forever())
# 如果SSL验证失败就把第二个参数设为False loop.close()
client = MyBLiveClient(139, True)
client.start()
# 5秒后停止测试用
# loop.call_later(5, client.stop)
# 按Ctrl+C停止
# import signal
# signal.signal(signal.SIGINT, lambda signum, frame: client.stop())
try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
if __name__ == '__main__': if __name__ == '__main__':