使用LRU缓存、头像优先使用Protobuf协议中的、优化获取头像代码

This commit is contained in:
John Smith 2023-07-30 22:22:47 +08:00
parent aea6f12bd2
commit 8e7d9b266b
4 changed files with 137 additions and 88 deletions

View File

@ -1,4 +1,5 @@
-r blivedm/requirements.txt
cachetools==5.3.1
pycryptodome==3.10.1
sqlalchemy==2.0.19
tornado==6.3.2

View File

@ -8,6 +8,7 @@ import urllib.parse
from typing import *
import aiohttp
import cachetools
import sqlalchemy.exc
import config
@ -21,7 +22,7 @@ logger = logging.getLogger(__name__)
DEFAULT_AVATAR_URL = '//static.hdslb.com/images/member/noface.gif'
# user_id -> avatar_url
_avatar_url_cache: Dict[int, str] = {}
_avatar_url_cache: Optional[cachetools.TTLCache] = None
# 正在获取头像的Futureuser_id -> Future
_uid_fetch_future_map: Dict[int, asyncio.Future] = {}
# 正在获取头像的user_id队列
@ -36,45 +37,110 @@ WBI_KEY_INDEX_TABLE = [
]
# wbi鉴权口令
_wbi_key = ''
# 正在获取wbi_key的Future
_refresh_wbi_key_future: Optional[asyncio.Future] = None
def init():
cfg = config.get_config()
global _uid_queue_to_fetch
global _avatar_url_cache, _uid_queue_to_fetch
_avatar_url_cache = cachetools.TTLCache(cfg.avatar_cache_size, 10 * 60)
_uid_queue_to_fetch = asyncio.Queue(cfg.fetch_avatar_max_queue_size)
asyncio.get_event_loop().create_task(_get_avatar_url_from_web_consumer())
async def get_avatar_url(user_id):
async def get_avatar_url(user_id) -> str:
avatar_url = await get_avatar_url_or_none(user_id)
if avatar_url is None:
avatar_url = DEFAULT_AVATAR_URL
return avatar_url
async def get_avatar_url_or_none(user_id):
async def get_avatar_url_or_none(user_id) -> Optional[str]:
if user_id == 0:
return None
avatar_url = get_avatar_url_from_memory(user_id)
# 查内存
avatar_url = _get_avatar_url_from_memory(user_id)
if avatar_url is not None:
return avatar_url
avatar_url = await get_avatar_url_from_database(user_id)
if avatar_url is not None:
# 查数据库
user = await _get_avatar_url_from_database(user_id)
if user is not None:
avatar_url = user.avatar_url
_update_avatar_cache_in_memory(user_id, avatar_url)
# 如果距离数据库上次更新太久,则在后台从接口获取,并更新所有缓存
if (datetime.datetime.now() - user.update_time).days >= 1:
asyncio.create_task(_refresh_avatar_cache_from_web(user_id))
return avatar_url
return await get_avatar_url_from_web(user_id)
# 从接口获取
avatar_url = await _get_avatar_url_from_web(user_id)
if avatar_url is not None:
update_avatar_cache(user_id, avatar_url)
return avatar_url
return None
def get_avatar_url_from_memory(user_id):
async def _refresh_avatar_cache_from_web(user_id):
avatar_url = await _get_avatar_url_from_web(user_id)
if avatar_url is None:
return
update_avatar_cache(user_id, avatar_url)
def update_avatar_cache(user_id, avatar_url):
_update_avatar_cache_in_memory(user_id, avatar_url)
_update_avatar_cache_in_database(user_id, avatar_url)
def update_avatar_cache_if_expired(user_id, avatar_url):
# 内存缓存过期了才更新,减少写入数据库的频率
if _get_avatar_url_from_memory(user_id) is None:
update_avatar_cache(user_id, avatar_url)
def _get_avatar_url_from_memory(user_id) -> Optional[str]:
return _avatar_url_cache.get(user_id, None)
def get_avatar_url_from_database(user_id) -> Awaitable[Optional[str]]:
def _update_avatar_cache_in_memory(user_id, avatar_url):
_avatar_url_cache[user_id] = avatar_url
def _get_avatar_url_from_database(user_id) -> Awaitable[Optional[bl_models.BilibiliUser]]:
loop = asyncio.get_running_loop()
return loop.run_in_executor(None, _do_get_avatar_url_from_database, user_id, loop)
return loop.run_in_executor(None, _do_get_avatar_url_from_database, user_id)
def _do_get_avatar_url_from_database(user_id, loop: asyncio.AbstractEventLoop):
def _do_get_avatar_url_from_database(user_id) -> Optional[bl_models.BilibiliUser]:
try:
with models.database.get_session() as session:
user: bl_models.BilibiliUser = session.scalars(
sqlalchemy.select(bl_models.BilibiliUser).filter(
bl_models.BilibiliUser.uid == user_id
)
).one_or_none()
if user is None:
return None
return user
except sqlalchemy.exc.OperationalError:
# SQLite会锁整个文件忽略就行
return None
except sqlalchemy.exc.SQLAlchemyError:
logger.exception('_do_get_avatar_url_from_database failed:')
return None
def _update_avatar_cache_in_database(user_id, avatar_url) -> Awaitable[None]:
return asyncio.get_running_loop().run_in_executor(
None, _do_update_avatar_cache_in_database, user_id, avatar_url
)
def _do_update_avatar_cache_in_database(user_id, avatar_url):
try:
with models.database.get_session() as session:
user = session.scalars(
@ -83,29 +149,21 @@ def _do_get_avatar_url_from_database(user_id, loop: asyncio.AbstractEventLoop):
)
).one_or_none()
if user is None:
return None
avatar_url = user.avatar_url
# 如果离上次更新太久就更新所有缓存
if (datetime.datetime.now() - user.update_time).days >= 3:
def refresh_cache():
_avatar_url_cache.pop(user_id, None)
get_avatar_url_from_web(user_id)
loop.call_soon_threadsafe(refresh_cache)
else:
# 否则只更新内存缓存
_update_avatar_cache_in_memory(user_id, avatar_url)
except sqlalchemy.exc.OperationalError:
# SQLite会锁整个文件忽略就行
return None
user = bl_models.BilibiliUser(
uid=user_id
)
session.add(user)
user.avatar_url = avatar_url
user.update_time = datetime.datetime.now()
session.commit()
except (sqlalchemy.exc.OperationalError, sqlalchemy.exc.IntegrityError):
# SQLite会锁整个文件忽略就行。另外还有多线程导致ID重复的问题这里对一致性要求不高就没加for update
pass
except sqlalchemy.exc.SQLAlchemyError:
logger.exception('_do_get_avatar_url_from_database failed:')
return None
return avatar_url
logger.exception('_do_update_avatar_cache_in_database failed:')
def get_avatar_url_from_web(user_id) -> Awaitable[Optional[str]]:
def _get_avatar_url_from_web(user_id) -> Awaitable[Optional[str]]:
# 如果已有正在获取的future则返回防止重复获取同一个uid
future = _uid_fetch_future_map.get(user_id, None)
if future is not None:
@ -139,7 +197,7 @@ async def _get_avatar_url_from_web_consumer():
else:
_last_fetch_banned_time = None
asyncio.create_task(_get_avatar_url_from_web_coroutine(user_id, future))
asyncio.create_task(_get_avatar_url_from_web_wrapper(user_id, future))
# 限制频率防止被B站ban
cfg = config.get_config()
@ -148,7 +206,7 @@ async def _get_avatar_url_from_web_consumer():
logger.exception('_get_avatar_url_from_web_consumer error:')
async def _get_avatar_url_from_web_coroutine(user_id, future):
async def _get_avatar_url_from_web_wrapper(user_id, future):
try:
avatar_url = await _do_get_avatar_url_from_web(user_id)
except BaseException as e:
@ -157,11 +215,12 @@ async def _get_avatar_url_from_web_coroutine(user_id, future):
future.set_result(avatar_url)
async def _do_get_avatar_url_from_web(user_id):
global _wbi_key
async def _do_get_avatar_url_from_web(user_id) -> Optional[str]:
global _wbi_key, _refresh_wbi_key_future
if _wbi_key == '':
# TODO 判断一下是否正在获取
_wbi_key = await _get_wbi_key()
if _refresh_wbi_key_future is None:
_refresh_wbi_key_future = asyncio.create_task(_refresh_wbi_key())
await _refresh_wbi_key_future
try:
async with utils.request.http_session.get(
@ -191,9 +250,15 @@ async def _do_get_avatar_url_from_web(user_id):
_wbi_key = ''
return None
avatar_url = process_avatar_url(data['data']['face'])
update_avatar_cache(user_id, avatar_url)
return avatar_url
return process_avatar_url(data['data']['face'])
async def _refresh_wbi_key():
global _wbi_key, _refresh_wbi_key_future
try:
_wbi_key = await _get_wbi_key()
finally:
_refresh_wbi_key_future = None
async def _get_wbi_key():
@ -265,40 +330,3 @@ def process_avatar_url(avatar_url):
if not avatar_url.endswith('noface.gif'):
avatar_url += '@48w_48h'
return avatar_url
def update_avatar_cache(user_id, avatar_url):
_update_avatar_cache_in_memory(user_id, avatar_url)
asyncio.get_running_loop().run_in_executor(
None, _update_avatar_cache_in_database, user_id, avatar_url
)
def _update_avatar_cache_in_memory(user_id, avatar_url):
_avatar_url_cache[user_id] = avatar_url
cfg = config.get_config()
while len(_avatar_url_cache) > cfg.avatar_cache_size:
_avatar_url_cache.pop(next(iter(_avatar_url_cache)), None)
def _update_avatar_cache_in_database(user_id, avatar_url):
try:
with models.database.get_session() as session:
user = session.scalars(
sqlalchemy.select(bl_models.BilibiliUser).filter(
bl_models.BilibiliUser.uid == user_id
)
).one_or_none()
if user is None:
user = bl_models.BilibiliUser(
uid=user_id
)
session.add(user)
user.avatar_url = avatar_url
user.update_time = datetime.datetime.now()
session.commit()
except (sqlalchemy.exc.OperationalError, sqlalchemy.exc.IntegrityError):
# SQLite会锁整个文件忽略就行另外还有多线程导致ID重复的问题
pass
except sqlalchemy.exc.SQLAlchemyError:
logger.exception('_update_avatar_cache_in_database failed:')

View File

@ -1,11 +1,14 @@
# -*- coding: utf-8 -*-
import asyncio
import base64
import binascii
import logging
import uuid
from typing import *
import api.chat
import blivedm.blivedm as blivedm
import blivedm.blivedm.models.pb as blivedm_pb
import config
import services.avatar
import services.translate
@ -203,6 +206,19 @@ class LiveMsgHandler(blivedm.BaseHandler):
# 重新定义XXX_callback是为了减少对字段名的依赖防止B站改字段名
def __danmu_msg_callback(self, client: LiveClient, command: dict):
info = command['info']
dm_v2 = command.get('dm_v2', '')
proto: Optional[blivedm_pb.SimpleDm] = None
if dm_v2 != '':
try:
proto = blivedm_pb.SimpleDm.loads(base64.b64decode(dm_v2))
except (binascii.Error, KeyError, TypeError, ValueError):
pass
if proto is not None:
face = proto.user.face
else:
face = ''
if len(info[3]) != 0:
medal_level = info[3][0]
medal_room_id = info[3][3]
@ -220,6 +236,7 @@ class LiveMsgHandler(blivedm.BaseHandler):
uid=info[2][0],
uname=info[2][1],
face=face,
admin=info[2][2],
urank=info[2][5],
mobile_verify=info[2][6],
@ -282,8 +299,12 @@ class LiveMsgHandler(blivedm.BaseHandler):
asyncio.create_task(self.__on_danmaku(client, message))
async def __on_danmaku(self, client: LiveClient, message: blivedm.DanmakuMessage):
# 先异步调用再获取房间,因为返回时房间可能已经不存在了
avatar_url = await services.avatar.get_avatar_url(message.uid)
avatar_url = message.face
if avatar_url != '':
services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url)
else:
# 先异步调用再获取房间,因为返回时房间可能已经不存在了
avatar_url = await services.avatar.get_avatar_url(message.uid)
room = client_room_manager.get_room(client.tmp_room_id)
if room is None:
@ -342,8 +363,7 @@ class LiveMsgHandler(blivedm.BaseHandler):
async def _on_gift(self, client: LiveClient, message: blivedm.GiftMessage):
avatar_url = services.avatar.process_avatar_url(message.face)
# 服务器白给的头像URL直接缓存
services.avatar.update_avatar_cache(message.uid, avatar_url)
services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url)
# 丢人
if message.coin_type != 'gold':
@ -385,8 +405,7 @@ class LiveMsgHandler(blivedm.BaseHandler):
async def _on_super_chat(self, client: LiveClient, message: blivedm.SuperChatMessage):
avatar_url = services.avatar.process_avatar_url(message.face)
# 服务器白给的头像URL直接缓存
services.avatar.update_avatar_cache(message.uid, avatar_url)
services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url)
room = client_room_manager.get_room(client.tmp_room_id)
if room is None:

View File

@ -14,6 +14,7 @@ from typing import *
import Crypto.Cipher.AES as cry_aes # noqa
import Crypto.Util.Padding as cry_pad # noqa
import aiohttp
import cachetools
import config
import utils.request
@ -27,12 +28,15 @@ NO_TRANSLATE_TEXTS = {
_translate_providers: List['TranslateProvider'] = []
# text -> res
_translate_cache: Dict[str, str] = {}
_translate_cache: Optional[cachetools.LRUCache] = None
# 正在翻译的Futuretext -> Future
_text_future_map: Dict[str, asyncio.Future] = {}
def init():
cfg = config.get_config()
global _translate_cache
_translate_cache = cachetools.LRUCache(cfg.translation_cache_size)
asyncio.get_event_loop().create_task(_do_init())
@ -140,9 +144,6 @@ def _on_translate_done(key, future):
if res is None:
return
_translate_cache[key] = res
cfg = config.get_config()
while len(_translate_cache) > cfg.translation_cache_size:
_translate_cache.pop(next(iter(_translate_cache)), None)
class TranslateProvider: