From 8e7d9b266bf2ee1eac5c6b5e9d097cceb91cb3ae Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 30 Jul 2023 22:22:47 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8LRU=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E3=80=81=E5=A4=B4=E5=83=8F=E4=BC=98=E5=85=88=E4=BD=BF=E7=94=A8?= =?UTF-8?q?Protobuf=E5=8D=8F=E8=AE=AE=E4=B8=AD=E7=9A=84=E3=80=81=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E8=8E=B7=E5=8F=96=E5=A4=B4=E5=83=8F=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- requirements.txt | 1 + services/avatar.py | 184 ++++++++++++++++++++++++------------------ services/chat.py | 31 +++++-- services/translate.py | 9 ++- 4 files changed, 137 insertions(+), 88 deletions(-) diff --git a/requirements.txt b/requirements.txt index 8290d85..0704c4c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ -r blivedm/requirements.txt +cachetools==5.3.1 pycryptodome==3.10.1 sqlalchemy==2.0.19 tornado==6.3.2 diff --git a/services/avatar.py b/services/avatar.py index 64eeaf5..c071e7a 100644 --- a/services/avatar.py +++ b/services/avatar.py @@ -8,6 +8,7 @@ import urllib.parse from typing import * import aiohttp +import cachetools import sqlalchemy.exc import config @@ -21,7 +22,7 @@ logger = logging.getLogger(__name__) DEFAULT_AVATAR_URL = '//static.hdslb.com/images/member/noface.gif' # user_id -> avatar_url -_avatar_url_cache: Dict[int, str] = {} +_avatar_url_cache: Optional[cachetools.TTLCache] = None # 正在获取头像的Future,user_id -> Future _uid_fetch_future_map: Dict[int, asyncio.Future] = {} # 正在获取头像的user_id队列 @@ -36,45 +37,110 @@ WBI_KEY_INDEX_TABLE = [ ] # wbi鉴权口令 _wbi_key = '' +# 正在获取wbi_key的Future +_refresh_wbi_key_future: Optional[asyncio.Future] = None def init(): cfg = config.get_config() - global _uid_queue_to_fetch + global _avatar_url_cache, _uid_queue_to_fetch + _avatar_url_cache = cachetools.TTLCache(cfg.avatar_cache_size, 10 * 60) _uid_queue_to_fetch = asyncio.Queue(cfg.fetch_avatar_max_queue_size) asyncio.get_event_loop().create_task(_get_avatar_url_from_web_consumer()) -async def get_avatar_url(user_id): +async def get_avatar_url(user_id) -> str: avatar_url = await get_avatar_url_or_none(user_id) if avatar_url is None: avatar_url = DEFAULT_AVATAR_URL return avatar_url -async def get_avatar_url_or_none(user_id): +async def get_avatar_url_or_none(user_id) -> Optional[str]: if user_id == 0: return None - avatar_url = get_avatar_url_from_memory(user_id) + # 查内存 + avatar_url = _get_avatar_url_from_memory(user_id) if avatar_url is not None: return avatar_url - avatar_url = await get_avatar_url_from_database(user_id) - if avatar_url is not None: + + # 查数据库 + user = await _get_avatar_url_from_database(user_id) + if user is not None: + avatar_url = user.avatar_url + _update_avatar_cache_in_memory(user_id, avatar_url) + # 如果距离数据库上次更新太久,则在后台从接口获取,并更新所有缓存 + if (datetime.datetime.now() - user.update_time).days >= 1: + asyncio.create_task(_refresh_avatar_cache_from_web(user_id)) return avatar_url - return await get_avatar_url_from_web(user_id) + + # 从接口获取 + avatar_url = await _get_avatar_url_from_web(user_id) + if avatar_url is not None: + update_avatar_cache(user_id, avatar_url) + return avatar_url + + return None -def get_avatar_url_from_memory(user_id): +async def _refresh_avatar_cache_from_web(user_id): + avatar_url = await _get_avatar_url_from_web(user_id) + if avatar_url is None: + return + update_avatar_cache(user_id, avatar_url) + + +def update_avatar_cache(user_id, avatar_url): + _update_avatar_cache_in_memory(user_id, avatar_url) + _update_avatar_cache_in_database(user_id, avatar_url) + + +def update_avatar_cache_if_expired(user_id, avatar_url): + # 内存缓存过期了才更新,减少写入数据库的频率 + if _get_avatar_url_from_memory(user_id) is None: + update_avatar_cache(user_id, avatar_url) + + +def _get_avatar_url_from_memory(user_id) -> Optional[str]: return _avatar_url_cache.get(user_id, None) -def get_avatar_url_from_database(user_id) -> Awaitable[Optional[str]]: +def _update_avatar_cache_in_memory(user_id, avatar_url): + _avatar_url_cache[user_id] = avatar_url + + +def _get_avatar_url_from_database(user_id) -> Awaitable[Optional[bl_models.BilibiliUser]]: loop = asyncio.get_running_loop() - return loop.run_in_executor(None, _do_get_avatar_url_from_database, user_id, loop) + return loop.run_in_executor(None, _do_get_avatar_url_from_database, user_id) -def _do_get_avatar_url_from_database(user_id, loop: asyncio.AbstractEventLoop): +def _do_get_avatar_url_from_database(user_id) -> Optional[bl_models.BilibiliUser]: + try: + with models.database.get_session() as session: + user: bl_models.BilibiliUser = session.scalars( + sqlalchemy.select(bl_models.BilibiliUser).filter( + bl_models.BilibiliUser.uid == user_id + ) + ).one_or_none() + if user is None: + return None + return user + except sqlalchemy.exc.OperationalError: + # SQLite会锁整个文件,忽略就行 + return None + except sqlalchemy.exc.SQLAlchemyError: + logger.exception('_do_get_avatar_url_from_database failed:') + return None + + +def _update_avatar_cache_in_database(user_id, avatar_url) -> Awaitable[None]: + return asyncio.get_running_loop().run_in_executor( + None, _do_update_avatar_cache_in_database, user_id, avatar_url + ) + + +def _do_update_avatar_cache_in_database(user_id, avatar_url): try: with models.database.get_session() as session: user = session.scalars( @@ -83,29 +149,21 @@ def _do_get_avatar_url_from_database(user_id, loop: asyncio.AbstractEventLoop): ) ).one_or_none() if user is None: - return None - avatar_url = user.avatar_url - - # 如果离上次更新太久就更新所有缓存 - if (datetime.datetime.now() - user.update_time).days >= 3: - def refresh_cache(): - _avatar_url_cache.pop(user_id, None) - get_avatar_url_from_web(user_id) - - loop.call_soon_threadsafe(refresh_cache) - else: - # 否则只更新内存缓存 - _update_avatar_cache_in_memory(user_id, avatar_url) - except sqlalchemy.exc.OperationalError: - # SQLite会锁整个文件,忽略就行 - return None + user = bl_models.BilibiliUser( + uid=user_id + ) + session.add(user) + user.avatar_url = avatar_url + user.update_time = datetime.datetime.now() + session.commit() + except (sqlalchemy.exc.OperationalError, sqlalchemy.exc.IntegrityError): + # SQLite会锁整个文件,忽略就行。另外还有多线程导致ID重复的问题,这里对一致性要求不高就没加for update + pass except sqlalchemy.exc.SQLAlchemyError: - logger.exception('_do_get_avatar_url_from_database failed:') - return None - return avatar_url + logger.exception('_do_update_avatar_cache_in_database failed:') -def get_avatar_url_from_web(user_id) -> Awaitable[Optional[str]]: +def _get_avatar_url_from_web(user_id) -> Awaitable[Optional[str]]: # 如果已有正在获取的future则返回,防止重复获取同一个uid future = _uid_fetch_future_map.get(user_id, None) if future is not None: @@ -139,7 +197,7 @@ async def _get_avatar_url_from_web_consumer(): else: _last_fetch_banned_time = None - asyncio.create_task(_get_avatar_url_from_web_coroutine(user_id, future)) + asyncio.create_task(_get_avatar_url_from_web_wrapper(user_id, future)) # 限制频率,防止被B站ban cfg = config.get_config() @@ -148,7 +206,7 @@ async def _get_avatar_url_from_web_consumer(): logger.exception('_get_avatar_url_from_web_consumer error:') -async def _get_avatar_url_from_web_coroutine(user_id, future): +async def _get_avatar_url_from_web_wrapper(user_id, future): try: avatar_url = await _do_get_avatar_url_from_web(user_id) except BaseException as e: @@ -157,11 +215,12 @@ async def _get_avatar_url_from_web_coroutine(user_id, future): future.set_result(avatar_url) -async def _do_get_avatar_url_from_web(user_id): - global _wbi_key +async def _do_get_avatar_url_from_web(user_id) -> Optional[str]: + global _wbi_key, _refresh_wbi_key_future if _wbi_key == '': - # TODO 判断一下是否正在获取 - _wbi_key = await _get_wbi_key() + if _refresh_wbi_key_future is None: + _refresh_wbi_key_future = asyncio.create_task(_refresh_wbi_key()) + await _refresh_wbi_key_future try: async with utils.request.http_session.get( @@ -191,9 +250,15 @@ async def _do_get_avatar_url_from_web(user_id): _wbi_key = '' return None - avatar_url = process_avatar_url(data['data']['face']) - update_avatar_cache(user_id, avatar_url) - return avatar_url + return process_avatar_url(data['data']['face']) + + +async def _refresh_wbi_key(): + global _wbi_key, _refresh_wbi_key_future + try: + _wbi_key = await _get_wbi_key() + finally: + _refresh_wbi_key_future = None async def _get_wbi_key(): @@ -265,40 +330,3 @@ def process_avatar_url(avatar_url): if not avatar_url.endswith('noface.gif'): avatar_url += '@48w_48h' return avatar_url - - -def update_avatar_cache(user_id, avatar_url): - _update_avatar_cache_in_memory(user_id, avatar_url) - asyncio.get_running_loop().run_in_executor( - None, _update_avatar_cache_in_database, user_id, avatar_url - ) - - -def _update_avatar_cache_in_memory(user_id, avatar_url): - _avatar_url_cache[user_id] = avatar_url - cfg = config.get_config() - while len(_avatar_url_cache) > cfg.avatar_cache_size: - _avatar_url_cache.pop(next(iter(_avatar_url_cache)), None) - - -def _update_avatar_cache_in_database(user_id, avatar_url): - try: - with models.database.get_session() as session: - user = session.scalars( - sqlalchemy.select(bl_models.BilibiliUser).filter( - bl_models.BilibiliUser.uid == user_id - ) - ).one_or_none() - if user is None: - user = bl_models.BilibiliUser( - uid=user_id - ) - session.add(user) - user.avatar_url = avatar_url - user.update_time = datetime.datetime.now() - session.commit() - except (sqlalchemy.exc.OperationalError, sqlalchemy.exc.IntegrityError): - # SQLite会锁整个文件,忽略就行,另外还有多线程导致ID重复的问题 - pass - except sqlalchemy.exc.SQLAlchemyError: - logger.exception('_update_avatar_cache_in_database failed:') diff --git a/services/chat.py b/services/chat.py index 4915c8c..5d6a5c3 100644 --- a/services/chat.py +++ b/services/chat.py @@ -1,11 +1,14 @@ # -*- coding: utf-8 -*- import asyncio +import base64 +import binascii import logging import uuid from typing import * import api.chat import blivedm.blivedm as blivedm +import blivedm.blivedm.models.pb as blivedm_pb import config import services.avatar import services.translate @@ -203,6 +206,19 @@ class LiveMsgHandler(blivedm.BaseHandler): # 重新定义XXX_callback是为了减少对字段名的依赖,防止B站改字段名 def __danmu_msg_callback(self, client: LiveClient, command: dict): info = command['info'] + dm_v2 = command.get('dm_v2', '') + + proto: Optional[blivedm_pb.SimpleDm] = None + if dm_v2 != '': + try: + proto = blivedm_pb.SimpleDm.loads(base64.b64decode(dm_v2)) + except (binascii.Error, KeyError, TypeError, ValueError): + pass + if proto is not None: + face = proto.user.face + else: + face = '' + if len(info[3]) != 0: medal_level = info[3][0] medal_room_id = info[3][3] @@ -220,6 +236,7 @@ class LiveMsgHandler(blivedm.BaseHandler): uid=info[2][0], uname=info[2][1], + face=face, admin=info[2][2], urank=info[2][5], mobile_verify=info[2][6], @@ -282,8 +299,12 @@ class LiveMsgHandler(blivedm.BaseHandler): asyncio.create_task(self.__on_danmaku(client, message)) async def __on_danmaku(self, client: LiveClient, message: blivedm.DanmakuMessage): - # 先异步调用再获取房间,因为返回时房间可能已经不存在了 - avatar_url = await services.avatar.get_avatar_url(message.uid) + avatar_url = message.face + if avatar_url != '': + services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url) + else: + # 先异步调用再获取房间,因为返回时房间可能已经不存在了 + avatar_url = await services.avatar.get_avatar_url(message.uid) room = client_room_manager.get_room(client.tmp_room_id) if room is None: @@ -342,8 +363,7 @@ class LiveMsgHandler(blivedm.BaseHandler): async def _on_gift(self, client: LiveClient, message: blivedm.GiftMessage): avatar_url = services.avatar.process_avatar_url(message.face) - # 服务器白给的头像URL,直接缓存 - services.avatar.update_avatar_cache(message.uid, avatar_url) + services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url) # 丢人 if message.coin_type != 'gold': @@ -385,8 +405,7 @@ class LiveMsgHandler(blivedm.BaseHandler): async def _on_super_chat(self, client: LiveClient, message: blivedm.SuperChatMessage): avatar_url = services.avatar.process_avatar_url(message.face) - # 服务器白给的头像URL,直接缓存 - services.avatar.update_avatar_cache(message.uid, avatar_url) + services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url) room = client_room_manager.get_room(client.tmp_room_id) if room is None: diff --git a/services/translate.py b/services/translate.py index 5366678..bd514d2 100644 --- a/services/translate.py +++ b/services/translate.py @@ -14,6 +14,7 @@ from typing import * import Crypto.Cipher.AES as cry_aes # noqa import Crypto.Util.Padding as cry_pad # noqa import aiohttp +import cachetools import config import utils.request @@ -27,12 +28,15 @@ NO_TRANSLATE_TEXTS = { _translate_providers: List['TranslateProvider'] = [] # text -> res -_translate_cache: Dict[str, str] = {} +_translate_cache: Optional[cachetools.LRUCache] = None # 正在翻译的Future,text -> Future _text_future_map: Dict[str, asyncio.Future] = {} def init(): + cfg = config.get_config() + global _translate_cache + _translate_cache = cachetools.LRUCache(cfg.translation_cache_size) asyncio.get_event_loop().create_task(_do_init()) @@ -140,9 +144,6 @@ def _on_translate_done(key, future): if res is None: return _translate_cache[key] = res - cfg = config.get_config() - while len(_translate_cache) > cfg.translation_cache_size: - _translate_cache.pop(next(iter(_translate_cache)), None) class TranslateProvider: