init repo

This commit is contained in:
Cam 2022-01-06 00:50:50 +08:00
commit ad99879faf
8 changed files with 562 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
.venv
.vscode
**/__pycache__

66
README.md Normal file
View File

@ -0,0 +1,66 @@
# B 站弹幕监听框架
## 特点
- 简单,只需房间号即可监听
- 异步io 不阻塞,及时获取消息
## 快速开始
1. 创建 app
### 目前请克隆该代码仓库
```python
from blive import BLiver
app = BLiver(123) #123为房间号
```
2. 创建处理器
```python
from blive import BLiver, Events, BLiverCtx
app = BLiver(123)
# 标记该方法监听弹幕消息,更多消息类型请参考Events类源代码
@app.handler(Events.DANMU_MSG)
async def listen_danmu(ctx: BLiverCtx):
danmu = DanMuMsg(ctx.body) #ctx.body套上相应的消息操作类即可得到消息的基本内容,也可直接操作ctx.body
print(danmu.content())
print(danmu.sender())
print(danmu.timestamp())
```
3. 运行
```python
from blive import BLiver, Events, BLiverCtx
app = BLiver(123)
@app.handler(Events.DANMU_MSG)
async def listen_danmu(ctx: BLiverCtx):
danmu = DanMuMsg(ctx.body)
print(danmu.content())
print(danmu.sender())
print(danmu.timestamp())
app.run() # 运行代码!
```
## 项目简介
- blive 文件夹为框架代码
- app.py 为一个简单示例
## TODO
- 打包发布
- 更多的消息操作类
- 尝试加入中间件架构

14
app.py Normal file
View File

@ -0,0 +1,14 @@
from blive import BLiver, Events, BLiverCtx, DanMuMsg
app = BLiver(510)
@app.handler(Events.DANMU_MSG)
async def listen(ctx: BLiverCtx):
danmu = DanMuMsg(ctx.body)
print(danmu.content())
print(danmu.sender())
print(danmu.timestamp())
app.run()

3
blive/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .framework import *
from .core import *
from .msg import *

256
blive/core.py Normal file
View File

@ -0,0 +1,256 @@
from collections import namedtuple
from multiprocessing import RawValue, Lock
import json
import requests
import struct
import enum
import brotli
import zlib
def get_blive_ws_url(roomid, ssl=False):
resp = requests.get(
f"https://api.live.bilibili.com/room/v1/Danmu/getConf?room_id={roomid}&platform=pc&player=web"
)
data = resp.json()
url_obj = data["data"]["host_server_list"][1]
if ssl:
url = f"ws://{url_obj['host']}:{url_obj['ws_port']}/sub"
else:
url = f"wss://{url_obj['host']}:{url_obj['wss_port']}/sub"
return url, data["data"]["token"]
def get_blive_room_id(roomid):
"""
得到b站直播间id,短id不是真实的id
Return: ture_id,short_id,up_id
"""
resp = requests.get(
"https://api.live.bilibili.com/xlive/web-room/v1/index/getInfoByRoom",
params={"room_id": roomid},
)
data = resp.json()
return (
data["data"]["room_info"]["room_id"],
data["data"]["room_info"]["short_id"],
data["data"]["room_info"]["uid"],
)
def get_blive_dm_history(roomid):
resp = requests.post(
"https://api.live.bilibili.com/xlive/web-room/v1/dM/gethistory",
headers={
"Host": "api.live.bilibili.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:78.0) Gecko/20100101 Firefox/78.0",
},
data={
"roomid": roomid,
"csrf_token": "",
"csrf": "",
"visit_id": "",
},
)
return resp.json()
def certification(roomid, token, uid=0, protover=3, platform="web"):
return {
"uid": uid,
"roomid": roomid,
"protover": protover,
"platform": platform,
"type": 2,
"clientver": "1.4.3",
"key": token,
}
def heartbeat():
return {}
class AuthReplyCode(enum.IntEnum):
OK = 0
TOKEN_ERROR = -101
# WS_BODY_PROTOCOL_VERSION
class ProtocolVersion(enum.IntEnum):
NORMAL = 0 # 未压缩
HEARTBEAT = 1 # 心跳
INFLATE = 2 # zlib压缩
BROTLI = 3 # brotil 压缩
class Operation(enum.IntEnum):
HEARTBEAT = 2 # 心跳
HEARTBEAT_REPLY = 3 # 心跳回应
NOTIFY = 5 # 通知
AUTH = 7 # # 认证
AUTH_REPLY = 8 # 认证回应
class Counter(object):
# 线程安全计数器
def __init__(self, init_value) -> None:
self.current = RawValue("i", init_value)
self.lock = Lock()
def increment(self):
with self.lock:
self.current.value += 1
def value(self):
with self.lock:
return self.current.value
def increment_get(self):
with self.lock:
self.current.value += 1
return self.current.value
def get_increment(self):
with self.lock:
yield self.current.value
self.current.value += 1
PackageHeader = namedtuple(
"PackageHeader",
["package_size", "header_size", "version", "operation", "sequence_id"],
)
HeaderStruct = struct.Struct(">I2H2I")
class B_MsgPackage:
def __init__(self) -> None:
self.sequence = Counter(0)
def pack(self, data, operation, version=ProtocolVersion.NORMAL):
body = json.dumps(data).encode("utf-8")
header = HeaderStruct.pack(
*PackageHeader(
package_size=HeaderStruct.size + len(body),
header_size=HeaderStruct.size,
version=version,
operation=operation,
sequence_id=self.sequence.increment_get(),
)
)
return header + body
def unpack(self, data) -> list:
packages = []
header = PackageHeader(*HeaderStruct.unpack(data[:16])) #
data = data[header.header_size :]
# 心跳包处理
if header.operation == Operation.HEARTBEAT_REPLY:
# 心跳不会粘包
packages.append((header, data[4:].decode("utf-8")))
# 通知包处理
elif header.operation == Operation.NOTIFY:
def notify_pk_process(data):
# 粘包处理代码 ,抽取为公共函数
header = PackageHeader(*HeaderStruct.unpack(data[:16])) # 包头
if len(data) > header.package_size: # 如果数据大小大于包头,说明是粘包
while True:
# 先把第一个包放进去
packages.append(
(header, data[16 : header.package_size].decode("utf-8"))
)
# 移动到下一个包
data = data[header.package_size :]
header = PackageHeader(*HeaderStruct.unpack(data[:16]))
if len(data) > header.package_size:
# 如果数据还大于package说明还有1个以上的包
continue
else:
packages.append(
(header, data[16:].decode("utf-8"))
) # 直接放第二个包
break
else:
packages.append((header, data[16:].decode("utf-8")))
# NOTIFY 消息可能会粘包
if header.version == ProtocolVersion.INFLATE:
# 先zlib解码
data = zlib.decompress(data)
notify_pk_process(data)
elif header.version == ProtocolVersion.BROTLI:
# 与zlib 逻辑相同,先解码,然后数据可能要拆包
data = brotli.decompress(data)
notify_pk_process(data)
elif header.version == ProtocolVersion.NORMAL:
# normal 直接decodefeature
packages.append((header, data.decode("utf-8")))
else:
# TODO 抛出错误或者打印日志
pass
elif header.operation == Operation.AUTH_REPLY:
packages.append((header, data.decode("utf-8")))
return packages
packman = B_MsgPackage()
class Events(str, enum.Enum):
PREPARING = "PREPARING" # 下播【结束语】
ROOM_CHANGE = "ROOM_CHANGE" # 房间信息改变
ROOM_RANK = "ROOM_RANK" # 排名改变
DANMU_MSG = "DANMU_MSG" # 接收到弹幕【自动回复】
SEND_GIFT = "ROOM_RANK" # 有人送礼【答谢送礼】
WELCOME_GUARD = "WELCOME_GUARD" # 舰长进入(不会触发)
ENTRY_EFFECT = "ENTRY_EFFECT" # 舰长、高能榜、老爷进入【欢迎舰长】
WELCOME = "WELCOME" # 老爷进入
INTERACT_WORD = "INTERACT_WORD" # 用户进入【欢迎】
ATTENTION = "ATTENTION" # 用户关注【答谢关注】
SHARE = "SHARE" # 用户分享直播间
SPECIAL_ATTENTION = "SPECIAL_ATTENTION" # 特别关注直播间,可用%special%判断
ROOM_REAL_TIME_MESSAGE_UPDATE = "ROOM_REAL_TIME_MESSAGE_UPDATE" # 粉丝数量改变
SUPER_CHAT_MESSAGE = "ROOM_REAL_TIME_MESSAGE_UPDATE" # 醒目留言
SUPER_CHAT_MESSAGE_JPN = "SUPER_CHAT_MESSAGE_JPN" # 醒目留言日文翻译
SUPER_CHAT_MESSAGE_DELETE = "SUPER_CHAT_MESSAGE_DELETE" # 删除醒目留言
ROOM_BLOCK_MSG = "ROOM_BLOCK_MSG" # 用户被禁言,%uname%昵称
GUARD_BUY = "GUARD_BUY" # 有人上船
FIRST_GUARD = "FIRST_GUARD" # 用户初次上船
# 船员数量改变事件,%uname%新船员昵称,%num%获取大航海数量附带直播间信息json数据
NEW_GUARD_COUNT = "NEW_GUARD_COUNT"
USER_TOAST_MSG = "USER_TOAST_MSG" # 上船附带的通知
HOT_RANK_CHANGED = "HOT_RANK_CHANGED" # 热门榜排名改变
HOT_RANK_SETTLEMENT = "HOT_RANK_SETTLEMENT" # 荣登热门榜topX
HOT_RANK = "HOT_RANK" # 热门榜xx榜topX%text%获取排名
ONLINE_RANK_V2 = "ONLINE_RANK_V2" # 礼物榜(高能榜)刷新
ONLINE_RANK_TOP3 = "ONLINE_RANK_TOP3" # 高能榜TOP3改变
ONLINE_RANK_COUNT = "ONLINE_RANK_COUNT" # 高能榜改变
NOTICE_MSG = "NOTICE_MSG" # 上船等带的通知
COMBO_SEND = "COMBO_SEND" # 礼物连击
SPECIAL_GIFT = "SPECIAL_GIFT" # 定制的专属礼物
ANCHOR_LOT_CHECKSTATUS = "ANCHOR_LOT_CHECKSTATUS" # 天选时刻前的审核
ANCHOR_LOT_START = "ANCHOR_LOT_START" # 开启天选
ANCHOR_LOT_END = "ANCHOR_LOT_END" # 天选结束
ANCHOR_LOT_AWARD = "ANCHOR_LOT_AWARD" # 天选结果推送
VOICE_JOIN_ROOM_COUNT_INFO = "VOICE_JOIN_ROOM_COUNT_INFO" # 申请连麦队列变化
VOICE_JOIN_LIST = "VOICE_JOIN_LIST" # 连麦申请、取消连麦申请
VOICE_JOIN_STATUS = "VOICE_JOIN_STATUS" # 开始连麦、结束连麦
WARNING = "WARNING" # 被警告,%text%可获取内容
CUT_OFF = "CUT_OFF" # 被超管切断
room_admin_entrance = "room_admin_entrance" # 设置房管
ROOM_ADMINS = "ROOM_ADMINS" # 房管数量改变
# 勋章升级,仅送礼物后触发,需设置中开启“监听勋章升级”。%medal_level%获取新等级(但用户当前勋章不一定是本直播间)
MEDAL_UPGRADE = "MEDAL_UPGRADE"
STOP_LIVE_ROOM_LIST = "STOP_LIVE_ROOM_LIST" # 停止直播的房间

148
blive/framework.py Normal file
View File

@ -0,0 +1,148 @@
import sys
import json
import asyncio
from typing import Dict, Tuple
import aiohttp
from aiohttp.client_ws import ClientWebSocketResponse
from aiohttp.http_websocket import WSMessage
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import loguru
from apscheduler.util import _Undefined
from .core import (
Events,
Operation,
PackageHeader,
packman,
get_blive_room_id,
get_blive_ws_url,
certification,
heartbeat,
)
undefined = _Undefined()
class BLiverCtx(object):
def __init__(self, bliver, msg) -> None:
super().__init__()
self.ws: ClientWebSocketResponse = bliver.ws
self.msg: Tuple = msg # 原始消息
self.bliver: BLiver = bliver
self.body: Dict = None # 消息内容
class Processor:
def __init__(self, logger=None) -> None:
self.logger = logger or loguru.logger
self.channels = {}
for e in Events:
self.channels[e] = []
def register(self, channel, handler):
handlers = self.channels.get(channel, None)
handlers.append(handler)
async def process(self, ctx):
header: PackageHeader = ctx.msg[0]
msg = json.loads(ctx.msg[1])
ctx.body = msg
if header.operation == Operation.NOTIFY:
handlers = self.channels.get(msg["cmd"], []) # 根据cmd 得到相应的处理句柄
await asyncio.gather(*[c(ctx) for c in handlers])
class BLiver:
def __init__(self, roomid, logger=None, log_level="INFO"):
self.roomid = roomid
if not logger:
self.logger = loguru.logger
self.logger.remove()
else:
self.logger = logger
self.logger.add(sys.stderr, level=log_level)
self._ws: ClientWebSocketResponse = None
self.scheduler = AsyncIOScheduler(timezone="Asia/ShangHai")
self.processor = Processor(logger=self.logger)
def handler(self, event: Events):
def f_wrapper(func):
self.logger.debug("handler added")
self.processor.register(event, func)
return func
return f_wrapper
def scheduled(
self,
trigger,
args=None,
kwargs=None,
id=None,
name=None,
misfire_grace_time=undefined,
coalesce=undefined,
max_instances=undefined,
next_run_time=undefined,
jobstore="default",
executor="default",
**trigger_args,
):
def s_func_wrapper(func):
self.logger.debug("scheduler job added,{}", func)
self.scheduler.add_job(
func,
trigger=trigger,
args=args,
kwargs=kwargs,
id=id,
name=name,
misfire_grace_time=misfire_grace_time,
coalesce=coalesce,
max_instances=max_instances,
next_run_time=next_run_time,
jobstore=jobstore,
executor=executor,
replace_existing=True,
**trigger_args,
)
return func
return s_func_wrapper
@property
def ws(self):
assert self._ws
return self._ws
async def heartbeat(self):
assert self._ws
await self._ws.send_bytes(packman.pack(heartbeat(), Operation.HEARTBEAT))
self.logger.debug("heartbeat sended")
async def listen(self):
rommid, _, _ = get_blive_room_id(self.roomid) # 如果是短id,就得到直播间真实id
url, token = get_blive_ws_url(rommid)
async with aiohttp.ClientSession().ws_connect(url) as ws:
self._ws = ws
await ws.send_bytes(
packman.pack(certification(rommid, token), Operation.AUTH)
)
self.scheduler.add_job(self.heartbeat, trigger="interval", seconds=30)
self.scheduler.start()
# 开始监听
while True:
msg: WSMessage = await ws.receive()
# print(msg)
if msg.type != aiohttp.WSMsgType.BINARY:
continue
# print(msg.data)
mq = packman.unpack(msg.data)
self.logger.debug("received msg:\n{}", mq)
tasks = [self.processor.process(BLiverCtx(self, m)) for m in mq]
await asyncio.gather(*tasks)
def run(self):
loop = asyncio.get_event_loop()
loop.create_task(self.listen())
loop.run_forever()

35
blive/msg.py Normal file
View File

@ -0,0 +1,35 @@
from abc import ABC
import json
"""
消息操作封装类,目前只封装了弹幕消息操作
"""
class BaseMsg(ABC):
def __init__(self, body) -> None:
super().__init__()
self.body = body
def cmd(self):
return self.body["cmd"]
def info(self):
return self.body["info"]
def __repr__(self) -> str:
return json.dumps(self.body)
class DanMuMsg(BaseMsg):
def __init__(self, body) -> None:
super(DanMuMsg, self).__init__(body)
def content(self):
return self.info()[1]
def sender(self):
return {"id": self.body["info"][2][0], "name": self.body["info"][2][1]}
def timestamp(self):
return self.body["info"][9]

37
requirements.txt Normal file
View File

@ -0,0 +1,37 @@
aiodns==3.0.0
aiohttp==3.8.1
aiosignal==1.2.0
APScheduler==3.8.1
async-timeout==4.0.2
asynctest==0.13.0
attrs==21.4.0
backports.zoneinfo==0.2.1
black==21.12b0
Brotli==1.0.9
cchardet==2.1.7
certifi==2021.10.8
cffi==1.15.0
charset-normalizer==2.0.10
click==8.0.3
frozenlist==1.2.0
idna==3.3
importlib-metadata==4.10.0
loguru==0.5.3
multidict==5.2.0
mypy-extensions==0.4.3
pathspec==0.9.0
platformdirs==2.4.1
pycares==4.1.2
pycparser==2.21
pytz==2021.3
pytz-deprecation-shim==0.1.0.post0
requests==2.27.0
six==1.16.0
tomli==1.2.3
typed-ast==1.5.1
typing_extensions==4.0.1
tzdata==2021.5
tzlocal==4.1
urllib3==1.26.7
yarl==1.7.2
zipp==3.7.0