后端动态获取公共服务器地址、支持故障转移

This commit is contained in:
John Smith 2024-11-04 22:52:49 +08:00
parent 132d2a9f71
commit 99d3b567da
3 changed files with 149 additions and 21 deletions

View File

@ -27,10 +27,9 @@ END_GAME_OPEN_LIVE_URL = OPEN_LIVE_BASE_URL + '/v2/app/end'
GAME_HEARTBEAT_OPEN_LIVE_URL = OPEN_LIVE_BASE_URL + '/v2/app/heartbeat'
GAME_BATCH_HEARTBEAT_OPEN_LIVE_URL = OPEN_LIVE_BASE_URL + '/v2/app/batchHeartbeat'
COMMON_SERVER_BASE_URL = 'https://chat.bilisc.com'
START_GAME_COMMON_SERVER_URL = COMMON_SERVER_BASE_URL + '/api/internal/open_live/start_game'
END_GAME_COMMON_SERVER_URL = COMMON_SERVER_BASE_URL + '/api/internal/open_live/end_game'
GAME_HEARTBEAT_COMMON_SERVER_URL = COMMON_SERVER_BASE_URL + '/api/internal/open_live/game_heartbeat'
START_GAME_COMMON_SERVER_URL = '/api/internal/open_live/start_game'
END_GAME_COMMON_SERVER_URL = '/api/internal/open_live/end_game'
GAME_HEARTBEAT_COMMON_SERVER_URL = '/api/internal/open_live/game_heartbeat'
_error_auth_code_cache = cachetools.LRUCache(256)
# 应B站要求抓一下刷请求的人不会用于其他用途
@ -54,24 +53,15 @@ class BusinessError(Exception):
return self.data['code']
async def request_open_live_or_common_server(open_live_url, common_server_url, body: dict) -> dict:
async def request_open_live_or_common_server(open_live_url, common_server_url, body: dict, **kwargs) -> dict:
"""如果配置了开放平台,则直接请求,否则转发请求到公共服务器的内部接口"""
cfg = config.get_config()
if cfg.is_open_live_configured:
return await request_open_live(open_live_url, body)
try:
req_ctx_mgr = utils.request.http_session.post(common_server_url, json=body)
return await _read_response(req_ctx_mgr, is_common_server=True)
except TransportError:
logger.exception('Request common server failed:')
raise
except BusinessError as e:
logger.warning('Request common server failed: %s', e)
raise
return await request_open_live(open_live_url, body, **kwargs)
return await request_common_server(common_server_url, body, **kwargs)
async def request_open_live(url, body: dict, *, ignore_rate_limit=False) -> dict:
async def request_open_live(url, body: dict, *, ignore_rate_limit=False, **kwargs) -> dict:
cfg = config.get_config()
assert cfg.is_open_live_configured
@ -109,7 +99,7 @@ async def request_open_live(url, body: dict, *, ignore_rate_limit=False) -> dict
headers['Accept'] = 'application/json'
try:
req_ctx_mgr = utils.request.http_session.post(url, headers=headers, data=body_bytes)
req_ctx_mgr = utils.request.http_session.post(url, headers=headers, data=body_bytes, **kwargs)
return await _read_response(req_ctx_mgr)
except TransportError:
logger.exception('Request open live failed:')
@ -126,6 +116,25 @@ async def request_open_live(url, body: dict, *, ignore_rate_limit=False) -> dict
raise
async def request_common_server(rel_url, body: dict, **kwargs) -> dict:
base_url, breaker = utils.request.get_common_server_base_url_and_circuit_breaker()
if base_url is None:
logger.error('No available common server endpoint')
raise TransportError('No available common server endpoint')
url = base_url + rel_url
with breaker:
try:
req_ctx_mgr = utils.request.http_session.post(url, json=body, **kwargs)
return await _read_response(req_ctx_mgr, is_common_server=True)
except TransportError:
logger.exception('Request common server failed:')
raise
except BusinessError as e:
logger.warning('Request common server failed: %s', e)
raise
async def _read_response(req_ctx_mgr: AsyncContextManager[aiohttp.ClientResponse], is_common_server=False) -> dict:
try:
async with req_ctx_mgr as r:
@ -298,9 +307,8 @@ async def send_game_heartbeat_by_service_or_common_server(game_id):
cfg = config.get_config()
if cfg.is_open_live_configured:
return await services.open_live.send_game_heartbeat(game_id)
# 这里GAME_HEARTBEAT_OPEN_LIVE_URL没用因为一定是请求公共服务器
return await request_open_live_or_common_server(
GAME_HEARTBEAT_OPEN_LIVE_URL, GAME_HEARTBEAT_COMMON_SERVER_URL, {'game_id': game_id}
return await request_common_server(
GAME_HEARTBEAT_COMMON_SERVER_URL, {'game_id': game_id}, timeout=aiohttp.ClientTimeout(total=15)
)

View File

@ -1,5 +1,6 @@
-r blivedm/requirements.txt
cachetools==5.3.1
circuitbreaker==2.0.0
pycryptodome==3.19.1
sqlalchemy==2.0.19
tornado==6.4.1

View File

@ -1,8 +1,17 @@
# -*- coding: utf-8 -*-
import asyncio
import datetime
import logging
from typing import *
import aiohttp
import circuitbreaker
import api.open_live
import config
import utils.async_io
logger = logging.getLogger(__name__)
# 不带这堆头部有时候也能成功请求,但是带上后成功的概率更高
BILIBILI_COMMON_HEADERS = {
@ -14,6 +23,18 @@ BILIBILI_COMMON_HEADERS = {
http_session: Optional[aiohttp.ClientSession] = None
_COMMON_SERVER_DISCOVERY_URLS = [
'https://api1.blive.chat/api/endpoints',
'https://api2.blive.chat/api/endpoints',
]
_last_update_common_server_time: Optional[datetime.datetime] = None
_common_server_base_urls = [
'https://api1.blive.chat',
'https://api2.blive.chat',
]
_cur_common_server_base_url: Optional[str] = None
_common_server_base_url_to_circuit_breaker: Dict[str, circuitbreaker.CircuitBreaker] = {}
def init():
global http_session
@ -22,6 +43,10 @@ def init():
timeout=aiohttp.ClientTimeout(total=10),
)
cfg = config.get_config()
if not cfg.is_open_live_configured:
_update_common_server_base_urls()
async def shut_down():
if http_session is not None:
@ -35,3 +60,97 @@ class CustomClientResponse(aiohttp.ClientResponse):
return await super()._wait_released()
except asyncio.CancelledError as e:
raise aiohttp.ClientConnectionError('Connection released') from e
def _update_common_server_base_urls():
global _last_update_common_server_time
cur_time = datetime.datetime.now()
if (
_last_update_common_server_time is not None
and cur_time - _last_update_common_server_time < datetime.timedelta(minutes=3)
):
return
_last_update_common_server_time = cur_time
utils.async_io.create_task_with_ref(_do_update_common_server_base_urls())
async def _do_update_common_server_base_urls():
global _last_update_common_server_time
_last_update_common_server_time = datetime.datetime.now()
async def request_get_urls(discovery_url):
async with http_session.get(discovery_url) as res:
res.raise_for_status()
data = await res.json()
return data['endpoints']
common_server_base_urls = []
futures = [
asyncio.create_task(request_get_urls(url))
for url in _COMMON_SERVER_DISCOVERY_URLS
]
for future in asyncio.as_completed(futures):
try:
common_server_base_urls = await future
break
except Exception as e:
logger.warning('Failed to discover common server endpoints from one source: %s', e)
for future in futures:
future.cancel()
if not common_server_base_urls:
logger.error('Failed to discover common server endpoints from any source')
return
# 按响应时间排序
sorted_common_server_base_urls = []
error_base_urls = []
async def test_endpoint(base_url):
try:
url = base_url + '/api/server_info'
async with http_session.get(url, timeout=aiohttp.ClientTimeout(total=3)) as res:
res.raise_for_status()
sorted_common_server_base_urls.append(base_url)
except Exception: # noqa
error_base_urls.append(base_url)
await asyncio.gather(*(test_endpoint(base_url) for base_url in common_server_base_urls))
sorted_common_server_base_urls.extend(error_base_urls)
global _common_server_base_urls, _cur_common_server_base_url
_common_server_base_urls = sorted_common_server_base_urls
if _cur_common_server_base_url not in _common_server_base_urls:
_cur_common_server_base_url = None
logger.info('Found common server endpoints: %s', _common_server_base_urls)
def get_common_server_base_url_and_circuit_breaker() -> Tuple[Optional[str], Optional[circuitbreaker.CircuitBreaker]]:
_update_common_server_base_urls()
global _cur_common_server_base_url
if _cur_common_server_base_url is not None:
breaker = _get_or_add_common_server_circuit_breaker(_cur_common_server_base_url)
if breaker.state != circuitbreaker.STATE_OPEN:
return _cur_common_server_base_url, breaker
_cur_common_server_base_url = None
# 找第一个未熔断的
for base_url in _common_server_base_urls:
breaker = _get_or_add_common_server_circuit_breaker(base_url)
if breaker.state != circuitbreaker.STATE_OPEN:
_cur_common_server_base_url = base_url
logger.info('Switch common server endpoint to %s', _cur_common_server_base_url)
return _cur_common_server_base_url, breaker
return None, None
def _get_or_add_common_server_circuit_breaker(base_url):
breaker = _common_server_base_url_to_circuit_breaker.get(base_url, None)
if breaker is None:
breaker = _common_server_base_url_to_circuit_breaker[base_url] = circuitbreaker.CircuitBreaker(
failure_threshold=3,
recovery_timeout=60,
expected_exception=api.open_live.TransportError,
)
return breaker