移除了一堆没用的特性

消息处理接口改成同步函数
消息处理器从数组改成单个对象
移除web端短ID属性
移除SSL配置
消息处理器添加客户端异常停止的回调
This commit is contained in:
John Smith 2023-09-03 18:09:12 +08:00
parent 74b9cdc100
commit 300840fd65
6 changed files with 75 additions and 103 deletions

View File

@ -6,7 +6,6 @@ import hmac
import json
import logging
import random
import ssl as ssl_
from typing import *
import aiohttp
@ -37,7 +36,6 @@ class OpenLiveClient(ws_base.WebSocketClientBase):
:param session: cookie连接池
:param heartbeat_interval: 发送连接心跳包的间隔时间
:param game_heartbeat_interval: 发送项目心跳包的间隔时间
:param ssl: True表示用默认的SSLContext验证False表示不验证也可以传入SSLContext
"""
def __init__(
@ -50,9 +48,8 @@ class OpenLiveClient(ws_base.WebSocketClientBase):
session: Optional[aiohttp.ClientSession] = None,
heartbeat_interval=30,
game_heartbeat_interval=20,
ssl: Union[bool, ssl_.SSLContext] = True,
):
super().__init__(session, heartbeat_interval, ssl)
super().__init__(session, heartbeat_interval)
self._access_key = access_key
self._access_secret = access_secret
@ -144,7 +141,7 @@ class OpenLiveClient(ws_base.WebSocketClientBase):
headers['Content-Type'] = 'application/json'
headers['Accept'] = 'application/json'
return self._session.post(url, headers=headers, data=body_bytes, ssl=self._ssl)
return self._session.post(url, headers=headers, data=body_bytes)
async def init_room(self):
"""
@ -194,7 +191,7 @@ class OpenLiveClient(ws_base.WebSocketClientBase):
async def _end_game(self):
"""
关闭项目互动玩法类项目建议断开连接时保证调用到这个函数close会调用否则短时间内无法重复连接同一个房间
关闭项目互动玩法类项目建议断开连接时保证调用到这个函数close会调用否则可能短时间内无法重复连接同一个房间
"""
if self._game_id in (None, ''):
return True

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
import asyncio
import logging
import ssl as ssl_
from typing import *
import aiohttp
@ -33,7 +32,6 @@ class BLiveClient(ws_base.WebSocketClientBase):
:param uid: B站用户ID0表示未登录None表示自动获取
:param session: cookie连接池
:param heartbeat_interval: 发送心跳包的间隔时间
:param ssl: True表示用默认的SSLContext验证False表示不验证也可以传入SSLContext
"""
def __init__(
@ -43,18 +41,14 @@ class BLiveClient(ws_base.WebSocketClientBase):
uid: Optional[int] = None,
session: Optional[aiohttp.ClientSession] = None,
heartbeat_interval=30,
ssl: Union[bool, ssl_.SSLContext] = True,
):
super().__init__(session, heartbeat_interval, ssl)
super().__init__(session, heartbeat_interval)
self._tmp_room_id = room_id
"""用来init_room的临时房间ID可以用短ID"""
self._uid = uid
# 在调用init_room后初始化的字段
# TODO 移除短ID
self._room_short_id: Optional[int] = None
"""房间短ID没有则为0"""
self._room_owner_uid: Optional[int] = None
"""主播用户ID"""
self._host_server_list: Optional[List[dict]] = None
@ -67,11 +61,11 @@ class BLiveClient(ws_base.WebSocketClientBase):
"""连接弹幕服务器用的token"""
@property
def room_short_id(self) -> Optional[int]:
def tmp_room_id(self) -> int:
"""
房间短ID没有则为0调用init_room后初始化
构造时传进来的room_id参数
"""
return self._room_short_id
return self._tmp_room_id
@property
def room_owner_uid(self) -> Optional[int]:
@ -106,7 +100,7 @@ class BLiveClient(ws_base.WebSocketClientBase):
if not await self._init_room_id_and_owner():
res = False
# 失败了则降级
self._room_id = self._room_short_id = self._tmp_room_id
self._room_id = self._tmp_room_id
self._room_owner_uid = 0
if not await self._init_host_server():
@ -128,7 +122,6 @@ class BLiveClient(ws_base.WebSocketClientBase):
async with self._session.get(
UID_INIT_URL,
headers={'User-Agent': utils.USER_AGENT},
ssl=self._ssl
) as res:
if res.status != 200:
logger.warning('room=%d _init_uid() failed, status=%d, reason=%s', self._tmp_room_id,
@ -167,7 +160,6 @@ class BLiveClient(ws_base.WebSocketClientBase):
async with self._session.get(
BUVID_INIT_URL,
headers={'User-Agent': utils.USER_AGENT},
ssl=self._ssl
) as res:
if res.status != 200:
logger.warning('room=%d _init_buvid() status error, status=%d, reason=%s',
@ -184,7 +176,6 @@ class BLiveClient(ws_base.WebSocketClientBase):
params={
'room_id': self._tmp_room_id
},
ssl=self._ssl
) as res:
if res.status != 200:
logger.warning('room=%d _init_room_id_and_owner() failed, status=%d, reason=%s', self._tmp_room_id,
@ -205,7 +196,6 @@ class BLiveClient(ws_base.WebSocketClientBase):
def _parse_room_init(self, data):
room_info = data['room_info']
self._room_id = room_info['room_id']
self._room_short_id = room_info['short_id']
self._room_owner_uid = room_info['uid']
return True
@ -218,7 +208,6 @@ class BLiveClient(ws_base.WebSocketClientBase):
'id': self._room_id,
'type': 0
},
ssl=self._ssl
) as res:
if res.status != 200:
logger.warning('room=%d _init_host_server() failed, status=%d, reason=%s', self._room_id,

View File

@ -3,7 +3,6 @@ import asyncio
import enum
import json
import logging
import ssl as ssl_
import struct
import zlib
from typing import *
@ -83,14 +82,12 @@ class WebSocketClientBase:
:param session: cookie连接池
:param heartbeat_interval: 发送心跳包的间隔时间
:param ssl: True表示用默认的SSLContext验证False表示不验证也可以传入SSLContext
"""
def __init__(
self,
session: Optional[aiohttp.ClientSession] = None,
heartbeat_interval: float = 30,
ssl: Union[bool, ssl_.SSLContext] = True,
):
if session is None:
self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
@ -101,12 +98,9 @@ class WebSocketClientBase:
assert self._session.loop is asyncio.get_event_loop() # noqa
self._heartbeat_interval = heartbeat_interval
# TODO 移除SSL配置
self._ssl = ssl if ssl else ssl_._create_unverified_context() # noqa
# TODO 没必要支持多个handler改成单个吧
self._handlers: List[handlers.HandlerInterface] = []
"""消息处理器,可动态增删"""
self._handler: Optional[handlers.HandlerInterface] = None
"""消息处理器"""
# 在调用init_room后初始化的字段
self._room_id: Optional[int] = None
@ -133,27 +127,16 @@ class WebSocketClientBase:
"""
return self._room_id
def add_handler(self, handler: 'handlers.HandlerInterface'):
def set_handler(self, handler: Optional['handlers.HandlerInterface']):
"""
添加消息处理器
注意多个处理器是并发处理的不要依赖处理的顺序
消息处理器和接收消息运行在同一协程如果处理消息耗时太长会阻塞接收消息这种情况建议将消息推到队列让另一个协程处理
设置消息处理器
注意消息处理器和网络协程运行在同一个协程如果处理消息耗时太长会阻塞接收消息如果是CPU密集型的任务建议将消息推到线程池处理
如果是IO密集型的任务应该使用async函数并且在handler里使用create_task创建新的协程
:param handler: 消息处理器
"""
if handler not in self._handlers:
self._handlers.append(handler)
def remove_handler(self, handler: 'handlers.HandlerInterface'):
"""
移除消息处理器
:param handler: 消息处理器
"""
try:
self._handlers.remove(handler)
except ValueError:
pass
self._handler = handler
def start(self):
"""
@ -236,17 +219,22 @@ class WebSocketClientBase:
"""
负责处理网络协程的异常网络协程具体逻辑在_network_coroutine里
"""
exc = None
try:
await self._network_coroutine()
except asyncio.CancelledError:
# 正常停止
pass
except Exception: # noqa
except Exception as e:
logger.exception('room=%s _network_coroutine() finished with exception:', self.room_id)
exc = e
finally:
logger.debug('room=%s _network_coroutine() finished', self.room_id)
self._network_future = None
if exc is not None and self._handler is not None:
self._handler.on_stopped_by_exception(self, exc)
async def _network_coroutine(self):
"""
网络协程负责连接服务器接收消息解包
@ -261,7 +249,6 @@ class WebSocketClientBase:
self._get_ws_url(retry_count),
headers={'User-Agent': utils.USER_AGENT}, # web端的token也会签名UA
receive_timeout=self._heartbeat_interval + 5,
ssl=self._ssl
) as websocket:
self._websocket = websocket
await self._on_ws_connect()
@ -281,9 +268,6 @@ class WebSocketClientBase:
logger.exception('room=%d auth failed, trying init_room() again', self.room_id)
if not await self.init_room():
raise InitError('init_room() failed')
except ssl_.SSLError:
logger.error('room=%d a SSLError happened, cannot reconnect', self.room_id)
raise
finally:
self._websocket = None
await self._on_ws_close()
@ -414,7 +398,7 @@ class WebSocketClientBase:
'popularity': popularity
}
}
await self._handle_command(body)
self._handle_command(body)
else:
# 未知消息
@ -441,7 +425,7 @@ class WebSocketClientBase:
if len(body) != 0:
try:
body = json.loads(body.decode('utf-8'))
await self._handle_command(body)
self._handle_command(body)
except asyncio.CancelledError:
raise
except Exception:
@ -464,19 +448,17 @@ class WebSocketClientBase:
logger.warning('room=%d unknown message operation=%d, header=%s, body=%s', self.room_id,
header.operation, header, body)
async def _handle_command(self, command: dict):
def _handle_command(self, command: dict):
"""
解析并处理业务消息
处理业务消息
:param command: 业务消息
"""
# TODO 考虑解析完整个WS包后再一次处理所有消息。另外用call_soon就不会阻塞网络协程了也不用加shield
# 外部代码可能不能正常处理取消所以这里加shield
results = await asyncio.shield(
asyncio.gather(
*(handler.handle(self, command) for handler in self._handlers), return_exceptions=True
)
)
for res in results:
if isinstance(res, Exception):
logger.exception('room=%d _handle_command() failed, command=%s', self.room_id, command, exc_info=res)
try:
# 为什么不做成异步的:
# 1. 为了保持处理消息的顺序这里不使用call_soon、create_task等方法延迟处理
# 2. 如果支持handle使用async函数用户可能会在里面处理耗时很长的异步操作导致网络协程阻塞
# 这里做成同步的强制用户使用create_task或消息队列处理异步操作这样就不会阻塞网络协程
self._handler.handle(self, command)
except Exception as e:
logger.exception('room=%d _handle_command() failed, command=%s', self.room_id, command, exc_info=e)

View File

@ -45,10 +45,13 @@ class HandlerInterface:
直播消息处理器接口
"""
async def handle(self, client: ws_base.WebSocketClientBase, command: dict):
def handle(self, client: ws_base.WebSocketClientBase, command: dict):
raise NotImplementedError
# TODO 加个异常停止的回调
def on_stopped_by_exception(self, client: ws_base.WebSocketClientBase, exception: Exception):
"""
当客户端被异常停止时调用可以在这里close或者重新start
"""
def _make_msg_callback(method_name, message_cls):
@ -72,7 +75,7 @@ class BaseHandler(HandlerInterface):
str,
Optional[Callable[
['BaseHandler', ws_base.WebSocketClientBase, dict],
Awaitable
Any
]]
] = {
# 收到心跳包这是blivedm自造的消息原本的心跳包格式不一样
@ -110,7 +113,7 @@ class BaseHandler(HandlerInterface):
}
"""cmd -> 处理回调"""
async def handle(self, client: ws_base.WebSocketClientBase, command: dict):
def handle(self, client: ws_base.WebSocketClientBase, command: dict):
cmd = command.get('cmd', '')
pos = cmd.find(':') # 2019-5-29 B站弹幕升级新增了参数
if pos != -1:
@ -125,34 +128,34 @@ class BaseHandler(HandlerInterface):
callback = self._CMD_CALLBACK_DICT[cmd]
if callback is not None:
await callback(self, client, command)
callback(self, client, command)
async def _on_heartbeat(self, client: ws_base.WebSocketClientBase, message: web_models.HeartbeatMessage):
def _on_heartbeat(self, client: ws_base.WebSocketClientBase, message: web_models.HeartbeatMessage):
"""
收到心跳包
"""
async def _on_danmaku(self, client: ws_base.WebSocketClientBase, message: web_models.DanmakuMessage):
def _on_danmaku(self, client: ws_base.WebSocketClientBase, message: web_models.DanmakuMessage):
"""
收到弹幕
"""
async def _on_gift(self, client: ws_base.WebSocketClientBase, message: web_models.GiftMessage):
def _on_gift(self, client: ws_base.WebSocketClientBase, message: web_models.GiftMessage):
"""
收到礼物
"""
async def _on_buy_guard(self, client: ws_base.WebSocketClientBase, message: web_models.GuardBuyMessage):
def _on_buy_guard(self, client: ws_base.WebSocketClientBase, message: web_models.GuardBuyMessage):
"""
有人上舰
"""
async def _on_super_chat(self, client: ws_base.WebSocketClientBase, message: web_models.SuperChatMessage):
def _on_super_chat(self, client: ws_base.WebSocketClientBase, message: web_models.SuperChatMessage):
"""
醒目留言
"""
async def _on_super_chat_delete(
def _on_super_chat_delete(
self, client: ws_base.WebSocketClientBase, message: web_models.SuperChatDeleteMessage
):
"""
@ -163,36 +166,36 @@ class BaseHandler(HandlerInterface):
# 开放平台消息
#
async def _on_open_live_danmaku(self, client: ws_base.WebSocketClientBase, message: open_models.DanmakuMessage):
def _on_open_live_danmaku(self, client: ws_base.WebSocketClientBase, message: open_models.DanmakuMessage):
"""
收到弹幕
"""
async def _on_open_live_gift(self, client: ws_base.WebSocketClientBase, message: open_models.GiftMessage):
def _on_open_live_gift(self, client: ws_base.WebSocketClientBase, message: open_models.GiftMessage):
"""
收到礼物
"""
async def _on_open_live_buy_guard(self, client: ws_base.WebSocketClientBase, message: open_models.GuardBuyMessage):
def _on_open_live_buy_guard(self, client: ws_base.WebSocketClientBase, message: open_models.GuardBuyMessage):
"""
有人上舰
"""
async def _on_open_live_super_chat(
def _on_open_live_super_chat(
self, client: ws_base.WebSocketClientBase, message: open_models.SuperChatMessage
):
"""
醒目留言
"""
async def _on_open_live_super_chat_delete(
def _on_open_live_super_chat_delete(
self, client: ws_base.WebSocketClientBase, message: open_models.SuperChatDeleteMessage
):
"""
删除醒目留言
"""
async def _on_open_live_like(self, client: ws_base.WebSocketClientBase, message: open_models.LikeMessage):
def _on_open_live_like(self, client: ws_base.WebSocketClientBase, message: open_models.LikeMessage):
"""
点赞
"""

View File

@ -29,7 +29,7 @@ async def run_single_client():
room_owner_auth_code=ROOM_OWNER_AUTH_CODE,
)
handler = MyHandler()
client.add_handler(handler)
client.set_handler(handler)
client.start()
try:
@ -43,31 +43,31 @@ async def run_single_client():
class MyHandler(blivedm.BaseHandler):
async def _on_heartbeat(self, client: blivedm.BLiveClient, message: web_models.HeartbeatMessage):
def _on_heartbeat(self, client: blivedm.BLiveClient, message: web_models.HeartbeatMessage):
print(f'[{client.room_id}] 心跳')
async def _on_open_live_danmaku(self, client: blivedm.OpenLiveClient, message: open_models.DanmakuMessage):
def _on_open_live_danmaku(self, client: blivedm.OpenLiveClient, message: open_models.DanmakuMessage):
print(f'[{message.room_id}] {message.uname}{message.msg}')
async def _on_open_live_gift(self, client: blivedm.OpenLiveClient, message: open_models.GiftMessage):
def _on_open_live_gift(self, client: blivedm.OpenLiveClient, message: open_models.GiftMessage):
coin_type = '金瓜子' if message.paid else '银瓜子'
print(f'[{message.room_id}] {message.uname} 赠送{message.gift_name}x{message.gift_num}'
f' {coin_type}x{message.price}')
async def _on_open_live_buy_guard(self, client: blivedm.OpenLiveClient, message: open_models.GuardBuyMessage):
def _on_open_live_buy_guard(self, client: blivedm.OpenLiveClient, message: open_models.GuardBuyMessage):
print(f'[{message.room_id}] {message.user_info.uname} 购买 大航海等级={message.guard_level}')
async def _on_open_live_super_chat(
def _on_open_live_super_chat(
self, client: blivedm.OpenLiveClient, message: open_models.SuperChatMessage
):
print(f'[{message.room_id}] 醒目留言 ¥{message.rmb} {message.uname}{message.message}')
async def _on_open_live_super_chat_delete(
def _on_open_live_super_chat_delete(
self, client: blivedm.OpenLiveClient, message: open_models.SuperChatDeleteMessage
):
print(f'[{message.room_id}] 删除醒目留言 message_ids={message.message_ids}')
async def _on_open_live_like(self, client: blivedm.OpenLiveClient, message: open_models.LikeMessage):
def _on_open_live_like(self, client: blivedm.OpenLiveClient, message: open_models.LikeMessage):
print(f'[{message.room_id}] {message.uname} 点赞')

View File

@ -18,6 +18,9 @@ TEST_ROOM_IDS = [
23105590,
]
# 这里填一个已登录账号的cookie。不填cookie也可以连接但是收到弹幕的用户名会打码UID会变成0
SESSDATA = ''
session: Optional[aiohttp.ClientSession] = None
@ -31,9 +34,8 @@ async def main():
def init_session():
# 这里填一个已登录账号的cookie。不填cookie也可以连接但是收到弹幕的用户名会打码UID会变成0
cookies = http.cookies.SimpleCookie()
cookies['SESSDATA'] = ''
cookies['SESSDATA'] = SESSDATA
cookies['SESSDATA']['domain'] = 'bilibili.com'
global session
@ -46,10 +48,9 @@ async def run_single_client():
演示监听一个直播间
"""
room_id = random.choice(TEST_ROOM_IDS)
# 如果SSL验证失败就把ssl设为FalseB站真的有过忘续证书的情况
client = blivedm.BLiveClient(room_id, session=session, ssl=True)
client = blivedm.BLiveClient(room_id, session=session)
handler = MyHandler()
client.add_handler(handler)
client.set_handler(handler)
client.start()
try:
@ -69,7 +70,7 @@ async def run_multi_clients():
clients = [blivedm.BLiveClient(room_id, session=session) for room_id in TEST_ROOM_IDS]
handler = MyHandler()
for client in clients:
client.add_handler(handler)
client.set_handler(handler)
client.start()
try:
@ -87,25 +88,25 @@ class MyHandler(blivedm.BaseHandler):
# _CMD_CALLBACK_DICT = blivedm.BaseHandler._CMD_CALLBACK_DICT.copy()
#
# # 入场消息回调
# async def __interact_word_callback(self, client: blivedm.BLiveClient, command: dict):
# def __interact_word_callback(self, client: blivedm.BLiveClient, command: dict):
# print(f"[{client.room_id}] INTERACT_WORD: self_type={type(self).__name__}, room_id={client.room_id},"
# f" uname={command['data']['uname']}")
# _CMD_CALLBACK_DICT['INTERACT_WORD'] = __interact_word_callback # noqa
async def _on_heartbeat(self, client: blivedm.BLiveClient, message: web_models.HeartbeatMessage):
def _on_heartbeat(self, client: blivedm.BLiveClient, message: web_models.HeartbeatMessage):
print(f'[{client.room_id}] 心跳')
async def _on_danmaku(self, client: blivedm.BLiveClient, message: web_models.DanmakuMessage):
def _on_danmaku(self, client: blivedm.BLiveClient, message: web_models.DanmakuMessage):
print(f'[{client.room_id}] {message.uname}{message.msg}')
async def _on_gift(self, client: blivedm.BLiveClient, message: web_models.GiftMessage):
def _on_gift(self, client: blivedm.BLiveClient, message: web_models.GiftMessage):
print(f'[{client.room_id}] {message.uname} 赠送{message.gift_name}x{message.num}'
f' {message.coin_type}瓜子x{message.total_coin}')
async def _on_buy_guard(self, client: blivedm.BLiveClient, message: web_models.GuardBuyMessage):
def _on_buy_guard(self, client: blivedm.BLiveClient, message: web_models.GuardBuyMessage):
print(f'[{client.room_id}] {message.username} 购买{message.gift_name}')
async def _on_super_chat(self, client: blivedm.BLiveClient, message: web_models.SuperChatMessage):
def _on_super_chat(self, client: blivedm.BLiveClient, message: web_models.SuperChatMessage):
print(f'[{client.room_id}] 醒目留言 ¥{message.price} {message.uname}{message.message}')