简单的B站直播弹幕websocket监听处理框架
Go to file
2022-01-06 12:18:29 +08:00
blive add new msg operator class and fix some bug 2022-01-06 12:18:29 +08:00
.gitignore init repo 2022-01-06 00:50:50 +08:00
app.py add new msg operator class and fix some bug 2022-01-06 12:18:29 +08:00
README.md update readme 2022-01-06 00:51:51 +08:00
requirements.txt init repo 2022-01-06 00:50:50 +08:00

B 站弹幕监听框架

特点

  • 简单,只需房间号即可监听
  • 异步io 不阻塞,及时获取消息

快速开始

  1. 创建 app

目前请克隆该代码仓库,并执行pip install -r rquirements.txt


from blive import  BLiver

app = BLiver(123) #123为房间号
  1. 创建处理器

from blive import  BLiver, Events, BLiverCtx

app = BLiver(123)

# 标记该方法监听弹幕消息,更多消息类型请参考Events类源代码
@app.handler(Events.DANMU_MSG)
async def listen_danmu(ctx: BLiverCtx):
    danmu = DanMuMsg(ctx.body) #ctx.body套上相应的消息操作类即可得到消息的基本内容,也可直接操作ctx.body
    print(danmu.content())
    print(danmu.sender())
    print(danmu.timestamp())

  1. 运行

from blive import  BLiver, Events, BLiverCtx

app = BLiver(123)
@app.handler(Events.DANMU_MSG)
async def listen_danmu(ctx: BLiverCtx):
    danmu = DanMuMsg(ctx.body)
    print(danmu.content())
    print(danmu.sender())
    print(danmu.timestamp())

app.run() # 运行代码!

项目简介

  • blive 文件夹为框架代码
  • app.py 为一个简单示例

TODO

  • 打包发布
  • 更多的消息操作类
  • 尝试加入中间件架构