将processor与event解耦,增加断线重连机制

This commit is contained in:
Cam 2022-02-22 13:34:46 +08:00
parent 5d60ef5add
commit 4dfd297758
4 changed files with 259 additions and 75 deletions

40
app.py
View File

@ -1,20 +1,24 @@
from blive import BLiver, Events, BLiverCtx
from blive.msg import (
DanMuMsg,
EntryEffectMsg,
HotRankChangeV2Msg,
InteractWordMsg,
OnlineRankCountMsg,
SendGiftMsg,
StopLiveRoomListMsg,
SuperChatMsg,
)
app = BLiver(605)
app = BLiver(510)
@app.on(Events.DANMU_MSG)
async def listen(ctx: BLiverCtx):
danmu = DanMuMsg(ctx.body)
print(
f"\n{danmu.sender['name']}({danmu.sender['medal']['medal_name']}:{danmu.sender['medal']['medal_level']}): \"{danmu.content}\"\n"
f'\n{danmu.sender.name} ({danmu.sender.medal.medal_name}:{danmu.sender.medal.medal_level}): "{danmu.content}"\n'
)
@ -31,17 +35,18 @@ async def listen_join(ctx: BLiverCtx):
@app.on(Events.SUPER_CHAT_MESSAGE)
async def listen_sc(ctx: BLiverCtx):
msg = SuperChatMsg(ctx.body)
print(msg.sender)
print(msg.content)
print(msg.start_time)
print(msg.time)
print(msg.price)
print("sc 来了")
print(
f"""\n感谢 {msg.sender['name']}({msg.sender['medal']['medal_name']}:{msg.sender['medal']['medal_level']})的价值{msg.price}的sc\n\n\t{msg.content}\n"""
)
@app.on(Events.SEND_GIFT)
async def listen_gift(ctx: BLiverCtx):
msg = SendGiftMsg(ctx.body)
print(f"{msg.sender['name']} 送出 {msg.gift['gift_name']}")
print(
f"{msg.sender['name']} ({msg.sender['medal']['medal_name']}:{msg.sender['medal']['medal_level']}) 送出 {msg.gift['gift_name']}"
)
@app.on(Events.HOT_RANK_CHANGED_V2)
@ -52,4 +57,23 @@ async def hot(ctx: BLiverCtx):
)
@app.on(Events.ENTRY_EFFECT)
async def welcome_captain(ctx: BLiverCtx):
msg = EntryEffectMsg(ctx.body)
print(f"\n{msg.copy_writting}\n")
@app.on(Events.STOP_LIVE_ROOM_LIST)
async def stop_live_room_list(ctx: BLiverCtx):
# 监听停止直播的房间
msg = StopLiveRoomListMsg(ctx.body)
print(f"停止直播的房间列表:{msg.room_id_list}")
@app.on(Events.ONLINE_RANK_COUNT)
async def online_rank(ctx):
msg = OnlineRankCountMsg(ctx.body)
print(f"当前在线人气排名 {msg.count}")
app.run()

View File

@ -2,6 +2,7 @@ import asyncio
from collections import namedtuple
from multiprocessing import RawValue, Lock
import json
from random import randint, random
import requests
import struct
import enum
@ -15,17 +16,18 @@ def get_blive_ws_url(roomid, ssl=True, platform="pc", player="web"):
params={"room_id": roomid, "platform": platform, "player": player},
)
data = resp.json()
url_obj = data["data"]["host_server_list"][1]
lens = len(data["data"]["host_server_list"])
url_obj = data["data"]["host_server_list"][randint(0, lens - 1)]
if ssl:
url = f"ws://{url_obj['host']}:{url_obj['ws_port']}/sub"
else:
url = f"wss://{url_obj['host']}:{url_obj['wss_port']}/sub"
else:
url = f"ws://{url_obj['host']}:{url_obj['ws_port']}/sub"
return url, data["data"]["token"]
def get_blive_room_info(roomid):
"""
得到b站直播间id,短id不是真实的id
得到b站直播间id,(短id不是真实的id)
Return: true_room_id,up_name
"""
@ -257,7 +259,7 @@ class Events(str, enum.Enum):
ANCHOR_LOT_START = "ANCHOR_LOT_START" # 开启天选
ANCHOR_LOT_END = "ANCHOR_LOT_END" # 天选结束
ANCHOR_LOT_AWARD = "ANCHOR_LOT_AWARD" # 天选结果推送
VOICE_JOIN_ROOM_COUNT_INFO = "VOICE_JOIN_ROOM_COUNT_INFO" # 申请连麦队列变化
VOICE_JOIN_ROOM_COUNT_INFO = "VOICE_JOIN_ROOM_COUNT_INFO" # 申请连麦队列变化
VOICE_JOIN_LIST = "VOICE_JOIN_LIST" # 连麦申请、取消连麦申请
VOICE_JOIN_STATUS = "VOICE_JOIN_STATUS" # 开始连麦、结束连麦
WARNING = "WARNING" # 被警告,%text%可获取内容
@ -266,7 +268,7 @@ class Events(str, enum.Enum):
ROOM_ADMINS = "ROOM_ADMINS" # 房管数量改变
# 勋章升级,仅送礼物后触发,需设置中开启“监听勋章升级”。%medal_level%获取新等级(但用户当前勋章不一定是本直播间)
MEDAL_UPGRADE = "MEDAL_UPGRADE"
STOP_LIVE_ROOM_LIST = "STOP_LIVE_ROOM_LIST" # 停止直播的房间
STOP_LIVE_ROOM_LIST = "STOP_LIVE_ROOM_LIST" # 停止直播的房间这些房间会关闭ws连接
WIDGET_BANNER = "WIDGET_BANNER" # 小部件横幅
PK_BATTLE_PROCESS_NEW = "PK_BATTLE_PROCESS_NEW" # 开始pk
PK_BATTLE_PROCESS = "PK_BATTLE_PROCESS" # pk
@ -274,3 +276,4 @@ class Events(str, enum.Enum):
HOT_RANK_CHANGED_V2 = "HOT_RANK_CHANGED_V2" # 热门榜改变v2
PK_BATTLE_SETTLE = "PK_BATTLE_SETTLE" # pk结果
PK_BATTLE_PRE_NEW = "PK_BATTLE_PRE_NEW" # pk预创建
LIVE_INTERACTIVE_GAME = "LIVE_INTERACTIVE_GAME" # 在线互动游戏 送礼物参与

View File

@ -1,19 +1,20 @@
import sys
import json
import asyncio
from typing import Awaitable, Dict, List, Tuple, Union
from typing import Awaitable, Dict, List, Union
import loguru
import aiohttp
from aiohttp.client_ws import ClientWebSocketResponse
from aiohttp.http_websocket import WSMessage
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.util import _Undefined
from requests.exceptions import ConnectionError
from .core import (
PackageHeader,
packman,
Events,
Operation,
PackageHeader,
get_blive_room_info,
get_blive_ws_url,
certification,
@ -24,13 +25,18 @@ from .core import (
undefined = _Undefined()
class ExitedException(Exception):
pass
class BLiverCtx(object):
def __init__(self, bliver, msg) -> None:
super().__init__()
self.ws: ClientWebSocketResponse = bliver.ws
self.msg: Tuple = msg # 原始消息
self.msg = msg # 原始消息
self.header: PackageHeader = msg[0] # 消息头部
self.bliver: BLiver = bliver
self.body: Dict = None # 消息内容
self.body = json.loads(msg[1])
class Channel:
@ -55,19 +61,15 @@ class Processor:
def __init__(self, logger=None) -> None:
self.logger = logger or loguru.logger
self.channels: Dict[str, Channel] = {}
for e in Events:
self.channels[e] = Channel()
def register(self, channel: str, handler: Awaitable):
channel = self.channels[channel]
channel.register_handler(handler)
c = self.channels.get(channel, Channel())
c.register_handler(handler)
self.channels[channel] = c
async def process(self, ctx):
header: PackageHeader = ctx.msg[0]
if header.operation == Operation.NOTIFY:
msg = json.loads(ctx.msg[1])
ctx.body = msg
listeners = self.channels.get(msg["cmd"], []) # 根据cmd 得到相应的处理句柄
if ctx.header.operation == Operation.NOTIFY:
listeners = self.channels.get(ctx.body["cmd"], []) # 根据cmd 得到相应的处理句柄
return await asyncio.gather(*[f(ctx) for f in listeners])
@ -84,6 +86,7 @@ class BLiver:
self._ws: ClientWebSocketResponse = None
self.scheduler = AsyncIOScheduler(timezone="Asia/ShangHai")
self.processor = Processor(logger=self.logger)
self.aio_session = aiohttp.ClientSession()
def on(self, event: Union[Events, List[Events]]):
def f_wrapper(func):
@ -143,32 +146,86 @@ class BLiver:
return self._ws
async def heartbeat(self):
assert self._ws is not None
await self._ws.send_bytes(packman.pack(heartbeat(), Operation.HEARTBEAT))
self.logger.debug("heartbeat sended")
try:
if self._ws is not None and not self._ws.closed:
await self._ws.send_bytes(
packman.pack(heartbeat(), Operation.HEARTBEAT)
)
self.logger.debug("heartbeat sended")
return
else:
self.logger.warning(
"heartbeat msg not send successfully, because ws had closed"
)
except (
aiohttp.ClientConnectionError,
asyncio.TimeoutError,
ConnectionResetError,
):
self.logger.warning("send heartbeat error, will reconnect ws")
await self.connect() # 重新连接
async def connect(self, retries=5):
for i in range(retries):
try:
url, token = get_blive_ws_url(self.real_roomid)
ws = await self.aio_session.ws_connect(url)
self._ws = ws
# 发送认证
await ws.send_bytes(
packman.pack(certification(self.real_roomid, token), Operation.AUTH)
)
return
except (
aiohttp.ClientConnectionError,
asyncio.TimeoutError,
ConnectionError,
):
self.logger.warning(
"connect failed, will retry {}, current: {}", retries, i + 1
)
await asyncio.sleep(1)
self.logger.warning("reconnect fail")
async def listen(self):
# start listening
url, token = get_blive_ws_url(self.real_roomid)
async with aiohttp.ClientSession().ws_connect(url) as ws:
self._ws = ws
await ws.send_bytes(
packman.pack(certification(self.real_roomid, token), Operation.AUTH)
)
await self.connect()
# 开始30s发送心跳包的定时任务
self.scheduler.add_job(self.heartbeat, trigger="interval", seconds=30)
self.scheduler.start()
# 开始30s发送心跳包的定时任务
self.scheduler.add_job(self.heartbeat, trigger="interval", seconds=30)
self.scheduler.start()
# 开始监听
while True:
msg: WSMessage = await ws.receive()
# 开始监听
while True:
try:
msg: WSMessage = await self.ws.receive(timeout=60)
if msg.type in (
aiohttp.WSMsgType.CLOSING,
aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.ERROR,
):
self.logger.warning("ws closed")
await self.connect() # reconnect
continue
if msg.type != aiohttp.WSMsgType.BINARY:
continue
mq = packman.unpack(msg.data)
self.logger.debug("received msg:\n{}", mq)
tasks = [self.processor.process(BLiverCtx(self, m)) for m in mq]
await asyncio.gather(*tasks)
except (
aiohttp.ClientConnectionError,
ConnectionResetError,
asyncio.TimeoutError,
):
self.logger.warning("ws conn will reconnect")
await self.connect()
async def graceful_close(self):
await self._ws.close()
await self.aio_session.close()
self.scheduler.shutdown()
self.running = False
def run(self):
loop = asyncio.get_event_loop()

View File

@ -1,11 +1,39 @@
from abc import ABC
import json
from re import L
from typing import List
"""
消息操作封装类,目前只封装了弹幕消息操作
"""
class DictObject:
def __getitem__(self, idx):
return getattr(self, idx)
def __setitem__(self, k, v):
setattr(self, k, v)
def __delitem__(self, k):
delattr(self, k)
class Medal(DictObject):
def __init__(self, medal_name, medal_level) -> None:
super(DictObject, self).__init__()
self.medal_name = medal_name
self.medal_level = medal_level
class Sender(DictObject):
def __init__(self, id, name, medal_name, medal_level) -> None:
super(DictObject, self).__init__()
self.id = id
self.name = name
self.medal = Medal(medal_name, medal_level)
class BaseMsg(ABC):
def __init__(self, body) -> None:
super().__init__()
@ -29,14 +57,14 @@ class DanMuMsg(BaseMsg):
@property
def sender(self):
return {
"id": self.body["info"][2][0],
"name": self.body["info"][2][1],
"medal": {
"medal_name": self.body["info"][3][1] if self.body["info"][3] else "",
"medal_level": self.body["info"][3][0] if self.body["info"][3] else 0,
},
}
if not hasattr(self, "_sender"):
self._sender = Sender(
id=self.body["info"][2][0],
name=self.body["info"][2][1],
medal_name=self.body["info"][3][1] if self.body["info"][3] else "",
medal_level=self.body["info"][3][0] if self.body["info"][3] else 0,
)
return self._sender
@property
def timestamp(self):
@ -49,14 +77,14 @@ class InteractWordMsg(BaseMsg):
@property
def user(self):
return {
"id": self.body["data"]["uid"],
"name": self.body["data"]["uname"],
"medal": {
"medal_name": self.body["data"]["fans_medal"]["medal_name"],
"medal_level": self.body["data"]["fans_medal"]["medal_level"],
},
}
if not hasattr(self, "_user"):
self._user = Sender(
id=self.body["data"]["uid"],
name=self.body["data"]["uname"],
medal_name=self.body["data"]["fans_medal"]["medal_name"],
medal_level=self.body["data"]["fans_medal"]["medal_level"],
)
return self._user
@property
def timestamp(self):
@ -68,7 +96,7 @@ class StopLiveRoomListMsg(BaseMsg):
super().__init__(body)
@property
def room_id_list(self):
def room_id_list(self) -> List[int]:
return self.body["data"]["room_id_list"]
@ -105,14 +133,14 @@ class SendGiftMsg(BaseMsg):
@property
def sender(self):
return {
"id": self.body["data"]["uid"],
"name": self.body["data"]["uname"],
"medal": {
"medal_name": self.body["data"]["medal_info"]["medal_name"],
"medal_level": self.body["data"]["medal_info"]["medal_level"],
},
}
if not hasattr(self, "_sender"):
self._sender = Sender(
id=self.body["data"]["uid"],
name=self.body["data"]["uname"],
medal_name=self.body["data"]["medal_info"]["medal_name"],
medal_level=self.body["data"]["medal_info"]["medal_level"],
)
return self._sender
@property
def action(self):
@ -148,14 +176,14 @@ class SuperChatMsg(BaseMsg):
@property
def sender(self):
return {
"id": self.body["data"]["user_info"]["uname"],
"name": self.body["data"]["uid"],
"medal": {
"medal_name": self.body["data"]["medal_info"]["medal_name"],
"medal_level": self.body["data"]["medal_info"]["medal_level"],
},
}
if not hasattr(self, "_sender"):
self._sender = Sender(
id=self.body["data"]["user_info"]["uname"],
name=self.body["data"]["uid"],
medal_name=self.body["data"]["medal_info"]["medal_name"],
medal_level=self.body["data"]["medal_info"]["medal_level"],
)
return self._sender
@property
def price(self):
@ -168,3 +196,75 @@ class SuperChatMsg(BaseMsg):
@property
def time(self):
return self.body["data"]["time"]
class EntryEffectMsg(BaseMsg):
def __init__(self, body) -> None:
super().__init__(body)
@property
def uid(self):
return self.body["data"]["uid"]
@property
def face(self):
return self.body["data"]["face"]
@property
def copy_writting(self):
return self.body["data"]["copy_writing"]
@property
def web_basemap_url(self):
return self.body["data"]["web_basemap_url"]
@property
def basemap_url(self):
return self.body["data"]["basemap_url"]
class LiveInteractiveGameMsg(BaseMsg):
def __init__(self, body) -> None:
super().__init__(body)
@property
def uid(self):
return self.body["data"]["uid"]
@property
def uname(self):
return self.body["data"]["uname"]
@property
def uface(self):
return self.body["data"]["uface"]
@property
def fans_medal_level(self):
return self.body["data"]["fans_medal_level"]
@property
def guard_level(self):
return self.body["data"]["guard_level"]
@property
def gift(self):
return {
"gift_id": self.body["data"]["gift_id"],
"gift_name": self.body["data"]["gift_name"],
"gift_num": self.body["data"]["gift_num"],
"price": self.body["data"]["price"],
"paid": self.body["data"]["paid"],
}
def timestamp(self):
return self.body["data"]["timestamp"]
class OnlineRankCountMsg(BaseMsg):
def __init__(self, body) -> None:
super().__init__(body)
@property
def count(self):
return self.body["data"]["count"]