diff --git a/api/base.py b/api/base.py index 3d092ce..81e1e72 100644 --- a/api/base.py +++ b/api/base.py @@ -1,12 +1,10 @@ # -*- coding: utf-8 -*- - import json import tornado.web -# noinspection PyAbstractClass -class ApiHandler(tornado.web.RequestHandler): +class ApiHandler(tornado.web.RequestHandler): # noqa def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.json_args = None diff --git a/api/chat.py b/api/chat.py index 86dcbea..ea0406d 100644 --- a/api/chat.py +++ b/api/chat.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- - import asyncio import enum import json @@ -7,17 +6,17 @@ import logging import random import time import uuid -from typing import * import aiohttp import tornado.websocket import api.base -import blivedm.blivedm as blivedm import blivedm.blivedm.client as blivedm_client import config -import models.avatar -import models.translate +import services.avatar +import services.chat +import services.translate +import utils.request logger = logging.getLogger(__name__) @@ -38,264 +37,6 @@ class ContentType(enum.IntEnum): EMOTICON = 1 -_http_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) - -room_manager: Optional['RoomManager'] = None - - -def init(): - global room_manager - room_manager = RoomManager() - - -class Room(blivedm.BLiveClient, blivedm.BaseHandler): - HEARTBEAT_INTERVAL = 10 - - # 重新定义XXX_callback是为了减少对字段名的依赖,防止B站改字段名 - def __danmu_msg_callback(self, client: blivedm.BLiveClient, command: dict): - info = command['info'] - if len(info[3]) != 0: - medal_level = info[3][0] - medal_room_id = info[3][3] - else: - medal_level = 0 - medal_room_id = 0 - - message = blivedm.DanmakuMessage( - timestamp=info[0][4], - msg_type=info[0][9], - dm_type=info[0][12], - emoticon_options=info[0][13], - - msg=info[1], - - uid=info[2][0], - uname=info[2][1], - admin=info[2][2], - urank=info[2][5], - mobile_verify=info[2][6], - - medal_level=medal_level, - medal_room_id=medal_room_id, - - user_level=info[4][0], - - privilege_type=info[7], - ) - return self._on_danmaku(client, message) - - def __send_gift_callback(self, client: blivedm.BLiveClient, command: dict): - data = command['data'] - message = blivedm.GiftMessage( - gift_name=data['giftName'], - num=data['num'], - uname=data['uname'], - face=data['face'], - uid=data['uid'], - timestamp=data['timestamp'], - coin_type=data['coin_type'], - total_coin=data['total_coin'], - ) - return self._on_gift(client, message) - - def __guard_buy_callback(self, client: blivedm.BLiveClient, command: dict): - data = command['data'] - message = blivedm.GuardBuyMessage( - uid=data['uid'], - username=data['username'], - guard_level=data['guard_level'], - start_time=data['start_time'], - ) - return self._on_buy_guard(client, message) - - def __super_chat_message_callback(self, client: blivedm.BLiveClient, command: dict): - data = command['data'] - message = blivedm.SuperChatMessage( - price=data['price'], - message=data['message'], - start_time=data['start_time'], - id_=data['id'], - uid=data['uid'], - uname=data['user_info']['uname'], - face=data['user_info']['face'], - ) - return self._on_super_chat(client, message) - - _CMD_CALLBACK_DICT = { - **blivedm.BaseHandler._CMD_CALLBACK_DICT, - 'DANMU_MSG': __danmu_msg_callback, - 'SEND_GIFT': __send_gift_callback, - 'GUARD_BUY': __guard_buy_callback, - 'SUPER_CHAT_MESSAGE': __super_chat_message_callback - } - - def __init__(self, room_id): - super().__init__(room_id, session=_http_session, heartbeat_interval=self.HEARTBEAT_INTERVAL) - self.add_handler(self) - self.clients: List['ChatHandler'] = [] - self.auto_translate_count = 0 - - async def init_room(self): - await super().init_room() - return True - - def send_message(self, cmd, data): - body = json.dumps({'cmd': cmd, 'data': data}) - for client in self.clients: - try: - client.write_message(body) - except tornado.websocket.WebSocketClosedError: - room_manager.del_client(self.room_id, client) - - def send_message_if(self, can_send_func: Callable[['ChatHandler'], bool], cmd, data): - body = json.dumps({'cmd': cmd, 'data': data}) - for client in filter(can_send_func, self.clients): - try: - client.write_message(body) - except tornado.websocket.WebSocketClosedError: - room_manager.del_client(self.room_id, client) - - async def _on_danmaku(self, client: blivedm.BLiveClient, message: blivedm.DanmakuMessage): - asyncio.ensure_future(self.__on_danmaku(message)) - - async def __on_danmaku(self, message: blivedm.DanmakuMessage): - if message.uid == self.room_owner_uid: - author_type = 3 # 主播 - elif message.admin: - author_type = 2 # 房管 - elif message.privilege_type != 0: # 1总督,2提督,3舰长 - author_type = 1 # 舰队 - else: - author_type = 0 - - if message.dm_type == 1: - content_type = ContentType.EMOTICON - content_type_params = make_emoticon_params( - message.emoticon_options_dict['url'], - ) - else: - content_type = ContentType.TEXT - content_type_params = None - - need_translate = self._need_translate(message.msg) - if need_translate: - translation = models.translate.get_translation_from_cache(message.msg) - if translation is None: - # 没有缓存,需要后面异步翻译后通知 - translation = '' - else: - need_translate = False - else: - translation = '' - - id_ = uuid.uuid4().hex - # 为了节省带宽用list而不是dict - self.send_message(Command.ADD_TEXT, make_text_message( - avatar_url=await models.avatar.get_avatar_url(message.uid), - timestamp=int(message.timestamp / 1000), - author_name=message.uname, - author_type=author_type, - content=message.msg, - privilege_type=message.privilege_type, - is_gift_danmaku=message.msg_type, - author_level=message.user_level, - is_newbie=message.urank < 10000, - is_mobile_verified=message.mobile_verify, - medal_level=0 if message.medal_room_id != self.room_id else message.medal_level, - id_=id_, - translation=translation, - content_type=content_type, - content_type_params=content_type_params, - )) - - if need_translate: - await self._translate_and_response(message.msg, id_) - - async def _on_gift(self, client: blivedm.BLiveClient, message: blivedm.GiftMessage): - avatar_url = models.avatar.process_avatar_url(message.face) - models.avatar.update_avatar_cache(message.uid, avatar_url) - if message.coin_type != 'gold': # 丢人 - return - id_ = uuid.uuid4().hex - self.send_message(Command.ADD_GIFT, { - 'id': id_, - 'avatarUrl': avatar_url, - 'timestamp': message.timestamp, - 'authorName': message.uname, - 'totalCoin': message.total_coin, - 'giftName': message.gift_name, - 'num': message.num - }) - - async def _on_buy_guard(self, client: blivedm.BLiveClient, message: blivedm.GuardBuyMessage): - asyncio.ensure_future(self.__on_buy_guard(message)) - - async def __on_buy_guard(self, message: blivedm.GuardBuyMessage): - id_ = uuid.uuid4().hex - self.send_message(Command.ADD_MEMBER, { - 'id': id_, - 'avatarUrl': await models.avatar.get_avatar_url(message.uid), - 'timestamp': message.start_time, - 'authorName': message.username, - 'privilegeType': message.guard_level - }) - - async def _on_super_chat(self, client: blivedm.BLiveClient, message: blivedm.SuperChatMessage): - avatar_url = models.avatar.process_avatar_url(message.face) - models.avatar.update_avatar_cache(message.uid, avatar_url) - - need_translate = self._need_translate(message.message) - if need_translate: - translation = models.translate.get_translation_from_cache(message.message) - if translation is None: - # 没有缓存,需要后面异步翻译后通知 - translation = '' - else: - need_translate = False - else: - translation = '' - - id_ = str(message.id) - self.send_message(Command.ADD_SUPER_CHAT, { - 'id': id_, - 'avatarUrl': avatar_url, - 'timestamp': message.start_time, - 'authorName': message.uname, - 'price': message.price, - 'content': message.message, - 'translation': translation - }) - - if need_translate: - asyncio.ensure_future(self._translate_and_response(message.message, id_)) - - async def _on_super_chat_delete(self, client: blivedm.BLiveClient, message: blivedm.SuperChatDeleteMessage): - self.send_message(Command.ADD_SUPER_CHAT, { - 'ids': list(map(str, message.ids)) - }) - - def _need_translate(self, text): - cfg = config.get_config() - return ( - cfg.enable_translate - and (not cfg.allow_translate_rooms or self.room_id in cfg.allow_translate_rooms) - and self.auto_translate_count > 0 - and models.translate.need_translate(text) - ) - - async def _translate_and_response(self, text, msg_id): - translation = await models.translate.translate(text) - if translation is None: - return - self.send_message_if( - lambda client: client.auto_translate, - Command.UPDATE_TRANSLATION, make_translation_message( - msg_id, - translation - ) - ) - - def make_text_message(avatar_url, timestamp, author_name, author_type, content, privilege_type, is_gift_danmaku, author_level, is_newbie, is_mobile_verified, medal_level, id_, translation, content_type, content_type_params): @@ -349,71 +90,7 @@ def make_translation_message(msg_id, translation): ] -class RoomManager: - def __init__(self): - self._rooms: Dict[int, Room] = {} - - async def add_client(self, room_id, client: 'ChatHandler'): - if room_id not in self._rooms: - if not await self._add_room(room_id): - client.close() - return - room = self._rooms.get(room_id, None) - if room is None: - return - - room.clients.append(client) - logger.info('%d clients in room %s', len(room.clients), room_id) - if client.auto_translate: - room.auto_translate_count += 1 - - await client.on_join_room() - - def del_client(self, room_id, client: 'ChatHandler'): - room = self._rooms.get(room_id, None) - if room is None: - return - - try: - room.clients.remove(client) - except ValueError: - # _add_room未完成,没有执行到room.clients.append - pass - else: - logger.info('%d clients in room %s', len(room.clients), room_id) - if client.auto_translate: - room.auto_translate_count = max(0, room.auto_translate_count - 1) - - if not room.clients: - self._del_room(room_id) - - async def _add_room(self, room_id): - if room_id in self._rooms: - return True - logger.info('Creating room %d', room_id) - self._rooms[room_id] = room = Room(room_id) - if await room.init_room(): - room.start() - logger.info('%d rooms', len(self._rooms)) - return True - else: - self._del_room(room_id) - return False - - def _del_room(self, room_id): - room = self._rooms.get(room_id, None) - if room is None: - return - logger.info('Removing room %d', room_id) - for client in room.clients: - client.close() - asyncio.ensure_future(room.stop_and_close()) - self._rooms.pop(room_id, None) - logger.info('%d rooms', len(self._rooms)) - - -# noinspection PyAbstractClass -class ChatHandler(tornado.websocket.WebSocketHandler): +class ChatHandler(tornado.websocket.WebSocketHandler): # noqa HEARTBEAT_INTERVAL = 10 RECEIVE_TIMEOUT = HEARTBEAT_INTERVAL + 5 @@ -453,7 +130,7 @@ class ChatHandler(tornado.websocket.WebSocketHandler): def on_close(self): logger.info('Websocket disconnected %s room: %s', self.request.remote_ip, str(self.room_id)) if self.has_joined_room: - room_manager.del_client(self.room_id, self) + services.chat.room_manager.del_client(self.room_id, self) if self._heartbeat_timer_handle is not None: self._heartbeat_timer_handle.cancel() self._heartbeat_timer_handle = None @@ -484,10 +161,10 @@ class ChatHandler(tornado.websocket.WebSocketHandler): except KeyError: pass - asyncio.ensure_future(room_manager.add_client(self.room_id, self)) + asyncio.ensure_future(services.chat.room_manager.add_client(self.room_id, self)) else: logger.warning('Unknown cmd, client: %s, cmd: %d, body: %s', self.request.remote_ip, cmd, body) - except Exception: + except Exception: # noqa logger.exception('on_message error, client: %s, message: %s', self.request.remote_ip, message) # 跨域测试用 @@ -516,7 +193,7 @@ class ChatHandler(tornado.websocket.WebSocketHandler): cfg = config.get_config() if cfg.allow_translate_rooms and self.room_id not in cfg.allow_translate_rooms: self.send_message(Command.ADD_TEXT, make_text_message( - avatar_url=models.avatar.DEFAULT_AVATAR_URL, + avatar_url=services.avatar.DEFAULT_AVATAR_URL, timestamp=int(time.time()), author_name='blivechat', author_type=2, @@ -536,7 +213,7 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # 测试用 async def send_test_message(self): base_data = { - 'avatarUrl': await models.avatar.get_avatar_url(300474), + 'avatarUrl': await services.avatar.get_avatar_url(300474), 'timestamp': int(time.time()), 'authorName': 'xfgryujk', } @@ -596,8 +273,7 @@ class ChatHandler(tornado.websocket.WebSocketHandler): self.send_message(Command.ADD_GIFT, gift_data) -# noinspection PyAbstractClass -class RoomInfoHandler(api.base.ApiHandler): +class RoomInfoHandler(api.base.ApiHandler): # noqa _host_server_list_cache = blivedm_client.DEFAULT_DANMAKU_SERVER_LIST async def get(self): @@ -620,8 +296,9 @@ class RoomInfoHandler(api.base.ApiHandler): @staticmethod async def _get_room_info(room_id): try: - async with _http_session.get(blivedm_client.ROOM_INIT_URL, params={'room_id': room_id} - ) as res: + async with utils.request.http_session.get( + blivedm_client.ROOM_INIT_URL, params={'room_id': room_id} + ) as res: if res.status != 200: logger.warning('room %d _get_room_info failed: %d %s', room_id, res.status, res.reason) @@ -668,13 +345,12 @@ class RoomInfoHandler(api.base.ApiHandler): # return host_server_list -# noinspection PyAbstractClass -class AvatarHandler(api.base.ApiHandler): +class AvatarHandler(api.base.ApiHandler): # noqa async def get(self): uid = int(self.get_query_argument('uid')) - avatar_url = await models.avatar.get_avatar_url_or_none(uid) + avatar_url = await services.avatar.get_avatar_url_or_none(uid) if avatar_url is None: - avatar_url = models.avatar.DEFAULT_AVATAR_URL + avatar_url = services.avatar.DEFAULT_AVATAR_URL # 缓存3分钟 self.set_header('Cache-Control', 'private, max-age=180') else: diff --git a/api/main.py b/api/main.py index 114a340..1a91da8 100644 --- a/api/main.py +++ b/api/main.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- - import tornado.web import api.base @@ -7,8 +6,7 @@ import config import update -# noinspection PyAbstractClass -class MainHandler(tornado.web.StaticFileHandler): +class MainHandler(tornado.web.StaticFileHandler): # noqa """为了使用Vue Router的history模式,把不存在的文件请求转发到index.html""" async def get(self, path, include_body=True): try: @@ -20,8 +18,7 @@ class MainHandler(tornado.web.StaticFileHandler): await super().get('index.html', include_body) -# noinspection PyAbstractClass -class ServerInfoHandler(api.base.ApiHandler): +class ServerInfoHandler(api.base.ApiHandler): # noqa async def get(self): cfg = config.get_config() self.write({ diff --git a/config.py b/config.py index cbb77a7..5a6378c 100644 --- a/config.py +++ b/config.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- - import configparser import logging import os @@ -66,7 +65,7 @@ class AppConfig: self._load_app_config(config) self._load_translator_configs(config) - except Exception: + except Exception: # noqa logger.exception('Failed to load config:') return False return True diff --git a/main.py b/main.py index 3ca0e51..659f860 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- - import argparse import logging import logging.handlers @@ -12,9 +11,10 @@ import tornado.web import api.chat import api.main import config -import models.avatar import models.database -import models.translate +import services.avatar +import services.chat +import services.translate import update logger = logging.getLogger(__name__) @@ -39,9 +39,9 @@ def main(): init_logging(args.debug) config.init() models.database.init(args.debug) - models.avatar.init() - models.translate.init() - api.chat.init() + services.avatar.init() + services.translate.init() + services.chat.init() update.check_update() run_server(args.host, args.port, args.debug) @@ -60,7 +60,6 @@ def init_logging(debug): file_handler = logging.handlers.TimedRotatingFileHandler( LOG_FILE_NAME, encoding='utf-8', when='midnight', backupCount=7, delay=True ) - # noinspection PyArgumentList logging.basicConfig( format='{asctime} {levelname} [{name}]: {message}', datefmt='%Y-%m-%d %H:%M:%S', diff --git a/models/bilibili.py b/models/bilibili.py new file mode 100644 index 0000000..28a38e4 --- /dev/null +++ b/models/bilibili.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- +import sqlalchemy + +import models.database + + +class BilibiliUser(models.database.OrmBase): + __tablename__ = 'bilibili_users' + uid = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) + avatar_url = sqlalchemy.Column(sqlalchemy.String(100)) + update_time = sqlalchemy.Column(sqlalchemy.DateTime) diff --git a/models/database.py b/models/database.py index 42228c0..ab358a6 100644 --- a/models/database.py +++ b/models/database.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- - import contextlib from typing import * @@ -9,23 +8,23 @@ import sqlalchemy.orm import config OrmBase = sqlalchemy.ext.declarative.declarative_base() -engine = None -DbSession: Optional[Type[sqlalchemy.orm.Session]] = None +_engine = None +_DbSession: Optional[Type[sqlalchemy.orm.Session]] = None def init(_debug): cfg = config.get_config() - global engine, DbSession + global _engine, _DbSession # engine = sqlalchemy.create_engine(cfg.database_url, echo=debug) - engine = sqlalchemy.create_engine(cfg.database_url) - DbSession = sqlalchemy.orm.sessionmaker(bind=engine) + _engine = sqlalchemy.create_engine(cfg.database_url) + _DbSession = sqlalchemy.orm.sessionmaker(bind=_engine) - OrmBase.metadata.create_all(engine) + OrmBase.metadata.create_all(_engine) @contextlib.contextmanager -def get_session(): - session = DbSession() +def get_session() -> ContextManager[sqlalchemy.orm.Session]: + session = _DbSession() try: yield session except BaseException: diff --git a/models/avatar.py b/services/avatar.py similarity index 86% rename from models/avatar.py rename to services/avatar.py index 7865860..9ce1117 100644 --- a/models/avatar.py +++ b/services/avatar.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- - import asyncio import datetime import logging @@ -11,7 +10,9 @@ import sqlalchemy import sqlalchemy.exc import config +import models.bilibili as bl_models import models.database +import utils.request logger = logging.getLogger(__name__) @@ -19,7 +20,6 @@ logger = logging.getLogger(__name__) DEFAULT_AVATAR_URL = '//static.hdslb.com/images/member/noface.gif' _main_event_loop = asyncio.get_event_loop() -_http_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) # user_id -> avatar_url _avatar_url_cache: Dict[int, str] = {} # 正在获取头像的Future,user_id -> Future @@ -67,7 +67,9 @@ def get_avatar_url_from_database(user_id) -> Awaitable[Optional[str]]: def _do_get_avatar_url_from_database(user_id): try: with models.database.get_session() as session: - user = session.query(BilibiliUser).filter(BilibiliUser.uid == user_id).one_or_none() + user = session.query(bl_models.BilibiliUser).filter( + bl_models.BilibiliUser.uid == user_id + ).one_or_none() if user is None: return None avatar_url = user.avatar_url @@ -130,7 +132,7 @@ async def _get_avatar_url_from_web_consumer(): # 限制频率,防止被B站ban cfg = config.get_config() await asyncio.sleep(cfg.fetch_avatar_interval) - except Exception: + except Exception: # noqa logger.exception('_get_avatar_url_from_web_consumer error:') @@ -145,8 +147,9 @@ async def _get_avatar_url_from_web_coroutine(user_id, future): async def _do_get_avatar_url_from_web(user_id): try: - async with _http_session.get('https://api.bilibili.com/x/space/acc/info', - params={'mid': user_id}) as r: + async with utils.request.http_session.get( + 'https://api.bilibili.com/x/space/acc/info', params={'mid': user_id} + ) as r: if r.status != 200: logger.warning('Failed to fetch avatar: status=%d %s uid=%d', r.status, r.reason, user_id) if r.status == 412: @@ -191,24 +194,19 @@ def _update_avatar_cache_in_memory(user_id, avatar_url): def _update_avatar_cache_in_database(user_id, avatar_url): try: with models.database.get_session() as session: - user = session.query(BilibiliUser).filter(BilibiliUser.uid == user_id).one_or_none() + user = session.query(bl_models.BilibiliUser).filter( + bl_models.BilibiliUser.uid == user_id + ).one_or_none() if user is None: - user = BilibiliUser(uid=user_id, avatar_url=avatar_url, - update_time=datetime.datetime.now()) + user = bl_models.BilibiliUser( + uid=user_id + ) session.add(user) - else: - user.avatar_url = avatar_url - user.update_time = datetime.datetime.now() + user.avatar_url = avatar_url + user.update_time = datetime.datetime.now() session.commit() except (sqlalchemy.exc.OperationalError, sqlalchemy.exc.IntegrityError): # SQLite会锁整个文件,忽略就行,另外还有多线程导致ID重复的问题 pass except sqlalchemy.exc.SQLAlchemyError: logger.exception('_update_avatar_cache_in_database failed:') - - -class BilibiliUser(models.database.OrmBase): - __tablename__ = 'bilibili_users' - uid = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) - avatar_url = sqlalchemy.Column(sqlalchemy.String(100)) - update_time = sqlalchemy.Column(sqlalchemy.DateTime) diff --git a/services/chat.py b/services/chat.py new file mode 100644 index 0000000..11a9db5 --- /dev/null +++ b/services/chat.py @@ -0,0 +1,336 @@ +# -*- coding: utf-8 -*- +import asyncio +import json +import logging +import uuid +from typing import * + +import tornado.websocket + +import api.chat +import blivedm.blivedm as blivedm +import config +import services.avatar +import services.translate +import utils.request + +logger = logging.getLogger(__name__) + +room_manager: Optional['RoomManager'] = None + + +def init(): + global room_manager + room_manager = RoomManager() + + +class Room(blivedm.BLiveClient, blivedm.BaseHandler): + HEARTBEAT_INTERVAL = 10 + + # 重新定义XXX_callback是为了减少对字段名的依赖,防止B站改字段名 + def __danmu_msg_callback(self, client: blivedm.BLiveClient, command: dict): + info = command['info'] + if len(info[3]) != 0: + medal_level = info[3][0] + medal_room_id = info[3][3] + else: + medal_level = 0 + medal_room_id = 0 + + message = blivedm.DanmakuMessage( + timestamp=info[0][4], + msg_type=info[0][9], + dm_type=info[0][12], + emoticon_options=info[0][13], + + msg=info[1], + + uid=info[2][0], + uname=info[2][1], + admin=info[2][2], + urank=info[2][5], + mobile_verify=info[2][6], + + medal_level=medal_level, + medal_room_id=medal_room_id, + + user_level=info[4][0], + + privilege_type=info[7], + ) + return self._on_danmaku(client, message) + + def __send_gift_callback(self, client: blivedm.BLiveClient, command: dict): + data = command['data'] + message = blivedm.GiftMessage( + gift_name=data['giftName'], + num=data['num'], + uname=data['uname'], + face=data['face'], + uid=data['uid'], + timestamp=data['timestamp'], + coin_type=data['coin_type'], + total_coin=data['total_coin'], + ) + return self._on_gift(client, message) + + def __guard_buy_callback(self, client: blivedm.BLiveClient, command: dict): + data = command['data'] + message = blivedm.GuardBuyMessage( + uid=data['uid'], + username=data['username'], + guard_level=data['guard_level'], + start_time=data['start_time'], + ) + return self._on_buy_guard(client, message) + + def __super_chat_message_callback(self, client: blivedm.BLiveClient, command: dict): + data = command['data'] + message = blivedm.SuperChatMessage( + price=data['price'], + message=data['message'], + start_time=data['start_time'], + id_=data['id'], + uid=data['uid'], + uname=data['user_info']['uname'], + face=data['user_info']['face'], + ) + return self._on_super_chat(client, message) + + _CMD_CALLBACK_DICT = { + **blivedm.BaseHandler._CMD_CALLBACK_DICT, + 'DANMU_MSG': __danmu_msg_callback, + 'SEND_GIFT': __send_gift_callback, + 'GUARD_BUY': __guard_buy_callback, + 'SUPER_CHAT_MESSAGE': __super_chat_message_callback + } + + def __init__(self, room_id): + super().__init__(room_id, session=utils.request.http_session, heartbeat_interval=self.HEARTBEAT_INTERVAL) + self.add_handler(self) + self.clients: List[api.chat.ChatHandler] = [] + self.auto_translate_count = 0 + + async def init_room(self): + await super().init_room() + return True + + def send_message(self, cmd, data): + body = json.dumps({'cmd': cmd, 'data': data}) + for client in self.clients: + try: + client.write_message(body) + except tornado.websocket.WebSocketClosedError: + room_manager.del_client(self.room_id, client) + + def send_message_if(self, can_send_func: Callable[['api.chat.ChatHandler'], bool], cmd, data): + body = json.dumps({'cmd': cmd, 'data': data}) + for client in filter(can_send_func, self.clients): + try: + client.write_message(body) + except tornado.websocket.WebSocketClosedError: + room_manager.del_client(self.room_id, client) + + async def _on_danmaku(self, client: blivedm.BLiveClient, message: blivedm.DanmakuMessage): + asyncio.ensure_future(self.__on_danmaku(message)) + + async def __on_danmaku(self, message: blivedm.DanmakuMessage): + if message.uid == self.room_owner_uid: + author_type = 3 # 主播 + elif message.admin: + author_type = 2 # 房管 + elif message.privilege_type != 0: # 1总督,2提督,3舰长 + author_type = 1 # 舰队 + else: + author_type = 0 + + if message.dm_type == 1: + content_type = api.chat.ContentType.EMOTICON + content_type_params = api.chat.make_emoticon_params( + message.emoticon_options_dict['url'], + ) + else: + content_type = api.chat.ContentType.TEXT + content_type_params = None + + need_translate = self._need_translate(message.msg) + if need_translate: + translation = services.translate.get_translation_from_cache(message.msg) + if translation is None: + # 没有缓存,需要后面异步翻译后通知 + translation = '' + else: + need_translate = False + else: + translation = '' + + id_ = uuid.uuid4().hex + # 为了节省带宽用list而不是dict + self.send_message(api.chat.Command.ADD_TEXT, api.chat.make_text_message( + avatar_url=await services.avatar.get_avatar_url(message.uid), + timestamp=int(message.timestamp / 1000), + author_name=message.uname, + author_type=author_type, + content=message.msg, + privilege_type=message.privilege_type, + is_gift_danmaku=message.msg_type, + author_level=message.user_level, + is_newbie=message.urank < 10000, + is_mobile_verified=message.mobile_verify, + medal_level=0 if message.medal_room_id != self.room_id else message.medal_level, + id_=id_, + translation=translation, + content_type=content_type, + content_type_params=content_type_params, + )) + + if need_translate: + await self._translate_and_response(message.msg, id_) + + async def _on_gift(self, client: blivedm.BLiveClient, message: blivedm.GiftMessage): + avatar_url = services.avatar.process_avatar_url(message.face) + services.avatar.update_avatar_cache(message.uid, avatar_url) + if message.coin_type != 'gold': # 丢人 + return + id_ = uuid.uuid4().hex + self.send_message(api.chat.Command.ADD_GIFT, { + 'id': id_, + 'avatarUrl': avatar_url, + 'timestamp': message.timestamp, + 'authorName': message.uname, + 'totalCoin': message.total_coin, + 'giftName': message.gift_name, + 'num': message.num + }) + + async def _on_buy_guard(self, client: blivedm.BLiveClient, message: blivedm.GuardBuyMessage): + asyncio.ensure_future(self.__on_buy_guard(message)) + + async def __on_buy_guard(self, message: blivedm.GuardBuyMessage): + id_ = uuid.uuid4().hex + self.send_message(api.chat.Command.ADD_MEMBER, { + 'id': id_, + 'avatarUrl': await services.avatar.get_avatar_url(message.uid), + 'timestamp': message.start_time, + 'authorName': message.username, + 'privilegeType': message.guard_level + }) + + async def _on_super_chat(self, client: blivedm.BLiveClient, message: blivedm.SuperChatMessage): + avatar_url = services.avatar.process_avatar_url(message.face) + services.avatar.update_avatar_cache(message.uid, avatar_url) + + need_translate = self._need_translate(message.message) + if need_translate: + translation = services.translate.get_translation_from_cache(message.message) + if translation is None: + # 没有缓存,需要后面异步翻译后通知 + translation = '' + else: + need_translate = False + else: + translation = '' + + id_ = str(message.id) + self.send_message(api.chat.Command.ADD_SUPER_CHAT, { + 'id': id_, + 'avatarUrl': avatar_url, + 'timestamp': message.start_time, + 'authorName': message.uname, + 'price': message.price, + 'content': message.message, + 'translation': translation + }) + + if need_translate: + asyncio.ensure_future(self._translate_and_response(message.message, id_)) + + async def _on_super_chat_delete(self, client: blivedm.BLiveClient, message: blivedm.SuperChatDeleteMessage): + self.send_message(api.chat.Command.ADD_SUPER_CHAT, { + 'ids': list(map(str, message.ids)) + }) + + def _need_translate(self, text): + cfg = config.get_config() + return ( + cfg.enable_translate + and (not cfg.allow_translate_rooms or self.room_id in cfg.allow_translate_rooms) + and self.auto_translate_count > 0 + and services.translate.need_translate(text) + ) + + async def _translate_and_response(self, text, msg_id): + translation = await services.translate.translate(text) + if translation is None: + return + self.send_message_if( + lambda client: client.auto_translate, + api.chat.Command.UPDATE_TRANSLATION, + api.chat.make_translation_message( + msg_id, + translation + ) + ) + + +class RoomManager: + def __init__(self): + self._rooms: Dict[int, Room] = {} + + async def add_client(self, room_id, client: 'api.chat.ChatHandler'): + if room_id not in self._rooms: + if not await self._add_room(room_id): + client.close() + return + room = self._rooms.get(room_id, None) + if room is None: + return + + room.clients.append(client) + logger.info('%d clients in room %s', len(room.clients), room_id) + if client.auto_translate: + room.auto_translate_count += 1 + + await client.on_join_room() + + def del_client(self, room_id, client: 'api.chat.ChatHandler'): + room = self._rooms.get(room_id, None) + if room is None: + return + + try: + room.clients.remove(client) + except ValueError: + # _add_room未完成,没有执行到room.clients.append + pass + else: + logger.info('%d clients in room %s', len(room.clients), room_id) + if client.auto_translate: + room.auto_translate_count = max(0, room.auto_translate_count - 1) + + if not room.clients: + self._del_room(room_id) + + async def _add_room(self, room_id): + if room_id in self._rooms: + return True + logger.info('Creating room %d', room_id) + self._rooms[room_id] = room = Room(room_id) + if await room.init_room(): + room.start() + logger.info('%d rooms', len(self._rooms)) + return True + else: + self._del_room(room_id) + return False + + def _del_room(self, room_id): + room = self._rooms.get(room_id, None) + if room is None: + return + logger.info('Removing room %d', room_id) + for client in room.clients: + client.close() + asyncio.ensure_future(room.stop_and_close()) + self._rooms.pop(room_id, None) + logger.info('%d rooms', len(self._rooms)) diff --git a/models/translate.py b/services/translate.py similarity index 96% rename from models/translate.py rename to services/translate.py index 060e361..38d7cfc 100644 --- a/models/translate.py +++ b/services/translate.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- - import asyncio import base64 import datetime @@ -12,11 +11,12 @@ import random import re from typing import * -import Crypto.Cipher.AES as cry_aes -import Crypto.Util.Padding as cry_pad +import Crypto.Cipher.AES as cry_aes # noqa +import Crypto.Util.Padding as cry_pad # noqa import aiohttp import config +import utils.request logger = logging.getLogger(__name__) @@ -26,7 +26,6 @@ NO_TRANSLATE_TEXTS = { } _main_event_loop = asyncio.get_event_loop() -_http_session: Optional[aiohttp.ClientSession] = None _translate_providers: List['TranslateProvider'] = [] # text -> res _translate_cache: Dict[str, str] = {} @@ -39,9 +38,6 @@ def init(): async def _do_init(): - global _http_session - _http_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) - cfg = config.get_config() if not cfg.enable_translate: return @@ -142,7 +138,7 @@ def _on_translate_done(key, future): # 缓存 try: res = future.result() - except Exception: + except Exception: # noqa return if res is None: return @@ -199,7 +195,7 @@ class FlowControlTranslateProvider(TranslateProvider): asyncio.ensure_future(self._translate_coroutine(text, future)) # 频率限制 await asyncio.sleep(self._query_interval) - except Exception: + except Exception: # noqa logger.exception('FlowControlTranslateProvider error:') async def _translate_coroutine(self, text, future): @@ -238,7 +234,7 @@ class TencentTranslateFree(FlowControlTranslateProvider): async def _do_init(self): try: - async with _http_session.get('https://fanyi.qq.com/') as r: + async with utils.request.http_session.get('https://fanyi.qq.com/') as r: if r.status != 200: logger.warning('TencentTranslateFree init request failed: status=%d %s', r.status, r.reason) return False @@ -280,7 +276,7 @@ class TencentTranslateFree(FlowControlTranslateProvider): # 获取token try: - async with _http_session.post('https://fanyi.qq.com/api/' + reauthuri) as r: + async with utils.request.http_session.post('https://fanyi.qq.com/api/' + reauthuri) as r: if r.status != 200: logger.warning('TencentTranslateFree init request failed: reauthuri=%s, status=%d %s', reauthuri, r.status, r.reason) @@ -333,7 +329,7 @@ class TencentTranslateFree(FlowControlTranslateProvider): async def _do_translate(self, text): try: - async with _http_session.post( + async with utils.request.http_session.post( 'https://fanyi.qq.com/api/translate', headers={ 'Referer': 'https://fanyi.qq.com/', @@ -427,7 +423,7 @@ class BilibiliTranslateFree(FlowControlTranslateProvider): async def _do_translate(self, text): try: - async with _http_session.get( + async with utils.request.http_session.get( 'https://api.live.bilibili.com/av/v1/SuperChat/messageTranslate', params={ 'room_id': '21396545', @@ -527,7 +523,7 @@ class TencentTranslate(FlowControlTranslateProvider): 'X-TC-Region': self._region } - return _http_session.post('https://tmt.tencentcloudapi.com/', headers=headers, data=body_bytes) + return utils.request.http_session.post('https://tmt.tencentcloudapi.com/', headers=headers, data=body_bytes) def _on_fail(self, code): if self._cool_down_timer_handle is not None: @@ -575,7 +571,7 @@ class BaiduTranslate(FlowControlTranslateProvider): async def _do_translate(self, text): try: - async with _http_session.post( + async with utils.request.http_session.post( 'https://fanyi-api.baidu.com/api/trans/vip/translate', data=self._add_sign({ 'q': text, diff --git a/update.py b/update.py index 86b9f31..ff7d180 100644 --- a/update.py +++ b/update.py @@ -1,9 +1,10 @@ # -*- coding: utf-8 -*- - import asyncio import aiohttp +import utils.request + VERSION = 'v1.5.3' @@ -13,15 +14,16 @@ def check_update(): async def _do_check_update(): try: - async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session: - async with session.get('https://api.github.com/repos/xfgryujk/blivechat/releases/latest') as r: - data = await r.json() - if data['name'] != VERSION: - print('---------------------------------------------') - print('New version available:', data['name']) - print(data['body']) - print('Download:', data['html_url']) - print('---------------------------------------------') + async with utils.request.http_session.get( + 'https://api.github.com/repos/xfgryujk/blivechat/releases/latest' + ) as r: + data = await r.json() + if data['name'] != VERSION: + print('---------------------------------------------') + print('New version available:', data['name']) + print(data['body']) + print('Download:', data['html_url']) + print('---------------------------------------------') except aiohttp.ClientConnectionError: print('Failed to check update: connection failed') except asyncio.TimeoutError: diff --git a/utils/request.py b/utils/request.py new file mode 100644 index 0000000..d2fe098 --- /dev/null +++ b/utils/request.py @@ -0,0 +1,4 @@ +# -*- coding: utf-8 -*- +import aiohttp + +http_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))