mirror of
https://github.com/xfgryujk/blivechat.git
synced 2025-04-12 12:31:15 +08:00
通过服务器转发时支持开放平台接口
This commit is contained in:
parent
72841ab46e
commit
07569033fa
75
api/chat.py
75
api/chat.py
@ -127,7 +127,7 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa
|
||||
self._heartbeat_timer_handle = None
|
||||
self._receive_timeout_timer_handle = None
|
||||
|
||||
self.room_id = None
|
||||
self.room_key: Optional[services.chat.RoomKey] = None
|
||||
self.auto_translate = False
|
||||
|
||||
def open(self):
|
||||
@ -156,9 +156,9 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa
|
||||
self.close()
|
||||
|
||||
def on_close(self):
|
||||
logger.info('client=%s disconnected, room=%s', self.request.remote_ip, str(self.room_id))
|
||||
logger.info('client=%s disconnected, room=%s', self.request.remote_ip, self.room_key)
|
||||
if self.has_joined_room:
|
||||
services.chat.client_room_manager.del_client(self.room_id, self)
|
||||
services.chat.client_room_manager.del_client(self.room_key, self)
|
||||
if self._heartbeat_timer_handle is not None:
|
||||
self._heartbeat_timer_handle.cancel()
|
||||
self._heartbeat_timer_handle = None
|
||||
@ -169,7 +169,7 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa
|
||||
def on_message(self, message):
|
||||
try:
|
||||
body = json.loads(message)
|
||||
cmd = body['cmd']
|
||||
cmd = int(body['cmd'])
|
||||
|
||||
if cmd == Command.HEARTBEAT:
|
||||
# 超时没有加入房间也断开
|
||||
@ -177,20 +177,7 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa
|
||||
self._refresh_receive_timeout_timer()
|
||||
|
||||
elif cmd == Command.JOIN_ROOM:
|
||||
if self.has_joined_room:
|
||||
return
|
||||
self._refresh_receive_timeout_timer()
|
||||
|
||||
self.room_id = int(body['data']['roomId'])
|
||||
logger.info('client=%s joining room %d', self.request.remote_ip, self.room_id)
|
||||
try:
|
||||
cfg = body['data']['config']
|
||||
self.auto_translate = bool(cfg['autoTranslate'])
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
services.chat.client_room_manager.add_client(self.room_id, self)
|
||||
asyncio.create_task(self._on_joined_room())
|
||||
self._on_join_room_req(body)
|
||||
|
||||
else:
|
||||
logger.warning('client=%s unknown cmd=%d, body=%s', self.request.remote_ip, cmd, body)
|
||||
@ -198,6 +185,41 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa
|
||||
except Exception: # noqa
|
||||
logger.exception('client=%s on_message error, message=%s', self.request.remote_ip, message)
|
||||
|
||||
def _on_join_room_req(self, body: dict):
|
||||
if self.has_joined_room:
|
||||
return
|
||||
data = body['data']
|
||||
|
||||
room_key_dict = data.get('roomKey', None)
|
||||
if room_key_dict is not None:
|
||||
room_key_type = services.chat.RoomKeyType(room_key_dict['type'])
|
||||
room_key_value = room_key_dict['value']
|
||||
if room_key_type == services.chat.RoomKeyType.ROOM_ID:
|
||||
if not isinstance(room_key_value, int):
|
||||
raise TypeError(f'Room key value type error, value={room_key_value}')
|
||||
elif room_key_type == services.chat.RoomKeyType.AUTH_CODE:
|
||||
if not isinstance(room_key_value, str):
|
||||
raise TypeError(f'Room key value type error, value={room_key_value}')
|
||||
else:
|
||||
raise ValueError(f'Unknown RoomKeyType={room_key_type}')
|
||||
else:
|
||||
# 兼容旧版客户端 TODO 过几个版本可以移除
|
||||
room_key_type = services.chat.RoomKeyType.ROOM_ID
|
||||
room_key_value = int(data['roomId'])
|
||||
self.room_key = services.chat.RoomKey(room_key_type, room_key_value)
|
||||
logger.info('client=%s joining room %s', self.request.remote_ip, self.room_key)
|
||||
|
||||
try:
|
||||
cfg = data['config']
|
||||
self.auto_translate = bool(cfg['autoTranslate'])
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
services.chat.client_room_manager.add_client(self.room_key, self)
|
||||
asyncio.create_task(self._on_joined_room())
|
||||
|
||||
self._refresh_receive_timeout_timer()
|
||||
|
||||
# 跨域测试用
|
||||
def check_origin(self, origin):
|
||||
if self.application.settings['debug']:
|
||||
@ -206,7 +228,7 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa
|
||||
|
||||
@property
|
||||
def has_joined_room(self):
|
||||
return self.room_id is not None
|
||||
return self.room_key is not None
|
||||
|
||||
def send_cmd_data(self, cmd, data):
|
||||
self.send_body_no_raise(make_message_body(cmd, data))
|
||||
@ -224,7 +246,12 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa
|
||||
# 不允许自动翻译的提示
|
||||
if self.auto_translate:
|
||||
cfg = config.get_config()
|
||||
if cfg.allow_translate_rooms and self.room_id not in cfg.allow_translate_rooms:
|
||||
if (
|
||||
cfg.allow_translate_rooms
|
||||
# 身份码就不管了吧,反正配置正确的情况下不会看到这个提示
|
||||
and self.room_key.type == services.chat.RoomKeyType.ROOM_ID
|
||||
and self.room_key.value not in cfg.allow_translate_rooms
|
||||
):
|
||||
self.send_cmd_data(Command.ADD_TEXT, make_text_message_data(
|
||||
author_name='blivechat',
|
||||
author_type=2,
|
||||
@ -295,7 +322,8 @@ class RoomInfoHandler(api.base.ApiHandler): # noqa
|
||||
room_id = int(self.get_query_argument('roomId'))
|
||||
logger.info('client=%s getting room info, room=%d', self.request.remote_ip, room_id)
|
||||
room_id, owner_uid = await self._get_room_info(room_id)
|
||||
host_server_list = await self._get_server_host_list(room_id)
|
||||
# 连接其他host必须要key
|
||||
host_server_list = dm_web_cli.DEFAULT_DANMAKU_SERVER_LIST
|
||||
if owner_uid == 0:
|
||||
# 缓存3分钟
|
||||
self.set_header('Cache-Control', 'private, max-age=180')
|
||||
@ -338,11 +366,6 @@ class RoomInfoHandler(api.base.ApiHandler): # noqa
|
||||
room_info = data['data']['room_info']
|
||||
return room_info['room_id'], room_info['uid']
|
||||
|
||||
@staticmethod
|
||||
async def _get_server_host_list(_room_id):
|
||||
# 连接其他host必须要key
|
||||
return dm_web_cli.DEFAULT_DANMAKU_SERVER_LIST
|
||||
|
||||
|
||||
class AvatarHandler(api.base.ApiHandler): # noqa
|
||||
async def get(self):
|
||||
|
36
config.py
36
config.py
@ -57,15 +57,25 @@ class AppConfig:
|
||||
self.open_browser_at_startup = True
|
||||
self.enable_upload_file = True
|
||||
|
||||
self.fetch_avatar_max_queue_size = 1
|
||||
self.fetch_avatar_max_queue_size = 4
|
||||
self.avatar_cache_size = 10000
|
||||
|
||||
self.open_live_access_key_id = ''
|
||||
self.open_live_access_key_secret = ''
|
||||
self.open_live_app_id = 0
|
||||
|
||||
self.enable_translate = True
|
||||
self.allow_translate_rooms = set()
|
||||
self.translate_max_queue_size = 10
|
||||
self.translation_cache_size = 50000
|
||||
self.translator_configs = []
|
||||
|
||||
@property
|
||||
def is_open_live_configured(self):
|
||||
return (
|
||||
self.open_live_access_key_id != '' and self.open_live_access_key_secret != '' and self.open_live_app_id != 0
|
||||
)
|
||||
|
||||
def load(self, path):
|
||||
try:
|
||||
config = configparser.ConfigParser()
|
||||
@ -81,19 +91,25 @@ class AppConfig:
|
||||
def _load_app_config(self, config: configparser.ConfigParser):
|
||||
app_section = config['app']
|
||||
self.host = app_section.get('host', self.host)
|
||||
self.port = app_section.getint('port', fallback=self.port)
|
||||
self.port = app_section.getint('port', self.port)
|
||||
self.database_url = app_section.get('database_url', self.database_url)
|
||||
self.tornado_xheaders = app_section.getboolean('tornado_xheaders', fallback=self.tornado_xheaders)
|
||||
self.tornado_xheaders = app_section.getboolean('tornado_xheaders', self.tornado_xheaders)
|
||||
self.loader_url = app_section.get('loader_url', self.loader_url)
|
||||
self.open_browser_at_startup = app_section.getboolean('open_browser_at_startup',
|
||||
fallback=self.open_browser_at_startup)
|
||||
self.enable_upload_file = app_section.getboolean('enable_upload_file', fallback=self.enable_upload_file)
|
||||
self.open_browser_at_startup = app_section.getboolean('open_browser_at_startup', self.open_browser_at_startup)
|
||||
self.enable_upload_file = app_section.getboolean('enable_upload_file', self.enable_upload_file)
|
||||
|
||||
self.fetch_avatar_max_queue_size = app_section.getint('fetch_avatar_max_queue_size',
|
||||
fallback=self.fetch_avatar_max_queue_size)
|
||||
self.avatar_cache_size = app_section.getint('avatar_cache_size', fallback=self.avatar_cache_size)
|
||||
self.fetch_avatar_max_queue_size = app_section.getint(
|
||||
'fetch_avatar_max_queue_size', self.fetch_avatar_max_queue_size
|
||||
)
|
||||
self.avatar_cache_size = app_section.getint('avatar_cache_size', self.avatar_cache_size)
|
||||
|
||||
self.enable_translate = app_section.getboolean('enable_translate', fallback=self.enable_translate)
|
||||
self.open_live_access_key_id = app_section.get('open_live_access_key_id', self.open_live_access_key_id)
|
||||
self.open_live_access_key_secret = app_section.get(
|
||||
'open_live_access_key_secret', self.open_live_access_key_secret
|
||||
)
|
||||
self.open_live_app_id = app_section.getint('open_live_app_id', self.open_live_app_id)
|
||||
|
||||
self.enable_translate = app_section.getboolean('enable_translate', self.enable_translate)
|
||||
self.allow_translate_rooms = _str_to_list(app_section.get('allow_translate_rooms', ''), int, set)
|
||||
self.translate_max_queue_size = app_section.getint('translate_max_queue_size', self.translate_max_queue_size)
|
||||
self.translation_cache_size = app_section.getint('translation_cache_size', self.translation_cache_size)
|
||||
|
@ -61,6 +61,13 @@ translation_cache_size = 50000
|
||||
# **The following is for translation team. Leave it default if you don't know its meaning**
|
||||
# -------------------------------------------------------------------------------------------------
|
||||
|
||||
# 在B站直播开放平台申请的开发者密钥,如果不填,会把请求转发到作者的服务器
|
||||
open_live_access_key_id =
|
||||
open_live_access_key_secret =
|
||||
# 在B站直播开放平台创建的项目ID,如果不填,会把请求转发到作者的服务器
|
||||
open_live_app_id =
|
||||
|
||||
|
||||
# 翻译器配置,索引到下面的配置节。可以以逗号分隔配置多个翻译器,翻译时会自动负载均衡
|
||||
# 配置多个翻译器可以增加额度、增加QPS、容灾
|
||||
# 不同配置可以使用同一个类型,但要使用不同的账号,否则还是会遇到额度、调用频率限制
|
||||
|
261
services/chat.py
261
services/chat.py
@ -2,6 +2,7 @@
|
||||
import asyncio
|
||||
import base64
|
||||
import binascii
|
||||
import enum
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
@ -18,6 +19,29 @@ import utils.request
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RoomKeyType(enum.IntEnum):
|
||||
ROOM_ID = 1
|
||||
AUTH_CODE = 2
|
||||
|
||||
|
||||
class RoomKey(NamedTuple):
|
||||
"""内部用来标识一个房间,由客户端加入房间时传入"""
|
||||
type: RoomKeyType
|
||||
value: Union[int, str]
|
||||
|
||||
def __str__(self):
|
||||
res = str(self.value)
|
||||
if self.type == RoomKeyType.AUTH_CODE:
|
||||
# 身份码要脱敏
|
||||
res = '***' + res[-3:]
|
||||
return res
|
||||
__repr__ = __str__
|
||||
|
||||
|
||||
# 用于类型标注的类型别名
|
||||
LiveClientType = Union['WebLiveClient', 'OpenLiveClient']
|
||||
|
||||
# 到B站的连接管理
|
||||
_live_client_manager: Optional['LiveClientManager'] = None
|
||||
# 到客户端的连接管理
|
||||
@ -43,137 +67,198 @@ async def shut_down():
|
||||
class LiveClientManager:
|
||||
"""管理到B站的连接"""
|
||||
def __init__(self):
|
||||
self._live_clients: Dict[int, WebLiveClient] = {}
|
||||
self._live_clients: Dict[RoomKey, LiveClientType] = {}
|
||||
self._close_client_futures: Set[asyncio.Future] = set()
|
||||
|
||||
async def shut_down(self):
|
||||
while len(self._live_clients) != 0:
|
||||
room_id = next(iter(self._live_clients))
|
||||
self.del_live_client(room_id)
|
||||
room_key = next(iter(self._live_clients))
|
||||
self.del_live_client(room_key)
|
||||
|
||||
await asyncio.gather(*self._close_client_futures, return_exceptions=True)
|
||||
|
||||
def add_live_client(self, room_id):
|
||||
if room_id in self._live_clients:
|
||||
def add_live_client(self, room_key: RoomKey):
|
||||
if room_key in self._live_clients:
|
||||
return
|
||||
logger.info('room=%d creating live client', room_id)
|
||||
self._live_clients[room_id] = live_client = WebLiveClient(room_id)
|
||||
|
||||
logger.info('room=%s creating live client', room_key)
|
||||
|
||||
self._live_clients[room_key] = live_client = self._create_live_client(room_key)
|
||||
live_client.set_handler(_live_msg_handler)
|
||||
# 直接启动吧,这里不用管init_room失败的情况,万一失败了会在on_client_stopped里删除掉这个客户端
|
||||
live_client.start()
|
||||
logger.info('room=%d live client created, %d live clients', room_id, len(self._live_clients))
|
||||
|
||||
def del_live_client(self, room_id):
|
||||
live_client = self._live_clients.pop(room_id, None)
|
||||
logger.info('room=%s live client created, %d live clients', room_key, len(self._live_clients))
|
||||
|
||||
@staticmethod
|
||||
def _create_live_client(room_key: RoomKey):
|
||||
if room_key.type == RoomKeyType.ROOM_ID:
|
||||
return WebLiveClient(room_key)
|
||||
elif room_key.type == RoomKeyType.AUTH_CODE:
|
||||
return OpenLiveClient(room_key)
|
||||
raise ValueError(f'Unknown RoomKeyType={room_key.type}')
|
||||
|
||||
def del_live_client(self, room_key: RoomKey):
|
||||
live_client = self._live_clients.pop(room_key, None)
|
||||
if live_client is None:
|
||||
return
|
||||
logger.info('room=%d removing live client', room_id)
|
||||
live_client.set_handler(None)
|
||||
|
||||
logger.info('room=%s removing live client', room_key)
|
||||
|
||||
live_client.set_handler(None)
|
||||
future = asyncio.create_task(live_client.stop_and_close())
|
||||
self._close_client_futures.add(future)
|
||||
future.add_done_callback(lambda _future: self._close_client_futures.discard(future))
|
||||
|
||||
logger.info('room=%d live client removed, %d live clients', room_id, len(self._live_clients))
|
||||
logger.info('room=%s live client removed, %d live clients', room_key, len(self._live_clients))
|
||||
|
||||
client_room_manager.del_room(room_id)
|
||||
client_room_manager.del_room(room_key)
|
||||
|
||||
def get_live_client(self, room_key: RoomKey):
|
||||
return self._live_clients.get(room_key, None)
|
||||
|
||||
|
||||
class WebLiveClient(blivedm.BLiveClient):
|
||||
HEARTBEAT_INTERVAL = 10
|
||||
|
||||
def __init__(self, room_id):
|
||||
super().__init__(room_id, uid=0, session=utils.request.http_session, heartbeat_interval=self.HEARTBEAT_INTERVAL)
|
||||
def __init__(self, room_key: RoomKey):
|
||||
assert room_key.type == RoomKeyType.ROOM_ID
|
||||
super().__init__(
|
||||
room_key.value,
|
||||
uid=0,
|
||||
session=utils.request.http_session,
|
||||
heartbeat_interval=self.HEARTBEAT_INTERVAL,
|
||||
)
|
||||
|
||||
@property
|
||||
def room_key(self):
|
||||
return RoomKey(RoomKeyType.ROOM_ID, self.tmp_room_id)
|
||||
|
||||
async def init_room(self):
|
||||
await super().init_room()
|
||||
res = await super().init_room()
|
||||
if res:
|
||||
logger.info('room=%s live client init succeeded, room_id=%d', self.room_key, self.room_id)
|
||||
else:
|
||||
logger.info('room=%s live client init with a downgrade, room_id=%d', self.room_key, self.room_id)
|
||||
# 允许降级
|
||||
return True
|
||||
|
||||
|
||||
class OpenLiveClient(blivedm.OpenLiveClient):
|
||||
HEARTBEAT_INTERVAL = 10
|
||||
|
||||
def __init__(self, room_key: RoomKey):
|
||||
assert room_key.type == RoomKeyType.AUTH_CODE
|
||||
cfg = config.get_config()
|
||||
super().__init__(
|
||||
access_key_id=cfg.open_live_access_key_id,
|
||||
access_key_secret=cfg.open_live_access_key_secret,
|
||||
app_id=cfg.open_live_app_id,
|
||||
room_owner_auth_code=room_key.value,
|
||||
session=utils.request.http_session,
|
||||
heartbeat_interval=self.HEARTBEAT_INTERVAL,
|
||||
)
|
||||
|
||||
@property
|
||||
def room_key(self):
|
||||
return RoomKey(RoomKeyType.AUTH_CODE, self.room_owner_auth_code)
|
||||
|
||||
async def init_room(self):
|
||||
res = await super().init_room()
|
||||
if res:
|
||||
logger.info('room=%s live client init succeeded, room_id=%d', self.room_key, self.room_id)
|
||||
else:
|
||||
logger.info('room=%s live client init failed', self.room_key)
|
||||
return res
|
||||
|
||||
# TODO 如果没有配置access_key,则请求公共服务器
|
||||
|
||||
|
||||
class ClientRoomManager:
|
||||
"""管理到客户端的连接"""
|
||||
# 房间没有客户端后延迟多久删除房间,不立即删除防止短时间后重连
|
||||
DELAY_DEL_ROOM_TIMEOUT = 10
|
||||
|
||||
def __init__(self):
|
||||
self._rooms: Dict[int, ClientRoom] = {}
|
||||
# room_id -> timer_handle
|
||||
self._delay_del_timer_handles: Dict[int, asyncio.TimerHandle] = {}
|
||||
self._rooms: Dict[RoomKey, ClientRoom] = {}
|
||||
self._delay_del_timer_handles: Dict[RoomKey, asyncio.TimerHandle] = {}
|
||||
|
||||
def shut_down(self):
|
||||
while len(self._rooms) != 0:
|
||||
room_id = next(iter(self._rooms))
|
||||
self.del_room(room_id)
|
||||
room_key = next(iter(self._rooms))
|
||||
self.del_room(room_key)
|
||||
|
||||
for timer_handle in self._delay_del_timer_handles.values():
|
||||
timer_handle.cancel()
|
||||
self._delay_del_timer_handles.clear()
|
||||
|
||||
def add_client(self, room_id, client: 'api.chat.ChatHandler'):
|
||||
room = self._get_or_add_room(room_id)
|
||||
def add_client(self, room_key: RoomKey, client: 'api.chat.ChatHandler'):
|
||||
room = self._get_or_add_room(room_key)
|
||||
room.add_client(client)
|
||||
|
||||
self._clear_delay_del_timer(room_id)
|
||||
self._clear_delay_del_timer(room_key)
|
||||
|
||||
def del_client(self, room_id, client: 'api.chat.ChatHandler'):
|
||||
room = self.get_room(room_id)
|
||||
def del_client(self, room_key: RoomKey, client: 'api.chat.ChatHandler'):
|
||||
room = self.get_room(room_key)
|
||||
if room is None:
|
||||
return
|
||||
|
||||
room.del_client(client)
|
||||
|
||||
if room.client_count == 0:
|
||||
self.delay_del_room(room_id, self.DELAY_DEL_ROOM_TIMEOUT)
|
||||
self.delay_del_room(room_key, self.DELAY_DEL_ROOM_TIMEOUT)
|
||||
|
||||
def get_room(self, room_id):
|
||||
return self._rooms.get(room_id, None)
|
||||
def get_room(self, room_key: RoomKey):
|
||||
return self._rooms.get(room_key, None)
|
||||
|
||||
def _get_or_add_room(self, room_id):
|
||||
room = self._rooms.get(room_id, None)
|
||||
def _get_or_add_room(self, room_key: RoomKey):
|
||||
room = self._rooms.get(room_key, None)
|
||||
if room is None:
|
||||
logger.info('room=%d creating client room', room_id)
|
||||
self._rooms[room_id] = room = ClientRoom(room_id)
|
||||
logger.info('room=%d client room created, %d client rooms', room_id, len(self._rooms))
|
||||
logger.info('room=%s creating client room', room_key)
|
||||
self._rooms[room_key] = room = ClientRoom(room_key)
|
||||
logger.info('room=%s client room created, %d client rooms', room_key, len(self._rooms))
|
||||
|
||||
_live_client_manager.add_live_client(room_id)
|
||||
_live_client_manager.add_live_client(room_key)
|
||||
return room
|
||||
|
||||
def del_room(self, room_id):
|
||||
self._clear_delay_del_timer(room_id)
|
||||
def del_room(self, room_key: RoomKey):
|
||||
self._clear_delay_del_timer(room_key)
|
||||
|
||||
room = self._rooms.pop(room_id, None)
|
||||
room = self._rooms.pop(room_key, None)
|
||||
if room is None:
|
||||
return
|
||||
logger.info('room=%d removing client room', room_id)
|
||||
|
||||
logger.info('room=%s removing client room', room_key)
|
||||
room.clear_clients()
|
||||
logger.info('room=%d client room removed, %d client rooms', room_id, len(self._rooms))
|
||||
logger.info('room=%s client room removed, %d client rooms', room_key, len(self._rooms))
|
||||
|
||||
_live_client_manager.del_live_client(room_id)
|
||||
_live_client_manager.del_live_client(room_key)
|
||||
|
||||
def delay_del_room(self, room_id, timeout):
|
||||
self._clear_delay_del_timer(room_id)
|
||||
self._delay_del_timer_handles[room_id] = asyncio.get_running_loop().call_later(
|
||||
timeout, self._on_delay_del_room, room_id
|
||||
def delay_del_room(self, room_key: RoomKey, timeout):
|
||||
self._clear_delay_del_timer(room_key)
|
||||
self._delay_del_timer_handles[room_key] = asyncio.get_running_loop().call_later(
|
||||
timeout, self._on_delay_del_room, room_key
|
||||
)
|
||||
|
||||
def _clear_delay_del_timer(self, room_id):
|
||||
timer_handle = self._delay_del_timer_handles.pop(room_id, None)
|
||||
def _clear_delay_del_timer(self, room_key: RoomKey):
|
||||
timer_handle = self._delay_del_timer_handles.pop(room_key, None)
|
||||
if timer_handle is not None:
|
||||
timer_handle.cancel()
|
||||
|
||||
def _on_delay_del_room(self, room_id):
|
||||
self._delay_del_timer_handles.pop(room_id, None)
|
||||
self.del_room(room_id)
|
||||
def _on_delay_del_room(self, room_key: RoomKey):
|
||||
self._delay_del_timer_handles.pop(room_key, None)
|
||||
self.del_room(room_key)
|
||||
|
||||
|
||||
class ClientRoom:
|
||||
def __init__(self, room_id):
|
||||
self._room_id = room_id
|
||||
def __init__(self, room_key: RoomKey):
|
||||
self._room_key = room_key
|
||||
self._clients: List[api.chat.ChatHandler] = []
|
||||
self._auto_translate_count = 0
|
||||
|
||||
@property
|
||||
def room_id(self):
|
||||
return self._room_id
|
||||
def room_key(self) -> RoomKey:
|
||||
return self._room_key
|
||||
|
||||
@property
|
||||
def client_count(self):
|
||||
@ -184,11 +269,13 @@ class ClientRoom:
|
||||
return self._auto_translate_count > 0
|
||||
|
||||
def add_client(self, client: 'api.chat.ChatHandler'):
|
||||
logger.info('room=%d addding client %s', self._room_id, client.request.remote_ip)
|
||||
logger.info('room=%s addding client %s', self._room_key, client.request.remote_ip)
|
||||
|
||||
self._clients.append(client)
|
||||
if client.auto_translate:
|
||||
self._auto_translate_count += 1
|
||||
logger.info('room=%d added client %s, %d clients', self._room_id, client.request.remote_ip,
|
||||
|
||||
logger.info('room=%s added client %s, %d clients', self._room_key, client.request.remote_ip,
|
||||
self.client_count)
|
||||
|
||||
def del_client(self, client: 'api.chat.ChatHandler'):
|
||||
@ -199,11 +286,13 @@ class ClientRoom:
|
||||
return
|
||||
if client.auto_translate:
|
||||
self._auto_translate_count -= 1
|
||||
logger.info('room=%d removed client %s, %d clients', self._room_id, client.request.remote_ip,
|
||||
|
||||
logger.info('room=%s removed client %s, %d clients', self._room_key, client.request.remote_ip,
|
||||
self.client_count)
|
||||
|
||||
def clear_clients(self):
|
||||
logger.info('room=%d clearing %d clients', self._room_id, self.client_count)
|
||||
logger.info('room=%s clearing %d clients', self._room_key, self.client_count)
|
||||
|
||||
for client in self._clients:
|
||||
client.close()
|
||||
self._clients.clear()
|
||||
@ -222,7 +311,7 @@ class ClientRoom:
|
||||
|
||||
class LiveMsgHandler(blivedm.BaseHandler):
|
||||
# 重新定义XXX_callback是为了减少对字段名的依赖,防止B站改字段名
|
||||
def __danmu_msg_callback(self, client: WebLiveClient, command: dict):
|
||||
def __danmu_msg_callback(self, client: LiveClientType, command: dict):
|
||||
info = command['info']
|
||||
dm_v2 = command.get('dm_v2', '')
|
||||
|
||||
@ -269,7 +358,7 @@ class LiveMsgHandler(blivedm.BaseHandler):
|
||||
)
|
||||
return self._on_danmaku(client, message)
|
||||
|
||||
def __send_gift_callback(self, client: WebLiveClient, command: dict):
|
||||
def __send_gift_callback(self, client: LiveClientType, command: dict):
|
||||
data = command['data']
|
||||
message = dm_web_models.GiftMessage(
|
||||
gift_name=data['giftName'],
|
||||
@ -283,7 +372,7 @@ class LiveMsgHandler(blivedm.BaseHandler):
|
||||
)
|
||||
return self._on_gift(client, message)
|
||||
|
||||
def __guard_buy_callback(self, client: WebLiveClient, command: dict):
|
||||
def __guard_buy_callback(self, client: LiveClientType, command: dict):
|
||||
data = command['data']
|
||||
message = dm_web_models.GuardBuyMessage(
|
||||
uid=data['uid'],
|
||||
@ -293,7 +382,7 @@ class LiveMsgHandler(blivedm.BaseHandler):
|
||||
)
|
||||
return self._on_buy_guard(client, message)
|
||||
|
||||
def __super_chat_message_callback(self, client: WebLiveClient, command: dict):
|
||||
def __super_chat_message_callback(self, client: LiveClientType, command: dict):
|
||||
data = command['data']
|
||||
message = dm_web_models.SuperChatMessage(
|
||||
price=data['price'],
|
||||
@ -314,13 +403,13 @@ class LiveMsgHandler(blivedm.BaseHandler):
|
||||
'SUPER_CHAT_MESSAGE': __super_chat_message_callback
|
||||
}
|
||||
|
||||
def on_client_stopped(self, client: WebLiveClient, exception: Optional[Exception]):
|
||||
_live_client_manager.del_live_client(client.tmp_room_id)
|
||||
def on_client_stopped(self, client: LiveClientType, exception: Optional[Exception]):
|
||||
_live_client_manager.del_live_client(client.room_key)
|
||||
|
||||
def _on_danmaku(self, client: WebLiveClient, message: dm_web_models.DanmakuMessage):
|
||||
def _on_danmaku(self, client: LiveClientType, message: dm_web_models.DanmakuMessage):
|
||||
asyncio.create_task(self.__on_danmaku(client, message))
|
||||
|
||||
async def __on_danmaku(self, client: WebLiveClient, message: dm_web_models.DanmakuMessage):
|
||||
async def __on_danmaku(self, client: LiveClientType, message: dm_web_models.DanmakuMessage):
|
||||
avatar_url = message.face
|
||||
if avatar_url != '':
|
||||
services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url)
|
||||
@ -328,7 +417,7 @@ class LiveMsgHandler(blivedm.BaseHandler):
|
||||
# 先异步调用再获取房间,因为返回时房间可能已经不存在了
|
||||
avatar_url = await services.avatar.get_avatar_url(message.uid)
|
||||
|
||||
room = client_room_manager.get_room(client.tmp_room_id)
|
||||
room = client_room_manager.get_room(client.room_key)
|
||||
if room is None:
|
||||
return
|
||||
|
||||
@ -352,7 +441,9 @@ class LiveMsgHandler(blivedm.BaseHandler):
|
||||
|
||||
text_emoticons = self._parse_text_emoticons(message)
|
||||
|
||||
need_translate = content_type != api.chat.ContentType.EMOTICON and self._need_translate(message.msg, room)
|
||||
need_translate = (
|
||||
content_type != api.chat.ContentType.EMOTICON and self._need_translate(message.msg, room, client)
|
||||
)
|
||||
if need_translate:
|
||||
translation = services.translate.get_translation_from_cache(message.msg)
|
||||
if translation is None:
|
||||
@ -384,7 +475,7 @@ class LiveMsgHandler(blivedm.BaseHandler):
|
||||
))
|
||||
|
||||
if need_translate:
|
||||
await self._translate_and_response(message.msg, room.room_id, msg_id)
|
||||
await self._translate_and_response(message.msg, room.room_key, msg_id)
|
||||
|
||||
@staticmethod
|
||||
def _parse_text_emoticons(message: dm_web_models.DanmakuMessage):
|
||||
@ -403,7 +494,7 @@ class LiveMsgHandler(blivedm.BaseHandler):
|
||||
except (json.JSONDecodeError, TypeError, KeyError):
|
||||
return []
|
||||
|
||||
def _on_gift(self, client: WebLiveClient, message: dm_web_models.GiftMessage):
|
||||
def _on_gift(self, client: LiveClientType, message: dm_web_models.GiftMessage):
|
||||
avatar_url = services.avatar.process_avatar_url(message.face)
|
||||
services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url)
|
||||
|
||||
@ -411,7 +502,7 @@ class LiveMsgHandler(blivedm.BaseHandler):
|
||||
if message.coin_type != 'gold':
|
||||
return
|
||||
|
||||
room = client_room_manager.get_room(client.tmp_room_id)
|
||||
room = client_room_manager.get_room(client.room_key)
|
||||
if room is None:
|
||||
return
|
||||
|
||||
@ -425,15 +516,15 @@ class LiveMsgHandler(blivedm.BaseHandler):
|
||||
'num': message.num
|
||||
})
|
||||
|
||||
def _on_buy_guard(self, client: WebLiveClient, message: dm_web_models.GuardBuyMessage):
|
||||
def _on_buy_guard(self, client: LiveClientType, message: dm_web_models.GuardBuyMessage):
|
||||
asyncio.create_task(self.__on_buy_guard(client, message))
|
||||
|
||||
@staticmethod
|
||||
async def __on_buy_guard(client: WebLiveClient, message: dm_web_models.GuardBuyMessage):
|
||||
async def __on_buy_guard(client: LiveClientType, message: dm_web_models.GuardBuyMessage):
|
||||
# 先异步调用再获取房间,因为返回时房间可能已经不存在了
|
||||
avatar_url = await services.avatar.get_avatar_url(message.uid)
|
||||
|
||||
room = client_room_manager.get_room(client.tmp_room_id)
|
||||
room = client_room_manager.get_room(client.room_key)
|
||||
if room is None:
|
||||
return
|
||||
|
||||
@ -445,15 +536,15 @@ class LiveMsgHandler(blivedm.BaseHandler):
|
||||
'privilegeType': message.guard_level
|
||||
})
|
||||
|
||||
def _on_super_chat(self, client: WebLiveClient, message: dm_web_models.SuperChatMessage):
|
||||
def _on_super_chat(self, client: LiveClientType, message: dm_web_models.SuperChatMessage):
|
||||
avatar_url = services.avatar.process_avatar_url(message.face)
|
||||
services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url)
|
||||
|
||||
room = client_room_manager.get_room(client.tmp_room_id)
|
||||
room = client_room_manager.get_room(client.room_key)
|
||||
if room is None:
|
||||
return
|
||||
|
||||
need_translate = self._need_translate(message.message, room)
|
||||
need_translate = self._need_translate(message.message, room, client)
|
||||
if need_translate:
|
||||
translation = services.translate.get_translation_from_cache(message.message)
|
||||
if translation is None:
|
||||
@ -477,11 +568,11 @@ class LiveMsgHandler(blivedm.BaseHandler):
|
||||
|
||||
if need_translate:
|
||||
asyncio.create_task(self._translate_and_response(
|
||||
message.message, room.room_id, msg_id, services.translate.Priority.HIGH
|
||||
message.message, room.room_key, msg_id, services.translate.Priority.HIGH
|
||||
))
|
||||
|
||||
def _on_super_chat_delete(self, client: WebLiveClient, message: dm_web_models.SuperChatDeleteMessage):
|
||||
room = client_room_manager.get_room(client.tmp_room_id)
|
||||
def _on_super_chat_delete(self, client: LiveClientType, message: dm_web_models.SuperChatDeleteMessage):
|
||||
room = client_room_manager.get_room(client.room_key)
|
||||
if room is None:
|
||||
return
|
||||
|
||||
@ -490,22 +581,22 @@ class LiveMsgHandler(blivedm.BaseHandler):
|
||||
})
|
||||
|
||||
@staticmethod
|
||||
def _need_translate(text, room: ClientRoom):
|
||||
def _need_translate(text, room: ClientRoom, client: LiveClientType):
|
||||
cfg = config.get_config()
|
||||
return (
|
||||
cfg.enable_translate
|
||||
and room.need_translate
|
||||
and (not cfg.allow_translate_rooms or room.room_id in cfg.allow_translate_rooms)
|
||||
and (not cfg.allow_translate_rooms or client.room_id in cfg.allow_translate_rooms)
|
||||
and services.translate.need_translate(text)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
async def _translate_and_response(text, room_id, msg_id, priority=services.translate.Priority.NORMAL):
|
||||
async def _translate_and_response(text, room_key: RoomKey, msg_id, priority=services.translate.Priority.NORMAL):
|
||||
translation = await services.translate.translate(text, priority)
|
||||
if translation is None:
|
||||
return
|
||||
|
||||
room = client_room_manager.get_room(room_id)
|
||||
room = client_room_manager.get_room(room_key)
|
||||
if room is None:
|
||||
return
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user