From 4dfd29775833257a6c87536f785c107d988175ee Mon Sep 17 00:00:00 2001 From: Cam Date: Tue, 22 Feb 2022 13:34:46 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86processor=E4=B8=8Eevent=E8=A7=A3?= =?UTF-8?q?=E8=80=A6=EF=BC=8C=E5=A2=9E=E5=8A=A0=E6=96=AD=E7=BA=BF=E9=87=8D?= =?UTF-8?q?=E8=BF=9E=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 40 ++++++++--- blive/core.py | 15 ++-- blive/framework.py | 113 ++++++++++++++++++++++-------- blive/msg.py | 166 ++++++++++++++++++++++++++++++++++++--------- 4 files changed, 259 insertions(+), 75 deletions(-) diff --git a/app.py b/app.py index d4f7042..b86ff1e 100644 --- a/app.py +++ b/app.py @@ -1,20 +1,24 @@ from blive import BLiver, Events, BLiverCtx + from blive.msg import ( DanMuMsg, + EntryEffectMsg, HotRankChangeV2Msg, InteractWordMsg, + OnlineRankCountMsg, SendGiftMsg, + StopLiveRoomListMsg, SuperChatMsg, ) -app = BLiver(605) +app = BLiver(510) @app.on(Events.DANMU_MSG) async def listen(ctx: BLiverCtx): danmu = DanMuMsg(ctx.body) print( - f"\n{danmu.sender['name']}({danmu.sender['medal']['medal_name']}:{danmu.sender['medal']['medal_level']}): \"{danmu.content}\"\n" + f'\n{danmu.sender.name} ({danmu.sender.medal.medal_name}:{danmu.sender.medal.medal_level}): "{danmu.content}"\n' ) @@ -31,17 +35,18 @@ async def listen_join(ctx: BLiverCtx): @app.on(Events.SUPER_CHAT_MESSAGE) async def listen_sc(ctx: BLiverCtx): msg = SuperChatMsg(ctx.body) - print(msg.sender) - print(msg.content) - print(msg.start_time) - print(msg.time) - print(msg.price) + print("sc 来了") + print( + f"""\n感谢 {msg.sender['name']}({msg.sender['medal']['medal_name']}:{msg.sender['medal']['medal_level']})的价值{msg.price}的sc\n\n\t{msg.content}\n""" + ) @app.on(Events.SEND_GIFT) async def listen_gift(ctx: BLiverCtx): msg = SendGiftMsg(ctx.body) - print(f"{msg.sender['name']} 送出 {msg.gift['gift_name']}") + print( + f"{msg.sender['name']} ({msg.sender['medal']['medal_name']}:{msg.sender['medal']['medal_level']}) 送出 {msg.gift['gift_name']}" + ) @app.on(Events.HOT_RANK_CHANGED_V2) @@ -52,4 +57,23 @@ async def hot(ctx: BLiverCtx): ) +@app.on(Events.ENTRY_EFFECT) +async def welcome_captain(ctx: BLiverCtx): + msg = EntryEffectMsg(ctx.body) + print(f"\n{msg.copy_writting}\n") + + +@app.on(Events.STOP_LIVE_ROOM_LIST) +async def stop_live_room_list(ctx: BLiverCtx): + # 监听停止直播的房间 + msg = StopLiveRoomListMsg(ctx.body) + print(f"停止直播的房间列表:{msg.room_id_list}") + + +@app.on(Events.ONLINE_RANK_COUNT) +async def online_rank(ctx): + msg = OnlineRankCountMsg(ctx.body) + print(f"当前在线人气排名 {msg.count}") + + app.run() diff --git a/blive/core.py b/blive/core.py index d0c4ebf..4373351 100644 --- a/blive/core.py +++ b/blive/core.py @@ -2,6 +2,7 @@ import asyncio from collections import namedtuple from multiprocessing import RawValue, Lock import json +from random import randint, random import requests import struct import enum @@ -15,17 +16,18 @@ def get_blive_ws_url(roomid, ssl=True, platform="pc", player="web"): params={"room_id": roomid, "platform": platform, "player": player}, ) data = resp.json() - url_obj = data["data"]["host_server_list"][1] + lens = len(data["data"]["host_server_list"]) + url_obj = data["data"]["host_server_list"][randint(0, lens - 1)] if ssl: - url = f"ws://{url_obj['host']}:{url_obj['ws_port']}/sub" - else: url = f"wss://{url_obj['host']}:{url_obj['wss_port']}/sub" + else: + url = f"ws://{url_obj['host']}:{url_obj['ws_port']}/sub" return url, data["data"]["token"] def get_blive_room_info(roomid): """ - 得到b站直播间id,(短id不是真实的id) + 得到b站直播间id,(短id不是真实的id) Return: true_room_id,up_name """ @@ -257,7 +259,7 @@ class Events(str, enum.Enum): ANCHOR_LOT_START = "ANCHOR_LOT_START" # 开启天选 ANCHOR_LOT_END = "ANCHOR_LOT_END" # 天选结束 ANCHOR_LOT_AWARD = "ANCHOR_LOT_AWARD" # 天选结果推送 - VOICE_JOIN_ROOM_COUNT_INFO = "VOICE_JOIN_ROOM_COUNT_INFO" # 申请连麦队列变化 + VOICE_JOIN_ROOM_COUNT_INFO = "VOICE_JOIN_ROOM_COUNT_INFO" # 申请连麦队列变化 VOICE_JOIN_LIST = "VOICE_JOIN_LIST" # 连麦申请、取消连麦申请 VOICE_JOIN_STATUS = "VOICE_JOIN_STATUS" # 开始连麦、结束连麦 WARNING = "WARNING" # 被警告,%text%可获取内容 @@ -266,7 +268,7 @@ class Events(str, enum.Enum): ROOM_ADMINS = "ROOM_ADMINS" # 房管数量改变 # 勋章升级,仅送礼物后触发,需设置中开启“监听勋章升级”。%medal_level%获取新等级(但用户当前勋章不一定是本直播间) MEDAL_UPGRADE = "MEDAL_UPGRADE" - STOP_LIVE_ROOM_LIST = "STOP_LIVE_ROOM_LIST" # 停止直播的房间 + STOP_LIVE_ROOM_LIST = "STOP_LIVE_ROOM_LIST" # 停止直播的房间(这些房间会关闭ws连接) WIDGET_BANNER = "WIDGET_BANNER" # 小部件横幅 PK_BATTLE_PROCESS_NEW = "PK_BATTLE_PROCESS_NEW" # 开始pk PK_BATTLE_PROCESS = "PK_BATTLE_PROCESS" # pk @@ -274,3 +276,4 @@ class Events(str, enum.Enum): HOT_RANK_CHANGED_V2 = "HOT_RANK_CHANGED_V2" # 热门榜改变v2 PK_BATTLE_SETTLE = "PK_BATTLE_SETTLE" # pk结果 PK_BATTLE_PRE_NEW = "PK_BATTLE_PRE_NEW" # pk预创建 + LIVE_INTERACTIVE_GAME = "LIVE_INTERACTIVE_GAME" # 在线互动游戏 送礼物参与 diff --git a/blive/framework.py b/blive/framework.py index f4de1fb..491d8c8 100644 --- a/blive/framework.py +++ b/blive/framework.py @@ -1,19 +1,20 @@ import sys import json import asyncio -from typing import Awaitable, Dict, List, Tuple, Union +from typing import Awaitable, Dict, List, Union import loguru import aiohttp from aiohttp.client_ws import ClientWebSocketResponse from aiohttp.http_websocket import WSMessage from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.util import _Undefined +from requests.exceptions import ConnectionError from .core import ( + PackageHeader, packman, Events, Operation, - PackageHeader, get_blive_room_info, get_blive_ws_url, certification, @@ -24,13 +25,18 @@ from .core import ( undefined = _Undefined() +class ExitedException(Exception): + pass + + class BLiverCtx(object): def __init__(self, bliver, msg) -> None: super().__init__() self.ws: ClientWebSocketResponse = bliver.ws - self.msg: Tuple = msg # 原始消息 + self.msg = msg # 原始消息 + self.header: PackageHeader = msg[0] # 消息头部 self.bliver: BLiver = bliver - self.body: Dict = None # 消息内容 + self.body = json.loads(msg[1]) class Channel: @@ -55,19 +61,15 @@ class Processor: def __init__(self, logger=None) -> None: self.logger = logger or loguru.logger self.channels: Dict[str, Channel] = {} - for e in Events: - self.channels[e] = Channel() def register(self, channel: str, handler: Awaitable): - channel = self.channels[channel] - channel.register_handler(handler) + c = self.channels.get(channel, Channel()) + c.register_handler(handler) + self.channels[channel] = c async def process(self, ctx): - header: PackageHeader = ctx.msg[0] - if header.operation == Operation.NOTIFY: - msg = json.loads(ctx.msg[1]) - ctx.body = msg - listeners = self.channels.get(msg["cmd"], []) # 根据cmd 得到相应的处理句柄 + if ctx.header.operation == Operation.NOTIFY: + listeners = self.channels.get(ctx.body["cmd"], []) # 根据cmd 得到相应的处理句柄 return await asyncio.gather(*[f(ctx) for f in listeners]) @@ -84,6 +86,7 @@ class BLiver: self._ws: ClientWebSocketResponse = None self.scheduler = AsyncIOScheduler(timezone="Asia/ShangHai") self.processor = Processor(logger=self.logger) + self.aio_session = aiohttp.ClientSession() def on(self, event: Union[Events, List[Events]]): def f_wrapper(func): @@ -143,32 +146,86 @@ class BLiver: return self._ws async def heartbeat(self): - assert self._ws is not None - await self._ws.send_bytes(packman.pack(heartbeat(), Operation.HEARTBEAT)) - self.logger.debug("heartbeat sended") + try: + if self._ws is not None and not self._ws.closed: + await self._ws.send_bytes( + packman.pack(heartbeat(), Operation.HEARTBEAT) + ) + self.logger.debug("heartbeat sended") + return + else: + self.logger.warning( + "heartbeat msg not send successfully, because ws had closed" + ) + except ( + aiohttp.ClientConnectionError, + asyncio.TimeoutError, + ConnectionResetError, + ): + self.logger.warning("send heartbeat error, will reconnect ws") + await self.connect() # 重新连接 + + async def connect(self, retries=5): + for i in range(retries): + try: + url, token = get_blive_ws_url(self.real_roomid) + ws = await self.aio_session.ws_connect(url) + self._ws = ws + # 发送认证 + await ws.send_bytes( + packman.pack(certification(self.real_roomid, token), Operation.AUTH) + ) + return + except ( + aiohttp.ClientConnectionError, + asyncio.TimeoutError, + ConnectionError, + ): + self.logger.warning( + "connect failed, will retry {}, current: {}", retries, i + 1 + ) + await asyncio.sleep(1) + self.logger.warning("reconnect fail") async def listen(self): # start listening - url, token = get_blive_ws_url(self.real_roomid) - async with aiohttp.ClientSession().ws_connect(url) as ws: - self._ws = ws - await ws.send_bytes( - packman.pack(certification(self.real_roomid, token), Operation.AUTH) - ) + await self.connect() - # 开始30s发送心跳包的定时任务 - self.scheduler.add_job(self.heartbeat, trigger="interval", seconds=30) - self.scheduler.start() + # 开始30s发送心跳包的定时任务 + self.scheduler.add_job(self.heartbeat, trigger="interval", seconds=30) + self.scheduler.start() - # 开始监听 - while True: - msg: WSMessage = await ws.receive() + # 开始监听 + while True: + try: + msg: WSMessage = await self.ws.receive(timeout=60) + if msg.type in ( + aiohttp.WSMsgType.CLOSING, + aiohttp.WSMsgType.CLOSED, + aiohttp.WSMsgType.ERROR, + ): + self.logger.warning("ws closed") + await self.connect() # reconnect + continue if msg.type != aiohttp.WSMsgType.BINARY: continue mq = packman.unpack(msg.data) self.logger.debug("received msg:\n{}", mq) tasks = [self.processor.process(BLiverCtx(self, m)) for m in mq] await asyncio.gather(*tasks) + except ( + aiohttp.ClientConnectionError, + ConnectionResetError, + asyncio.TimeoutError, + ): + self.logger.warning("ws conn will reconnect") + await self.connect() + + async def graceful_close(self): + await self._ws.close() + await self.aio_session.close() + self.scheduler.shutdown() + self.running = False def run(self): loop = asyncio.get_event_loop() diff --git a/blive/msg.py b/blive/msg.py index 3d3e6ec..c4741d8 100644 --- a/blive/msg.py +++ b/blive/msg.py @@ -1,11 +1,39 @@ from abc import ABC import json +from re import L +from typing import List """ 消息操作封装类,目前只封装了弹幕消息操作 """ +class DictObject: + def __getitem__(self, idx): + return getattr(self, idx) + + def __setitem__(self, k, v): + setattr(self, k, v) + + def __delitem__(self, k): + delattr(self, k) + + +class Medal(DictObject): + def __init__(self, medal_name, medal_level) -> None: + super(DictObject, self).__init__() + self.medal_name = medal_name + self.medal_level = medal_level + + +class Sender(DictObject): + def __init__(self, id, name, medal_name, medal_level) -> None: + super(DictObject, self).__init__() + self.id = id + self.name = name + self.medal = Medal(medal_name, medal_level) + + class BaseMsg(ABC): def __init__(self, body) -> None: super().__init__() @@ -29,14 +57,14 @@ class DanMuMsg(BaseMsg): @property def sender(self): - return { - "id": self.body["info"][2][0], - "name": self.body["info"][2][1], - "medal": { - "medal_name": self.body["info"][3][1] if self.body["info"][3] else "", - "medal_level": self.body["info"][3][0] if self.body["info"][3] else 0, - }, - } + if not hasattr(self, "_sender"): + self._sender = Sender( + id=self.body["info"][2][0], + name=self.body["info"][2][1], + medal_name=self.body["info"][3][1] if self.body["info"][3] else "", + medal_level=self.body["info"][3][0] if self.body["info"][3] else 0, + ) + return self._sender @property def timestamp(self): @@ -49,14 +77,14 @@ class InteractWordMsg(BaseMsg): @property def user(self): - return { - "id": self.body["data"]["uid"], - "name": self.body["data"]["uname"], - "medal": { - "medal_name": self.body["data"]["fans_medal"]["medal_name"], - "medal_level": self.body["data"]["fans_medal"]["medal_level"], - }, - } + if not hasattr(self, "_user"): + self._user = Sender( + id=self.body["data"]["uid"], + name=self.body["data"]["uname"], + medal_name=self.body["data"]["fans_medal"]["medal_name"], + medal_level=self.body["data"]["fans_medal"]["medal_level"], + ) + return self._user @property def timestamp(self): @@ -68,7 +96,7 @@ class StopLiveRoomListMsg(BaseMsg): super().__init__(body) @property - def room_id_list(self): + def room_id_list(self) -> List[int]: return self.body["data"]["room_id_list"] @@ -105,14 +133,14 @@ class SendGiftMsg(BaseMsg): @property def sender(self): - return { - "id": self.body["data"]["uid"], - "name": self.body["data"]["uname"], - "medal": { - "medal_name": self.body["data"]["medal_info"]["medal_name"], - "medal_level": self.body["data"]["medal_info"]["medal_level"], - }, - } + if not hasattr(self, "_sender"): + self._sender = Sender( + id=self.body["data"]["uid"], + name=self.body["data"]["uname"], + medal_name=self.body["data"]["medal_info"]["medal_name"], + medal_level=self.body["data"]["medal_info"]["medal_level"], + ) + return self._sender @property def action(self): @@ -148,14 +176,14 @@ class SuperChatMsg(BaseMsg): @property def sender(self): - return { - "id": self.body["data"]["user_info"]["uname"], - "name": self.body["data"]["uid"], - "medal": { - "medal_name": self.body["data"]["medal_info"]["medal_name"], - "medal_level": self.body["data"]["medal_info"]["medal_level"], - }, - } + if not hasattr(self, "_sender"): + self._sender = Sender( + id=self.body["data"]["user_info"]["uname"], + name=self.body["data"]["uid"], + medal_name=self.body["data"]["medal_info"]["medal_name"], + medal_level=self.body["data"]["medal_info"]["medal_level"], + ) + return self._sender @property def price(self): @@ -168,3 +196,75 @@ class SuperChatMsg(BaseMsg): @property def time(self): return self.body["data"]["time"] + + +class EntryEffectMsg(BaseMsg): + def __init__(self, body) -> None: + super().__init__(body) + + @property + def uid(self): + return self.body["data"]["uid"] + + @property + def face(self): + return self.body["data"]["face"] + + @property + def copy_writting(self): + return self.body["data"]["copy_writing"] + + @property + def web_basemap_url(self): + return self.body["data"]["web_basemap_url"] + + @property + def basemap_url(self): + return self.body["data"]["basemap_url"] + + +class LiveInteractiveGameMsg(BaseMsg): + def __init__(self, body) -> None: + super().__init__(body) + + @property + def uid(self): + return self.body["data"]["uid"] + + @property + def uname(self): + return self.body["data"]["uname"] + + @property + def uface(self): + return self.body["data"]["uface"] + + @property + def fans_medal_level(self): + return self.body["data"]["fans_medal_level"] + + @property + def guard_level(self): + return self.body["data"]["guard_level"] + + @property + def gift(self): + return { + "gift_id": self.body["data"]["gift_id"], + "gift_name": self.body["data"]["gift_name"], + "gift_num": self.body["data"]["gift_num"], + "price": self.body["data"]["price"], + "paid": self.body["data"]["paid"], + } + + def timestamp(self): + return self.body["data"]["timestamp"] + + +class OnlineRankCountMsg(BaseMsg): + def __init__(self, body) -> None: + super().__init__(body) + + @property + def count(self): + return self.body["data"]["count"]