mirror of
https://github.com/yulinfeng000/blive.git
synced 2024-12-26 12:50:22 +08:00
简单的B站直播弹幕websocket监听处理框架
blive | ||
example | ||
.gitignore | ||
LICENSE | ||
PROTOCOL.md | ||
README.md | ||
requirements.txt | ||
setup.py |
B 站弹幕监听框架
特点
- 简单,只需房间号即可监听
- 异步,io 不阻塞,及时获取消息
B 站直播弹幕 websocket 协议分析
快速开始
-
安装
pip install blive
-
创建 app
from blive import BLiver app = BLiver(123) #123为房间号
-
创建处理器
from blive import BLiver, Events, BLiverCtx from blive.msg import DanMuMsg app = BLiver(123) # 标记该方法监听弹幕消息,更多消息类型请参考 Events 类源代码 @app.on(Events.DANMU_MSG) async def listen_danmu(ctx: BLiverCtx): danmu = DanMuMsg(ctx.body) #ctx.body 套上相应的消息操作类即可得到消息的基本内容,也可直接操作 ctx.body print(danmu.content) print(danmu.sender) print(danmu.timestamp)
-
运行
from blive import BLiver, Events, BLiverCtx from blive.msg import DanMuMsg app = BLiver(123) @app.on(Events.DANMU_MSG) async def listen_danmu(ctx: BLiverCtx): danmu = DanMuMsg(ctx.body) print(danmu.content) print(danmu.sender) print(danmu.timestamp) app.run() # 运行app!
同时监听多个直播间
import asyncio
from blive import BLiver, Events, BLiverCtx
from blive.msg import DanMuMsg
# 定义弹幕事件handler
async def listen(ctx: BLiverCtx):
danmu = DanMuMsg(ctx.body)
print(
f'\n{danmu.sender.name} ({danmu.sender.medal.medal_name}:{danmu.sender.medal.medal_level}): "{danmu.content}"\n'
)
async def main():
# 两个直播间
ke = BLiver(605)
azi = BLiver(510)
# 注册handler
ke.on(Events.DANMU_MSG, listen)
azi.on(Events.DANMU_MSG, listen)
# 以异步task的形式运行
task1 = ke.run_as_task()
task2 = azi.run_as_task()
# await 两个任务
await asyncio.gather(*[task1, task2])
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
作为协议解析工具在其他地方使用(伪代码)
from blive.core import BWS_MsgPackage
packman = BWS_MsgPackage() # 实例化一个消息包处理器
while True:
data = ws.receive() # 当收到消息时
msg = packman.unpack(data) # 使用packman解析消息,返回一个形如 [(header,body), (header,body), ... ] 数组
print(msg)
与 fastapi (其他asyncio生态框架) 配合使用
from fastapi import FastAPI
from blive import BLiver,Events
from blive.msg import DanMuMsg
app = FastAPI()
BLIVER_POOL = {}
# 定义弹幕事件handler
async def handler(ctx):
danmu = DanMuMsg(ctx.body)
print(
f'\n{danmu.sender.name} ({danmu.sender.medal.medal_name}:{danmu.sender.medal.medal_level}): "{danmu.content}"\n'
)
def create_bliver(roomid):
b = BLiver(roomid)
b.on(Events.DANMU_MSG,handler)
return b
@app.get("/create")
async def create_new_bliver(roomid:int):
room = BLIVER_POOL.get(roomid,None)
if not room:
b = create_bliver(roomid)
BLIVER_POOL[roomid] = b.run_as_task() # 启动监听
return {"msg":"创建一个新直播间弹幕监听成功"}
@app.get("/del")
async def rm_bliver(roomid:int):
room = BLIVER_POOL.get(roomid,None)
if room:
room.cancel()
BLIVER_POOL.pop(roomid)
return {"msg":"移除直播间弹幕监听成功"}
@app.get("/show")
async def show():
return list(BLIVER_POOL.keys())
项目简介
-
blive 文件夹为框架代码
-
core.py 为B站ws直播聊天室协议包处理的核心代码
-
eeframework.py 为框架代码
-
msg.py 为消息操作类代码
-
-
example/app.py 以框架形式运行例子
-
example/multi_room.py 同时监听多个直播间的实现
-
example/with_fastapi.py 与fastapi 配合使用的例子
TODO
- 更多的消息操作类(欢迎各位提pr)
- 尝试加入中间件架构(目前感觉需求不大)