replace framework to pyee and remove loguru

This commit is contained in:
Cam 2022-11-25 20:30:45 +08:00
parent 596d3d0c33
commit bc6b01b22d
9 changed files with 197 additions and 178 deletions

View File

@ -159,33 +159,13 @@ async def show():
return list(BLIVER_POOL.keys()) return list(BLIVER_POOL.keys())
``` ```
## 全局异常处理
全局异常处理分为两个共享级别,分别为类级别和实例级别,在类上注册的异常处理为所有类实例共享,实例级别的异常处理只有实例自身拥有
```python
app = BLiver(510)
@app.catch(ZeroDivisionError)
def err_handler(e, app: BLiver):
print(f"{app.uname} catch ZeroDivisionError", e)
@app.on(Events.DANMU_MSG)
async def danmu_handler(ctx):
1 / 0 # will raise ZeroDivisionError
azi.run()
```
## 项目简介 ## 项目简介
- blive 文件夹为框架代码 - blive 文件夹为框架代码
- core.py 为B站ws直播聊天室协议包处理的核心代码 - core.py 为B站ws直播聊天室协议包处理的核心代码
- framework.py 为框架代码 - eeframework.py 为框架代码
- msg.py 为消息操作类代码 - msg.py 为消息操作类代码

View File

@ -1,3 +1,3 @@
from .framework import * from .eeframework import *
from .core import * from .core import *
from .msg import * from .msg import *

View File

@ -1,4 +1,3 @@
import asyncio
from collections import namedtuple from collections import namedtuple
import json import json
from random import randint from random import randint
@ -95,40 +94,6 @@ class Operation(enum.IntEnum):
AUTH_REPLY = 8 # 认证回应 AUTH_REPLY = 8 # 认证回应
class Counter(object):
def __init__(self, init_value=0) -> None:
self.current = init_value
def increment(self):
self.current += 1
def value(self):
return self.current
def increment_get(self):
self.current += 1
return self.current
class AsyncCounter:
def __init__(self) -> None:
self.current = 0
self.lock = asyncio.Lock()
async def increment(self):
async with self.lock:
self.current += 1
async def increment_get(self):
async with self.lock:
self.current += 1
return self.current
async def value(self):
async with self.lock:
return self.current
PackageHeader = namedtuple( PackageHeader = namedtuple(
"PackageHeader", "PackageHeader",
["package_size", "header_size", "version", "operation", "sequence_id"], ["package_size", "header_size", "version", "operation", "sequence_id"],
@ -137,11 +102,18 @@ PackageHeader = namedtuple(
HeaderStruct = struct.Struct(">I2H2I") HeaderStruct = struct.Struct(">I2H2I")
class BWS_MsgPackage: def counter(start=0):
while True:
start += 1
yield start
class BLiveMsgPackage:
"""bilibili websocket message package""" """bilibili websocket message package"""
def __init__(self) -> None: def __init__(self) -> None:
self.sequence = Counter(0) self.sequence = counter(0)
def pack(self, data, operation, version=ProtocolVersion.NORMAL): def pack(self, data, operation, version=ProtocolVersion.NORMAL):
body = json.dumps(data).encode("utf-8") body = json.dumps(data).encode("utf-8")
@ -151,25 +123,14 @@ class BWS_MsgPackage:
header_size=HeaderStruct.size, header_size=HeaderStruct.size,
version=version, version=version,
operation=operation, operation=operation,
sequence_id=self.sequence.increment_get(), sequence_id=next(self.sequence),
) )
) )
return header + body return header + body
def unpack(self, data) -> list: def zipped_notify_pkg_process(
packages = [] # 装处理好的数据包用 self, packages: list, data
header = PackageHeader(*HeaderStruct.unpack(data[:16])) # 读取数据包的头部 ): # 解压后的包处理代码 ,抽取为公共函数, data: 解压后的原始数据
data = data[16:] # 读取数据包的数据段
# 心跳包处理
if header.operation == Operation.HEARTBEAT_REPLY:
# 心跳不会粘包,前4位有不明含义的数据
packages.append((header, data[4:].decode("utf-8")))
# 通知包处理
elif header.operation == Operation.NOTIFY:
def zipped_notify_pkg_process(data): # 解压后的包处理代码 ,抽取为公共函数, data: 解压后的原始数据
header = PackageHeader(*HeaderStruct.unpack(data[:16])) # 读取包头 header = PackageHeader(*HeaderStruct.unpack(data[:16])) # 读取包头
if len(data) > header.package_size: # 如果数据大小大于包头声明的大小,说明是粘包 if len(data) > header.package_size: # 如果数据大小大于包头声明的大小,说明是粘包
while True: while True:
@ -186,24 +147,35 @@ class BWS_MsgPackage:
continue continue
else: else:
# 剩下的数据刚好就是一个包,直接放,然后退出循环 # 剩下的数据刚好就是一个包,直接放,然后退出循环
packages.append( packages.append((header, data[16:].decode("utf-8"))) # 直接放第二个包
(header, data[16:].decode("utf-8"))
) # 直接放第二个包
break break
else: else:
# 如果数据大小不大于包头声明的大小,说明是单个包太大压缩的。直接放入 # 如果数据大小不大于包头声明的大小,说明是单个包太大压缩的。直接放入
packages.append((header, data[16:].decode("utf-8"))) packages.append((header, data[16:].decode("utf-8")))
def unpack(self, data) -> list:
packages = [] # 装处理好的数据包用
header = PackageHeader(*HeaderStruct.unpack(data[:16])) # 读取数据包的头部
data = data[16:] # 读取数据包的数据段
# 心跳包处理
if header.operation == Operation.HEARTBEAT_REPLY:
# 心跳不会粘包,前4位有不明含义的数据
packages.append((header, data[4:].decode("utf-8")))
# 通知包处理
elif header.operation == Operation.NOTIFY:
# NOTIFY 消息可能会粘包 # NOTIFY 消息可能会粘包
if header.version == ProtocolVersion.DEFLATE: if header.version == ProtocolVersion.DEFLATE:
# 先zlib解码拆包 # 先zlib解码拆包
data = zlib.decompress(data) data = zlib.decompress(data)
zipped_notify_pkg_process(data) self.zipped_notify_pkg_process(packages, data)
elif header.version == ProtocolVersion.BROTLI: elif header.version == ProtocolVersion.BROTLI:
# 与 zlib 逻辑相同,先解码,然后数据可能要拆包 # 与 zlib 逻辑相同,先解码,然后数据可能要拆包
data = brotli.decompress(data) data = brotli.decompress(data)
zipped_notify_pkg_process(data) self.zipped_notify_pkg_process(packages, data)
elif header.version == ProtocolVersion.NORMAL: elif header.version == ProtocolVersion.NORMAL:
# normal 直接decode # normal 直接decode
@ -218,7 +190,7 @@ class BWS_MsgPackage:
return packages return packages
packman = BWS_MsgPackage() packman = BLiveMsgPackage()
class Events(str, enum.Enum): class Events(str, enum.Enum):
@ -242,8 +214,9 @@ class Events(str, enum.Enum):
ROOM_BLOCK_MSG = "ROOM_BLOCK_MSG" # 用户被禁言,%uname%昵称 ROOM_BLOCK_MSG = "ROOM_BLOCK_MSG" # 用户被禁言,%uname%昵称
GUARD_BUY = "GUARD_BUY" # 有人上船 GUARD_BUY = "GUARD_BUY" # 有人上船
FIRST_GUARD = "FIRST_GUARD" # 用户初次上船 FIRST_GUARD = "FIRST_GUARD" # 用户初次上船
# 船员数量改变事件,%uname%新船员昵称,%num%获取大航海数量附带直播间信息json数据 NEW_GUARD_COUNT = (
NEW_GUARD_COUNT = "NEW_GUARD_COUNT" "NEW_GUARD_COUNT" # 船员数量改变事件,%uname%新船员昵称,%num%获取大航海数量附带直播间信息json数据
)
USER_TOAST_MSG = "USER_TOAST_MSG" # 上船附带的通知 USER_TOAST_MSG = "USER_TOAST_MSG" # 上船附带的通知
HOT_RANK_CHANGED = "HOT_RANK_CHANGED" # 热门榜排名改变 HOT_RANK_CHANGED = "HOT_RANK_CHANGED" # 热门榜排名改变
HOT_RANK_SETTLEMENT = "HOT_RANK_SETTLEMENT" # 荣登热门榜topX HOT_RANK_SETTLEMENT = "HOT_RANK_SETTLEMENT" # 荣登热门榜topX
@ -265,8 +238,7 @@ class Events(str, enum.Enum):
CUT_OFF = "CUT_OFF" # 被超管切断 CUT_OFF = "CUT_OFF" # 被超管切断
room_admin_entrance = "room_admin_entrance" # 设置房管 room_admin_entrance = "room_admin_entrance" # 设置房管
ROOM_ADMINS = "ROOM_ADMINS" # 房管数量改变 ROOM_ADMINS = "ROOM_ADMINS" # 房管数量改变
# 勋章升级,仅送礼物后触发,需设置中开启“监听勋章升级”。%medal_level%获取新等级(但用户当前勋章不一定是本直播间) MEDAL_UPGRADE = "MEDAL_UPGRADE" # 勋章升级,仅送礼物后触发,需设置中开启“监听勋章升级”。%medal_level%获取新等级(但用户当前勋章不一定是本直播间)
MEDAL_UPGRADE = "MEDAL_UPGRADE"
STOP_LIVE_ROOM_LIST = "STOP_LIVE_ROOM_LIST" # 停止直播的房间这些房间会关闭ws连接 STOP_LIVE_ROOM_LIST = "STOP_LIVE_ROOM_LIST" # 停止直播的房间这些房间会关闭ws连接
WIDGET_BANNER = "WIDGET_BANNER" # 小部件横幅 WIDGET_BANNER = "WIDGET_BANNER" # 小部件横幅
PK_BATTLE_PROCESS_NEW = "PK_BATTLE_PROCESS_NEW" # 开始pk PK_BATTLE_PROCESS_NEW = "PK_BATTLE_PROCESS_NEW" # 开始pk

127
blive/eeframework.py Normal file
View File

@ -0,0 +1,127 @@
import json
import asyncio
from typing import List, Union
import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from requests.exceptions import ConnectionError
from pyee import AsyncIOEventEmitter
from .core import (
BLiveMsgPackage,
PackageHeader,
Events,
Operation,
get_blive_room_info,
get_blive_ws_url,
certification,
heartbeat,
)
class BLiverCtx:
def __init__(self, bliver, msg) -> None:
self.ws = bliver.ws
self.bliver: BLiver = bliver
self.msg:tuple[PackageHeader,dict] = msg # 原始消息
self.header: PackageHeader = self.msg[0] # 消息头部
self.body:dict = json.loads(msg[1])
class BLiver(AsyncIOEventEmitter):
def __init__(self, room_id, uid=0):
super().__init__()
self.running = False
self.ws = None
self.room_id = room_id
self.uid = uid
self.real_room_id, self.uname = get_blive_room_info(room_id)
self.packman = BLiveMsgPackage()
self.scheduler = AsyncIOScheduler(timezone="Asia/ShangHai")
self.aio_session = aiohttp.ClientSession()
def register_handler(self, event: Union[Events, List[Events]], handler):
self.on(event, handler)
async def heartbeat(self):
try:
if self.ws is not None and not self.ws.closed:
await self.ws.send_bytes(
self.packman.pack(heartbeat(), Operation.HEARTBEAT)
)
return
except (
aiohttp.ClientConnectionError,
asyncio.TimeoutError,
ConnectionError,
ConnectionResetError,
):
await self.connect() # 重新连接
async def connect(self, retries=5):
for _ in range(retries):
try:
url, token = get_blive_ws_url(self.real_room_id)
self.ws = await self.aio_session.ws_connect(url)
# 发送认证
await self.ws.send_bytes(
self.packman.pack(
certification(self.real_room_id, token, uid=self.uid),
Operation.AUTH,
)
)
return
except (
aiohttp.ClientConnectionError,
asyncio.TimeoutError,
ConnectionError,
ConnectionResetError,
):
await asyncio.sleep(1)
raise aiohttp.ClientConnectionError("与服务器连接失败")
async def listen(self):
self.running = True
# start listening
await self.connect()
# 开始30s发送心跳包的定时任务
self.scheduler.add_job(self.heartbeat, trigger="interval", seconds=30)
self.scheduler.start()
# 开始监听
while True:
try:
msg = await self.ws.receive(timeout=60)
if msg.type in (
aiohttp.WSMsgType.CLOSING,
aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.ERROR,
):
await self.connect() # reconnect
continue
if msg.type != aiohttp.WSMsgType.BINARY:
continue
mq = self.packman.unpack(msg.data)
ctxs = [BLiverCtx(self, m) for m in mq]
ctxs = filter(lambda ctx:ctx.body.get("cmd",None), ctxs)
for ctx in ctxs:
self.emit(ctx.body["cmd"],ctx)
except (
aiohttp.ClientConnectionError,
ConnectionResetError,
asyncio.TimeoutError,
):
await self.connect()
async def graceful_close(self):
self.scheduler.shutdown()
await self._ws.close()
await self.aio_session.close()
self.running = False
def run(self):
loop = asyncio.get_event_loop()
loop.create_task(self.listen())
loop.run_forever()
def run_as_task(self):
loop = asyncio.get_event_loop()
return loop.create_task(self.listen())

View File

@ -1,4 +1,3 @@
from socket import gaierror
import sys import sys
import json import json
import asyncio import asyncio

View File

@ -1,5 +1,4 @@
from blive import BLiver, Events, BLiverCtx from blive import BLiver, Events, BLiverCtx
from blive.msg import ( from blive.msg import (
DanMuMsg, DanMuMsg,
EntryEffectMsg, EntryEffectMsg,
@ -18,7 +17,7 @@ app = BLiver(510)
async def listen(ctx: BLiverCtx): async def listen(ctx: BLiverCtx):
danmu = DanMuMsg(ctx.body) danmu = DanMuMsg(ctx.body)
print( print(
f'\n{danmu.sender.name} ({danmu.sender.medal.medal_name}:{danmu.sender.medal.medal_level}): "{danmu.content}"\n' f'[弹幕] {danmu.sender.name} ({danmu.sender.medal.medal_name}:{danmu.sender.medal.medal_level}): "{danmu.content}"\n'
) )
@ -26,18 +25,17 @@ async def listen(ctx: BLiverCtx):
async def listen_join(ctx: BLiverCtx): async def listen_join(ctx: BLiverCtx):
join = InteractWordMsg(ctx.body) join = InteractWordMsg(ctx.body)
print( print(
"欢迎", "[欢迎]",
f"{join.user['name']} ({join.user['medal']['medal_name']}:{join.user['medal']['medal_level']})", f"{join.user['name']} ({join.user['medal']['medal_name']}:{join.user['medal']['medal_level']})",
"进入直播间", "进入直播间\n",
) )
@app.on(Events.SUPER_CHAT_MESSAGE) @app.on(Events.SUPER_CHAT_MESSAGE)
async def listen_sc(ctx: BLiverCtx): async def listen_sc(ctx: BLiverCtx):
msg = SuperChatMsg(ctx.body) msg = SuperChatMsg(ctx.body)
print("sc 来了")
print( print(
f"\n感谢 {msg.sender['name']}({msg.sender['medal']['medal_name']}:{msg.sender['medal']['medal_level']})的价值 {msg.price} 的sc\n\n\t{msg.content}\n" f"[sc] 感谢 {msg.sender['name']}({msg.sender['medal']['medal_name']}:{msg.sender['medal']['medal_level']})的价值 {msg.price} 的sc\n\n\t{msg.content}\n"
) )
@ -45,7 +43,7 @@ async def listen_sc(ctx: BLiverCtx):
async def listen_gift(ctx: BLiverCtx): async def listen_gift(ctx: BLiverCtx):
msg = SendGiftMsg(ctx.body) msg = SendGiftMsg(ctx.body)
print( print(
f"{msg.sender['name']} ({msg.sender['medal']['medal_name']}:{msg.sender['medal']['medal_level']}) 送出 {msg.gift['gift_name']}" f"[礼物] {msg.sender['name']} ({msg.sender['medal']['medal_name']}:{msg.sender['medal']['medal_level']}) 送出 {msg.gift['gift_name']}\n"
) )
@ -53,27 +51,27 @@ async def listen_gift(ctx: BLiverCtx):
async def hot(ctx: BLiverCtx): async def hot(ctx: BLiverCtx):
msg = HotRankChangeV2Msg(ctx.body) msg = HotRankChangeV2Msg(ctx.body)
print( print(
f"恭喜 {ctx.bliver.uname}{msg.area_name} 区 的 {msg.rank_desc} 榜单中获得第 {msg.rank}" f"[通知] 恭喜 {ctx.bliver.uname}{msg.area_name} 区 的 {msg.rank_desc} 榜单中获得第 {msg.rank}\n"
) )
@app.on(Events.ENTRY_EFFECT) @app.on(Events.ENTRY_EFFECT)
async def welcome_captain(ctx: BLiverCtx): async def welcome_captain(ctx: BLiverCtx):
msg = EntryEffectMsg(ctx.body) msg = EntryEffectMsg(ctx.body)
print(f"\n{msg.copy_writting}\n") print(f"[热烈欢迎] {msg.copy_writting}\n")
@app.on(Events.STOP_LIVE_ROOM_LIST) @app.on(Events.STOP_LIVE_ROOM_LIST)
async def stop_live_room_list(ctx: BLiverCtx): async def stop_live_room_list(ctx: BLiverCtx):
# 监听停止直播的房间 # 监听停止直播的房间
msg = StopLiveRoomListMsg(ctx.body) msg = StopLiveRoomListMsg(ctx.body)
print(f"停止直播的房间列表:{msg.room_id_list}") print(f"[通知] 停止直播的房间列表:{msg.room_id_list}\n")
@app.on(Events.ONLINE_RANK_COUNT) @app.on(Events.ONLINE_RANK_COUNT)
async def online_rank(ctx): async def online_rank(ctx):
msg = OnlineRankCountMsg(ctx.body) msg = OnlineRankCountMsg(ctx.body)
print(f"当前在线人气排名 {msg.count}") print(f"[通知] 当前在线人气排名 {msg.count}\n")
app.run() app.run()

View File

@ -1,66 +0,0 @@
"""监听多个直播间的例子"""
import asyncio
from blive import BLiver, Events, BLiverCtx
from blive.msg import DanMuMsg
# 多个对象共用的全局异常处理
# 首先定义全局异常处理handler
def global_error_handler(e, app: BLiver):
print(f"{app.uname} 全局异常捕获", e)
# 调用类方法注册异常以及其处理函数,需在实例化之前注册注册后所有BLiver共同拥有该异常处理
BLiver.register_global_error_handler(ZeroDivisionError, global_error_handler)
# 定义弹幕事件handler,为了演示异常处理直接在方法中抛出异常
async def azi_timeout_error(ctx: BLiverCtx):
raise TimeoutError
async def ke_type_error(ctx):
raise TypeError
async def zero_division_error(ctx):
1 / 0
# 两个直播间
ke = BLiver(21716679)
azi = BLiver(7983476)
# 注册handler
ke.register_handler(Events.INTERACT_WORD, zero_division_error)
azi.register_handler(Events.INTERACT_WORD, zero_division_error)
ke.register_handler(Events.DANMU_MSG, ke_type_error)
azi.register_handler(Events.DANMU_MSG, azi_timeout_error)
# 类实例级别的异常处理,实例与实例之间不共享
ke.register_error_handler(
TypeError, lambda e, app: print(f"{app.uname} catch TypeError", e)
)
# 实例级别的异常处理可以用注解方式进行注册
@azi.catch(TimeoutError)
def azi_handler(e, app):
print(f"{app.uname} catch TimeoutError", e)
async def main():
# 以异步task的形式运行
task1 = ke.run_as_task()
task2 = azi.run_as_task()
# await 两个任务
await asyncio.gather(*[task1, task2])
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

View File

@ -2,7 +2,7 @@
import asyncio import asyncio
from blive import BLiver, Events, BLiverCtx from blive import BLiver, Events, BLiverCtx
from blive.msg import DanMuMsg from blive.msg import DanMuMsg,InteractWordMsg
# 定义弹幕事件handler # 定义弹幕事件handler
@ -12,18 +12,27 @@ async def listen(ctx: BLiverCtx):
f'\n{ctx.bliver.uname}{danmu.sender.name} ({danmu.sender.medal.medal_name}:{danmu.sender.medal.medal_level}): "{danmu.content}"\n' f'\n{ctx.bliver.uname}{danmu.sender.name} ({danmu.sender.medal.medal_name}:{danmu.sender.medal.medal_level}): "{danmu.content}"\n'
) )
async def listen_join(ctx: BLiverCtx):
join = InteractWordMsg(ctx.body)
print(
f"\n{ctx.bliver.uname}】欢迎",
f"{join.user['name']} ({join.user['medal']['medal_name']}:{join.user['medal']['medal_level']})",
"进入直播间",
)
async def main(): async def main():
# 两个直播间 # 两个直播间
ke = BLiver(605) hai7 = BLiver(21452505)
azi = BLiver(510) azi = BLiver(510)
# 注册handler
ke.register_handler(Events.DANMU_MSG, listen)
azi.register_handler(Events.DANMU_MSG, listen) azi.register_handler(Events.DANMU_MSG, listen)
azi.register_handler(Events.INTERACT_WORD, listen_join)
# 注册handler
hai7.register_handler(Events.DANMU_MSG, listen)
hai7.register_handler(Events.INTERACT_WORD, listen_join)
# 以异步task的形式运行 # 以异步task的形式运行
task1 = ke.run_as_task() task1 = hai7.run_as_task()
task2 = azi.run_as_task() task2 = azi.run_as_task()
# await 两个任务 # await 两个任务

View File

@ -5,7 +5,7 @@ with open("./README.md", "r") as f:
setuptools.setup( setuptools.setup(
name="blive", name="blive",
version="0.0.8", version="0.0.9",
author="cam", author="cam",
author_email="yulinfeng000@gmail.com", author_email="yulinfeng000@gmail.com",
long_description=description, long_description=description,
@ -16,5 +16,5 @@ setuptools.setup(
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License", "License :: OSI Approved :: MIT License",
], ],
install_requires=["aiohttp","loguru","requests","APScheduler","brotli"] install_requires=["aiohttp","requests","APScheduler","brotli","pyee"]
) )