From bc6b01b22dda391625e9cc33b8ad4d401d881c3d Mon Sep 17 00:00:00 2001 From: Cam Date: Fri, 25 Nov 2022 20:30:45 +0800 Subject: [PATCH] replace framework to pyee and remove loguru --- README.md | 22 +------ blive/__init__.py | 2 +- blive/core.py | 112 +++++++++++++--------------------- blive/eeframework.py | 127 +++++++++++++++++++++++++++++++++++++++ blive/framework.py | 1 - example/app.py | 22 +++---- example/error_handler.py | 66 -------------------- example/multi_room.py | 19 ++++-- setup.py | 4 +- 9 files changed, 197 insertions(+), 178 deletions(-) create mode 100644 blive/eeframework.py delete mode 100644 example/error_handler.py diff --git a/README.md b/README.md index 2b847aa..3ccc369 100644 --- a/README.md +++ b/README.md @@ -159,33 +159,13 @@ async def show(): return list(BLIVER_POOL.keys()) ``` - -## 全局异常处理 - -全局异常处理分为两个共享级别,分别为类级别和实例级别,在类上注册的异常处理为所有类实例共享,实例级别的异常处理只有实例自身拥有 - -```python - -app = BLiver(510) - -@app.catch(ZeroDivisionError) -def err_handler(e, app: BLiver): - print(f"{app.uname} catch ZeroDivisionError", e) - -@app.on(Events.DANMU_MSG) -async def danmu_handler(ctx): - 1 / 0 # will raise ZeroDivisionError - -azi.run() -``` - ## 项目简介 - blive 文件夹为框架代码 - core.py 为B站ws直播聊天室协议包处理的核心代码 - - framework.py 为框架代码 + - eeframework.py 为框架代码 - msg.py 为消息操作类代码 diff --git a/blive/__init__.py b/blive/__init__.py index 7e83a65..d82c439 100644 --- a/blive/__init__.py +++ b/blive/__init__.py @@ -1,3 +1,3 @@ -from .framework import * +from .eeframework import * from .core import * from .msg import * \ No newline at end of file diff --git a/blive/core.py b/blive/core.py index d85d7dd..b552e51 100644 --- a/blive/core.py +++ b/blive/core.py @@ -1,4 +1,3 @@ -import asyncio from collections import namedtuple import json from random import randint @@ -95,40 +94,6 @@ class Operation(enum.IntEnum): AUTH_REPLY = 8 # 认证回应 -class Counter(object): - def __init__(self, init_value=0) -> None: - self.current = init_value - - def increment(self): - self.current += 1 - - def value(self): - return self.current - - def increment_get(self): - self.current += 1 - return self.current - - -class AsyncCounter: - def __init__(self) -> None: - self.current = 0 - self.lock = asyncio.Lock() - - async def increment(self): - async with self.lock: - self.current += 1 - - async def increment_get(self): - async with self.lock: - self.current += 1 - return self.current - - async def value(self): - async with self.lock: - return self.current - - PackageHeader = namedtuple( "PackageHeader", ["package_size", "header_size", "version", "operation", "sequence_id"], @@ -137,11 +102,18 @@ PackageHeader = namedtuple( HeaderStruct = struct.Struct(">I2H2I") -class BWS_MsgPackage: +def counter(start=0): + while True: + start += 1 + yield start + + + +class BLiveMsgPackage: """bilibili websocket message package""" def __init__(self) -> None: - self.sequence = Counter(0) + self.sequence = counter(0) def pack(self, data, operation, version=ProtocolVersion.NORMAL): body = json.dumps(data).encode("utf-8") @@ -151,11 +123,36 @@ class BWS_MsgPackage: header_size=HeaderStruct.size, version=version, operation=operation, - sequence_id=self.sequence.increment_get(), + sequence_id=next(self.sequence), ) ) return header + body + def zipped_notify_pkg_process( + self, packages: list, data + ): # 解压后的包处理代码 ,抽取为公共函数, data: 解压后的原始数据 + header = PackageHeader(*HeaderStruct.unpack(data[:16])) # 读取包头 + if len(data) > header.package_size: # 如果数据大小大于包头声明的大小,说明是粘包 + while True: + # 先把第一个包放进去 / 放入包 + packages.append( + (header, data[16 : header.package_size].decode("utf-8")) + ) + # 移动到下一个包 + data = data[header.package_size :] + # 读取下一个包的包头 + header = PackageHeader(*HeaderStruct.unpack(data[:16])) + if len(data) > header.package_size: + # 如果数据还大于声明的package_size,说明还有1个以上的包 + continue + else: + # 剩下的数据刚好就是一个包,直接放,然后退出循环 + packages.append((header, data[16:].decode("utf-8"))) # 直接放第二个包 + break + else: + # 如果数据大小不大于包头声明的大小,说明是单个包太大压缩的。直接放入 + packages.append((header, data[16:].decode("utf-8"))) + def unpack(self, data) -> list: packages = [] # 装处理好的数据包用 header = PackageHeader(*HeaderStruct.unpack(data[:16])) # 读取数据包的头部 @@ -169,41 +166,16 @@ class BWS_MsgPackage: # 通知包处理 elif header.operation == Operation.NOTIFY: - def zipped_notify_pkg_process(data): # 解压后的包处理代码 ,抽取为公共函数, data: 解压后的原始数据 - header = PackageHeader(*HeaderStruct.unpack(data[:16])) # 读取包头 - if len(data) > header.package_size: # 如果数据大小大于包头声明的大小,说明是粘包 - while True: - # 先把第一个包放进去 / 放入包 - packages.append( - (header, data[16 : header.package_size].decode("utf-8")) - ) - # 移动到下一个包 - data = data[header.package_size :] - # 读取下一个包的包头 - header = PackageHeader(*HeaderStruct.unpack(data[:16])) - if len(data) > header.package_size: - # 如果数据还大于声明的package_size,说明还有1个以上的包 - continue - else: - # 剩下的数据刚好就是一个包,直接放,然后退出循环 - packages.append( - (header, data[16:].decode("utf-8")) - ) # 直接放第二个包 - break - else: - # 如果数据大小不大于包头声明的大小,说明是单个包太大压缩的。直接放入 - packages.append((header, data[16:].decode("utf-8"))) - # NOTIFY 消息可能会粘包 if header.version == ProtocolVersion.DEFLATE: # 先zlib解码,拆包 data = zlib.decompress(data) - zipped_notify_pkg_process(data) + self.zipped_notify_pkg_process(packages, data) elif header.version == ProtocolVersion.BROTLI: # 与 zlib 逻辑相同,先解码,然后数据可能要拆包 data = brotli.decompress(data) - zipped_notify_pkg_process(data) + self.zipped_notify_pkg_process(packages, data) elif header.version == ProtocolVersion.NORMAL: # normal 直接decode @@ -218,7 +190,7 @@ class BWS_MsgPackage: return packages -packman = BWS_MsgPackage() +packman = BLiveMsgPackage() class Events(str, enum.Enum): @@ -242,8 +214,9 @@ class Events(str, enum.Enum): ROOM_BLOCK_MSG = "ROOM_BLOCK_MSG" # 用户被禁言,%uname%昵称 GUARD_BUY = "GUARD_BUY" # 有人上船 FIRST_GUARD = "FIRST_GUARD" # 用户初次上船 - # 船员数量改变事件,%uname%新船员昵称,%num%获取大航海数量,附带直播间信息json数据 - NEW_GUARD_COUNT = "NEW_GUARD_COUNT" + NEW_GUARD_COUNT = ( + "NEW_GUARD_COUNT" # 船员数量改变事件,%uname%新船员昵称,%num%获取大航海数量,附带直播间信息json数据 + ) USER_TOAST_MSG = "USER_TOAST_MSG" # 上船附带的通知 HOT_RANK_CHANGED = "HOT_RANK_CHANGED" # 热门榜排名改变 HOT_RANK_SETTLEMENT = "HOT_RANK_SETTLEMENT" # 荣登热门榜topX @@ -265,8 +238,7 @@ class Events(str, enum.Enum): CUT_OFF = "CUT_OFF" # 被超管切断 room_admin_entrance = "room_admin_entrance" # 设置房管 ROOM_ADMINS = "ROOM_ADMINS" # 房管数量改变 - # 勋章升级,仅送礼物后触发,需设置中开启“监听勋章升级”。%medal_level%获取新等级(但用户当前勋章不一定是本直播间) - MEDAL_UPGRADE = "MEDAL_UPGRADE" + MEDAL_UPGRADE = "MEDAL_UPGRADE" # 勋章升级,仅送礼物后触发,需设置中开启“监听勋章升级”。%medal_level%获取新等级(但用户当前勋章不一定是本直播间) STOP_LIVE_ROOM_LIST = "STOP_LIVE_ROOM_LIST" # 停止直播的房间(这些房间会关闭ws连接) WIDGET_BANNER = "WIDGET_BANNER" # 小部件横幅 PK_BATTLE_PROCESS_NEW = "PK_BATTLE_PROCESS_NEW" # 开始pk diff --git a/blive/eeframework.py b/blive/eeframework.py new file mode 100644 index 0000000..1518d1b --- /dev/null +++ b/blive/eeframework.py @@ -0,0 +1,127 @@ +import json +import asyncio +from typing import List, Union +import aiohttp +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from requests.exceptions import ConnectionError +from pyee import AsyncIOEventEmitter +from .core import ( + BLiveMsgPackage, + PackageHeader, + Events, + Operation, + get_blive_room_info, + get_blive_ws_url, + certification, + heartbeat, +) + + +class BLiverCtx: + def __init__(self, bliver, msg) -> None: + self.ws = bliver.ws + self.bliver: BLiver = bliver + self.msg:tuple[PackageHeader,dict] = msg # 原始消息 + self.header: PackageHeader = self.msg[0] # 消息头部 + self.body:dict = json.loads(msg[1]) + +class BLiver(AsyncIOEventEmitter): + def __init__(self, room_id, uid=0): + super().__init__() + self.running = False + self.ws = None + self.room_id = room_id + self.uid = uid + self.real_room_id, self.uname = get_blive_room_info(room_id) + self.packman = BLiveMsgPackage() + self.scheduler = AsyncIOScheduler(timezone="Asia/ShangHai") + self.aio_session = aiohttp.ClientSession() + + def register_handler(self, event: Union[Events, List[Events]], handler): + self.on(event, handler) + + async def heartbeat(self): + try: + if self.ws is not None and not self.ws.closed: + await self.ws.send_bytes( + self.packman.pack(heartbeat(), Operation.HEARTBEAT) + ) + return + except ( + aiohttp.ClientConnectionError, + asyncio.TimeoutError, + ConnectionError, + ConnectionResetError, + ): + await self.connect() # 重新连接 + + async def connect(self, retries=5): + for _ in range(retries): + try: + url, token = get_blive_ws_url(self.real_room_id) + self.ws = await self.aio_session.ws_connect(url) + # 发送认证 + await self.ws.send_bytes( + self.packman.pack( + certification(self.real_room_id, token, uid=self.uid), + Operation.AUTH, + ) + ) + return + except ( + aiohttp.ClientConnectionError, + asyncio.TimeoutError, + ConnectionError, + ConnectionResetError, + ): + await asyncio.sleep(1) + raise aiohttp.ClientConnectionError("与服务器连接失败") + + async def listen(self): + self.running = True + # start listening + await self.connect() + + # 开始30s发送心跳包的定时任务 + self.scheduler.add_job(self.heartbeat, trigger="interval", seconds=30) + self.scheduler.start() + + # 开始监听 + while True: + try: + msg = await self.ws.receive(timeout=60) + if msg.type in ( + aiohttp.WSMsgType.CLOSING, + aiohttp.WSMsgType.CLOSED, + aiohttp.WSMsgType.ERROR, + ): + await self.connect() # reconnect + continue + if msg.type != aiohttp.WSMsgType.BINARY: + continue + mq = self.packman.unpack(msg.data) + ctxs = [BLiverCtx(self, m) for m in mq] + ctxs = filter(lambda ctx:ctx.body.get("cmd",None), ctxs) + for ctx in ctxs: + self.emit(ctx.body["cmd"],ctx) + except ( + aiohttp.ClientConnectionError, + ConnectionResetError, + asyncio.TimeoutError, + ): + await self.connect() + + async def graceful_close(self): + self.scheduler.shutdown() + await self._ws.close() + await self.aio_session.close() + self.running = False + + def run(self): + loop = asyncio.get_event_loop() + loop.create_task(self.listen()) + loop.run_forever() + + def run_as_task(self): + loop = asyncio.get_event_loop() + return loop.create_task(self.listen()) \ No newline at end of file diff --git a/blive/framework.py b/blive/framework.py index 05881e6..4dfb768 100644 --- a/blive/framework.py +++ b/blive/framework.py @@ -1,4 +1,3 @@ -from socket import gaierror import sys import json import asyncio diff --git a/example/app.py b/example/app.py index 07ae543..c867124 100644 --- a/example/app.py +++ b/example/app.py @@ -1,5 +1,4 @@ -from blive import BLiver, Events, BLiverCtx - +from blive import BLiver, Events, BLiverCtx from blive.msg import ( DanMuMsg, EntryEffectMsg, @@ -18,7 +17,7 @@ app = BLiver(510) async def listen(ctx: BLiverCtx): danmu = DanMuMsg(ctx.body) print( - f'\n{danmu.sender.name} ({danmu.sender.medal.medal_name}:{danmu.sender.medal.medal_level}): "{danmu.content}"\n' + f'[弹幕] {danmu.sender.name} ({danmu.sender.medal.medal_name}:{danmu.sender.medal.medal_level}): "{danmu.content}"\n' ) @@ -26,18 +25,17 @@ async def listen(ctx: BLiverCtx): async def listen_join(ctx: BLiverCtx): join = InteractWordMsg(ctx.body) print( - "欢迎", + "[欢迎]", f"{join.user['name']} ({join.user['medal']['medal_name']}:{join.user['medal']['medal_level']})", - "进入直播间", + "进入直播间\n", ) @app.on(Events.SUPER_CHAT_MESSAGE) async def listen_sc(ctx: BLiverCtx): msg = SuperChatMsg(ctx.body) - print("sc 来了") print( - f"\n感谢 {msg.sender['name']}({msg.sender['medal']['medal_name']}:{msg.sender['medal']['medal_level']})的价值 {msg.price} 的sc\n\n\t{msg.content}\n" + f"[sc] 感谢 {msg.sender['name']}({msg.sender['medal']['medal_name']}:{msg.sender['medal']['medal_level']})的价值 {msg.price} 的sc\n\n\t{msg.content}\n" ) @@ -45,7 +43,7 @@ async def listen_sc(ctx: BLiverCtx): async def listen_gift(ctx: BLiverCtx): msg = SendGiftMsg(ctx.body) print( - f"{msg.sender['name']} ({msg.sender['medal']['medal_name']}:{msg.sender['medal']['medal_level']}) 送出 {msg.gift['gift_name']}" + f"[礼物] {msg.sender['name']} ({msg.sender['medal']['medal_name']}:{msg.sender['medal']['medal_level']}) 送出 {msg.gift['gift_name']}\n" ) @@ -53,27 +51,27 @@ async def listen_gift(ctx: BLiverCtx): async def hot(ctx: BLiverCtx): msg = HotRankChangeV2Msg(ctx.body) print( - f"恭喜 {ctx.bliver.uname} 在 {msg.area_name} 区 的 {msg.rank_desc} 榜单中获得第 {msg.rank} 名" + f"[通知] 恭喜 {ctx.bliver.uname} 在 {msg.area_name} 区 的 {msg.rank_desc} 榜单中获得第 {msg.rank} 名\n" ) @app.on(Events.ENTRY_EFFECT) async def welcome_captain(ctx: BLiverCtx): msg = EntryEffectMsg(ctx.body) - print(f"\n{msg.copy_writting}\n") + print(f"[热烈欢迎] {msg.copy_writting}\n") @app.on(Events.STOP_LIVE_ROOM_LIST) async def stop_live_room_list(ctx: BLiverCtx): # 监听停止直播的房间 msg = StopLiveRoomListMsg(ctx.body) - print(f"停止直播的房间列表:{msg.room_id_list}") + print(f"[通知] 停止直播的房间列表:{msg.room_id_list}\n") @app.on(Events.ONLINE_RANK_COUNT) async def online_rank(ctx): msg = OnlineRankCountMsg(ctx.body) - print(f"当前在线人气排名 {msg.count}") + print(f"[通知] 当前在线人气排名 {msg.count}\n") app.run() diff --git a/example/error_handler.py b/example/error_handler.py deleted file mode 100644 index 196fc3e..0000000 --- a/example/error_handler.py +++ /dev/null @@ -1,66 +0,0 @@ -"""监听多个直播间的例子""" - -import asyncio -from blive import BLiver, Events, BLiverCtx -from blive.msg import DanMuMsg - -# 多个对象共用的全局异常处理 -# 首先定义全局异常处理handler - - -def global_error_handler(e, app: BLiver): - print(f"{app.uname} 全局异常捕获", e) - - -# 调用类方法注册异常以及其处理函数,需在实例化之前注册,注册后所有BLiver共同拥有该异常处理 -BLiver.register_global_error_handler(ZeroDivisionError, global_error_handler) - - -# 定义弹幕事件handler,为了演示异常处理直接在方法中抛出异常 -async def azi_timeout_error(ctx: BLiverCtx): - raise TimeoutError - - -async def ke_type_error(ctx): - raise TypeError - - -async def zero_division_error(ctx): - 1 / 0 - - -# 两个直播间 -ke = BLiver(21716679) -azi = BLiver(7983476) - -# 注册handler -ke.register_handler(Events.INTERACT_WORD, zero_division_error) -azi.register_handler(Events.INTERACT_WORD, zero_division_error) -ke.register_handler(Events.DANMU_MSG, ke_type_error) -azi.register_handler(Events.DANMU_MSG, azi_timeout_error) - - -# 类实例级别的异常处理,实例与实例之间不共享 -ke.register_error_handler( - TypeError, lambda e, app: print(f"{app.uname} catch TypeError", e) -) - -# 实例级别的异常处理可以用注解方式进行注册 -@azi.catch(TimeoutError) -def azi_handler(e, app): - print(f"{app.uname} catch TimeoutError", e) - - -async def main(): - - # 以异步task的形式运行 - task1 = ke.run_as_task() - task2 = azi.run_as_task() - - # await 两个任务 - await asyncio.gather(*[task1, task2]) - - -if __name__ == "__main__": - loop = asyncio.get_event_loop() - loop.run_until_complete(main()) diff --git a/example/multi_room.py b/example/multi_room.py index 97136d8..43abbd2 100644 --- a/example/multi_room.py +++ b/example/multi_room.py @@ -2,7 +2,7 @@ import asyncio from blive import BLiver, Events, BLiverCtx -from blive.msg import DanMuMsg +from blive.msg import DanMuMsg,InteractWordMsg # 定义弹幕事件handler @@ -12,18 +12,27 @@ async def listen(ctx: BLiverCtx): f'\n【{ctx.bliver.uname}】{danmu.sender.name} ({danmu.sender.medal.medal_name}:{danmu.sender.medal.medal_level}): "{danmu.content}"\n' ) +async def listen_join(ctx: BLiverCtx): + join = InteractWordMsg(ctx.body) + print( + f"\n【{ctx.bliver.uname}】欢迎", + f"{join.user['name']} ({join.user['medal']['medal_name']}:{join.user['medal']['medal_level']})", + "进入直播间", + ) async def main(): # 两个直播间 - ke = BLiver(605) + hai7 = BLiver(21452505) azi = BLiver(510) - # 注册handler - ke.register_handler(Events.DANMU_MSG, listen) azi.register_handler(Events.DANMU_MSG, listen) + azi.register_handler(Events.INTERACT_WORD, listen_join) + # 注册handler + hai7.register_handler(Events.DANMU_MSG, listen) + hai7.register_handler(Events.INTERACT_WORD, listen_join) # 以异步task的形式运行 - task1 = ke.run_as_task() + task1 = hai7.run_as_task() task2 = azi.run_as_task() # await 两个任务 diff --git a/setup.py b/setup.py index ca1a56b..0137cad 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ with open("./README.md", "r") as f: setuptools.setup( name="blive", - version="0.0.8", + version="0.0.9", author="cam", author_email="yulinfeng000@gmail.com", long_description=description, @@ -16,5 +16,5 @@ setuptools.setup( "Programming Language :: Python :: 3", "License :: OSI Approved :: MIT License", ], - install_requires=["aiohttp","loguru","requests","APScheduler","brotli"] + install_requires=["aiohttp","requests","APScheduler","brotli","pyee"] )