抽出客户端基类

This commit is contained in:
John Smith 2023-09-03 12:22:01 +08:00
parent 92e5cf56db
commit a52af864ad
8 changed files with 342 additions and 318 deletions

View File

@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from .models import *
from .handlers import *
from .client import *
from .open_live_client import *
from .clients import *

View File

@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-
from .web import *
from .open_live import *

View File

@ -1,17 +1,21 @@
# -*- coding: utf-8 -*-
import asyncio
import datetime
import hashlib
import hmac
import json
import logging
import random
import ssl as ssl_
import datetime
from typing import *
import aiohttp
from . import client, handlers
from . import ws_base
__all__ = (
'OpenLiveClient',
)
logger = logging.getLogger('blivedm')
@ -20,10 +24,9 @@ HEARTBEAT_URL = 'https://live-open.biliapi.com/v2/app/heartbeat'
END_URL = 'https://live-open.biliapi.com/v2/app/end'
# TODO 抽出公共基类现在BLiveClient和OpenLiveClient还有不重合的代码
class OpenLiveClient(client.BLiveClient):
class OpenLiveClient(ws_base.WebSocketClientBase):
"""
B站直播开放平台客户端负责连接房间
开放平台客户端
文档参考https://open-live.bilibili.com/document/
@ -43,41 +46,28 @@ class OpenLiveClient(client.BLiveClient):
access_secret: str,
app_id: int,
room_owner_auth_code: str,
*,
session: Optional[aiohttp.ClientSession] = None,
heartbeat_interval=30,
game_heartbeat_interval=20,
ssl: Union[bool, ssl_.SSLContext] = True,
):
super().__init__(session, heartbeat_interval, ssl)
self._access_key = access_key
self._access_secret = access_secret
self._app_id = app_id
self._room_owner_auth_code = room_owner_auth_code
if session is None:
self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
self._own_session = True
else:
self._session = session
self._own_session = False
assert self._session.loop is asyncio.get_event_loop() # noqa
self._heartbeat_interval = heartbeat_interval
self._game_heartbeat_interval = game_heartbeat_interval
self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa
self._handlers: List[handlers.HandlerInterface] = []
"""消息处理器,可动态增删"""
# 在调用init_room后初始化的字段
self._room_id = None
"""真实房间ID"""
self._room_owner_uid = None
self._room_owner_uid: Optional[int] = None
"""主播用户ID"""
self._host_server_list: Optional[List[str]] = []
self._host_server_url_list: Optional[List[str]] = []
"""弹幕服务器URL列表"""
self._auth_body = None
self._auth_body: Optional[str] = None
"""连接弹幕服务器用的认证包内容"""
self._game_id = None
self._game_id: Optional[str] = None
"""项目场次ID仅用于互动玩法类项目其他项目为空字符串"""
# 在运行时初始化的字段
@ -90,13 +80,6 @@ class OpenLiveClient(client.BLiveClient):
self._game_heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None
"""发项目心跳包定时器的handle"""
@property
def room_id(self) -> Optional[int]:
"""
房间ID调用init_room后初始化
"""
return self._room_id
@property
def room_owner_uid(self) -> Optional[int]:
"""
@ -203,7 +186,7 @@ class OpenLiveClient(client.BLiveClient):
self._game_id = data['game_info']['game_id']
websocket_info = data['websocket_info']
self._auth_body = websocket_info['auth_body']
self._host_server_list = websocket_info['wss_link']
self._host_server_url_list = websocket_info['wss_link']
anchor_info = data['anchor_info']
self._room_id = anchor_info['room_id']
self._room_owner_uid = anchor_info['uid']
@ -275,58 +258,24 @@ class OpenLiveClient(client.BLiveClient):
return False
return True
async def _network_coroutine(self):
async def _on_network_coroutine_start(self):
"""
网络协程负责连接服务器接收消息解包
在_network_coroutine开头运行可以用来初始化房间
"""
# 如果之前未初始化则初始化
if self._auth_body is None:
if not await self.init_room():
raise client.InitError('init_room() failed')
raise ws_base.InitError('init_room() failed')
retry_count = 0
while True:
try:
# 连接
host_server_url = self._host_server_list[retry_count % len(self._host_server_list)]
async with self._session.ws_connect(
host_server_url,
receive_timeout=self._heartbeat_interval + 5,
ssl=self._ssl
) as websocket:
self._websocket = websocket
await self._on_ws_connect()
# 处理消息
message: aiohttp.WSMessage
async for message in websocket:
await self._on_ws_message(message)
# 至少成功处理1条消息
retry_count = 0
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
# 掉线重连
pass
except client.AuthError:
# 认证失败了应该重新获取auth_body再重连
logger.exception('room=%d auth failed, trying init_room() again', self.room_id)
if not await self.init_room():
raise client.InitError('init_room() failed')
except ssl_.SSLError:
logger.error('room=%d a SSLError happened, cannot reconnect', self.room_id)
raise
finally:
self._websocket = None
await self._on_ws_close()
# 准备重连
retry_count += 1
logger.warning('room=%d is reconnecting, retry_count=%d', self.room_id, retry_count)
await asyncio.sleep(1)
def _get_ws_url(self, retry_count) -> str:
"""
返回WebSocket连接的URL可以在这里做故障转移和负载均衡
"""
return self._host_server_url_list[retry_count % len(self._host_server_url_list)]
async def _send_auth(self):
"""
发送认证包
"""
auth_body = json.loads(self._auth_body)
await self._websocket.send_bytes(self._make_packet(auth_body, client.Operation.AUTH))
await self._websocket.send_bytes(self._make_packet(auth_body, ws_base.Operation.AUTH))

272
blivedm/clients/web.py Normal file
View File

@ -0,0 +1,272 @@
# -*- coding: utf-8 -*-
import asyncio
import logging
import ssl as ssl_
from typing import *
import aiohttp
import yarl
from . import ws_base
__all__ = (
'BLiveClient',
)
logger = logging.getLogger('blivedm')
USER_AGENT = (
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36'
)
UID_INIT_URL = 'https://api.bilibili.com/x/web-interface/nav'
BUVID_INIT_URL = 'https://data.bilibili.com/v/'
ROOM_INIT_URL = 'https://api.live.bilibili.com/xlive/web-room/v1/index/getInfoByRoom'
DANMAKU_SERVER_CONF_URL = 'https://api.live.bilibili.com/xlive/web-room/v1/index/getDanmuInfo'
DEFAULT_DANMAKU_SERVER_LIST = [
{'host': 'broadcastlv.chat.bilibili.com', 'port': 2243, 'wss_port': 443, 'ws_port': 2244}
]
class BLiveClient(ws_base.WebSocketClientBase):
"""
web端客户端
:param room_id: URL中的房间ID可以用短ID
:param uid: B站用户ID0表示未登录None表示自动获取
:param session: cookie连接池
:param heartbeat_interval: 发送心跳包的间隔时间
:param ssl: True表示用默认的SSLContext验证False表示不验证也可以传入SSLContext
"""
def __init__(
self,
room_id: int,
*,
uid: Optional[int] = None,
session: Optional[aiohttp.ClientSession] = None,
heartbeat_interval=30,
ssl: Union[bool, ssl_.SSLContext] = True,
):
super().__init__(session, heartbeat_interval, ssl)
self._tmp_room_id = room_id
"""用来init_room的临时房间ID可以用短ID"""
self._uid = uid
# 在调用init_room后初始化的字段
# TODO 移除短ID
self._room_short_id: Optional[int] = None
"""房间短ID没有则为0"""
self._room_owner_uid: Optional[int] = None
"""主播用户ID"""
self._host_server_list: Optional[List[dict]] = None
"""
弹幕服务器列表
`[{host: "tx-bj4-live-comet-04.chat.bilibili.com", port: 2243, wss_port: 443, ws_port: 2244}, ...]`
"""
self._host_server_token: Optional[str] = None
"""连接弹幕服务器用的token"""
@property
def room_short_id(self) -> Optional[int]:
"""
房间短ID没有则为0调用init_room后初始化
"""
return self._room_short_id
@property
def room_owner_uid(self) -> Optional[int]:
"""
主播用户ID调用init_room后初始化
"""
return self._room_owner_uid
@property
def uid(self) -> Optional[int]:
"""
当前登录的用户ID未登录则为0调用init_room后初始化
"""
return self._uid
async def init_room(self):
"""
初始化连接房间需要的字段
:return: True代表没有降级如果需要降级后还可用重载这个函数返回True
"""
if self._uid is None:
if not await self._init_uid():
logger.warning('room=%d _init_uid() failed', self._tmp_room_id)
self._uid = 0
if self._get_buvid() == '':
if not await self._init_buvid():
logger.warning('room=%d _init_buvid() failed', self._tmp_room_id)
res = True
if not await self._init_room_id_and_owner():
res = False
# 失败了则降级
self._room_id = self._room_short_id = self._tmp_room_id
self._room_owner_uid = 0
if not await self._init_host_server():
res = False
# 失败了则降级
self._host_server_list = DEFAULT_DANMAKU_SERVER_LIST
self._host_server_token = None
return res
async def _init_uid(self):
try:
async with self._session.get(
UID_INIT_URL,
headers={'User-Agent': USER_AGENT},
ssl=self._ssl
) as res:
if res.status != 200:
logger.warning('room=%d _init_uid() failed, status=%d, reason=%s', self._tmp_room_id,
res.status, res.reason)
return False
data = await res.json()
if data['code'] != 0:
if data['code'] == -101:
# 未登录
self._uid = 0
return True
logger.warning('room=%d _init_uid() failed, message=%s', self._tmp_room_id,
data['message'])
return False
data = data['data']
if not data['isLogin']:
# 未登录
self._uid = 0
else:
self._uid = data['mid']
return True
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room=%d _init_uid() failed:', self._tmp_room_id)
return False
def _get_buvid(self):
cookies = self._session.cookie_jar.filter_cookies(yarl.URL(BUVID_INIT_URL))
buvid_cookie = cookies.get('buvid3', None)
if buvid_cookie is None:
return ''
return buvid_cookie.value
async def _init_buvid(self):
try:
async with self._session.get(
BUVID_INIT_URL,
headers={'User-Agent': USER_AGENT},
ssl=self._ssl
) as res:
if res.status != 200:
logger.warning('room=%d _init_buvid() status error, status=%d, reason=%s',
self._tmp_room_id, res.status, res.reason)
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room=%d _init_buvid() exception:', self._tmp_room_id)
return self._get_buvid() != ''
async def _init_room_id_and_owner(self):
try:
async with self._session.get(
ROOM_INIT_URL,
headers={'User-Agent': USER_AGENT},
params={
'room_id': self._tmp_room_id
},
ssl=self._ssl
) as res:
if res.status != 200:
logger.warning('room=%d _init_room_id_and_owner() failed, status=%d, reason=%s', self._tmp_room_id,
res.status, res.reason)
return False
data = await res.json()
if data['code'] != 0:
logger.warning('room=%d _init_room_id_and_owner() failed, message=%s', self._tmp_room_id,
data['message'])
return False
if not self._parse_room_init(data['data']):
return False
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room=%d _init_room_id_and_owner() failed:', self._tmp_room_id)
return False
return True
def _parse_room_init(self, data):
room_info = data['room_info']
self._room_id = room_info['room_id']
self._room_short_id = room_info['short_id']
self._room_owner_uid = room_info['uid']
return True
async def _init_host_server(self):
try:
async with self._session.get(
DANMAKU_SERVER_CONF_URL,
headers={'User-Agent': USER_AGENT},
params={
'id': self._room_id,
'type': 0
},
ssl=self._ssl
) as res:
if res.status != 200:
logger.warning('room=%d _init_host_server() failed, status=%d, reason=%s', self._room_id,
res.status, res.reason)
return False
data = await res.json()
if data['code'] != 0:
logger.warning('room=%d _init_host_server() failed, message=%s', self._room_id, data['message'])
return False
if not self._parse_danmaku_server_conf(data['data']):
return False
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room=%d _init_host_server() failed:', self._room_id)
return False
return True
def _parse_danmaku_server_conf(self, data):
self._host_server_list = data['host_list']
self._host_server_token = data['token']
if not self._host_server_list:
logger.warning('room=%d _parse_danmaku_server_conf() failed: host_server_list is empty', self._room_id)
return False
return True
async def _on_network_coroutine_start(self):
"""
在_network_coroutine开头运行可以用来初始化房间
"""
# 如果之前未初始化则初始化
if self._host_server_token is None:
if not await self.init_room():
raise ws_base.InitError('init_room() failed')
def _get_ws_url(self, retry_count) -> str:
"""
返回WebSocket连接的URL可以在这里做故障转移和负载均衡
"""
host_server = self._host_server_list[retry_count % len(self._host_server_list)]
return f"wss://{host_server['host']}:{host_server['wss_port']}/sub"
async def _send_auth(self):
"""
发送认证包
"""
auth_params = {
'uid': self._uid,
'roomid': self._room_id,
'protover': 3,
'platform': 'web',
'type': 2,
'buvid': self._get_buvid(),
}
if self._host_server_token is not None:
auth_params['key'] = self._host_server_token
await self._websocket.send_bytes(self._make_packet(auth_params, ws_base.Operation.AUTH))

View File

@ -10,13 +10,8 @@ from typing import *
import aiohttp
import brotli
import yarl
from . import handlers
__all__ = (
'BLiveClient',
)
from .. import handlers
logger = logging.getLogger('blivedm')
@ -24,14 +19,6 @@ USER_AGENT = (
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36'
)
UID_INIT_URL = 'https://api.bilibili.com/x/web-interface/nav'
BUVID_INIT_URL = 'https://data.bilibili.com/v/'
ROOM_INIT_URL = 'https://api.live.bilibili.com/xlive/web-room/v1/index/getInfoByRoom'
DANMAKU_SERVER_CONF_URL = 'https://api.live.bilibili.com/xlive/web-room/v1/index/getDanmuInfo'
DEFAULT_DANMAKU_SERVER_LIST = [
{'host': 'broadcastlv.chat.bilibili.com', 'port': 2243, 'wss_port': 443, 'ws_port': 2244}
]
HEADER_STRUCT = struct.Struct('>I2H2I')
@ -90,12 +77,10 @@ class AuthError(Exception):
"""认证失败"""
class BLiveClient:
class WebSocketClientBase:
"""
B站直播弹幕客户端负责连接房间
基于WebSocket的客户端
:param room_id: URL中的房间ID可以用短ID
:param uid: B站用户ID0表示未登录None表示自动获取
:param session: cookie连接池
:param heartbeat_interval: 发送心跳包的间隔时间
:param ssl: True表示用默认的SSLContext验证False表示不验证也可以传入SSLContext
@ -103,16 +88,10 @@ class BLiveClient:
def __init__(
self,
room_id,
uid=None,
session: Optional[aiohttp.ClientSession] = None,
heartbeat_interval=30,
heartbeat_interval: float = 30,
ssl: Union[bool, ssl_.SSLContext] = True,
):
self._tmp_room_id = room_id
"""用来init_room的临时房间ID可以用短ID"""
self._uid = uid
if session is None:
self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
self._own_session = True
@ -122,6 +101,7 @@ class BLiveClient:
assert self._session.loop is asyncio.get_event_loop() # noqa
self._heartbeat_interval = heartbeat_interval
# TODO 移除SSL配置
self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa
# TODO 没必要支持多个handler改成单个吧
@ -129,19 +109,7 @@ class BLiveClient:
"""消息处理器,可动态增删"""
# 在调用init_room后初始化的字段
self._room_id = None
"""真实房间ID"""
self._room_short_id = None
"""房间短ID没有则为0"""
self._room_owner_uid = None
"""主播用户ID"""
self._host_server_list: Optional[List[dict]] = None
"""
弹幕服务器列表
[{host: "tx-bj4-live-comet-04.chat.bilibili.com", port: 2243, wss_port: 443, ws_port: 2244}, ...]
"""
self._host_server_token = None
"""连接弹幕服务器用的token"""
self._room_id: Optional[int] = None
# 在运行时初始化的字段
self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None
@ -165,27 +133,6 @@ class BLiveClient:
"""
return self._room_id
@property
def room_short_id(self) -> Optional[int]:
"""
房间短ID没有则为0调用init_room后初始化
"""
return self._room_short_id
@property
def room_owner_uid(self) -> Optional[int]:
"""
主播用户ID调用init_room后初始化
"""
return self._room_owner_uid
@property
def uid(self) -> Optional[int]:
"""
当前登录的用户ID未登录则为0调用init_room后初始化
"""
return self._uid
def add_handler(self, handler: 'handlers.HandlerInterface'):
"""
添加消息处理器
@ -258,154 +205,13 @@ class BLiveClient:
if self._own_session:
await self._session.close()
async def init_room(self):
async def init_room(self) -> bool:
"""
初始化连接房间需要的字段
:return: True代表没有降级如果需要降级后还可用重载这个函数返回True
"""
if self._uid is None:
if not await self._init_uid():
logger.warning('room=%d _init_uid() failed', self._tmp_room_id)
self._uid = 0
if self._get_buvid() == '':
if not await self._init_buvid():
logger.warning('room=%d _init_buvid() failed', self._tmp_room_id)
res = True
if not await self._init_room_id_and_owner():
res = False
# 失败了则降级
self._room_id = self._room_short_id = self._tmp_room_id
self._room_owner_uid = 0
if not await self._init_host_server():
res = False
# 失败了则降级
self._host_server_list = DEFAULT_DANMAKU_SERVER_LIST
self._host_server_token = None
return res
async def _init_uid(self):
try:
async with self._session.get(
UID_INIT_URL,
headers={'User-Agent': USER_AGENT},
ssl=self._ssl
) as res:
if res.status != 200:
logger.warning('room=%d _init_uid() failed, status=%d, reason=%s', self._tmp_room_id,
res.status, res.reason)
return False
data = await res.json()
if data['code'] != 0:
if data['code'] == -101:
# 未登录
self._uid = 0
return True
logger.warning('room=%d _init_uid() failed, message=%s', self._tmp_room_id,
data['message'])
return False
data = data['data']
if not data['isLogin']:
# 未登录
self._uid = 0
else:
self._uid = data['mid']
return True
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room=%d _init_uid() failed:', self._tmp_room_id)
return False
def _get_buvid(self):
cookies = self._session.cookie_jar.filter_cookies(yarl.URL(BUVID_INIT_URL))
buvid_cookie = cookies.get('buvid3', None)
if buvid_cookie is None:
return ''
return buvid_cookie.value
async def _init_buvid(self):
try:
async with self._session.get(
BUVID_INIT_URL,
headers={'User-Agent': USER_AGENT},
ssl=self._ssl
) as res:
if res.status != 200:
logger.warning('room=%d _init_buvid() status error, status=%d, reason=%s',
self._tmp_room_id, res.status, res.reason)
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room=%d _init_buvid() exception:', self._tmp_room_id)
return self._get_buvid() != ''
async def _init_room_id_and_owner(self):
try:
async with self._session.get(
ROOM_INIT_URL,
headers={'User-Agent': USER_AGENT},
params={
'room_id': self._tmp_room_id
},
ssl=self._ssl
) as res:
if res.status != 200:
logger.warning('room=%d _init_room_id_and_owner() failed, status=%d, reason=%s', self._tmp_room_id,
res.status, res.reason)
return False
data = await res.json()
if data['code'] != 0:
logger.warning('room=%d _init_room_id_and_owner() failed, message=%s', self._tmp_room_id,
data['message'])
return False
if not self._parse_room_init(data['data']):
return False
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room=%d _init_room_id_and_owner() failed:', self._tmp_room_id)
return False
return True
def _parse_room_init(self, data):
room_info = data['room_info']
self._room_id = room_info['room_id']
self._room_short_id = room_info['short_id']
self._room_owner_uid = room_info['uid']
return True
async def _init_host_server(self):
try:
async with self._session.get(
DANMAKU_SERVER_CONF_URL,
headers={'User-Agent': USER_AGENT},
params={
'id': self._room_id,
'type': 0
},
ssl=self._ssl
) as res:
if res.status != 200:
logger.warning('room=%d _init_host_server() failed, status=%d, reason=%s', self._room_id,
res.status, res.reason)
return False
data = await res.json()
if data['code'] != 0:
logger.warning('room=%d _init_host_server() failed, message=%s', self._room_id, data['message'])
return False
if not self._parse_danmaku_server_conf(data['data']):
return False
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room=%d _init_host_server() failed:', self._room_id)
return False
return True
def _parse_danmaku_server_conf(self, data):
self._host_server_list = data['host_list']
self._host_server_token = data['token']
if not self._host_server_list:
logger.warning('room=%d _parse_danmaku_server_conf() failed: host_server_list is empty', self._room_id)
return False
return True
raise NotImplementedError
@staticmethod
def _make_packet(data: dict, operation: int) -> bytes:
@ -445,19 +251,14 @@ class BLiveClient:
"""
网络协程负责连接服务器接收消息解包
"""
# 如果之前未初始化则初始化
if self._host_server_token is None:
if not await self.init_room():
raise InitError('init_room() failed')
await self._on_network_coroutine_start()
retry_count = 0
while True:
try:
# 连接
host_server = self._host_server_list[retry_count % len(self._host_server_list)]
async with self._session.ws_connect(
f"wss://{host_server['host']}:{host_server['wss_port']}/sub",
headers={'User-Agent': USER_AGENT},
self._get_ws_url(retry_count),
receive_timeout=self._heartbeat_interval + 5,
ssl=self._ssl
) as websocket:
@ -491,6 +292,17 @@ class BLiveClient:
logger.warning('room=%d is reconnecting, retry_count=%d', self.room_id, retry_count)
await asyncio.sleep(1)
async def _on_network_coroutine_start(self):
"""
在_network_coroutine开头运行可以用来初始化房间
"""
def _get_ws_url(self, retry_count) -> str:
"""
返回WebSocket连接的URL可以在这里做故障转移和负载均衡
"""
raise NotImplementedError
async def _on_ws_connect(self):
"""
WebSocket连接成功
@ -512,17 +324,7 @@ class BLiveClient:
"""
发送认证包
"""
auth_params = {
'uid': self._uid,
'roomid': self._room_id,
'protover': 3,
'platform': 'web',
'type': 2,
'buvid': self._get_buvid(),
}
if self._host_server_token is not None:
auth_params['key'] = self._host_server_token
await self._websocket.send_bytes(self._make_packet(auth_params, Operation.AUTH))
raise NotImplementedError
def _on_send_heartbeat(self):
"""

View File

@ -2,7 +2,7 @@
import logging
from typing import *
from . import client as client_
from .clients import ws_base
from . import models
__all__ = (
@ -48,7 +48,7 @@ class HandlerInterface:
直播消息处理器接口
"""
async def handle(self, client: client_.BLiveClient, command: dict):
async def handle(self, client: ws_base.WebSocketClientBase, command: dict):
raise NotImplementedError
# TODO 加个异常停止的回调
@ -59,28 +59,28 @@ class BaseHandler(HandlerInterface):
一个简单的消息处理器实现带消息分发和消息类型转换继承并重写_on_xxx方法即可实现自己的处理器
"""
def __heartbeat_callback(self, client: client_.BLiveClient, command: dict):
def __heartbeat_callback(self, client: ws_base.WebSocketClientBase, command: dict):
return self._on_heartbeat(client, models.HeartbeatMessage.from_command(command['data']))
def __danmu_msg_callback(self, client: client_.BLiveClient, command: dict):
def __danmu_msg_callback(self, client: ws_base.WebSocketClientBase, command: dict):
return self._on_danmaku(client, models.DanmakuMessage.from_command(command['info'], command.get('dm_v2', '')))
def __send_gift_callback(self, client: client_.BLiveClient, command: dict):
def __send_gift_callback(self, client: ws_base.WebSocketClientBase, command: dict):
return self._on_gift(client, models.GiftMessage.from_command(command['data']))
def __guard_buy_callback(self, client: client_.BLiveClient, command: dict):
def __guard_buy_callback(self, client: ws_base.WebSocketClientBase, command: dict):
return self._on_buy_guard(client, models.GuardBuyMessage.from_command(command['data']))
def __super_chat_message_callback(self, client: client_.BLiveClient, command: dict):
def __super_chat_message_callback(self, client: ws_base.WebSocketClientBase, command: dict):
return self._on_super_chat(client, models.SuperChatMessage.from_command(command['data']))
def __super_chat_message_delete_callback(self, client: client_.BLiveClient, command: dict):
def __super_chat_message_delete_callback(self, client: ws_base.WebSocketClientBase, command: dict):
return self._on_super_chat_delete(client, models.SuperChatDeleteMessage.from_command(command['data']))
_CMD_CALLBACK_DICT: Dict[
str,
Optional[Callable[
['BaseHandler', client_.BLiveClient, dict],
['BaseHandler', ws_base.WebSocketClientBase, dict],
Awaitable
]]
] = {
@ -104,7 +104,7 @@ class BaseHandler(HandlerInterface):
_CMD_CALLBACK_DICT[cmd] = None
del cmd
async def handle(self, client: client_.BLiveClient, command: dict):
async def handle(self, client: ws_base.WebSocketClientBase, command: dict):
cmd = command.get('cmd', '')
pos = cmd.find(':') # 2019-5-29 B站弹幕升级新增了参数
if pos != -1:
@ -121,32 +121,32 @@ class BaseHandler(HandlerInterface):
if callback is not None:
await callback(self, client, command)
async def _on_heartbeat(self, client: client_.BLiveClient, message: models.HeartbeatMessage):
async def _on_heartbeat(self, client: ws_base.WebSocketClientBase, message: models.HeartbeatMessage):
"""
收到心跳包人气值
"""
async def _on_danmaku(self, client: client_.BLiveClient, message: models.DanmakuMessage):
async def _on_danmaku(self, client: ws_base.WebSocketClientBase, message: models.DanmakuMessage):
"""
收到弹幕
"""
async def _on_gift(self, client: client_.BLiveClient, message: models.GiftMessage):
async def _on_gift(self, client: ws_base.WebSocketClientBase, message: models.GiftMessage):
"""
收到礼物
"""
async def _on_buy_guard(self, client: client_.BLiveClient, message: models.GuardBuyMessage):
async def _on_buy_guard(self, client: ws_base.WebSocketClientBase, message: models.GuardBuyMessage):
"""
有人上舰
"""
async def _on_super_chat(self, client: client_.BLiveClient, message: models.SuperChatMessage):
async def _on_super_chat(self, client: ws_base.WebSocketClientBase, message: models.SuperChatMessage):
"""
醒目留言
"""
async def _on_super_chat_delete(self, client: client_.BLiveClient, message: models.SuperChatDeleteMessage):
async def _on_super_chat_delete(self, client: ws_base.WebSocketClientBase, message: models.SuperChatDeleteMessage):
"""
删除醒目留言
"""

View File

@ -24,7 +24,7 @@ class HeartbeatMessage:
"""
popularity: int = 0
"""人气值"""
"""人气值,已废弃"""
@classmethod
def from_command(cls, data: dict):

View File

@ -2,7 +2,6 @@
import asyncio
import blivedm
import blivedm.open_live_client as open_live_client
ACCESS_KEY = ''
ACCESS_SECRET = ''
@ -18,7 +17,7 @@ async def run_single_client():
"""
演示监听一个直播间
"""
client = open_live_client.OpenLiveClient(
client = blivedm.OpenLiveClient(
access_key=ACCESS_KEY,
access_secret=ACCESS_SECRET,
app_id=APP_ID,
@ -39,7 +38,7 @@ async def run_single_client():
class MyHandler(blivedm.HandlerInterface):
async def handle(self, client: open_live_client.OpenLiveClient, command: dict):
async def handle(self, client: blivedm.OpenLiveClient, command: dict):
print(command)