blivechat/services/chat.py

478 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import asyncio
import base64
import binascii
import logging
import uuid
from typing import *
import api.chat
import blivedm.blivedm as blivedm
import blivedm.blivedm.models.pb as blivedm_pb
import config
import services.avatar
import services.translate
import utils.request
logger = logging.getLogger(__name__)
# 到B站的连接管理
_live_client_manager: Optional['LiveClientManager'] = None
# 到客户端的连接管理
client_room_manager: Optional['ClientRoomManager'] = None
# 直播消息处理器
_live_msg_handler: Optional['LiveMsgHandler'] = None
def init():
global _live_client_manager, client_room_manager, _live_msg_handler
_live_client_manager = LiveClientManager()
client_room_manager = ClientRoomManager()
_live_msg_handler = LiveMsgHandler()
class LiveClientManager:
"""管理到B站的连接"""
def __init__(self):
self._live_clients: Dict[int, LiveClient] = {}
def add_live_client(self, room_id):
if room_id in self._live_clients:
return
logger.info('room=%d creating live client', room_id)
self._live_clients[room_id] = live_client = LiveClient(room_id)
live_client.add_handler(_live_msg_handler)
asyncio.create_task(self._init_live_client(live_client))
logger.info('room=%d live client created, %d live clients', room_id, len(self._live_clients))
async def _init_live_client(self, live_client: 'LiveClient'):
if not await live_client.init_room():
logger.warning('room=%d live client init failed', live_client.tmp_room_id)
self.del_live_client(live_client.tmp_room_id)
return
logger.info('room=%d (%d) live client init succeeded', live_client.tmp_room_id, live_client.room_id)
live_client.start()
def del_live_client(self, room_id):
live_client = self._live_clients.pop(room_id, None)
if live_client is None:
return
logger.info('room=%d removing live client', room_id)
live_client.remove_handler(_live_msg_handler)
asyncio.create_task(live_client.stop_and_close())
logger.info('room=%d live client removed, %d live clients', room_id, len(self._live_clients))
client_room_manager.del_room(room_id)
class LiveClient(blivedm.BLiveClient):
HEARTBEAT_INTERVAL = 10
def __init__(self, room_id):
super().__init__(room_id, session=utils.request.http_session, heartbeat_interval=self.HEARTBEAT_INTERVAL)
@property
def tmp_room_id(self):
"""初始化参数传入的房间IDroom_id可能改变这个不会变"""
return self._tmp_room_id
async def init_room(self):
await super().init_room()
return True
class ClientRoomManager:
"""管理到客户端的连接"""
# 房间没有客户端后延迟多久删除房间,不立即删除防止短时间后重连
DELAY_DEL_ROOM_TIMEOUT = 10
def __init__(self):
self._rooms: Dict[int, ClientRoom] = {}
# room_id -> timer_handle
self._delay_del_timer_handles: Dict[int, asyncio.TimerHandle] = {}
def add_client(self, room_id, client: 'api.chat.ChatHandler'):
room = self._get_or_add_room(room_id)
room.add_client(client)
self._clear_delay_del_timer(room_id)
def del_client(self, room_id, client: 'api.chat.ChatHandler'):
room = self.get_room(room_id)
if room is None:
return
room.del_client(client)
if room.client_count == 0:
self.delay_del_room(room_id, self.DELAY_DEL_ROOM_TIMEOUT)
def get_room(self, room_id):
return self._rooms.get(room_id, None)
def _get_or_add_room(self, room_id):
room = self._rooms.get(room_id, None)
if room is None:
logger.info('room=%d creating client room', room_id)
self._rooms[room_id] = room = ClientRoom(room_id)
logger.info('room=%d client room created, %d client rooms', room_id, len(self._rooms))
_live_client_manager.add_live_client(room_id)
return room
def del_room(self, room_id):
self._clear_delay_del_timer(room_id)
room = self._rooms.pop(room_id, None)
if room is None:
return
logger.info('room=%d removing client room', room_id)
room.clear_clients()
logger.info('room=%d client room removed, %d client rooms', room_id, len(self._rooms))
_live_client_manager.del_live_client(room_id)
def delay_del_room(self, room_id, timeout):
self._clear_delay_del_timer(room_id)
self._delay_del_timer_handles[room_id] = asyncio.get_running_loop().call_later(
timeout, self._on_delay_del_room, room_id
)
def _clear_delay_del_timer(self, room_id):
timer_handle = self._delay_del_timer_handles.pop(room_id, None)
if timer_handle is not None:
timer_handle.cancel()
def _on_delay_del_room(self, room_id):
self._delay_del_timer_handles.pop(room_id, None)
self.del_room(room_id)
class ClientRoom:
def __init__(self, room_id):
self._room_id = room_id
self._clients: List[api.chat.ChatHandler] = []
self._auto_translate_count = 0
@property
def room_id(self):
return self._room_id
@property
def client_count(self):
return len(self._clients)
@property
def need_translate(self):
return self._auto_translate_count > 0
def add_client(self, client: 'api.chat.ChatHandler'):
logger.info('room=%d addding client %s', self._room_id, client.request.remote_ip)
self._clients.append(client)
if client.auto_translate:
self._auto_translate_count += 1
logger.info('room=%d added client %s, %d clients', self._room_id, client.request.remote_ip,
self.client_count)
def del_client(self, client: 'api.chat.ChatHandler'):
client.close()
try:
self._clients.remove(client)
except ValueError:
return
if client.auto_translate:
self._auto_translate_count -= 1
logger.info('room=%d removed client %s, %d clients', self._room_id, client.request.remote_ip,
self.client_count)
def clear_clients(self):
logger.info('room=%d clearing %d clients', self._room_id, self.client_count)
for client in self._clients:
client.close()
self._clients.clear()
self._auto_translate_count = 0
def send_cmd_data(self, cmd, data):
body = api.chat.make_message_body(cmd, data)
for client in self._clients:
client.send_body_no_raise(body)
def send_cmd_data_if(self, filterer: Callable[['api.chat.ChatHandler'], bool], cmd, data):
body = api.chat.make_message_body(cmd, data)
for client in filter(filterer, self._clients):
client.send_body_no_raise(body)
class LiveMsgHandler(blivedm.BaseHandler):
# 重新定义XXX_callback是为了减少对字段名的依赖防止B站改字段名
def __danmu_msg_callback(self, client: LiveClient, command: dict):
info = command['info']
dm_v2 = command.get('dm_v2', '')
proto: Optional[blivedm_pb.SimpleDm] = None
if dm_v2 != '':
try:
proto = blivedm_pb.SimpleDm.loads(base64.b64decode(dm_v2))
except (binascii.Error, KeyError, TypeError, ValueError):
pass
if proto is not None:
face = proto.user.face
else:
face = ''
if len(info[3]) != 0:
medal_level = info[3][0]
medal_room_id = info[3][3]
else:
medal_level = 0
medal_room_id = 0
message = blivedm.DanmakuMessage(
timestamp=info[0][4],
msg_type=info[0][9],
dm_type=info[0][12],
emoticon_options=info[0][13],
msg=info[1],
uid=info[2][0],
uname=info[2][1],
face=face,
admin=info[2][2],
urank=info[2][5],
mobile_verify=info[2][6],
medal_level=medal_level,
medal_room_id=medal_room_id,
user_level=info[4][0],
privilege_type=info[7],
)
return self._on_danmaku(client, message)
def __send_gift_callback(self, client: LiveClient, command: dict):
data = command['data']
message = blivedm.GiftMessage(
gift_name=data['giftName'],
num=data['num'],
uname=data['uname'],
face=data['face'],
uid=data['uid'],
timestamp=data['timestamp'],
coin_type=data['coin_type'],
total_coin=data['total_coin'],
)
return self._on_gift(client, message)
def __guard_buy_callback(self, client: LiveClient, command: dict):
data = command['data']
message = blivedm.GuardBuyMessage(
uid=data['uid'],
username=data['username'],
guard_level=data['guard_level'],
start_time=data['start_time'],
)
return self._on_buy_guard(client, message)
def __super_chat_message_callback(self, client: LiveClient, command: dict):
data = command['data']
message = blivedm.SuperChatMessage(
price=data['price'],
message=data['message'],
start_time=data['start_time'],
id=data['id'],
uid=data['uid'],
uname=data['user_info']['uname'],
face=data['user_info']['face'],
)
return self._on_super_chat(client, message)
_CMD_CALLBACK_DICT = {
**blivedm.BaseHandler._CMD_CALLBACK_DICT,
'DANMU_MSG': __danmu_msg_callback,
'SEND_GIFT': __send_gift_callback,
'GUARD_BUY': __guard_buy_callback,
'SUPER_CHAT_MESSAGE': __super_chat_message_callback
}
async def _on_danmaku(self, client: LiveClient, message: blivedm.DanmakuMessage):
asyncio.create_task(self.__on_danmaku(client, message))
async def __on_danmaku(self, client: LiveClient, message: blivedm.DanmakuMessage):
avatar_url = message.face
if avatar_url != '':
services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url)
else:
# 先异步调用再获取房间,因为返回时房间可能已经不存在了
avatar_url = await services.avatar.get_avatar_url(message.uid)
room = client_room_manager.get_room(client.tmp_room_id)
if room is None:
return
if message.uid == client.room_owner_uid:
author_type = 3 # 主播
elif message.admin:
author_type = 2 # 房管
elif message.privilege_type != 0: # 1总督2提督3舰长
author_type = 1 # 舰队
else:
author_type = 0
if message.dm_type == 1:
content_type = api.chat.ContentType.EMOTICON
content_type_params = api.chat.make_emoticon_params(
message.emoticon_options_dict['url'],
)
else:
content_type = api.chat.ContentType.TEXT
content_type_params = None
need_translate = self._need_translate(message.msg, room)
if need_translate:
translation = services.translate.get_translation_from_cache(message.msg)
if translation is None:
# 没有缓存,需要后面异步翻译后通知
translation = ''
else:
need_translate = False
else:
translation = ''
msg_id = uuid.uuid4().hex
room.send_cmd_data(api.chat.Command.ADD_TEXT, api.chat.make_text_message_data(
avatar_url=avatar_url,
timestamp=int(message.timestamp / 1000),
author_name=message.uname,
author_type=author_type,
content=message.msg,
privilege_type=message.privilege_type,
is_gift_danmaku=bool(message.msg_type),
author_level=message.user_level,
is_newbie=message.urank < 10000,
is_mobile_verified=bool(message.mobile_verify),
medal_level=0 if message.medal_room_id != client.room_id else message.medal_level,
id_=msg_id,
translation=translation,
content_type=content_type,
content_type_params=content_type_params,
))
if need_translate:
await self._translate_and_response(message.msg, room.room_id, msg_id)
async def _on_gift(self, client: LiveClient, message: blivedm.GiftMessage):
avatar_url = services.avatar.process_avatar_url(message.face)
services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url)
# 丢人
if message.coin_type != 'gold':
return
room = client_room_manager.get_room(client.tmp_room_id)
if room is None:
return
room.send_cmd_data(api.chat.Command.ADD_GIFT, {
'id': uuid.uuid4().hex,
'avatarUrl': avatar_url,
'timestamp': message.timestamp,
'authorName': message.uname,
'totalCoin': message.total_coin,
'giftName': message.gift_name,
'num': message.num
})
async def _on_buy_guard(self, client: LiveClient, message: blivedm.GuardBuyMessage):
asyncio.create_task(self.__on_buy_guard(client, message))
@staticmethod
async def __on_buy_guard(client: LiveClient, message: blivedm.GuardBuyMessage):
# 先异步调用再获取房间,因为返回时房间可能已经不存在了
avatar_url = await services.avatar.get_avatar_url(message.uid)
room = client_room_manager.get_room(client.tmp_room_id)
if room is None:
return
room.send_cmd_data(api.chat.Command.ADD_MEMBER, {
'id': uuid.uuid4().hex,
'avatarUrl': avatar_url,
'timestamp': message.start_time,
'authorName': message.username,
'privilegeType': message.guard_level
})
async def _on_super_chat(self, client: LiveClient, message: blivedm.SuperChatMessage):
avatar_url = services.avatar.process_avatar_url(message.face)
services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url)
room = client_room_manager.get_room(client.tmp_room_id)
if room is None:
return
need_translate = self._need_translate(message.message, room)
if need_translate:
translation = services.translate.get_translation_from_cache(message.message)
if translation is None:
# 没有缓存,需要后面异步翻译后通知
translation = ''
else:
need_translate = False
else:
translation = ''
msg_id = str(message.id)
room.send_cmd_data(api.chat.Command.ADD_SUPER_CHAT, {
'id': msg_id,
'avatarUrl': avatar_url,
'timestamp': message.start_time,
'authorName': message.uname,
'price': message.price,
'content': message.message,
'translation': translation
})
if need_translate:
asyncio.create_task(self._translate_and_response(
message.message, room.room_id, msg_id, services.translate.Priority.HIGH
))
async def _on_super_chat_delete(self, client: LiveClient, message: blivedm.SuperChatDeleteMessage):
room = client_room_manager.get_room(client.tmp_room_id)
if room is None:
return
room.send_cmd_data(api.chat.Command.DEL_SUPER_CHAT, {
'ids': list(map(str, message.ids))
})
@staticmethod
def _need_translate(text, room: ClientRoom):
cfg = config.get_config()
return (
cfg.enable_translate
and room.need_translate
and (not cfg.allow_translate_rooms or room.room_id in cfg.allow_translate_rooms)
and services.translate.need_translate(text)
)
@staticmethod
async def _translate_and_response(text, room_id, msg_id, priority=services.translate.Priority.NORMAL):
translation = await services.translate.translate(text, priority)
if translation is None:
return
room = client_room_manager.get_room(room_id)
if room is None:
return
room.send_cmd_data_if(
lambda client: client.auto_translate,
api.chat.Command.UPDATE_TRANSLATION,
api.chat.make_translation_message_data(
msg_id,
translation
)
)