From ad99879fafcf4711d739a47555bda1c22a1b1969 Mon Sep 17 00:00:00 2001
From: Cam <cam@Cams-MacBook-Pro.local>
Date: Thu, 6 Jan 2022 00:50:50 +0800
Subject: [PATCH] init repo

---
 .gitignore         |   3 +
 README.md          |  66 ++++++++++++
 app.py             |  14 +++
 blive/__init__.py  |   3 +
 blive/core.py      | 256 +++++++++++++++++++++++++++++++++++++++++++++
 blive/framework.py | 148 ++++++++++++++++++++++++++
 blive/msg.py       |  35 +++++++
 requirements.txt   |  37 +++++++
 8 files changed, 562 insertions(+)
 create mode 100644 .gitignore
 create mode 100644 README.md
 create mode 100644 app.py
 create mode 100644 blive/__init__.py
 create mode 100644 blive/core.py
 create mode 100644 blive/framework.py
 create mode 100644 blive/msg.py
 create mode 100644 requirements.txt

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..d832596
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+.venv
+.vscode
+**/__pycache__
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..b1f3645
--- /dev/null
+++ b/README.md
@@ -0,0 +1,66 @@
+# B 站弹幕监听框架
+
+## 特点
+
+- 简单,只需房间号即可监听
+- 异步,io 不阻塞,及时获取消息
+
+## 快速开始
+
+1. 创建 app
+
+### 目前请克隆该代码仓库
+
+```python
+
+from blive import  BLiver
+
+app = BLiver(123) #123为房间号
+```
+
+2. 创建处理器
+
+```python
+
+from blive import  BLiver, Events, BLiverCtx
+
+app = BLiver(123)
+
+# 标记该方法监听弹幕消息,更多消息类型请参考Events类源代码
+@app.handler(Events.DANMU_MSG)
+async def listen_danmu(ctx: BLiverCtx):
+    danmu = DanMuMsg(ctx.body) #ctx.body套上相应的消息操作类即可得到消息的基本内容,也可直接操作ctx.body
+    print(danmu.content())
+    print(danmu.sender())
+    print(danmu.timestamp())
+
+```
+
+3. 运行
+
+```python
+
+from blive import  BLiver, Events, BLiverCtx
+
+app = BLiver(123)
+@app.handler(Events.DANMU_MSG)
+async def listen_danmu(ctx: BLiverCtx):
+    danmu = DanMuMsg(ctx.body)
+    print(danmu.content())
+    print(danmu.sender())
+    print(danmu.timestamp())
+
+app.run() # 运行代码!
+
+```
+
+## 项目简介
+
+- blive 文件夹为框架代码
+- app.py 为一个简单示例
+
+## TODO
+
+- 打包发布
+- 更多的消息操作类
+- 尝试加入中间件架构
diff --git a/app.py b/app.py
new file mode 100644
index 0000000..5e4888a
--- /dev/null
+++ b/app.py
@@ -0,0 +1,14 @@
+from blive import BLiver, Events, BLiverCtx, DanMuMsg
+
+app = BLiver(510)
+
+
+@app.handler(Events.DANMU_MSG)
+async def listen(ctx: BLiverCtx):
+    danmu = DanMuMsg(ctx.body)
+    print(danmu.content())
+    print(danmu.sender())
+    print(danmu.timestamp())
+
+
+app.run()
diff --git a/blive/__init__.py b/blive/__init__.py
new file mode 100644
index 0000000..7e83a65
--- /dev/null
+++ b/blive/__init__.py
@@ -0,0 +1,3 @@
+from .framework import *
+from .core import *
+from .msg import *
\ No newline at end of file
diff --git a/blive/core.py b/blive/core.py
new file mode 100644
index 0000000..bea73c3
--- /dev/null
+++ b/blive/core.py
@@ -0,0 +1,256 @@
+from collections import namedtuple
+from multiprocessing import RawValue, Lock
+import json
+import requests
+import struct
+import enum
+import brotli
+import zlib
+
+
+def get_blive_ws_url(roomid, ssl=False):
+    resp = requests.get(
+        f"https://api.live.bilibili.com/room/v1/Danmu/getConf?room_id={roomid}&platform=pc&player=web"
+    )
+    data = resp.json()
+    url_obj = data["data"]["host_server_list"][1]
+    if ssl:
+        url = f"ws://{url_obj['host']}:{url_obj['ws_port']}/sub"
+    else:
+        url = f"wss://{url_obj['host']}:{url_obj['wss_port']}/sub"
+    return url, data["data"]["token"]
+
+
+def get_blive_room_id(roomid):
+    """
+    得到b站直播间id,(短id不是真实的id)
+
+    Return: ture_id,short_id,up_id
+    """
+    resp = requests.get(
+        "https://api.live.bilibili.com/xlive/web-room/v1/index/getInfoByRoom",
+        params={"room_id": roomid},
+    )
+    data = resp.json()
+    return (
+        data["data"]["room_info"]["room_id"],
+        data["data"]["room_info"]["short_id"],
+        data["data"]["room_info"]["uid"],
+    )
+
+
+def get_blive_dm_history(roomid):
+    resp = requests.post(
+        "https://api.live.bilibili.com/xlive/web-room/v1/dM/gethistory",
+        headers={
+            "Host": "api.live.bilibili.com",
+            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:78.0) Gecko/20100101 Firefox/78.0",
+        },
+        data={
+            "roomid": roomid,
+            "csrf_token": "",
+            "csrf": "",
+            "visit_id": "",
+        },
+    )
+    return resp.json()
+
+
+def certification(roomid, token, uid=0, protover=3, platform="web"):
+    return {
+        "uid": uid,
+        "roomid": roomid,
+        "protover": protover,
+        "platform": platform,
+        "type": 2,
+        "clientver": "1.4.3",
+        "key": token,
+    }
+
+
+def heartbeat():
+    return {}
+
+
+class AuthReplyCode(enum.IntEnum):
+    OK = 0
+    TOKEN_ERROR = -101
+
+
+# WS_BODY_PROTOCOL_VERSION
+class ProtocolVersion(enum.IntEnum):
+    NORMAL = 0  # 未压缩
+    HEARTBEAT = 1  # 心跳
+    INFLATE = 2  # zlib压缩
+    BROTLI = 3  # brotil 压缩
+
+
+class Operation(enum.IntEnum):
+    HEARTBEAT = 2  # 心跳
+    HEARTBEAT_REPLY = 3  # 心跳回应
+    NOTIFY = 5  # 通知
+    AUTH = 7  # # 认证
+    AUTH_REPLY = 8  # 认证回应
+
+
+class Counter(object):
+    # 线程安全计数器
+    def __init__(self, init_value) -> None:
+        self.current = RawValue("i", init_value)
+        self.lock = Lock()
+
+    def increment(self):
+        with self.lock:
+            self.current.value += 1
+
+    def value(self):
+        with self.lock:
+            return self.current.value
+
+    def increment_get(self):
+        with self.lock:
+            self.current.value += 1
+            return self.current.value
+
+    def get_increment(self):
+        with self.lock:
+            yield self.current.value
+            self.current.value += 1
+
+
+PackageHeader = namedtuple(
+    "PackageHeader",
+    ["package_size", "header_size", "version", "operation", "sequence_id"],
+)
+
+HeaderStruct = struct.Struct(">I2H2I")
+
+
+class B_MsgPackage:
+    def __init__(self) -> None:
+        self.sequence = Counter(0)
+
+    def pack(self, data, operation, version=ProtocolVersion.NORMAL):
+        body = json.dumps(data).encode("utf-8")
+        header = HeaderStruct.pack(
+            *PackageHeader(
+                package_size=HeaderStruct.size + len(body),
+                header_size=HeaderStruct.size,
+                version=version,
+                operation=operation,
+                sequence_id=self.sequence.increment_get(),
+            )
+        )
+        return header + body
+
+    def unpack(self, data) -> list:
+        packages = []
+        header = PackageHeader(*HeaderStruct.unpack(data[:16]))  #
+        data = data[header.header_size :]
+
+        # 心跳包处理
+        if header.operation == Operation.HEARTBEAT_REPLY:
+            # 心跳不会粘包
+            packages.append((header, data[4:].decode("utf-8")))
+            
+
+        # 通知包处理
+        elif header.operation == Operation.NOTIFY:
+
+            def notify_pk_process(data):
+                # 粘包处理代码 ,抽取为公共函数
+                header = PackageHeader(*HeaderStruct.unpack(data[:16]))  # 包头
+                if len(data) > header.package_size:  # 如果数据大小大于包头,说明是粘包
+                    while True:
+                        # 先把第一个包放进去
+                        packages.append(
+                            (header, data[16 : header.package_size].decode("utf-8"))
+                        )
+                        # 移动到下一个包
+                        data = data[header.package_size :]
+                        header = PackageHeader(*HeaderStruct.unpack(data[:16]))
+
+                        if len(data) > header.package_size:
+                            # 如果数据还大于package,说明还有1个以上的包
+                            continue
+                        else:
+                            packages.append(
+                                (header, data[16:].decode("utf-8"))
+                            )  # 直接放第二个包
+                            break
+                else:
+                    packages.append((header, data[16:].decode("utf-8")))
+
+            # NOTIFY 消息可能会粘包
+            if header.version == ProtocolVersion.INFLATE:
+                # 先zlib解码
+                data = zlib.decompress(data)
+                notify_pk_process(data)
+
+            elif header.version == ProtocolVersion.BROTLI:
+                # 与zlib 逻辑相同,先解码,然后数据可能要拆包
+                data = brotli.decompress(data)
+                notify_pk_process(data)
+
+            elif header.version == ProtocolVersion.NORMAL:
+                # normal 直接decode,feature
+                packages.append((header, data.decode("utf-8")))
+            else:
+                # TODO 抛出错误或者打印日志
+                pass
+
+        elif header.operation == Operation.AUTH_REPLY:
+            packages.append((header, data.decode("utf-8")))
+
+        return packages
+
+
+packman = B_MsgPackage()
+
+
+class Events(str, enum.Enum):
+    PREPARING = "PREPARING"  # 下播【结束语】
+    ROOM_CHANGE = "ROOM_CHANGE"  # 房间信息改变
+    ROOM_RANK = "ROOM_RANK"  # 排名改变
+    DANMU_MSG = "DANMU_MSG"  # 接收到弹幕【自动回复】
+    SEND_GIFT = "ROOM_RANK"  # 有人送礼【答谢送礼】
+    WELCOME_GUARD = "WELCOME_GUARD"  # 舰长进入(不会触发)
+    ENTRY_EFFECT = "ENTRY_EFFECT"  # 舰长、高能榜、老爷进入【欢迎舰长】
+    WELCOME = "WELCOME"  # 老爷进入
+    INTERACT_WORD = "INTERACT_WORD"  # 用户进入【欢迎】
+    ATTENTION = "ATTENTION"  # 用户关注【答谢关注】
+    SHARE = "SHARE"  # 用户分享直播间
+    SPECIAL_ATTENTION = "SPECIAL_ATTENTION"  # 特别关注直播间,可用%special%判断
+    ROOM_REAL_TIME_MESSAGE_UPDATE = "ROOM_REAL_TIME_MESSAGE_UPDATE"  # 粉丝数量改变
+    SUPER_CHAT_MESSAGE = "ROOM_REAL_TIME_MESSAGE_UPDATE"  # 醒目留言
+    SUPER_CHAT_MESSAGE_JPN = "SUPER_CHAT_MESSAGE_JPN"  # 醒目留言日文翻译
+    SUPER_CHAT_MESSAGE_DELETE = "SUPER_CHAT_MESSAGE_DELETE"  # 删除醒目留言
+    ROOM_BLOCK_MSG = "ROOM_BLOCK_MSG"  # 用户被禁言,%uname%昵称
+    GUARD_BUY = "GUARD_BUY"  # 有人上船
+    FIRST_GUARD = "FIRST_GUARD"  # 用户初次上船
+    # 船员数量改变事件,%uname%新船员昵称,%num%获取大航海数量,附带直播间信息json数据
+    NEW_GUARD_COUNT = "NEW_GUARD_COUNT"
+    USER_TOAST_MSG = "USER_TOAST_MSG"  # 上船附带的通知
+    HOT_RANK_CHANGED = "HOT_RANK_CHANGED"  # 热门榜排名改变
+    HOT_RANK_SETTLEMENT = "HOT_RANK_SETTLEMENT"  # 荣登热门榜topX
+    HOT_RANK = "HOT_RANK"  # 热门榜xx榜topX,%text%获取排名
+    ONLINE_RANK_V2 = "ONLINE_RANK_V2"  # 礼物榜(高能榜)刷新
+    ONLINE_RANK_TOP3 = "ONLINE_RANK_TOP3"  # 高能榜TOP3改变
+    ONLINE_RANK_COUNT = "ONLINE_RANK_COUNT"  # 高能榜改变
+    NOTICE_MSG = "NOTICE_MSG"  # 上船等带的通知
+    COMBO_SEND = "COMBO_SEND"  # 礼物连击
+    SPECIAL_GIFT = "SPECIAL_GIFT"  # 定制的专属礼物
+    ANCHOR_LOT_CHECKSTATUS = "ANCHOR_LOT_CHECKSTATUS"  # 天选时刻前的审核
+    ANCHOR_LOT_START = "ANCHOR_LOT_START"  # 开启天选
+    ANCHOR_LOT_END = "ANCHOR_LOT_END"  # 天选结束
+    ANCHOR_LOT_AWARD = "ANCHOR_LOT_AWARD"  # 天选结果推送
+    VOICE_JOIN_ROOM_COUNT_INFO = "VOICE_JOIN_ROOM_COUNT_INFO"  # 	申请连麦队列变化
+    VOICE_JOIN_LIST = "VOICE_JOIN_LIST"  # 连麦申请、取消连麦申请
+    VOICE_JOIN_STATUS = "VOICE_JOIN_STATUS"  # 开始连麦、结束连麦
+    WARNING = "WARNING"  # 被警告,%text%可获取内容
+    CUT_OFF = "CUT_OFF"  # 被超管切断
+    room_admin_entrance = "room_admin_entrance"  # 设置房管
+    ROOM_ADMINS = "ROOM_ADMINS"  # 房管数量改变
+    # 勋章升级,仅送礼物后触发,需设置中开启“监听勋章升级”。%medal_level%获取新等级(但用户当前勋章不一定是本直播间)
+    MEDAL_UPGRADE = "MEDAL_UPGRADE"
+    STOP_LIVE_ROOM_LIST = "STOP_LIVE_ROOM_LIST"  # 停止直播的房间
diff --git a/blive/framework.py b/blive/framework.py
new file mode 100644
index 0000000..e1c3d3c
--- /dev/null
+++ b/blive/framework.py
@@ -0,0 +1,148 @@
+import sys
+import json
+import asyncio
+from typing import Dict, Tuple
+import aiohttp
+from aiohttp.client_ws import ClientWebSocketResponse
+from aiohttp.http_websocket import WSMessage
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
+import loguru
+from apscheduler.util import _Undefined
+from .core import (
+    Events,
+    Operation,
+    PackageHeader,
+    packman,
+    get_blive_room_id,
+    get_blive_ws_url,
+    certification,
+    heartbeat,
+)
+
+
+undefined = _Undefined()
+
+
+class BLiverCtx(object):
+    def __init__(self, bliver, msg) -> None:
+        super().__init__()
+        self.ws: ClientWebSocketResponse = bliver.ws
+        self.msg: Tuple = msg  # 原始消息
+        self.bliver: BLiver = bliver
+        self.body: Dict = None  # 消息内容
+
+
+class Processor:
+    def __init__(self, logger=None) -> None:
+        self.logger = logger or loguru.logger
+        self.channels = {}
+        for e in Events:
+            self.channels[e] = []
+
+    def register(self, channel, handler):
+        handlers = self.channels.get(channel, None)
+        handlers.append(handler)
+
+    async def process(self, ctx):
+        header: PackageHeader = ctx.msg[0]
+        msg = json.loads(ctx.msg[1])
+        ctx.body = msg
+        if header.operation == Operation.NOTIFY:
+            handlers = self.channels.get(msg["cmd"], [])  # 根据cmd 得到相应的处理句柄
+            await asyncio.gather(*[c(ctx) for c in handlers])
+
+
+class BLiver:
+    def __init__(self, roomid, logger=None, log_level="INFO"):
+        self.roomid = roomid
+        if not logger:
+            self.logger = loguru.logger
+            self.logger.remove()
+        else:
+            self.logger = logger
+        self.logger.add(sys.stderr, level=log_level)
+        self._ws: ClientWebSocketResponse = None
+        self.scheduler = AsyncIOScheduler(timezone="Asia/ShangHai")
+        self.processor = Processor(logger=self.logger)
+
+    def handler(self, event: Events):
+        def f_wrapper(func):
+            self.logger.debug("handler added")
+            self.processor.register(event, func)
+            return func
+
+        return f_wrapper
+
+    def scheduled(
+        self,
+        trigger,
+        args=None,
+        kwargs=None,
+        id=None,
+        name=None,
+        misfire_grace_time=undefined,
+        coalesce=undefined,
+        max_instances=undefined,
+        next_run_time=undefined,
+        jobstore="default",
+        executor="default",
+        **trigger_args,
+    ):
+        def s_func_wrapper(func):
+            self.logger.debug("scheduler job added,{}", func)
+            self.scheduler.add_job(
+                func,
+                trigger=trigger,
+                args=args,
+                kwargs=kwargs,
+                id=id,
+                name=name,
+                misfire_grace_time=misfire_grace_time,
+                coalesce=coalesce,
+                max_instances=max_instances,
+                next_run_time=next_run_time,
+                jobstore=jobstore,
+                executor=executor,
+                replace_existing=True,
+                **trigger_args,
+            )
+            return func
+
+        return s_func_wrapper
+
+    @property
+    def ws(self):
+        assert self._ws
+        return self._ws
+
+    async def heartbeat(self):
+        assert self._ws
+        await self._ws.send_bytes(packman.pack(heartbeat(), Operation.HEARTBEAT))
+        self.logger.debug("heartbeat sended")
+
+    async def listen(self):
+        rommid, _, _ = get_blive_room_id(self.roomid)  # 如果是短id,就得到直播间真实id
+        url, token = get_blive_ws_url(rommid)
+        async with aiohttp.ClientSession().ws_connect(url) as ws:
+            self._ws = ws
+            await ws.send_bytes(
+                packman.pack(certification(rommid, token), Operation.AUTH)
+            )
+            self.scheduler.add_job(self.heartbeat, trigger="interval", seconds=30)
+            self.scheduler.start()
+            # 开始监听
+            while True:
+                msg: WSMessage = await ws.receive()
+                # print(msg)
+                if msg.type != aiohttp.WSMsgType.BINARY:
+                    continue
+                # print(msg.data)
+                mq = packman.unpack(msg.data)
+                self.logger.debug("received msg:\n{}", mq)
+                tasks = [self.processor.process(BLiverCtx(self, m)) for m in mq]
+                await asyncio.gather(*tasks)
+
+    def run(self):
+        loop = asyncio.get_event_loop()
+        loop.create_task(self.listen())
+        loop.run_forever()
diff --git a/blive/msg.py b/blive/msg.py
new file mode 100644
index 0000000..6aba14d
--- /dev/null
+++ b/blive/msg.py
@@ -0,0 +1,35 @@
+from abc import ABC
+import json
+
+"""
+消息操作封装类,目前只封装了弹幕消息操作
+"""
+
+
+class BaseMsg(ABC):
+    def __init__(self, body) -> None:
+        super().__init__()
+        self.body = body
+
+    def cmd(self):
+        return self.body["cmd"]
+
+    def info(self):
+        return self.body["info"]
+
+    def __repr__(self) -> str:
+        return json.dumps(self.body)
+
+
+class DanMuMsg(BaseMsg):
+    def __init__(self, body) -> None:
+        super(DanMuMsg, self).__init__(body)
+
+    def content(self):
+        return self.info()[1]
+
+    def sender(self):
+        return {"id": self.body["info"][2][0], "name": self.body["info"][2][1]}
+
+    def timestamp(self):
+        return self.body["info"][9]
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..d9b9d7f
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,37 @@
+aiodns==3.0.0
+aiohttp==3.8.1
+aiosignal==1.2.0
+APScheduler==3.8.1
+async-timeout==4.0.2
+asynctest==0.13.0
+attrs==21.4.0
+backports.zoneinfo==0.2.1
+black==21.12b0
+Brotli==1.0.9
+cchardet==2.1.7
+certifi==2021.10.8
+cffi==1.15.0
+charset-normalizer==2.0.10
+click==8.0.3
+frozenlist==1.2.0
+idna==3.3
+importlib-metadata==4.10.0
+loguru==0.5.3
+multidict==5.2.0
+mypy-extensions==0.4.3
+pathspec==0.9.0
+platformdirs==2.4.1
+pycares==4.1.2
+pycparser==2.21
+pytz==2021.3
+pytz-deprecation-shim==0.1.0.post0
+requests==2.27.0
+six==1.16.0
+tomli==1.2.3
+typed-ast==1.5.1
+typing_extensions==4.0.1
+tzdata==2021.5
+tzlocal==4.1
+urllib3==1.26.7
+yarl==1.7.2
+zipp==3.7.0