Revert "几乎完全重写"

This reverts commit 07bc699f2c.
This commit is contained in:
John Smith 2019-02-19 23:15:00 +08:00
parent 356a59d54a
commit 0091ed9a77
2 changed files with 286 additions and 187 deletions

View File

@ -1,189 +1,264 @@
import asyncio # -*- coding: utf-8 -*-
import struct
import json import json
import struct
import sys import sys
from asyncio import get_event_loop, gather, sleep, CancelledError
from collections import namedtuple
from enum import IntEnum
# noinspection PyProtectedMember
from ssl import _create_unverified_context
import aiohttp import aiohttp
import websockets
from websockets.exceptions import ConnectionClosed
class BaseDanmu(): class Operation(IntEnum):
structer = struct.Struct('!I2H2I') SEND_HEARTBEAT = 2
POPULARITY = 3
COMMAND = 5
AUTH = 7
RECV_HEARTBEAT = 8
def __init__(self, room_id, area_id, client_session=None):
if client_session is None:
self.client = aiohttp.ClientSession()
else:
self.client = client_session
self.ws = None
self._area_id = area_id
self.room_id = room_id
# 建立连接过程中难以处理重设置房间问题
self.lock_for_reseting_roomid_manually = asyncio.Lock()
self.task_main = None
self._waiting = None
self._closed = False
self._bytes_heartbeat = self._wrap_str(opt=2, body='')
@property
def room_id(self):
return self._room_id
@room_id.setter
def room_id(self, room_id):
self._room_id = room_id
str_conn_room = f'{{"uid":0,"roomid":{room_id},"protover":1,"platform":"web","clientver":"1.3.3"}}'
self._bytes_conn_room = self._wrap_str(opt=7, body=str_conn_room)
def _wrap_str(self, opt, body, len_header=16, ver=1, seq=1):
remain_data = body.encode('utf-8')
len_data = len(remain_data) + len_header
header = self.structer.pack(len_data, len_header, ver, opt, seq)
data = header + remain_data
return data
async def _send_bytes(self, bytes_data): class BLiveClient:
try: ROOM_INIT_URL = 'https://api.live.bilibili.com/room/v1/Room/room_init'
await self.ws.send_bytes(bytes_data) WEBSOCKET_URL = 'wss://broadcastlv.chat.bilibili.com:2245/sub'
except asyncio.CancelledError:
return False HEADER_STRUCT = struct.Struct('>I2H2I')
except: HeaderTuple = namedtuple('HeaderTuple', ('total_len', 'header_len', 'proto_ver', 'operation', 'sequence'))
print(sys.exc_info()[0], sys.exc_info()[1])
def __init__(self, room_id, ssl=True, loop=None):
"""
:param room_id: URL中的房间ID
:param ssl: True表示用默认的SSLContext验证False表示不验证也可以传入SSLContext
:param loop: 协程事件循环
"""
self._short_id = room_id
self._room_id = None
# 未登录
self._uid = 0
self._ssl = ssl if ssl else _create_unverified_context()
self._websocket = None
self._loop = loop or get_event_loop()
self._future = None
def start(self):
"""
创建相关的协程不会执行事件循环
:return: True表示成功创建协程False表示之前创建的协程未结束
"""
if self._future is not None:
return False return False
self._future = gather(
self._message_loop(),
self._heartbeat_loop(),
loop=self._loop
)
self._future.add_done_callback(self.__on_done)
return True return True
async def _read_bytes(self): def stop(self):
bytes_data = None """
取消相关的协程不会停止事件循环
"""
if self._future is not None:
self._future.cancel()
def __on_done(self, future):
self._future = None
self._on_stop(future.exception())
async def _get_room_id(self):
try: try:
# 如果调用aiohttp的bytes readnone的时候会raise exception async with aiohttp.ClientSession(loop=self._loop) as session:
msg = await asyncio.wait_for(self.ws.receive(), timeout=35.0) async with session.get(self.ROOM_INIT_URL,
bytes_data = msg.data params={'id': self._short_id},
except asyncio.TimeoutError: ssl=self._ssl) as res:
print('# 由于心跳包30s一次但是发现35内没有收到任何包说明已经悄悄失联了主动断开') if res.status == 200:
return None data = await res.json()
except: if data['code'] == 0:
print(sys.exc_info()[0], sys.exc_info()[1]) self._room_id = data['data']['room_id']
print('请联系开发者') else:
return None raise ConnectionAbortedError('获取房间ID失败' + data['msg'])
else:
return bytes_data raise ConnectionAbortedError('获取房间ID失败' + res.reason)
except Exception as e:
async def connect_ws(self): if not self._handle_error(e):
try: self._future.cancel()
url = 'wss://broadcastlv.chat.bilibili.com:443/sub' raise
self.ws = await asyncio.wait_for(self.client.ws_connect(url), timeout=3)
except: def _make_packet(self, data, operation):
print("# 连接无法建立,请检查本地网络状况") body = json.dumps(data).encode('utf-8')
print(sys.exc_info()[0], sys.exc_info()[1]) header = self.HEADER_STRUCT.pack(
return False self.HEADER_STRUCT.size + len(body),
print(f'{self._area_id}号弹幕监控已连接b站服务器') self.HEADER_STRUCT.size,
return (await self._send_bytes(self._bytes_conn_room)) 1,
operation,
async def heart_beat(self): 1
try: )
while True: return header + body
if not (await self._send_bytes(self._bytes_heartbeat)):
return async def _send_auth(self):
await asyncio.sleep(30) auth_params = {
except asyncio.CancelledError: 'uid': self._uid,
pass 'roomid': self._room_id,
'protover': 1,
async def read_datas(self): 'platform': 'web',
'clientver': '1.4.0'
}
await self._websocket.send(self._make_packet(auth_params, Operation.AUTH))
async def _message_loop(self):
# 获取房间ID
if self._room_id is None:
await self._get_room_id()
while True: while True:
datas = await self._read_bytes() try:
# 本函数对bytes进行相关操作不特别声明均为bytes # 连接
if datas is None: async with websockets.connect(self.WEBSOCKET_URL,
return ssl=self._ssl,
data_l = 0 loop=self._loop) as websocket:
len_datas = len(datas) self._websocket = websocket
while data_l != len_datas: await self._send_auth()
# 每片data都分为header和bodydata和data可能粘连
# data_l == header_l && next_data_l = next_header_l
# ||header_l...header_r|body_l...body_r||next_data_l...
tuple_header = self.structer.unpack_from(datas[data_l:])
len_data, len_header, ver, opt, seq = tuple_header
body_l = data_l + len_header
next_data_l = data_l + len_data
body = datas[body_l:next_data_l]
# 人气值(或者在线人数或者类似)以及心跳
if opt == 3:
UserCount, = struct.unpack('!I', body)
print(f'弹幕心跳检测{self._area_id}')
pass
# cmd
elif opt == 5:
if not self.handle_danmu(body):
return
# 握手确认
elif opt == 8:
print(f'{self._area_id}号弹幕监控进入房间({self._room_id}')
else:
print(datas[data_l:next_data_l])
data_l = next_data_l # 处理消息
async for message in websocket:
await self._handle_message(message)
# 待确认 except CancelledError:
async def close_ws(self): break
try: except ConnectionClosed:
await self.ws.close() self._websocket = None
except: # 重连
print('请联系开发者', sys.exc_info()[0], sys.exc_info()[1]) print('掉线重连中', file=sys.stderr)
if not self.ws.closed: try:
print(f'请联系开发者 {self._area_id}号弹幕收尾模块状态{self.ws.closed}') await sleep(5)
except CancelledError:
def handle_danmu(self, body):
return True
async def run_forever(self):
self._waiting = asyncio.Future()
while not self._closed:
print(f'正在启动{self._area_id}号弹幕姬')
async with self.lock_for_reseting_roomid_manually:
if self._closed:
break break
is_open = await self.connect_ws()
if not is_open:
continue continue
self.task_main = asyncio.ensure_future(self.read_datas()) except Exception as e:
task_heartbeat = asyncio.ensure_future(self.heart_beat()) if not self._handle_error(e):
tasks = [self.task_main, task_heartbeat] self._future.cancel()
_, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) raise
print(f'{self._area_id}号弹幕姬异常或主动断开,正在处理剩余信息') continue
if not task_heartbeat.done(): finally:
task_heartbeat.cancel() self._websocket = None
await self.close_ws()
await asyncio.wait(pending)
print(f'{self._area_id}号弹幕姬退出,剩余任务处理完毕')
self._waiting.set_result(True)
async def reconnect(self, room_id):
async with self.lock_for_reseting_roomid_manually:
# not None是判断是否已经连接了的(重连过程中也可以处理)
if self.ws is not None:
await self.close_ws()
if self.task_main is not None:
await self.task_main
# 由于锁的存在,绝对不可能到达下一个的自动重连状态,这里是保证正确显示当前监控房间号
self.room_id = room_id
print(f'{self._area_id}号弹幕姬已经切换房间({room_id}')
async def close(self):
if not self._closed:
self._closed = True
async with self.lock_for_reseting_roomid_manually:
if self.ws is not None:
await self.close_ws()
if self._waiting is not None:
await self._waiting
return True
else:
return False
class DanmuPrinter(BaseDanmu):
def handle_danmu(self, body):
dic = json.loads(body.decode('utf-8'))
cmd = dic['cmd']
if cmd == 'DANMU_MSG':
print(dic)
return True
async def _heartbeat_loop(self):
while True:
try:
if self._websocket is None:
await sleep(0.5)
else:
await self._websocket.send(self._make_packet({}, Operation.SEND_HEARTBEAT))
await sleep(30)
except CancelledError:
break
except ConnectionClosed:
# 等待重连
continue
except Exception as e:
if not self._handle_error(e):
self._future.cancel()
raise
continue
async def _handle_message(self, message):
offset = 0
while offset < len(message):
try:
header = self.HeaderTuple(*self.HEADER_STRUCT.unpack_from(message, offset))
except struct.error:
break
if header.operation == Operation.POPULARITY:
popularity = int.from_bytes(message[offset + self.HEADER_STRUCT.size:
offset + self.HEADER_STRUCT.size + 4],
'big')
await self._on_get_popularity(popularity)
elif header.operation == Operation.COMMAND:
body = message[offset + self.HEADER_STRUCT.size: offset + header.total_len]
body = json.loads(body.decode('utf-8'))
await self._handle_command(body)
elif header.operation == Operation.RECV_HEARTBEAT:
await self._websocket.send(self._make_packet({}, Operation.SEND_HEARTBEAT))
else:
body = message[offset + self.HEADER_STRUCT.size: offset + header.total_len]
print('未知包类型:', header, body, file=sys.stderr)
offset += header.total_len
async def _handle_command(self, command):
if isinstance(command, list):
for one_command in command:
await self._handle_command(one_command)
return
cmd = command['cmd']
# print(command)
if cmd == 'DANMU_MSG': # 收到弹幕
await self._on_get_danmaku(command['info'][1], command['info'][2][1])
elif cmd == 'SEND_GIFT': # 送礼物
pass
elif cmd == 'WELCOME': # 欢迎
pass
elif cmd == 'WELCOME_GUARD': # 欢迎房管
pass
elif cmd == 'SYS_MSG': # 系统消息
pass
elif cmd == 'PREPARING': # 房主准备中
pass
elif cmd == 'LIVE': # 直播开始
pass
elif cmd == 'WISH_BOTTLE': # 许愿瓶?
pass
else:
print('未知命令:', command, file=sys.stderr)
async def _on_get_popularity(self, popularity):
"""
获取到人气值
:param popularity: 人气值
"""
pass
async def _on_get_danmaku(self, content, user_name):
"""
获取到弹幕
:param content: 弹幕内容
:param user_name: 弹幕作者
"""
pass
def _on_stop(self, exc):
"""
协程结束后被调用
:param exc: 如果是异常结束则为异常否则为None
"""
pass
def _handle_error(self, exc):
"""
处理异常时被调用
:param exc: 异常
:return: True表示异常被处理False表示异常没被处理
"""
return False

View File

@ -1,24 +1,48 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import asyncio import sys
from time import time from asyncio import get_event_loop
from blivedm import DanmuPrinter from ssl import SSLError
async def test1(): from blivedm import BLiveClient
connection = DanmuPrinter(23058, 0)
task_run = asyncio.ensure_future(connection.run_forever())
await asyncio.sleep(30) class MyBLiveClient(BLiveClient):
print(time(), 'closing')
await connection.close() async def _on_get_popularity(self, popularity):
print(time(), 'closed') print('当前人气值:', popularity)
await task_run
print(time(), 'all done') async def _on_get_danmaku(self, content, user_name):
print(user_name, '说:', content)
def _on_stop(self, exc):
self._loop.stop()
def _handle_error(self, exc):
print(exc, file=sys.stderr)
if isinstance(exc, SSLError):
print('SSL验证失败', file=sys.stderr)
return False
def main(): def main():
loop = asyncio.get_event_loop() loop = get_event_loop()
loop.run_until_complete(test1())
loop.close() # 如果SSL验证失败就把第二个参数设为False
client = MyBLiveClient(139, True)
client.start()
# 5秒后停止测试用
# loop.call_later(5, client.stop)
# 按Ctrl+C停止
# import signal
# signal.signal(signal.SIGINT, lambda signum, frame: client.stop())
try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
if __name__ == '__main__': if __name__ == '__main__':