mirror of
https://github.com/xfgryujk/blivechat.git
synced 2025-04-03 16:10:35 +08:00
防止队列中出现相同uid时重复获取头像
This commit is contained in:
parent
00e1f31f84
commit
49cdc56cff
@ -21,9 +21,12 @@ _main_event_loop = asyncio.get_event_loop()
|
|||||||
_http_session = aiohttp.ClientSession()
|
_http_session = aiohttp.ClientSession()
|
||||||
# user_id -> avatar_url
|
# user_id -> avatar_url
|
||||||
_avatar_url_cache: Dict[int, str] = {}
|
_avatar_url_cache: Dict[int, str] = {}
|
||||||
# (user_id, future)
|
# 正在获取头像的Future,user_id -> Future
|
||||||
_fetch_task_queue = asyncio.Queue(15)
|
_uid_fetch_future_map: Dict[int, asyncio.Future] = {}
|
||||||
_last_fetch_failed_time: Optional[datetime.datetime] = None
|
# 正在获取头像的user_id队列
|
||||||
|
_uid_queue_to_fetch = asyncio.Queue(15)
|
||||||
|
# 上次被B站ban时间
|
||||||
|
_last_fetch_banned_time: Optional[datetime.datetime] = None
|
||||||
|
|
||||||
|
|
||||||
def init():
|
def init():
|
||||||
@ -64,10 +67,7 @@ def _do_get_avatar_url_from_database(user_id):
|
|||||||
# 如果离上次更新太久就更新所有缓存
|
# 如果离上次更新太久就更新所有缓存
|
||||||
if (datetime.datetime.now() - user.update_time).days >= 3:
|
if (datetime.datetime.now() - user.update_time).days >= 3:
|
||||||
def refresh_cache():
|
def refresh_cache():
|
||||||
try:
|
_avatar_url_cache.pop(user_id, None)
|
||||||
del _avatar_url_cache[user_id]
|
|
||||||
except KeyError:
|
|
||||||
pass
|
|
||||||
get_avatar_url_from_web(user_id)
|
get_avatar_url_from_web(user_id)
|
||||||
|
|
||||||
_main_event_loop.call_soon(refresh_cache)
|
_main_event_loop.call_soon(refresh_cache)
|
||||||
@ -84,9 +84,15 @@ def _do_get_avatar_url_from_database(user_id):
|
|||||||
|
|
||||||
|
|
||||||
def get_avatar_url_from_web(user_id) -> Awaitable[Optional[str]]:
|
def get_avatar_url_from_web(user_id) -> Awaitable[Optional[str]]:
|
||||||
future = _main_event_loop.create_future()
|
# 如果已有正在获取的future则返回,防止重复获取同一个uid
|
||||||
|
future = _uid_fetch_future_map.get(user_id, None)
|
||||||
|
if future is not None:
|
||||||
|
return future
|
||||||
|
# 否则创建一个获取任务
|
||||||
|
_uid_fetch_future_map[user_id] = future = _main_event_loop.create_future()
|
||||||
|
future.add_done_callback(lambda _future: _uid_fetch_future_map.pop(user_id, None))
|
||||||
try:
|
try:
|
||||||
_fetch_task_queue.put_nowait((user_id, future))
|
_uid_queue_to_fetch.put_nowait(user_id)
|
||||||
except asyncio.QueueFull:
|
except asyncio.QueueFull:
|
||||||
future.set_result(None)
|
future.set_result(None)
|
||||||
return future
|
return future
|
||||||
@ -95,24 +101,21 @@ def get_avatar_url_from_web(user_id) -> Awaitable[Optional[str]]:
|
|||||||
async def _get_avatar_url_from_web_consumer():
|
async def _get_avatar_url_from_web_consumer():
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
user_id, future = await _fetch_task_queue.get()
|
user_id = await _uid_queue_to_fetch.get()
|
||||||
|
future = _uid_fetch_future_map.get(user_id, None)
|
||||||
# 先查缓存,防止队列中出现相同uid时重复获取
|
if future is None:
|
||||||
avatar_url = get_avatar_url_from_memory(user_id)
|
|
||||||
if avatar_url is not None:
|
|
||||||
future.set_result(avatar_url)
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# 防止在被ban的时候获取
|
# 防止在被ban的时候获取
|
||||||
global _last_fetch_failed_time
|
global _last_fetch_banned_time
|
||||||
if _last_fetch_failed_time is not None:
|
if _last_fetch_banned_time is not None:
|
||||||
cur_time = datetime.datetime.now()
|
cur_time = datetime.datetime.now()
|
||||||
if (cur_time - _last_fetch_failed_time).total_seconds() < 3 * 60 + 3:
|
if (cur_time - _last_fetch_banned_time).total_seconds() < 3 * 60 + 3:
|
||||||
# 3分钟以内被ban,解封大约要15分钟
|
# 3分钟以内被ban,解封大约要15分钟
|
||||||
future.set_result(None)
|
future.set_result(None)
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
_last_fetch_failed_time = None
|
_last_fetch_banned_time = None
|
||||||
|
|
||||||
asyncio.ensure_future(_get_avatar_url_from_web_coroutine(user_id, future))
|
asyncio.ensure_future(_get_avatar_url_from_web_coroutine(user_id, future))
|
||||||
|
|
||||||
@ -127,8 +130,8 @@ async def _get_avatar_url_from_web_coroutine(user_id, future):
|
|||||||
avatar_url = await _do_get_avatar_url_from_web(user_id)
|
avatar_url = await _do_get_avatar_url_from_web(user_id)
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
future.set_exception(e)
|
future.set_exception(e)
|
||||||
return
|
else:
|
||||||
future.set_result(avatar_url)
|
future.set_result(avatar_url)
|
||||||
|
|
||||||
|
|
||||||
async def _do_get_avatar_url_from_web(user_id):
|
async def _do_get_avatar_url_from_web(user_id):
|
||||||
@ -139,8 +142,8 @@ async def _do_get_avatar_url_from_web(user_id):
|
|||||||
logger.warning('Failed to fetch avatar: status=%d %s uid=%d', r.status, r.reason, user_id)
|
logger.warning('Failed to fetch avatar: status=%d %s uid=%d', r.status, r.reason, user_id)
|
||||||
if r.status == 412:
|
if r.status == 412:
|
||||||
# 被B站ban了
|
# 被B站ban了
|
||||||
global _last_fetch_failed_time
|
global _last_fetch_banned_time
|
||||||
_last_fetch_failed_time = datetime.datetime.now()
|
_last_fetch_banned_time = datetime.datetime.now()
|
||||||
return None
|
return None
|
||||||
data = await r.json()
|
data = await r.json()
|
||||||
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
||||||
|
Loading…
Reference in New Issue
Block a user