From c3566823c19b8e596da5d456f2917a7e8745f7ba Mon Sep 17 00:00:00 2001 From: John Smith Date: Thu, 24 Aug 2023 21:52:51 +0800 Subject: [PATCH] =?UTF-8?q?=E8=8E=B7=E5=8F=96=E5=A4=B4=E5=83=8F=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E5=B0=81=E8=A3=85=E6=88=90=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.py | 2 - data/config.example.ini | 8 +- services/avatar.py | 411 ++++++++++++++++++++++++---------------- 3 files changed, 246 insertions(+), 175 deletions(-) diff --git a/config.py b/config.py index 8dbe3fe..48d7751 100644 --- a/config.py +++ b/config.py @@ -55,7 +55,6 @@ class AppConfig: self.open_browser_at_startup = True self.enable_upload_file = True - self.fetch_avatar_interval = 5.5 self.fetch_avatar_max_queue_size = 1 self.avatar_cache_size = 10000 @@ -86,7 +85,6 @@ class AppConfig: fallback=self.open_browser_at_startup) self.enable_upload_file = app_section.getboolean('enable_upload_file', fallback=self.enable_upload_file) - self.fetch_avatar_interval = app_section.getfloat('fetch_avatar_interval', fallback=self.fetch_avatar_interval) self.fetch_avatar_max_queue_size = app_section.getint('fetch_avatar_max_queue_size', fallback=self.fetch_avatar_max_queue_size) self.avatar_cache_size = app_section.getint('avatar_cache_size', fallback=self.avatar_cache_size) diff --git a/data/config.example.ini b/data/config.example.ini index 2297f0d..3878072 100644 --- a/data/config.example.ini +++ b/data/config.example.ini @@ -24,13 +24,9 @@ open_browser_at_startup = true enable_upload_file = true -# 获取头像间隔时间(秒)。如果小于5秒有很大概率被服务器拉黑 -# Interval between fetching avatars (seconds). At least 3 seconds is recommended -fetch_avatar_interval = 5.5 - -# 获取头像最大队列长度,注意最长等待时间等于 最大队列长度 * 请求间隔时间 +# 获取头像最大队列长度 # Maximum queue length for fetching avatar -fetch_avatar_max_queue_size = 1 +fetch_avatar_max_queue_size = 3 # 内存中头像缓存数量 # Number of avatar caches in memory diff --git a/services/avatar.py b/services/avatar.py index f7bb907..bf779de 100644 --- a/services/avatar.py +++ b/services/avatar.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import asyncio +import dataclasses import datetime import hashlib import logging @@ -21,32 +22,36 @@ logger = logging.getLogger(__name__) DEFAULT_AVATAR_URL = '//static.hdslb.com/images/member/noface.gif' +_avatar_fetchers: List['AvatarFetcher'] = [] # user_id -> avatar_url _avatar_url_cache: Optional[cachetools.TTLCache] = None # 正在获取头像的Future,user_id -> Future _uid_fetch_future_map: Dict[int, asyncio.Future] = {} -# 正在获取头像的user_id队列 -_uid_queue_to_fetch: Optional[asyncio.Queue] = None -# 上次被B站ban时间 -_last_fetch_banned_time: Optional[datetime.datetime] = None +# 正在获取头像的任务队列 +_task_queue: Optional['asyncio.Queue[FetchTask]'] = None -# wbi密码表 -WBI_KEY_INDEX_TABLE = [ - 46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35, - 27, 43, 5, 49, 33, 9, 42, 19, 29, 28, 14, 39, 12, 38, 41, 13 -] -# wbi鉴权口令 -_wbi_key = '' -# 正在获取wbi_key的Future -_refresh_wbi_key_future: Optional[asyncio.Future] = None + +@dataclasses.dataclass +class FetchTask: + user_id: int + future: 'asyncio.Future[Optional[str]]' def init(): cfg = config.get_config() - global _avatar_url_cache, _uid_queue_to_fetch + global _avatar_url_cache, _task_queue _avatar_url_cache = cachetools.TTLCache(cfg.avatar_cache_size, 10 * 60) - _uid_queue_to_fetch = asyncio.Queue(cfg.fetch_avatar_max_queue_size) - asyncio.get_event_loop().create_task(_get_avatar_url_from_web_consumer()) + _task_queue = asyncio.Queue(cfg.fetch_avatar_max_queue_size) + asyncio.get_event_loop().create_task(_do_init()) + + +async def _do_init(): + fetchers = [ + UserSpaceAvatarFetcher(5.5), + ] + await asyncio.gather(*(fetcher.init() for fetcher in fetchers)) + global _avatar_fetchers + _avatar_fetchers = fetchers async def get_avatar_url(user_id) -> str: @@ -102,6 +107,17 @@ def update_avatar_cache_if_expired(user_id, avatar_url): update_avatar_cache(user_id, avatar_url) +def process_avatar_url(avatar_url): + # 去掉协议,兼容HTTP、HTTPS + m = re.fullmatch(r'(?:https?:)?(.*)', avatar_url) + if m is not None: + avatar_url = m[1] + # 缩小图片加快传输 + if not avatar_url.endswith('noface.gif'): + avatar_url += '@48w_48h' + return avatar_url + + def _get_avatar_url_from_memory(user_id) -> Optional[str]: return _avatar_url_cache.get(user_id, None) @@ -169,167 +185,228 @@ def _get_avatar_url_from_web(user_id) -> Awaitable[Optional[str]]: if future is not None: return future # 否则创建一个获取任务 - _uid_fetch_future_map[user_id] = future = asyncio.get_running_loop().create_future() - future.add_done_callback(lambda _future: _uid_fetch_future_map.pop(user_id, None)) - try: - _uid_queue_to_fetch.put_nowait(user_id) - except asyncio.QueueFull: + future = asyncio.get_running_loop().create_future() + + task = FetchTask( + user_id=user_id, + future=future + ) + if not _push_task(task): future.set_result(None) + return future + + _uid_fetch_future_map[user_id] = future + future.add_done_callback(lambda _future: _uid_fetch_future_map.pop(user_id, None)) return future -async def _get_avatar_url_from_web_consumer(): - while True: +def _push_task(task: FetchTask): + if not _has_available_avatar_fetcher(): + return False + + try: + _task_queue.put_nowait(task) + return True + except asyncio.QueueFull: + return False + + +def _pop_task() -> Awaitable[FetchTask]: + return _task_queue.get() + + +def _cancel_all_tasks_if_no_available_avatar_fetcher(): + if _has_available_avatar_fetcher(): + return + + logger.warning('No available avatar fetcher') + while not _task_queue.empty(): + task = _task_queue.get_nowait() + task.future.set_result(None) + + +def _has_available_avatar_fetcher(): + return any(fetcher.is_available for fetcher in _avatar_fetchers) + + +class AvatarFetcher: + def __init__(self, query_interval): + self._query_interval = query_interval + self._be_available_event = asyncio.Event() + self._be_available_event.set() + + self._cool_down_timer_handle = None + + async def init(self): + asyncio.create_task(self._fetch_consumer()) + return True + + @property + def is_available(self): + return self._cool_down_timer_handle is None + + def _on_availability_change(self): + if self.is_available: + self._be_available_event.set() + else: + self._be_available_event.clear() + _cancel_all_tasks_if_no_available_avatar_fetcher() + + async def _fetch_consumer(self): + cls_name = type(self).__name__ + while True: + try: + if not self.is_available: + logger.info('%s waiting to become available', cls_name) + await self._be_available_event.wait() + logger.info('%s became available', cls_name) + + task = await _pop_task() + # 为了简化代码,约定只会在_fetch_wrapper里变成不可用,所以获取task之后这里还是可用的 + assert self.is_available + + start_time = datetime.datetime.now() + await self._fetch_wrapper(task) + cost_time = (datetime.datetime.now() - start_time).total_seconds() + + # 限制频率,防止被B站ban + await asyncio.sleep(self._query_interval - cost_time) + except Exception: # noqa + logger.exception('%s error:', cls_name) + + async def _fetch_wrapper(self, task: FetchTask) -> Optional[str]: try: - user_id = await _uid_queue_to_fetch.get() - future = _uid_fetch_future_map.get(user_id, None) - if future is None: - continue + avatar_url = await self._do_fetch(task.user_id) + except BaseException as e: + task.future.set_exception(e) + return None - # 防止在被ban的时候获取 - global _last_fetch_banned_time - if _last_fetch_banned_time is not None: - cur_time = datetime.datetime.now() - if (cur_time - _last_fetch_banned_time).total_seconds() < 3 * 60 + 3: - # 3分钟以内被ban - future.set_result(None) - continue - else: - _last_fetch_banned_time = None + task.future.set_result(avatar_url) + return avatar_url - asyncio.create_task(_get_avatar_url_from_web_wrapper(user_id, future)) + async def _do_fetch(self, user_id) -> Optional[str]: + raise NotImplementedError - # 限制频率,防止被B站ban - cfg = config.get_config() - await asyncio.sleep(cfg.fetch_avatar_interval) - except Exception: # noqa - logger.exception('_get_avatar_url_from_web_consumer error:') + def _cool_down(self, sleep_time): + if self._cool_down_timer_handle is not None: + return - -async def _get_avatar_url_from_web_wrapper(user_id, future): - try: - avatar_url = await _do_get_avatar_url_from_web(user_id) - except BaseException as e: - future.set_exception(e) - else: - future.set_result(avatar_url) - - -async def _do_get_avatar_url_from_web(user_id) -> Optional[str]: - global _wbi_key, _refresh_wbi_key_future, _last_fetch_banned_time - if _wbi_key == '': - if _refresh_wbi_key_future is None: - _refresh_wbi_key_future = asyncio.create_task(_refresh_wbi_key()) - await _refresh_wbi_key_future - - try: - async with utils.request.http_session.get( - 'https://api.bilibili.com/x/space/wbi/acc/info', - headers={ - **utils.request.BILIBILI_COMMON_HEADERS, - 'Origin': 'https://space.bilibili.com', - 'Referer': f'https://space.bilibili.com/{user_id}/' - }, - params=_add_wbi_sign({'mid': user_id}), - ) as r: - if r.status != 200: - logger.warning('Failed to fetch avatar: status=%d %s uid=%d', r.status, r.reason, user_id) - if r.status == 412: - # 被B站ban了 - _last_fetch_banned_time = datetime.datetime.now() - return None - data = await r.json() - except (aiohttp.ClientConnectionError, asyncio.TimeoutError): - return None - - code = data['code'] - if code != 0: - logger.info('Failed to fetch avatar: code=%d %s uid=%d', code, data['message'], user_id) - if code == -401: - # 被B站ban了 - _last_fetch_banned_time = datetime.datetime.now() - elif code == -403: - # 签名错误 - _wbi_key = '' - return None - - return process_avatar_url(data['data']['face']) - - -async def _refresh_wbi_key(): - global _wbi_key, _refresh_wbi_key_future - try: - _wbi_key = await _get_wbi_key() - finally: - _refresh_wbi_key_future = None - - -async def _get_wbi_key(): - try: - async with utils.request.http_session.get( - 'https://api.bilibili.com/nav', - headers=utils.request.BILIBILI_COMMON_HEADERS, - ) as r: - if r.status != 200: - logger.warning('Failed to get wbi key: status=%d %s', r.status, r.reason) - return '' - data = await r.json() - except (aiohttp.ClientConnectionError, asyncio.TimeoutError): - logger.exception('Failed to get wbi key:') - return '' - - try: - wbi_img = data['data']['wbi_img'] - img_key = wbi_img['img_url'].rpartition('/')[2].partition('.')[0] - sub_key = wbi_img['sub_url'].rpartition('/')[2].partition('.')[0] - except KeyError: - logger.warning('Failed to get wbi key: data=%s', data) - return '' - - shuffled_key = img_key + sub_key - wbi_key = [] - for index in WBI_KEY_INDEX_TABLE: - if index < len(shuffled_key): - wbi_key.append(shuffled_key[index]) - return ''.join(wbi_key) - - -def _add_wbi_sign(params: dict): - if _wbi_key == '': - return params - - wts = str(int(datetime.datetime.now().timestamp())) - params_to_sign = {**params, 'wts': wts} - - # 按key字典序排序 - params_to_sign = { - key: params_to_sign[key] - for key in sorted(params_to_sign.keys()) - } - # 过滤一些字符 - for key, value in params_to_sign.items(): - value = ''.join( - ch - for ch in str(value) - if ch not in "!'()*" + self._cool_down_timer_handle = asyncio.get_running_loop().call_later( + sleep_time, self._on_cool_down_timeout ) - params_to_sign[key] = value + self._on_availability_change() - str_to_sign = urllib.parse.urlencode(params_to_sign) + _wbi_key - w_rid = hashlib.md5(str_to_sign.encode('utf-8')).hexdigest() - return { - **params, - 'wts': wts, - 'w_rid': w_rid - } + def _on_cool_down_timeout(self): + self._cool_down_timer_handle = None + self._on_availability_change() -def process_avatar_url(avatar_url): - # 去掉协议,兼容HTTP、HTTPS - m = re.fullmatch(r'(?:https?:)?(.*)', avatar_url) - if m is not None: - avatar_url = m[1] - # 缩小图片加快传输 - if not avatar_url.endswith('noface.gif'): - avatar_url += '@48w_48h' - return avatar_url +class UserSpaceAvatarFetcher(AvatarFetcher): + # wbi密码表 + WBI_KEY_INDEX_TABLE = [ + 46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35, + 27, 43, 5, 49, 33, 9, 42, 19, 29, 28, 14, 39, 12, 38, 41, 13 + ] + + def __init__(self, query_interval): + super().__init__(query_interval) + + # wbi鉴权口令 + self._wbi_key = '' + + async def _do_fetch(self, user_id) -> Optional[str]: + if self._wbi_key == '': + self._wbi_key = await self._get_wbi_key() + if self._wbi_key == '': + return None + + try: + async with utils.request.http_session.get( + 'https://api.bilibili.com/x/space/wbi/acc/info', + headers={ + **utils.request.BILIBILI_COMMON_HEADERS, + 'Origin': 'https://space.bilibili.com', + 'Referer': f'https://space.bilibili.com/{user_id}/' + }, + params=self._add_wbi_sign({'mid': user_id}), + ) as r: + if r.status != 200: + logger.warning('Failed to fetch avatar: status=%d %s uid=%d', r.status, r.reason, user_id) + if r.status == 412: + # 被B站ban了 + self._cool_down(3 * 60) + return None + data = await r.json() + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): + return None + + code = data['code'] + if code != 0: + logger.info('Failed to fetch avatar: code=%d %s uid=%d', code, data['message'], user_id) + if code == -401: + # 被B站ban了 + self._cool_down(3 * 60) + elif code == -403: + # 签名错误 + self._wbi_key = '' + return None + + return process_avatar_url(data['data']['face']) + + async def _get_wbi_key(self): + try: + async with utils.request.http_session.get( + 'https://api.bilibili.com/nav', + headers=utils.request.BILIBILI_COMMON_HEADERS, + ) as r: + if r.status != 200: + logger.warning('Failed to get wbi key: status=%d %s', r.status, r.reason) + return '' + data = await r.json() + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): + logger.exception('Failed to get wbi key:') + return '' + + try: + wbi_img = data['data']['wbi_img'] + img_key = wbi_img['img_url'].rpartition('/')[2].partition('.')[0] + sub_key = wbi_img['sub_url'].rpartition('/')[2].partition('.')[0] + except KeyError: + logger.warning('Failed to get wbi key: data=%s', data) + return '' + + shuffled_key = img_key + sub_key + wbi_key = [] + for index in self.WBI_KEY_INDEX_TABLE: + if index < len(shuffled_key): + wbi_key.append(shuffled_key[index]) + return ''.join(wbi_key) + + def _add_wbi_sign(self, params: dict): + if self._wbi_key == '': + return params + + wts = str(int(datetime.datetime.now().timestamp())) + params_to_sign = {**params, 'wts': wts} + + # 按key字典序排序 + params_to_sign = { + key: params_to_sign[key] + for key in sorted(params_to_sign.keys()) + } + # 过滤一些字符 + for key, value in params_to_sign.items(): + value = ''.join( + ch + for ch in str(value) + if ch not in "!'()*" + ) + params_to_sign[key] = value + + str_to_sign = urllib.parse.urlencode(params_to_sign) + self._wbi_key + w_rid = hashlib.md5(str_to_sign.encode('utf-8')).hexdigest() + return { + **params, + 'wts': wts, + 'w_rid': w_rid + }