From 49cdc56cff5d8918047882583e5b31ee850a8810 Mon Sep 17 00:00:00 2001 From: John Smith Date: Wed, 5 Feb 2020 12:58:26 +0800 Subject: [PATCH] =?UTF-8?q?=E9=98=B2=E6=AD=A2=E9=98=9F=E5=88=97=E4=B8=AD?= =?UTF-8?q?=E5=87=BA=E7=8E=B0=E7=9B=B8=E5=90=8Cuid=E6=97=B6=E9=87=8D?= =?UTF-8?q?=E5=A4=8D=E8=8E=B7=E5=8F=96=E5=A4=B4=E5=83=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- models/avatar.py | 49 +++++++++++++++++++++++++----------------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/models/avatar.py b/models/avatar.py index 68937c1..95ae866 100644 --- a/models/avatar.py +++ b/models/avatar.py @@ -21,9 +21,12 @@ _main_event_loop = asyncio.get_event_loop() _http_session = aiohttp.ClientSession() # user_id -> avatar_url _avatar_url_cache: Dict[int, str] = {} -# (user_id, future) -_fetch_task_queue = asyncio.Queue(15) -_last_fetch_failed_time: Optional[datetime.datetime] = None +# 正在获取头像的Future,user_id -> Future +_uid_fetch_future_map: Dict[int, asyncio.Future] = {} +# 正在获取头像的user_id队列 +_uid_queue_to_fetch = asyncio.Queue(15) +# 上次被B站ban时间 +_last_fetch_banned_time: Optional[datetime.datetime] = None def init(): @@ -64,10 +67,7 @@ def _do_get_avatar_url_from_database(user_id): # 如果离上次更新太久就更新所有缓存 if (datetime.datetime.now() - user.update_time).days >= 3: def refresh_cache(): - try: - del _avatar_url_cache[user_id] - except KeyError: - pass + _avatar_url_cache.pop(user_id, None) get_avatar_url_from_web(user_id) _main_event_loop.call_soon(refresh_cache) @@ -84,9 +84,15 @@ def _do_get_avatar_url_from_database(user_id): def get_avatar_url_from_web(user_id) -> Awaitable[Optional[str]]: - future = _main_event_loop.create_future() + # 如果已有正在获取的future则返回,防止重复获取同一个uid + future = _uid_fetch_future_map.get(user_id, None) + if future is not None: + return future + # 否则创建一个获取任务 + _uid_fetch_future_map[user_id] = future = _main_event_loop.create_future() + future.add_done_callback(lambda _future: _uid_fetch_future_map.pop(user_id, None)) try: - _fetch_task_queue.put_nowait((user_id, future)) + _uid_queue_to_fetch.put_nowait(user_id) except asyncio.QueueFull: future.set_result(None) return future @@ -95,24 +101,21 @@ def get_avatar_url_from_web(user_id) -> Awaitable[Optional[str]]: async def _get_avatar_url_from_web_consumer(): while True: try: - user_id, future = await _fetch_task_queue.get() - - # 先查缓存,防止队列中出现相同uid时重复获取 - avatar_url = get_avatar_url_from_memory(user_id) - if avatar_url is not None: - future.set_result(avatar_url) + user_id = await _uid_queue_to_fetch.get() + future = _uid_fetch_future_map.get(user_id, None) + if future is None: continue # 防止在被ban的时候获取 - global _last_fetch_failed_time - if _last_fetch_failed_time is not None: + global _last_fetch_banned_time + if _last_fetch_banned_time is not None: cur_time = datetime.datetime.now() - if (cur_time - _last_fetch_failed_time).total_seconds() < 3 * 60 + 3: + if (cur_time - _last_fetch_banned_time).total_seconds() < 3 * 60 + 3: # 3分钟以内被ban,解封大约要15分钟 future.set_result(None) continue else: - _last_fetch_failed_time = None + _last_fetch_banned_time = None asyncio.ensure_future(_get_avatar_url_from_web_coroutine(user_id, future)) @@ -127,8 +130,8 @@ async def _get_avatar_url_from_web_coroutine(user_id, future): avatar_url = await _do_get_avatar_url_from_web(user_id) except BaseException as e: future.set_exception(e) - return - future.set_result(avatar_url) + else: + future.set_result(avatar_url) async def _do_get_avatar_url_from_web(user_id): @@ -139,8 +142,8 @@ async def _do_get_avatar_url_from_web(user_id): logger.warning('Failed to fetch avatar: status=%d %s uid=%d', r.status, r.reason, user_id) if r.status == 412: # 被B站ban了 - global _last_fetch_failed_time - _last_fetch_failed_time = datetime.datetime.now() + global _last_fetch_banned_time + _last_fetch_banned_time = datetime.datetime.now() return None data = await r.json() except (aiohttp.ClientConnectionError, asyncio.TimeoutError):