mirror of
https://github.com/xfgryujk/blivechat.git
synced 2024-12-26 21:00:15 +08:00
538 lines
18 KiB
Python
538 lines
18 KiB
Python
# -*- coding: utf-8 -*-
|
||
|
||
import asyncio
|
||
import datetime
|
||
import functools
|
||
import hashlib
|
||
import hmac
|
||
import json
|
||
import logging
|
||
import random
|
||
import re
|
||
from typing import *
|
||
|
||
import aiohttp
|
||
|
||
import config
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
NO_TRANSLATE_TEXTS = {
|
||
'草', '草草', '草草草', '草生', '大草原', '上手', '上手上手', '理解', '理解理解', '天才', '天才天才',
|
||
'强', '余裕', '余裕余裕', '大丈夫', '再放送', '放送事故', '清楚', '清楚清楚'
|
||
}
|
||
|
||
_main_event_loop = asyncio.get_event_loop()
|
||
_http_session: Optional[aiohttp.ClientSession] = None
|
||
_translate_providers: List['TranslateProvider'] = []
|
||
# text -> res
|
||
_translate_cache: Dict[str, str] = {}
|
||
# 正在翻译的Future,text -> Future
|
||
_text_future_map: Dict[str, asyncio.Future] = {}
|
||
|
||
|
||
def init():
|
||
asyncio.ensure_future(_do_init())
|
||
|
||
|
||
async def _do_init():
|
||
global _http_session
|
||
_http_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
|
||
|
||
cfg = config.get_config()
|
||
if not cfg.enable_translate:
|
||
return
|
||
providers = []
|
||
for trans_cfg in cfg.translator_configs:
|
||
provider = create_translate_provider(trans_cfg)
|
||
if provider is not None:
|
||
providers.append(provider)
|
||
await asyncio.gather(*(provider.init() for provider in providers))
|
||
global _translate_providers
|
||
_translate_providers = providers
|
||
|
||
|
||
def create_translate_provider(cfg):
|
||
type_ = cfg['type']
|
||
if type_ == 'TencentTranslateFree':
|
||
return TencentTranslateFree(
|
||
cfg['query_interval'], cfg['max_queue_size'], cfg['source_language'],
|
||
cfg['target_language']
|
||
)
|
||
elif type_ == 'BilibiliTranslateFree':
|
||
return BilibiliTranslateFree(cfg['query_interval'], cfg['max_queue_size'])
|
||
elif type_ == 'TencentTranslate':
|
||
return TencentTranslate(
|
||
cfg['query_interval'], cfg['max_queue_size'], cfg['source_language'],
|
||
cfg['target_language'], cfg['secret_id'], cfg['secret_key'],
|
||
cfg['region']
|
||
)
|
||
elif type_ == 'BaiduTranslate':
|
||
return BaiduTranslate(
|
||
cfg['query_interval'], cfg['max_queue_size'], cfg['source_language'],
|
||
cfg['target_language'], cfg['app_id'], cfg['secret']
|
||
)
|
||
return None
|
||
|
||
|
||
def need_translate(text):
|
||
text = text.strip()
|
||
# 没有中文,平时打不出的字不管
|
||
if not any(0x4E00 <= ord(c) <= 0x9FFF for c in text):
|
||
return False
|
||
# 含有日文假名
|
||
if any(0x3040 <= ord(c) <= 0x30FF for c in text):
|
||
return False
|
||
# 弹幕同传
|
||
if '【' in text:
|
||
return False
|
||
# 中日双语
|
||
if text in NO_TRANSLATE_TEXTS:
|
||
return False
|
||
return True
|
||
|
||
|
||
def get_translation_from_cache(text):
|
||
key = text.strip().lower()
|
||
return _translate_cache.get(key, None)
|
||
|
||
|
||
def translate(text) -> Awaitable[Optional[str]]:
|
||
key = text.strip().lower()
|
||
# 如果已有正在翻译的future则返回,防止重复翻译
|
||
future = _text_future_map.get(key, None)
|
||
if future is not None:
|
||
return future
|
||
# 否则创建一个翻译任务
|
||
future = _main_event_loop.create_future()
|
||
|
||
# 查缓存
|
||
res = _translate_cache.get(key, None)
|
||
if res is not None:
|
||
future.set_result(res)
|
||
return future
|
||
|
||
# 负载均衡,找等待时间最少的provider
|
||
min_wait_time = None
|
||
min_wait_time_provider = None
|
||
for provider in _translate_providers:
|
||
if not provider.is_available:
|
||
continue
|
||
wait_time = provider.wait_time
|
||
if min_wait_time is None or wait_time < min_wait_time:
|
||
min_wait_time = wait_time
|
||
min_wait_time_provider = provider
|
||
|
||
# 没有可用的
|
||
if min_wait_time_provider is None:
|
||
future.set_result(None)
|
||
return future
|
||
|
||
_text_future_map[key] = future
|
||
future.add_done_callback(functools.partial(_on_translate_done, key))
|
||
min_wait_time_provider.translate(text, future)
|
||
return future
|
||
|
||
|
||
def _on_translate_done(key, future):
|
||
_text_future_map.pop(key, None)
|
||
# 缓存
|
||
try:
|
||
res = future.result()
|
||
except Exception:
|
||
return
|
||
if res is None:
|
||
return
|
||
_translate_cache[key] = res
|
||
cfg = config.get_config()
|
||
while len(_translate_cache) > cfg.translation_cache_size:
|
||
_translate_cache.pop(next(iter(_translate_cache)), None)
|
||
|
||
|
||
class TranslateProvider:
|
||
async def init(self):
|
||
return True
|
||
|
||
@property
|
||
def is_available(self):
|
||
return True
|
||
|
||
@property
|
||
def wait_time(self):
|
||
return 0
|
||
|
||
def translate(self, text, future):
|
||
raise NotImplementedError
|
||
|
||
|
||
class FlowControlTranslateProvider(TranslateProvider):
|
||
def __init__(self, query_interval, max_queue_size):
|
||
self._query_interval = query_interval
|
||
# (text, future)
|
||
self._text_queue = asyncio.Queue(max_queue_size)
|
||
|
||
async def init(self):
|
||
asyncio.ensure_future(self._translate_consumer())
|
||
return True
|
||
|
||
@property
|
||
def is_available(self):
|
||
return not self._text_queue.full()
|
||
|
||
@property
|
||
def wait_time(self):
|
||
return self._text_queue.qsize() * self._query_interval
|
||
|
||
def translate(self, text, future):
|
||
try:
|
||
self._text_queue.put_nowait((text, future))
|
||
except asyncio.QueueFull:
|
||
future.set_result(None)
|
||
|
||
async def _translate_consumer(self):
|
||
while True:
|
||
try:
|
||
text, future = await self._text_queue.get()
|
||
asyncio.ensure_future(self._translate_coroutine(text, future))
|
||
# 频率限制
|
||
await asyncio.sleep(self._query_interval)
|
||
except Exception:
|
||
logger.exception('FlowControlTranslateProvider error:')
|
||
|
||
async def _translate_coroutine(self, text, future):
|
||
try:
|
||
res = await self._do_translate(text)
|
||
except BaseException as e:
|
||
future.set_exception(e)
|
||
else:
|
||
future.set_result(res)
|
||
|
||
async def _do_translate(self, text):
|
||
raise NotImplementedError
|
||
|
||
|
||
class TencentTranslateFree(FlowControlTranslateProvider):
|
||
def __init__(self, query_interval, max_queue_size, source_language, target_language):
|
||
super().__init__(query_interval, max_queue_size)
|
||
self._source_language = source_language
|
||
self._target_language = target_language
|
||
|
||
self._qtv = ''
|
||
self._qtk = ''
|
||
self._reinit_future = None
|
||
# 连续失败的次数
|
||
self._fail_count = 0
|
||
|
||
async def init(self):
|
||
if not await super().init():
|
||
return False
|
||
if not await self._do_init():
|
||
return False
|
||
self._reinit_future = asyncio.ensure_future(self._reinit_coroutine())
|
||
return True
|
||
|
||
async def _do_init(self):
|
||
try:
|
||
async with _http_session.get('https://fanyi.qq.com/') as r:
|
||
if r.status != 200:
|
||
logger.warning('TencentTranslateFree init request failed: status=%d %s', r.status, r.reason)
|
||
return False
|
||
html = await r.text()
|
||
|
||
m = re.search(r"""\breauthuri\s*=\s*['"](.+?)['"]""", html)
|
||
if m is None:
|
||
logger.exception('TencentTranslateFree init failed: reauthuri not found')
|
||
return False
|
||
reauthuri = m[1]
|
||
|
||
async with _http_session.post('https://fanyi.qq.com/api/' + reauthuri) as r:
|
||
if r.status != 200:
|
||
logger.warning('TencentTranslateFree init request failed: reauthuri=%s, status=%d %s',
|
||
reauthuri, r.status, r.reason)
|
||
return False
|
||
data = await r.json()
|
||
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
||
logger.exception('TencentTranslateFree init error:')
|
||
return False
|
||
|
||
qtv = data.get('qtv', None)
|
||
if qtv is None:
|
||
logger.warning('TencentTranslateFree init failed: qtv not found')
|
||
return False
|
||
qtk = data.get('qtk', None)
|
||
if qtk is None:
|
||
logger.warning('TencentTranslateFree init failed: qtk not found')
|
||
return False
|
||
|
||
self._qtv = qtv
|
||
self._qtk = qtk
|
||
return True
|
||
|
||
async def _reinit_coroutine(self):
|
||
try:
|
||
while True:
|
||
await asyncio.sleep(30)
|
||
logger.debug('TencentTranslateFree reinit')
|
||
asyncio.ensure_future(self._do_init())
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
@property
|
||
def is_available(self):
|
||
return self._qtv != '' and self._qtk != '' and super().is_available
|
||
|
||
async def _translate_coroutine(self, text, future):
|
||
try:
|
||
res = await self._do_translate(text)
|
||
except BaseException as e:
|
||
future.set_exception(e)
|
||
self._on_fail()
|
||
return
|
||
future.set_result(res)
|
||
if res is None:
|
||
self._on_fail()
|
||
else:
|
||
self._fail_count = 0
|
||
|
||
async def _do_translate(self, text):
|
||
try:
|
||
async with _http_session.post(
|
||
'https://fanyi.qq.com/api/translate',
|
||
headers={
|
||
'Referer': 'https://fanyi.qq.com/'
|
||
},
|
||
data={
|
||
'source': self._source_language,
|
||
'target': self._target_language,
|
||
'sourceText': text,
|
||
'qtv': self._qtv,
|
||
'qtk': self._qtk
|
||
}
|
||
) as r:
|
||
if r.status != 200:
|
||
logger.warning('TencentTranslateFree request failed: status=%d %s', r.status, r.reason)
|
||
return None
|
||
data = await r.json()
|
||
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
||
return None
|
||
if data['errCode'] != 0:
|
||
logger.warning('TencentTranslateFree failed: %d %s', data['errCode'], data['errMsg'])
|
||
return None
|
||
res = ''.join(record['targetText'] for record in data['translate']['records'])
|
||
if res == '' and text.strip() != '':
|
||
# qtv、qtk过期
|
||
logger.warning('TencentTranslateFree result is empty %s', data)
|
||
return None
|
||
return res
|
||
|
||
def _on_fail(self):
|
||
self._fail_count += 1
|
||
# 目前没有测试出被ban的情况,为了可靠性,连续失败20次时冷却直到下次重新init
|
||
if self._fail_count >= 20:
|
||
self._cool_down()
|
||
|
||
def _cool_down(self):
|
||
logger.info('TencentTranslateFree is cooling down')
|
||
# 下次_do_init后恢复
|
||
self._qtv = self._qtk = ''
|
||
self._fail_count = 0
|
||
|
||
|
||
class BilibiliTranslateFree(FlowControlTranslateProvider):
|
||
def __init__(self, query_interval, max_queue_size):
|
||
super().__init__(query_interval, max_queue_size)
|
||
|
||
async def _do_translate(self, text):
|
||
try:
|
||
async with _http_session.get(
|
||
'https://api.live.bilibili.com/av/v1/SuperChat/messageTranslate',
|
||
params={
|
||
'room_id': '21396545',
|
||
'ruid': '407106379',
|
||
'parent_area_id': '9',
|
||
'area_id': '371',
|
||
'msg': text
|
||
}
|
||
) as r:
|
||
if r.status != 200:
|
||
logger.warning('BilibiliTranslateFree request failed: status=%d %s', r.status, r.reason)
|
||
return None
|
||
data = await r.json()
|
||
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
||
return None
|
||
if data['code'] != 0:
|
||
logger.warning('BilibiliTranslateFree failed: %d %s', data['code'], data['msg'])
|
||
return None
|
||
return data['data']['message_trans']
|
||
|
||
|
||
class TencentTranslate(FlowControlTranslateProvider):
|
||
def __init__(self, query_interval, max_queue_size, source_language, target_language,
|
||
secret_id, secret_key, region):
|
||
super().__init__(query_interval, max_queue_size)
|
||
self._source_language = source_language
|
||
self._target_language = target_language
|
||
self._secret_id = secret_id
|
||
self._secret_key = secret_key
|
||
self._region = region
|
||
|
||
self._cool_down_timer_handle = None
|
||
|
||
@property
|
||
def is_available(self):
|
||
return self._cool_down_timer_handle is None and super().is_available
|
||
|
||
async def _do_translate(self, text):
|
||
try:
|
||
async with self._request_tencent_cloud(
|
||
'TextTranslate',
|
||
'2018-03-21',
|
||
{
|
||
'SourceText': text,
|
||
'Source': self._source_language,
|
||
'Target': self._target_language,
|
||
'ProjectId': 0
|
||
}
|
||
) as r:
|
||
if r.status != 200:
|
||
logger.warning('TencentTranslate request failed: status=%d %s', r.status, r.reason)
|
||
return None
|
||
data = (await r.json())['Response']
|
||
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
||
return None
|
||
error = data.get('Error', None)
|
||
if error is not None:
|
||
logger.warning('TencentTranslate failed: %s %s, RequestId=%s', error['Code'],
|
||
error['Message'], data['RequestId'])
|
||
self._on_fail(error['Code'])
|
||
return None
|
||
return data['TargetText']
|
||
|
||
def _request_tencent_cloud(self, action, version, body):
|
||
body_bytes = json.dumps(body).encode('utf-8')
|
||
|
||
canonical_headers = 'content-type:application/json; charset=utf-8\nhost:tmt.tencentcloudapi.com\n'
|
||
signed_headers = 'content-type;host'
|
||
hashed_request_payload = hashlib.sha256(body_bytes).hexdigest()
|
||
canonical_request = f'POST\n/\n\n{canonical_headers}\n{signed_headers}\n{hashed_request_payload}'
|
||
|
||
request_timestamp = int(datetime.datetime.now().timestamp())
|
||
date = datetime.datetime.utcfromtimestamp(request_timestamp).strftime('%Y-%m-%d')
|
||
credential_scope = f'{date}/tmt/tc3_request'
|
||
hashed_canonical_request = hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
|
||
string_to_sign = f'TC3-HMAC-SHA256\n{request_timestamp}\n{credential_scope}\n{hashed_canonical_request}'
|
||
|
||
def sign(key, msg):
|
||
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
|
||
|
||
secret_date = sign(('TC3' + self._secret_key).encode('utf-8'), date)
|
||
secret_service = sign(secret_date, 'tmt')
|
||
secret_signing = sign(secret_service, 'tc3_request')
|
||
signature = hmac.new(secret_signing, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
|
||
|
||
authorization = (
|
||
f'TC3-HMAC-SHA256 Credential={self._secret_id}/{credential_scope}, '
|
||
f'SignedHeaders={signed_headers}, Signature={signature}'
|
||
)
|
||
|
||
headers = {
|
||
'Authorization': authorization,
|
||
'Content-Type': 'application/json; charset=utf-8',
|
||
'X-TC-Action': action,
|
||
'X-TC-Version': version,
|
||
'X-TC-Timestamp': str(request_timestamp),
|
||
'X-TC-Region': self._region
|
||
}
|
||
|
||
return _http_session.post('https://tmt.tencentcloudapi.com/', headers=headers, data=body_bytes)
|
||
|
||
def _on_fail(self, code):
|
||
if self._cool_down_timer_handle is not None:
|
||
return
|
||
|
||
sleep_time = 0
|
||
if code == 'FailedOperation.NoFreeAmount':
|
||
# 下个月恢复免费额度
|
||
cur_time = datetime.datetime.now()
|
||
year = cur_time.year
|
||
month = cur_time.month + 1
|
||
if month > 12:
|
||
year += 1
|
||
month = 1
|
||
next_month_time = datetime.datetime(year, month, 1, minute=5)
|
||
sleep_time = (next_month_time - cur_time).total_seconds()
|
||
# Python 3.8之前不能超过一天
|
||
sleep_time = min(sleep_time, 24 * 60 * 60 - 1)
|
||
elif code in ('FailedOperation.ServiceIsolate', 'LimitExceeded'):
|
||
# 需要手动处理,等5分钟
|
||
sleep_time = 5 * 60
|
||
if sleep_time != 0:
|
||
self._cool_down_timer_handle = asyncio.get_event_loop().call_later(
|
||
sleep_time, self._on_cool_down_timeout
|
||
)
|
||
|
||
def _on_cool_down_timeout(self):
|
||
self._cool_down_timer_handle = None
|
||
|
||
|
||
class BaiduTranslate(FlowControlTranslateProvider):
|
||
def __init__(self, query_interval, max_queue_size, source_language, target_language,
|
||
app_id, secret):
|
||
super().__init__(query_interval, max_queue_size)
|
||
self._source_language = source_language
|
||
self._target_language = target_language
|
||
self._app_id = app_id
|
||
self._secret = secret
|
||
|
||
self._cool_down_timer_handle = None
|
||
|
||
@property
|
||
def is_available(self):
|
||
return self._cool_down_timer_handle is None and super().is_available
|
||
|
||
async def _do_translate(self, text):
|
||
try:
|
||
async with _http_session.post(
|
||
'https://fanyi-api.baidu.com/api/trans/vip/translate',
|
||
data=self._add_sign({
|
||
'q': text,
|
||
'from': self._source_language,
|
||
'to': self._target_language,
|
||
'appid': self._app_id,
|
||
'salt': random.randint(1, 999999999)
|
||
})
|
||
) as r:
|
||
if r.status != 200:
|
||
logger.warning('BaiduTranslate request failed: status=%d %s', r.status, r.reason)
|
||
return None
|
||
data = await r.json()
|
||
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
||
return None
|
||
error_code = data.get('error_code', None)
|
||
if error_code is not None:
|
||
logger.warning('BaiduTranslate failed: %s %s', error_code, data['error_msg'])
|
||
self._on_fail(error_code)
|
||
return None
|
||
return ''.join(result['dst'] for result in data['trans_result'])
|
||
|
||
def _add_sign(self, data):
|
||
str_to_sign = f"{self._app_id}{data['q']}{data['salt']}{self._secret}"
|
||
sign = hashlib.md5(str_to_sign.encode('utf-8')).hexdigest()
|
||
return {**data, 'sign': sign}
|
||
|
||
def _on_fail(self, code):
|
||
if self._cool_down_timer_handle is not None:
|
||
return
|
||
|
||
sleep_time = 0
|
||
if code == '54004':
|
||
# 账户余额不足,需要手动处理,等5分钟
|
||
sleep_time = 5 * 60
|
||
if sleep_time != 0:
|
||
self._cool_down_timer_handle = asyncio.get_event_loop().call_later(
|
||
sleep_time, self._on_cool_down_timeout
|
||
)
|
||
|
||
def _on_cool_down_timeout(self):
|
||
self._cool_down_timer_handle = None
|