各种方便的小修改

This commit is contained in:
John Smith 2019-06-06 21:50:51 +08:00
parent 10a6a91e74
commit fe5b4ca2c9
2 changed files with 284 additions and 72 deletions

View File

@ -8,6 +8,7 @@ from collections import namedtuple
from enum import IntEnum
# noinspection PyProtectedMember
from ssl import _create_unverified_context
from typing import *
import aiohttp
@ -45,16 +46,189 @@ class Operation(IntEnum):
# MaxBusinessOp = 10000
class DanmakuMessage:
def __init__(self, mode, font_size, color, timestamp, rnd, uid_crc32, msg_type, bubble,
msg,
uid, uname, admin, vip, svip, urank, mobile_verify, uname_color,
medal_level, medal_name, runame, room_id, mcolor, special_medal,
user_level, ulevel_color, ulevel_rank,
old_title, title,
privilege_type):
"""
:param mode: 弹幕显示模式滚动顶部底部
:param font_size: 字体尺寸
:param color: 颜色
:param timestamp: 时间戳
:param rnd: 随机数
:param uid_crc32: 用户ID文本的CRC32
:param msg_type: 是否礼物弹幕节奏风暴
:param bubble: 右侧评论栏气泡
:param msg: 弹幕内容
:param uid: 用户ID
:param uname: 用户名
:param admin: 是否房管
:param vip: 是否月费老爷
:param svip: 是否年费老爷
:param urank: 用户身份用来判断是否正式会员猜测非正式会员为5000正式会员为10000
:param mobile_verify: 是否绑定手机
:param uname_color: 用户名颜色
:param medal_level: 勋章等级
:param medal_name: 勋章名
:param runame: 勋章房间主播名
:param room_id: 勋章房间ID
:param mcolor: 勋章颜色
:param special_medal: 特殊勋章
:param user_level: 用户等级
:param ulevel_color: 用户等级颜色
:param ulevel_rank: 用户等级排名>50000时为'>50000'
:param old_title: 旧头衔
:param title: 头衔
:param privilege_type: 舰队类型0非舰队1总督2提督3舰长
"""
self.mode = mode
self.font_size = font_size
self.color = color
self.timestamp = timestamp
self.rnd = rnd
self.uid_crc32 = uid_crc32
self.msg_type = msg_type
self.bubble = bubble
self.msg = msg
self.uid = uid
self.uname = uname
self.admin = admin
self.vip = vip
self.svip = svip
self.urank = urank
self.mobile_verify = mobile_verify
self.uname_color = uname_color
self.medal_level = medal_level
self.medal_name = medal_name
self.runame = runame
self.room_id = room_id
self.mcolor = mcolor
self.special_medal = special_medal
self.user_level = user_level
self.ulevel_color = ulevel_color
self.ulevel_rank = ulevel_rank
self.old_title = old_title
self.title = title
self.privilege_type = privilege_type
@classmethod
def from_command(cls, info: dict):
return cls(
info[0][1], info[0][2], info[0][3], info[0][4], info[0][5], info[0][7], info[0][9], info[0][10],
info[1],
*info[2],
*(info[3] or (0, '', '', 0, 0, 0)),
info[4][0], info[4][2], info[4][3],
*info[5],
info[7]
)
class GiftMessage:
def __init__(self, gift_name, num, uname, face, guard_level, uid, timestamp, gift_id,
gift_type, action, price, rnd, coin_type, total_coin):
"""
:param gift_name: 礼物名
:param num: 礼物数量
:param uname: 用户名
:param face: 用户头像URL
:param guard_level: 舰队等级0非舰队1总督2提督3舰长
:param uid: 用户ID
:param timestamp: 时间戳
:param gift_id: 礼物ID
:param gift_type: 礼物类型未知
:param action: 目前遇到的有'喂食''赠送'
:param price: 礼物单价瓜子数
:param rnd: 随机数
:param coin_type: 瓜子类型'silver''gold'
:param total_coin: 总瓜子数
"""
self.gift_name = gift_name
self.num = num
self.uname = uname
self.face = face
self.guard_level = guard_level
self.uid = uid
self.timestamp = timestamp
self.gift_id = gift_id
self.gift_type = gift_type
self.action = action
self.price = price
self.rnd = rnd
self.coin_type = coin_type
self.total_coin = total_coin
@classmethod
def from_command(cls, data: dict):
return cls(
data['giftName'], data['num'], data['uname'], data['face'], data['guard_level'],
data['uid'], data['timestamp'], data['giftId'], data['giftType'],
data['action'], data['price'], data['rnd'], data['coin_type'], data['total_coin']
)
class GuardBuyMessage:
def __init__(self, uid, username, guard_level, num, price, gift_id, gift_name,
start_time, end_time):
"""
:param uid: 用户ID
:param username: 用户名
:param guard_level: 舰队等级0非舰队1总督2提督3舰长
:param num: 数量
:param price: 单价金瓜子数
:param gift_id: 礼物ID
:param gift_name: 礼物名
:param start_time: 开始时间戳
:param end_time: 结束时间戳
"""
self.uid = uid
self.username = username
self.guard_level = guard_level
self.num = num
self.price = price
self.gift_id = gift_id
self.gift_name = gift_name
self.start_time = start_time
self.end_time = end_time
@classmethod
def from_command(cls, data: dict):
return cls(
data['uid'], data['username'], data['guard_level'], data['num'], data['price'],
data['gift_id'], data['gift_name'], data['start_time'], data['end_time']
)
class BLiveClient:
_COMMAND_HANDLERS = {
_COMMAND_HANDLERS: Dict[str, Optional[Callable[['BLiveClient', dict], Awaitable]]] = {
# 收到弹幕
# go-common\app\service\live\live-dm\service\v1\send.go
'DANMU_MSG': lambda client, command: client._on_get_danmaku(
command['info'][1], command['info'][2][1]
'DANMU_MSG': lambda client, command: client._on_receive_danmaku(
DanmakuMessage.from_command(command['info'])
),
# 有人送礼
'SEND_GIFT': lambda client, command: client._on_gift(
command['data']['giftName'], command['data']['num'], command['data']['uname']
'SEND_GIFT': lambda client, command: client._on_receive_gift(
GiftMessage.from_command(command['data'])
),
# 有人上舰
'GUARD_BUY': lambda client, command: client._on_buy_guard(
GuardBuyMessage.from_command(command['data'])
)
}
for cmd in ( # 其他已知命令
@ -63,22 +237,25 @@ class BLiveClient:
'BLOCK', 'ROUND', 'WELCOME', 'REFRESH', 'ACTIVITY_RED_PACKET', 'ROOM_LIMIT',
'PK_PRE', 'PK_END', 'PK_SETTLE', 'PK_MIC_END',
# 其他遇到的
'COMBO_SEND', 'COMBO_END', 'ROOM_RANK', 'NOTICE_MSG', 'WELCOME_GUARD', 'GUARD_BUY',
'WISH_BOTTLE', 'RAFFLE_START', 'ENTRY_EFFECT', 'ROOM_REAL_TIME_MESSAGE_UPDATE'
'COMBO_SEND', 'COMBO_END', 'ROOM_RANK', 'NOTICE_MSG', 'WELCOME_GUARD',
'WISH_BOTTLE', 'RAFFLE_START', 'ENTRY_EFFECT', 'ROOM_REAL_TIME_MESSAGE_UPDATE',
'USER_TOAST_MSG', 'GUARD_LOTTERY_START'
):
_COMMAND_HANDLERS[cmd] = None
def __init__(self, room_id, ssl=True, loop=None, session: aiohttp.ClientSession=None,
uid=0):
"""
:param room_id: URL中的房间ID
:param room_id: URL中的房间ID可以为短ID
:param ssl: True表示用默认的SSLContext验证False表示不验证也可以传入SSLContext
:param loop: 协程事件循环
:param session: cookie连接池
:param uid: B站用户ID0表示未登录
"""
self._short_id = room_id
self._room_id = None
# 用来init_room的临时房间ID
self._tmp_room_id = room_id
# 调用init_room后初始化
self._room_id = self._room_short_id = self._room_owner_uid = None
self._uid = uid
if loop is not None:
@ -87,7 +264,7 @@ class BLiveClient:
self._loop = session.loop
else:
self._loop = asyncio.get_event_loop()
self._is_running = False
self._future = None
if session is None:
self._session = aiohttp.ClientSession(loop=self._loop)
@ -102,7 +279,28 @@ class BLiveClient:
@property
def is_running(self):
return self._is_running
return self._future is not None
@property
def room_id(self):
"""
房间ID调用init_room后初始化
"""
return self._room_id
@property
def room_short_id(self):
"""
房间短ID没有则为0调用init_room后初始化
"""
return self._room_short_id
@property
def room_owner_uid(self):
"""
主播ID调用init_room后初始化
"""
return self._room_owner_uid
async def close(self):
"""
@ -111,28 +309,43 @@ class BLiveClient:
if self._own_session:
await self._session.close()
def run(self):
def start(self):
"""
创建相关的协程不会执行事件循环
:return: 协程的future
"""
if self._is_running:
if self._future is not None:
raise RuntimeError('This client is already running')
self._is_running = True
return asyncio.ensure_future(self._message_loop(), loop=self._loop)
self._future = asyncio.ensure_future(self._message_loop(), loop=self._loop)
return self._future
async def _get_room_id(self):
def stop(self):
"""
停止相关的协程
:return: 协程的future
"""
if self._future is None:
raise RuntimeError('This client is not running')
self._future.cancel()
return self._future
async def init_room(self):
async with self._session.get(ROOM_INIT_URL,
params={'id': self._short_id},
params={'id': self._tmp_room_id},
ssl=self._ssl) as res:
if res.status == 200:
data = await res.json()
if data['code'] == 0:
self._room_id = data['data']['room_id']
else:
raise ConnectionAbortedError('获取房间ID失败' + data['msg'])
else:
raise ConnectionAbortedError('获取房间ID失败' + res.reason)
if res.status != 200:
raise ConnectionAbortedError(f'room {self._tmp_room_id} init_room失败'
f'{res.status} {res.reason}')
data = await res.json()
if data['code'] != 0:
raise ConnectionAbortedError(f'room {self._tmp_room_id} init_room失败'
f'{data["msg"]}')
self._parse_room_init(data['data'])
def _parse_room_init(self, data):
self._room_id = data['room_id']
self._room_short_id = data['short_id']
self._room_owner_uid = data['uid']
def _make_packet(self, data, operation):
body = json.dumps(data).encode('utf-8')
@ -158,7 +371,7 @@ class BLiveClient:
async def _message_loop(self):
# 获取房间ID
if self._room_id is None:
await self._get_room_id()
await self.init_room()
while True:
heartbeat_future = None
@ -175,17 +388,18 @@ class BLiveClient:
if message.type == aiohttp.WSMsgType.BINARY:
await self._handle_message(message.data)
else:
logger.warning('未知的websocket消息type=%s %s', message.type, message.data)
logger.warning('room %d 未知的websocket消息type=%s %s', self.room_id,
message.type, message.data)
except asyncio.CancelledError:
break
except (aiohttp.ClientConnectorError, asyncio.TimeoutError):
# 重连
logger.warning('掉线重连中')
try:
await asyncio.sleep(5)
except asyncio.CancelledError:
break
logger.warning('room %d 掉线重连中', self.room_id)
# try:
# await asyncio.sleep(5)
# except asyncio.CancelledError:
# break
finally:
if heartbeat_future is not None:
heartbeat_future.cancel()
@ -195,7 +409,7 @@ class BLiveClient:
break
self._websocket = None
self._is_running = False
self._future = None
async def _heartbeat_loop(self):
while True:
@ -218,7 +432,7 @@ class BLiveClient:
popularity = int.from_bytes(message[offset + HEADER_STRUCT.size:
offset + HEADER_STRUCT.size + 4],
'big')
await self._on_get_popularity(popularity)
await self._on_receive_popularity(popularity)
elif header.operation == Operation.SEND_MSG_REPLY:
body = message[offset + HEADER_STRUCT.size: offset + header.pack_len]
@ -230,7 +444,8 @@ class BLiveClient:
else:
body = message[offset + HEADER_STRUCT.size: offset + header.pack_len]
logger.warning('未知包类型operation=%d %s%s', header.operation, header, body)
logger.warning('room %d 未知包类型operation=%d %s%s', self.room_id,
header.operation, header, body)
offset += header.pack_len
@ -249,28 +464,28 @@ class BLiveClient:
if handler is not None:
await handler(self, command)
else:
logger.warning('未知命令cmd=%s %s', cmd, command)
logger.warning('room %d 未知命令cmd=%s %s', self.room_id, cmd, command)
async def _on_get_popularity(self, popularity):
async def _on_receive_popularity(self, popularity: int):
"""
获取到人气值
:param popularity: 人气值
收到人气值
"""
pass
async def _on_get_danmaku(self, content, user_name):
async def _on_receive_danmaku(self, danmaku: DanmakuMessage):
"""
获取到弹幕
:param content: 弹幕内容
:param user_name: 弹幕作者
收到弹幕
"""
pass
async def _on_gift(self, gift_name, gift_num, user_name):
async def _on_receive_gift(self, gift: GiftMessage):
"""
有人送礼
:param gift_name: 礼物名
:param gift_num: 礼物数
:param user_name: 送礼人
收到礼物
"""
pass
async def _on_buy_guard(self, message: GuardBuyMessage):
"""
有人上舰
"""
pass

View File

@ -2,35 +2,40 @@
import asyncio
from blivedm import BLiveClient
import blivedm
class MyBLiveClient(BLiveClient):
class MyBLiveClient(blivedm.BLiveClient):
# 演示如何自定义handler
_COMMAND_HANDLERS = BLiveClient._COMMAND_HANDLERS.copy()
_COMMAND_HANDLERS['SEND_GIFT'] = lambda client, command: client._my_on_gift(
command['data']['giftName'], command['data']['num'], command['data']['uname'],
command['data']['coin_type'], command['data']['total_coin']
)
_COMMAND_HANDLERS = blivedm.BLiveClient._COMMAND_HANDLERS.copy()
async def _on_get_popularity(self, popularity):
async def __on_vip_enter(self, command):
print(command)
_COMMAND_HANDLERS['WELCOME'] = __on_vip_enter # 老爷入场
async def _on_receive_popularity(self, popularity: int):
print(f'当前人气值:{popularity}')
async def _on_get_danmaku(self, content, user_name):
print(f'{user_name}{content}')
async def _on_receive_danmaku(self, danmaku: blivedm.DanmakuMessage):
print(f'{danmaku.uname}{danmaku.msg}')
async def _my_on_gift(self, gift_name, gift_num, user_name, coin_type, total_coin):
print(f'{user_name} 赠送{gift_name}x{gift_num} {coin_type}币x{total_coin}')
async def _on_receive_gift(self, gift: blivedm.GiftMessage):
print(f'{gift.uname} 赠送{gift.gift_name}x{gift.num} {gift.coin_type}币x{gift.total_coin}')
async def _on_buy_guard(self, message: blivedm.GuardBuyMessage):
print(f'{message.username} 购买{message.gift_name}')
async def async_main():
async def main():
# 139是黑桐谷歌的直播间
# 如果SSL验证失败就把第二个参数设为False
client = MyBLiveClient(139, True)
future = client.run()
future = client.start()
try:
# 5秒后停止测试用
# await asyncio.sleep(5)
# future = client.stop()
# 或者
# future.cancel()
await future
@ -38,13 +43,5 @@ async def async_main():
await client.close()
def main():
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(async_main())
finally:
loop.close()
if __name__ == '__main__':
main()
asyncio.get_event_loop().run_until_complete(main())