移除从用户中心获取头像的接口

This commit is contained in:
John Smith 2023-09-10 00:03:29 +08:00
parent a50461d4e6
commit 72c11d60da

View File

@ -2,10 +2,8 @@
import asyncio
import dataclasses
import datetime
import hashlib
import logging
import re
import urllib.parse
from typing import *
import aiohttp
@ -47,7 +45,6 @@ def init():
async def _do_init():
fetchers = [
UserSpaceAvatarFetcher(5.5),
MedalAnchorAvatarFetcher(3),
UserCardAvatarFetcher(3),
GameUserCenterAvatarFetcher(3),
@ -306,131 +303,6 @@ class AvatarFetcher:
self._on_availability_change()
class UserSpaceAvatarFetcher(AvatarFetcher):
# wbi密码表
WBI_KEY_INDEX_TABLE = [
46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35,
27, 43, 5, 49, 33, 9, 42, 19, 29, 28, 14, 39, 12, 38, 41, 13
]
def __init__(self, query_interval):
super().__init__(query_interval)
# wbi鉴权口令
self._wbi_key = ''
async def _do_fetch(self, user_id) -> Optional[str]:
if self._wbi_key == '':
await self._refresh_wbi_key()
if self._wbi_key == '':
return None
try:
async with utils.request.http_session.get(
'https://api.bilibili.com/x/space/wbi/acc/info',
headers={
**utils.request.BILIBILI_COMMON_HEADERS,
'Origin': 'https://space.bilibili.com',
'Referer': f'https://space.bilibili.com/{user_id}/'
},
params=self._add_wbi_sign({'mid': user_id}),
) as r:
if r.status != 200:
logger.warning(
'UserSpaceAvatarFetcher failed to fetch avatar: status=%d %s uid=%d',
r.status, r.reason, user_id
)
if r.status == 412:
# 被B站ban了
self._cool_down(3 * 60)
await self._refresh_wbi_key()
return None
data = await r.json()
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
return None
code = data['code']
if code != 0:
logger.info(
'UserSpaceAvatarFetcher failed to fetch avatar: code=%d %s uid=%d',
code, data['message'], user_id
)
if code == -401:
# 被B站ban了
self._cool_down(3 * 60)
await self._refresh_wbi_key()
elif code == -403:
# 签名错误
self._wbi_key = ''
await self._refresh_wbi_key()
return None
return process_avatar_url(data['data']['face'])
async def _refresh_wbi_key(self):
wbi_key = await self._get_wbi_key()
if wbi_key != '':
self._wbi_key = wbi_key
async def _get_wbi_key(self):
try:
async with utils.request.http_session.get(
'https://api.bilibili.com/nav',
headers=utils.request.BILIBILI_COMMON_HEADERS,
) as r:
if r.status != 200:
logger.warning('UserSpaceAvatarFetcher failed to get wbi key: status=%d %s', r.status, r.reason)
return ''
data = await r.json()
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('UserSpaceAvatarFetcher failed to get wbi key:')
return ''
try:
wbi_img = data['data']['wbi_img']
img_key = wbi_img['img_url'].rpartition('/')[2].partition('.')[0]
sub_key = wbi_img['sub_url'].rpartition('/')[2].partition('.')[0]
except KeyError:
logger.warning('UserSpaceAvatarFetcher failed to get wbi key: data=%s', data)
return ''
shuffled_key = img_key + sub_key
wbi_key = []
for index in self.WBI_KEY_INDEX_TABLE:
if index < len(shuffled_key):
wbi_key.append(shuffled_key[index])
return ''.join(wbi_key)
def _add_wbi_sign(self, params: dict):
if self._wbi_key == '':
return params
wts = str(int(datetime.datetime.now().timestamp()))
params_to_sign = {**params, 'wts': wts}
# 按key字典序排序
params_to_sign = {
key: params_to_sign[key]
for key in sorted(params_to_sign.keys())
}
# 过滤一些字符
for key, value in params_to_sign.items():
value = ''.join(
ch
for ch in str(value)
if ch not in "!'()*"
)
params_to_sign[key] = value
str_to_sign = urllib.parse.urlencode(params_to_sign) + self._wbi_key
w_rid = hashlib.md5(str_to_sign.encode('utf-8')).hexdigest()
return {
**params,
'wts': wts,
'w_rid': w_rid
}
class MedalAnchorAvatarFetcher(AvatarFetcher):
async def _do_fetch(self, user_id) -> Optional[str]:
try: