From c87325e5e0b0f3109100eff47b6d6348ecb44217 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sat, 19 Feb 2022 14:58:29 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=B0B=E7=AB=99=E7=9A=84=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E5=92=8C=E5=88=B0=E5=AE=A2=E6=88=B7=E7=AB=AF=E7=9A=84=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E8=A7=A3=E8=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/chat.py | 113 +++++++-------- services/chat.py | 371 +++++++++++++++++++++++++++++------------------ 2 files changed, 285 insertions(+), 199 deletions(-) diff --git a/api/chat.py b/api/chat.py index ea0406d..5ac38dd 100644 --- a/api/chat.py +++ b/api/chat.py @@ -6,6 +6,7 @@ import logging import random import time import uuid +from typing import * import aiohttp import tornado.websocket @@ -37,9 +38,18 @@ class ContentType(enum.IntEnum): EMOTICON = 1 -def make_text_message(avatar_url, timestamp, author_name, author_type, content, privilege_type, - is_gift_danmaku, author_level, is_newbie, is_mobile_verified, medal_level, - id_, translation, content_type, content_type_params): +def make_message_body(cmd, data): + return json.dumps( + { + 'cmd': cmd, + 'data': data + } + ).encode('utf-8') + + +def make_text_message_data(avatar_url, timestamp, author_name, author_type, content, privilege_type, + is_gift_danmaku, author_level, is_newbie, is_mobile_verified, medal_level, + id_, translation, content_type, content_type_params): return [ # 0: avatarUrl avatar_url, @@ -81,7 +91,7 @@ def make_emoticon_params(url): ] -def make_translation_message(msg_id, translation): +def make_translation_message_data(msg_id, translation): return [ # 0: id msg_id, @@ -103,14 +113,14 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa self.auto_translate = False def open(self): - logger.info('Websocket connected %s', self.request.remote_ip) + logger.info('client=%s connected', self.request.remote_ip) self._heartbeat_timer_handle = asyncio.get_event_loop().call_later( self.HEARTBEAT_INTERVAL, self._on_send_heartbeat ) self._refresh_receive_timeout_timer() def _on_send_heartbeat(self): - self.send_message(Command.HEARTBEAT, {}) + self.send_cmd_data(Command.HEARTBEAT, {}) self._heartbeat_timer_handle = asyncio.get_event_loop().call_later( self.HEARTBEAT_INTERVAL, self._on_send_heartbeat ) @@ -123,14 +133,14 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa ) def _on_receive_timeout(self): - logger.warning('Client %s timed out', self.request.remote_ip) + logger.warning('client=%s timed out', self.request.remote_ip) self._receive_timeout_timer_handle = None self.close() def on_close(self): - logger.info('Websocket disconnected %s room: %s', self.request.remote_ip, str(self.room_id)) + logger.info('client=%s disconnected, room=%s', self.request.remote_ip, str(self.room_id)) if self.has_joined_room: - services.chat.room_manager.del_client(self.room_id, self) + services.chat.client_room_manager.del_client(self.room_id, self) if self._heartbeat_timer_handle is not None: self._heartbeat_timer_handle.cancel() self._heartbeat_timer_handle = None @@ -146,26 +156,31 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa body = json.loads(message) cmd = body['cmd'] + if cmd == Command.HEARTBEAT: pass + elif cmd == Command.JOIN_ROOM: if self.has_joined_room: return self._refresh_receive_timeout_timer() self.room_id = int(body['data']['roomId']) - logger.info('Client %s is joining room %d', self.request.remote_ip, self.room_id) + logger.info('client=%s joining room %d', self.request.remote_ip, self.room_id) try: cfg = body['data']['config'] - self.auto_translate = cfg['autoTranslate'] + self.auto_translate = bool(cfg['autoTranslate']) except KeyError: pass - asyncio.ensure_future(services.chat.room_manager.add_client(self.room_id, self)) + services.chat.client_room_manager.add_client(self.room_id, self) + asyncio.ensure_future(self._on_joined_room()) + else: - logger.warning('Unknown cmd, client: %s, cmd: %d, body: %s', self.request.remote_ip, cmd, body) + logger.warning('client=%s unknown cmd=%d, body=%s', self.request.remote_ip, cmd, body) + except Exception: # noqa - logger.exception('on_message error, client: %s, message: %s', self.request.remote_ip, message) + logger.exception('client=%s on_message error, message=%s', self.request.remote_ip, message) # 跨域测试用 def check_origin(self, origin): @@ -177,22 +192,24 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa def has_joined_room(self): return self.room_id is not None - def send_message(self, cmd, data): - body = json.dumps({'cmd': cmd, 'data': data}) + def send_cmd_data(self, cmd, data): + self.send_body_no_raise(make_message_body(cmd, data)) + + def send_body_no_raise(self, body: Union[bytes, str, Dict[str, Any]]): try: self.write_message(body) except tornado.websocket.WebSocketClosedError: self.close() - async def on_join_room(self): + async def _on_joined_room(self): if self.application.settings['debug']: - await self.send_test_message() + await self._send_test_message() # 不允许自动翻译的提示 if self.auto_translate: cfg = config.get_config() if cfg.allow_translate_rooms and self.room_id not in cfg.allow_translate_rooms: - self.send_message(Command.ADD_TEXT, make_text_message( + self.send_cmd_data(Command.ADD_TEXT, make_text_message_data( avatar_url=services.avatar.DEFAULT_AVATAR_URL, timestamp=int(time.time()), author_name='blivechat', @@ -211,13 +228,13 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa )) # 测试用 - async def send_test_message(self): + async def _send_test_message(self): base_data = { 'avatarUrl': await services.avatar.get_avatar_url(300474), 'timestamp': int(time.time()), 'authorName': 'xfgryujk', } - text_data = make_text_message( + text_data = make_text_message_data( avatar_url=base_data['avatarUrl'], timestamp=base_data['timestamp'], author_name=base_data['authorName'], @@ -253,32 +270,30 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa 'content': 'The quick brown fox jumps over the lazy dog', 'translation': '' } - self.send_message(Command.ADD_TEXT, text_data) + self.send_cmd_data(Command.ADD_TEXT, text_data) text_data[2] = '主播' text_data[3] = 3 text_data[4] = "I can eat glass, it doesn't hurt me." text_data[11] = uuid.uuid4().hex - self.send_message(Command.ADD_TEXT, text_data) - self.send_message(Command.ADD_MEMBER, member_data) - self.send_message(Command.ADD_SUPER_CHAT, sc_data) + self.send_cmd_data(Command.ADD_TEXT, text_data) + self.send_cmd_data(Command.ADD_MEMBER, member_data) + self.send_cmd_data(Command.ADD_SUPER_CHAT, sc_data) sc_data['id'] = str(random.randint(1, 65535)) sc_data['price'] = 100 sc_data['content'] = '敏捷的棕色狐狸跳过了懒狗' - self.send_message(Command.ADD_SUPER_CHAT, sc_data) + self.send_cmd_data(Command.ADD_SUPER_CHAT, sc_data) # self.send_message(Command.DEL_SUPER_CHAT, {'ids': [sc_data['id']]}) - self.send_message(Command.ADD_GIFT, gift_data) + self.send_cmd_data(Command.ADD_GIFT, gift_data) gift_data['id'] = uuid.uuid4().hex gift_data['totalCoin'] = 1245000 gift_data['giftName'] = '小电视飞船' - self.send_message(Command.ADD_GIFT, gift_data) + self.send_cmd_data(Command.ADD_GIFT, gift_data) class RoomInfoHandler(api.base.ApiHandler): # noqa - _host_server_list_cache = blivedm_client.DEFAULT_DANMAKU_SERVER_LIST - async def get(self): room_id = int(self.get_query_argument('roomId')) - logger.info('Client %s is getting room info %d', self.request.remote_ip, room_id) + logger.info('client=%s getting room info, room=%d', self.request.remote_ip, room_id) room_id, owner_uid = await self._get_room_info(room_id) host_server_list = await self._get_server_host_list(room_id) if owner_uid == 0: @@ -300,49 +315,25 @@ class RoomInfoHandler(api.base.ApiHandler): # noqa blivedm_client.ROOM_INIT_URL, params={'room_id': room_id} ) as res: if res.status != 200: - logger.warning('room %d _get_room_info failed: %d %s', room_id, + logger.warning('room=%d _get_room_info failed: %d %s', room_id, res.status, res.reason) return room_id, 0 data = await res.json() except (aiohttp.ClientConnectionError, asyncio.TimeoutError): - logger.exception('room %d _get_room_info failed', room_id) + logger.exception('room=%d _get_room_info failed', room_id) return room_id, 0 if data['code'] != 0: - logger.warning('room %d _get_room_info failed: %s', room_id, data['message']) + logger.warning('room=%d _get_room_info failed: %s', room_id, data['message']) return room_id, 0 room_info = data['data']['room_info'] return room_info['room_id'], room_info['uid'] - @classmethod - async def _get_server_host_list(cls, _room_id): - return cls._host_server_list_cache - + @staticmethod + async def _get_server_host_list(_room_id): # 连接其他host必须要key - # try: - # async with _http_session.get(blivedm.DANMAKU_SERVER_CONF_URL, params={'id': room_id, 'type': 0} - # ) as res: - # if res.status != 200: - # logger.warning('room %d _get_server_host_list failed: %d %s', room_id, - # res.status, res.reason) - # return cls._host_server_list_cache - # data = await res.json() - # except (aiohttp.ClientConnectionError, asyncio.TimeoutError): - # logger.exception('room %d _get_server_host_list failed', room_id) - # return cls._host_server_list_cache - # - # if data['code'] != 0: - # logger.warning('room %d _get_server_host_list failed: %s', room_id, data['message']) - # return cls._host_server_list_cache - # - # host_server_list = data['data']['host_list'] - # if not host_server_list: - # logger.warning('room %d _get_server_host_list failed: host_server_list is empty') - # return cls._host_server_list_cache - # - # cls._host_server_list_cache = host_server_list - # return host_server_list + return blivedm_client.DEFAULT_DANMAKU_SERVER_LIST class AvatarHandler(api.base.ApiHandler): # noqa diff --git a/services/chat.py b/services/chat.py index 11a9db5..e02a657 100644 --- a/services/chat.py +++ b/services/chat.py @@ -1,12 +1,9 @@ # -*- coding: utf-8 -*- import asyncio -import json import logging import uuid from typing import * -import tornado.websocket - import api.chat import blivedm.blivedm as blivedm import config @@ -16,19 +13,169 @@ import utils.request logger = logging.getLogger(__name__) -room_manager: Optional['RoomManager'] = None +# 到B站的连接管理 +_live_client_manager: Optional['LiveClientManager'] = None +# 到客户端的连接管理 +client_room_manager: Optional['ClientRoomManager'] = None +# 直播消息处理器 +_live_msg_handler: Optional['LiveMsgHandler'] = None def init(): - global room_manager - room_manager = RoomManager() + global _live_client_manager, client_room_manager, _live_msg_handler + _live_client_manager = LiveClientManager() + client_room_manager = ClientRoomManager() + _live_msg_handler = LiveMsgHandler() -class Room(blivedm.BLiveClient, blivedm.BaseHandler): - HEARTBEAT_INTERVAL = 10 +class LiveClientManager: + """管理到B站的连接""" + def __init__(self): + self._live_clients: Dict[int, LiveClient] = {} + def add_live_client(self, room_id): + if room_id in self._live_clients: + return + logger.info('room=%d creating live client', room_id) + self._live_clients[room_id] = live_client = LiveClient(room_id) + live_client.add_handler(_live_msg_handler) + asyncio.ensure_future(self._init_live_client(live_client)) + logger.info('room=%d live client created, %d live clients', room_id, len(self._live_clients)) + + async def _init_live_client(self, live_client: 'LiveClient'): + if not await live_client.init_room(): + logger.warning('room=%d live client init failed', live_client.tmp_room_id) + self.del_live_client(live_client.tmp_room_id) + return + logger.info('room=%d (%d) live client init succeeded', live_client.tmp_room_id, live_client.room_id) + live_client.start() + + def del_live_client(self, room_id): + live_client = self._live_clients.pop(room_id, None) + if live_client is None: + return + logger.info('room=%d removing live client', room_id) + live_client.remove_handler(_live_msg_handler) + asyncio.ensure_future(live_client.stop_and_close()) + logger.info('room=%d live client removed, %d live clients', room_id, len(self._live_clients)) + + client_room_manager.del_room(room_id) + + +class LiveClient(blivedm.BLiveClient): + def __init__(self, room_id): + super().__init__(room_id, session=utils.request.http_session, heartbeat_interval=10) + + @property + def tmp_room_id(self): + """初始化参数传入的房间ID,room_id可能改变,这个不会变""" + return self._tmp_room_id + + async def init_room(self): + await super().init_room() + return True + + +class ClientRoomManager: + """管理到客户端的连接""" + def __init__(self): + self._rooms: Dict[int, ClientRoom] = {} + + def add_client(self, room_id, client: 'api.chat.ChatHandler'): + room = self.get_or_add_room(room_id) + room.add_client(client) + + def del_client(self, room_id, client: 'api.chat.ChatHandler'): + room = self.get_room(room_id) + if room is None: + return + room.del_client(client) + + if room.client_count == 0: + self.del_room(room_id) + + def get_room(self, room_id): + return self._rooms.get(room_id, None) + + def get_or_add_room(self, room_id): + room = self._rooms.get(room_id, None) + if room is None: + logger.info('room=%d creating client room', room_id) + self._rooms[room_id] = room = ClientRoom(room_id) + logger.info('room=%d client room created, %d client rooms', room_id, len(self._rooms)) + + _live_client_manager.add_live_client(room_id) + return room + + def del_room(self, room_id): + room = self._rooms.pop(room_id, None) + if room is None: + return + logger.info('room=%d removing client room', room_id) + room.clear_clients() + logger.info('room=%d client room removed, %d client rooms', room_id, len(self._rooms)) + + _live_client_manager.del_live_client(room_id) + + +class ClientRoom: + def __init__(self, room_id): + self._room_id = room_id + self._clients: List[api.chat.ChatHandler] = [] + self._auto_translate_count = 0 + + @property + def room_id(self): + return self._room_id + + @property + def client_count(self): + return len(self._clients) + + @property + def need_translate(self): + return self._auto_translate_count > 0 + + def add_client(self, client: 'api.chat.ChatHandler'): + logger.info('room=%d addding client %s', self._room_id, client.request.remote_ip) + self._clients.append(client) + if client.auto_translate: + self._auto_translate_count += 1 + logger.info('room=%d added client %s, %d clients', self._room_id, client.request.remote_ip, + self.client_count) + + def del_client(self, client: 'api.chat.ChatHandler'): + client.close() + try: + self._clients.remove(client) + except ValueError: + return + if client.auto_translate: + self._auto_translate_count -= 1 + logger.info('room=%d removed client %s, %d clients', self._room_id, client.request.remote_ip, + self.client_count) + + def clear_clients(self): + logger.info('room=%d clearing %d clients', self._room_id, self.client_count) + for client in self._clients: + client.close() + self._clients.clear() + self._auto_translate_count = 0 + + def send_cmd_data(self, cmd, data): + body = api.chat.make_message_body(cmd, data) + for client in self._clients: + client.send_body_no_raise(body) + + def send_cmd_data_if(self, filterer: Callable[['api.chat.ChatHandler'], bool], cmd, data): + body = api.chat.make_message_body(cmd, data) + for client in filter(filterer, self._clients): + client.send_body_no_raise(body) + + +class LiveMsgHandler(blivedm.BaseHandler): # 重新定义XXX_callback是为了减少对字段名的依赖,防止B站改字段名 - def __danmu_msg_callback(self, client: blivedm.BLiveClient, command: dict): + def __danmu_msg_callback(self, client: LiveClient, command: dict): info = command['info'] if len(info[3]) != 0: medal_level = info[3][0] @@ -60,7 +207,7 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler): ) return self._on_danmaku(client, message) - def __send_gift_callback(self, client: blivedm.BLiveClient, command: dict): + def __send_gift_callback(self, client: LiveClient, command: dict): data = command['data'] message = blivedm.GiftMessage( gift_name=data['giftName'], @@ -74,7 +221,7 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler): ) return self._on_gift(client, message) - def __guard_buy_callback(self, client: blivedm.BLiveClient, command: dict): + def __guard_buy_callback(self, client: LiveClient, command: dict): data = command['data'] message = blivedm.GuardBuyMessage( uid=data['uid'], @@ -84,7 +231,7 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler): ) return self._on_buy_guard(client, message) - def __super_chat_message_callback(self, client: blivedm.BLiveClient, command: dict): + def __super_chat_message_callback(self, client: LiveClient, command: dict): data = command['data'] message = blivedm.SuperChatMessage( price=data['price'], @@ -105,37 +252,18 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler): 'SUPER_CHAT_MESSAGE': __super_chat_message_callback } - def __init__(self, room_id): - super().__init__(room_id, session=utils.request.http_session, heartbeat_interval=self.HEARTBEAT_INTERVAL) - self.add_handler(self) - self.clients: List[api.chat.ChatHandler] = [] - self.auto_translate_count = 0 + async def _on_danmaku(self, client: LiveClient, message: blivedm.DanmakuMessage): + asyncio.ensure_future(self.__on_danmaku(client, message)) - async def init_room(self): - await super().init_room() - return True + async def __on_danmaku(self, client: LiveClient, message: blivedm.DanmakuMessage): + # 先异步调用再获取房间,因为返回时房间可能已经不存在了 + avatar_url = await services.avatar.get_avatar_url(message.uid) - def send_message(self, cmd, data): - body = json.dumps({'cmd': cmd, 'data': data}) - for client in self.clients: - try: - client.write_message(body) - except tornado.websocket.WebSocketClosedError: - room_manager.del_client(self.room_id, client) + room = client_room_manager.get_room(client.tmp_room_id) + if room is None: + return - def send_message_if(self, can_send_func: Callable[['api.chat.ChatHandler'], bool], cmd, data): - body = json.dumps({'cmd': cmd, 'data': data}) - for client in filter(can_send_func, self.clients): - try: - client.write_message(body) - except tornado.websocket.WebSocketClosedError: - room_manager.del_client(self.room_id, client) - - async def _on_danmaku(self, client: blivedm.BLiveClient, message: blivedm.DanmakuMessage): - asyncio.ensure_future(self.__on_danmaku(message)) - - async def __on_danmaku(self, message: blivedm.DanmakuMessage): - if message.uid == self.room_owner_uid: + if message.uid == client.room_owner_uid: author_type = 3 # 主播 elif message.admin: author_type = 2 # 房管 @@ -153,7 +281,7 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler): content_type = api.chat.ContentType.TEXT content_type_params = None - need_translate = self._need_translate(message.msg) + need_translate = self._need_translate(message.msg, room) if need_translate: translation = services.translate.get_translation_from_cache(message.msg) if translation is None: @@ -164,10 +292,10 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler): else: translation = '' - id_ = uuid.uuid4().hex + msg_id = uuid.uuid4().hex # 为了节省带宽用list而不是dict - self.send_message(api.chat.Command.ADD_TEXT, api.chat.make_text_message( - avatar_url=await services.avatar.get_avatar_url(message.uid), + room.send_cmd_data(api.chat.Command.ADD_TEXT, api.chat.make_text_message_data( + avatar_url=avatar_url, timestamp=int(message.timestamp / 1000), author_name=message.uname, author_type=author_type, @@ -177,24 +305,31 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler): author_level=message.user_level, is_newbie=message.urank < 10000, is_mobile_verified=message.mobile_verify, - medal_level=0 if message.medal_room_id != self.room_id else message.medal_level, - id_=id_, + medal_level=0 if message.medal_room_id != client.room_id else message.medal_level, + id_=msg_id, translation=translation, content_type=content_type, content_type_params=content_type_params, )) if need_translate: - await self._translate_and_response(message.msg, id_) + await self._translate_and_response(message.msg, room.room_id, msg_id) - async def _on_gift(self, client: blivedm.BLiveClient, message: blivedm.GiftMessage): + async def _on_gift(self, client: LiveClient, message: blivedm.GiftMessage): avatar_url = services.avatar.process_avatar_url(message.face) + # 服务器白给的头像URL,直接缓存 services.avatar.update_avatar_cache(message.uid, avatar_url) - if message.coin_type != 'gold': # 丢人 + + # 丢人 + if message.coin_type != 'gold': return - id_ = uuid.uuid4().hex - self.send_message(api.chat.Command.ADD_GIFT, { - 'id': id_, + + room = client_room_manager.get_room(client.tmp_room_id) + if room is None: + return + + room.send_cmd_data(api.chat.Command.ADD_GIFT, { + 'id': uuid.uuid4().hex, 'avatarUrl': avatar_url, 'timestamp': message.timestamp, 'authorName': message.uname, @@ -203,24 +338,36 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler): 'num': message.num }) - async def _on_buy_guard(self, client: blivedm.BLiveClient, message: blivedm.GuardBuyMessage): - asyncio.ensure_future(self.__on_buy_guard(message)) + async def _on_buy_guard(self, client: LiveClient, message: blivedm.GuardBuyMessage): + asyncio.ensure_future(self.__on_buy_guard(client, message)) - async def __on_buy_guard(self, message: blivedm.GuardBuyMessage): - id_ = uuid.uuid4().hex - self.send_message(api.chat.Command.ADD_MEMBER, { - 'id': id_, - 'avatarUrl': await services.avatar.get_avatar_url(message.uid), + @staticmethod + async def __on_buy_guard(client: LiveClient, message: blivedm.GuardBuyMessage): + # 先异步调用再获取房间,因为返回时房间可能已经不存在了 + avatar_url = await services.avatar.get_avatar_url(message.uid) + + room = client_room_manager.get_room(client.tmp_room_id) + if room is None: + return + + room.send_cmd_data(api.chat.Command.ADD_MEMBER, { + 'id': uuid.uuid4().hex, + 'avatarUrl': avatar_url, 'timestamp': message.start_time, 'authorName': message.username, 'privilegeType': message.guard_level }) - async def _on_super_chat(self, client: blivedm.BLiveClient, message: blivedm.SuperChatMessage): + async def _on_super_chat(self, client: LiveClient, message: blivedm.SuperChatMessage): avatar_url = services.avatar.process_avatar_url(message.face) + # 服务器白给的头像URL,直接缓存 services.avatar.update_avatar_cache(message.uid, avatar_url) - need_translate = self._need_translate(message.message) + room = client_room_manager.get_room(client.tmp_room_id) + if room is None: + return + + need_translate = self._need_translate(message.message, room) if need_translate: translation = services.translate.get_translation_from_cache(message.message) if translation is None: @@ -231,9 +378,9 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler): else: translation = '' - id_ = str(message.id) - self.send_message(api.chat.Command.ADD_SUPER_CHAT, { - 'id': id_, + msg_id = str(message.id) + room.send_cmd_data(api.chat.Command.ADD_SUPER_CHAT, { + 'id': msg_id, 'avatarUrl': avatar_url, 'timestamp': message.start_time, 'authorName': message.uname, @@ -243,94 +390,42 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler): }) if need_translate: - asyncio.ensure_future(self._translate_and_response(message.message, id_)) + asyncio.ensure_future(self._translate_and_response(message.message, room.room_id, msg_id)) - async def _on_super_chat_delete(self, client: blivedm.BLiveClient, message: blivedm.SuperChatDeleteMessage): - self.send_message(api.chat.Command.ADD_SUPER_CHAT, { + async def _on_super_chat_delete(self, client: LiveClient, message: blivedm.SuperChatDeleteMessage): + room = client_room_manager.get_room(client.tmp_room_id) + if room is None: + return + + room.send_cmd_data(api.chat.Command.ADD_SUPER_CHAT, { 'ids': list(map(str, message.ids)) }) - def _need_translate(self, text): + @staticmethod + def _need_translate(text, room: ClientRoom): cfg = config.get_config() return ( - cfg.enable_translate - and (not cfg.allow_translate_rooms or self.room_id in cfg.allow_translate_rooms) - and self.auto_translate_count > 0 - and services.translate.need_translate(text) + cfg.enable_translate + and room.need_translate + and (not cfg.allow_translate_rooms or room.room_id in cfg.allow_translate_rooms) + and services.translate.need_translate(text) ) - async def _translate_and_response(self, text, msg_id): + @staticmethod + async def _translate_and_response(text, room_id, msg_id): translation = await services.translate.translate(text) if translation is None: return - self.send_message_if( + + room = client_room_manager.get_room(room_id) + if room is None: + return + + room.send_cmd_data_if( lambda client: client.auto_translate, api.chat.Command.UPDATE_TRANSLATION, - api.chat.make_translation_message( + api.chat.make_translation_message_data( msg_id, translation ) ) - - -class RoomManager: - def __init__(self): - self._rooms: Dict[int, Room] = {} - - async def add_client(self, room_id, client: 'api.chat.ChatHandler'): - if room_id not in self._rooms: - if not await self._add_room(room_id): - client.close() - return - room = self._rooms.get(room_id, None) - if room is None: - return - - room.clients.append(client) - logger.info('%d clients in room %s', len(room.clients), room_id) - if client.auto_translate: - room.auto_translate_count += 1 - - await client.on_join_room() - - def del_client(self, room_id, client: 'api.chat.ChatHandler'): - room = self._rooms.get(room_id, None) - if room is None: - return - - try: - room.clients.remove(client) - except ValueError: - # _add_room未完成,没有执行到room.clients.append - pass - else: - logger.info('%d clients in room %s', len(room.clients), room_id) - if client.auto_translate: - room.auto_translate_count = max(0, room.auto_translate_count - 1) - - if not room.clients: - self._del_room(room_id) - - async def _add_room(self, room_id): - if room_id in self._rooms: - return True - logger.info('Creating room %d', room_id) - self._rooms[room_id] = room = Room(room_id) - if await room.init_room(): - room.start() - logger.info('%d rooms', len(self._rooms)) - return True - else: - self._del_room(room_id) - return False - - def _del_room(self, room_id): - room = self._rooms.get(room_id, None) - if room is None: - return - logger.info('Removing room %d', room_id) - for client in room.clients: - client.close() - asyncio.ensure_future(room.stop_and_close()) - self._rooms.pop(room_id, None) - logger.info('%d rooms', len(self._rooms))