From 65a9efa37e0f97a0c76c0bbb74af96d03e25c878 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 26 Nov 2023 12:18:14 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=80=E6=94=BE=E5=B9=B3=E5=8F=B0=E9=A1=B9?= =?UTF-8?q?=E7=9B=AE=E5=BF=83=E8=B7=B3=E6=94=B9=E6=88=90=E6=89=B9=E9=87=8F?= =?UTF-8?q?=E5=BF=83=E8=B7=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/open_live.py | 82 ++++++++++++++++++++++++++++------ main.py | 2 + services/chat.py | 6 +-- services/open_live.py | 100 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 172 insertions(+), 18 deletions(-) create mode 100644 services/open_live.py diff --git a/api/open_live.py b/api/open_live.py index ba6bbe7..e0e052e 100644 --- a/api/open_live.py +++ b/api/open_live.py @@ -15,13 +15,16 @@ import tornado.web import api.base import config +import services.open_live import utils.request logger = logging.getLogger(__name__) -START_GAME_OPEN_LIVE_URL = 'https://live-open.biliapi.com/v2/app/start' -END_GAME_OPEN_LIVE_URL = 'https://live-open.biliapi.com/v2/app/end' -GAME_HEARTBEAT_OPEN_LIVE_URL = 'https://live-open.biliapi.com/v2/app/heartbeat' +OPEN_LIVE_BASE_URL = 'https://live-open.biliapi.com' +START_GAME_OPEN_LIVE_URL = OPEN_LIVE_BASE_URL + '/v2/app/start' +END_GAME_OPEN_LIVE_URL = OPEN_LIVE_BASE_URL + '/v2/app/end' +GAME_HEARTBEAT_OPEN_LIVE_URL = OPEN_LIVE_BASE_URL + '/v2/app/heartbeat' +GAME_BATCH_HEARTBEAT_OPEN_LIVE_URL = OPEN_LIVE_BASE_URL + '/v2/app/batchHeartbeat' COMMON_SERVER_BASE_URL = 'https://chat.bilisc.com' START_GAME_COMMON_SERVER_URL = COMMON_SERVER_BASE_URL + '/api/internal/open_live/start_game' @@ -50,7 +53,7 @@ async def request_open_live_or_common_server(open_live_url, common_server_url, b """如果配置了开放平台,则直接请求,否则转发请求到公共服务器的内部接口""" cfg = config.get_config() if cfg.is_open_live_configured: - return await _request_open_live(open_live_url, body) + return await request_open_live(open_live_url, body) try: req_ctx_mgr = utils.request.http_session.post(common_server_url, json=body) @@ -63,7 +66,7 @@ async def request_open_live_or_common_server(open_live_url, common_server_url, b raise -async def _request_open_live(url, body: dict) -> dict: +async def request_open_live(url, body: dict) -> dict: cfg = config.get_config() assert cfg.is_open_live_configured @@ -180,7 +183,7 @@ class _PrivateHandlerBase(_OpenLiveHandlerBase): raise tornado.web.HTTPError(501) try: - self.res = await _request_open_live(self._OPEN_LIVE_URL, self.json_args) + self.res = await request_open_live(self._OPEN_LIVE_URL, self.json_args) except TransportError: raise tornado.web.HTTPError(500) except BusinessError as e: @@ -202,11 +205,17 @@ class _StartGameMixin(_OpenLiveHandlerBase): except (TypeError, KeyError): room_id = None code = self.res['code'] - logger.info('room_id=%s start game res: %s %s', room_id, code, self.res['message']) + logger.info( + 'client=%s room_id=%s start game res: %s %s, game_id=%s', self.request.remote_ip, room_id, + code, self.res['message'], self.res['data']['game_info']['game_id'] + ) if code == 7007: # 身份码错误 # 让我看看是哪个混蛋把房间ID、UID当做身份码 - logger.info('Auth code error! auth_code=%s', self.json_args.get('code', None)) + logger.info( + 'client=%s auth code error! auth_code=%s', self.request.remote_ip, + self.json_args.get('code', None) + ) class StartGamePublicHandler(_StartGameMixin, _PublicHandlerBase): @@ -226,13 +235,60 @@ class EndGamePrivateHandler(_PrivateHandlerBase): _OPEN_LIVE_URL = END_GAME_OPEN_LIVE_URL -class GameHeartbeatPublicHandler(_PublicHandlerBase): - _OPEN_LIVE_URL = GAME_HEARTBEAT_OPEN_LIVE_URL - _COMMON_SERVER_URL = GAME_HEARTBEAT_COMMON_SERVER_URL +class GameHeartbeatPublicHandler(_OpenLiveHandlerBase): + async def post(self): + game_id = self.json_args.get('game_id', None) + if not isinstance(game_id, str) or game_id == '': + raise tornado.web.MissingArgumentError('game_id') + + try: + self.res = await send_game_heartbeat_by_service_or_common_server(game_id) + except TransportError as e: + logger.error( + 'client=%s game heartbeat failed, game_id=%s, error: %s', self.request.remote_ip, game_id, e + ) + raise tornado.web.HTTPError(500) + except BusinessError as e: + logger.info( + 'client=%s game heartbeat failed, game_id=%s, error: %s', self.request.remote_ip, game_id, e + ) + self.res = e.data + self.write(self.res) -class GameHeartbeatPrivateHandler(_PrivateHandlerBase): - _OPEN_LIVE_URL = GAME_HEARTBEAT_OPEN_LIVE_URL +async def send_game_heartbeat_by_service_or_common_server(game_id): + cfg = config.get_config() + if cfg.is_open_live_configured: + return await services.open_live.send_game_heartbeat(game_id) + # 这里GAME_HEARTBEAT_OPEN_LIVE_URL没用,因为一定是请求公共服务器 + return await request_open_live_or_common_server( + GAME_HEARTBEAT_OPEN_LIVE_URL, GAME_HEARTBEAT_COMMON_SERVER_URL, {'game_id': game_id} + ) + + +class GameHeartbeatPrivateHandler(_OpenLiveHandlerBase): + async def post(self): + cfg = config.get_config() + if not cfg.is_open_live_configured: + raise tornado.web.HTTPError(501) + + game_id = self.json_args.get('game_id', None) + if not isinstance(game_id, str) or game_id == '': + raise tornado.web.MissingArgumentError('game_id') + + try: + self.res = await services.open_live.send_game_heartbeat(game_id) + except TransportError as e: + logger.error( + 'client=%s game heartbeat failed, game_id=%s, error: %s', self.request.remote_ip, game_id, e + ) + raise tornado.web.HTTPError(500) + except BusinessError as e: + logger.info( + 'client=%s game heartbeat failed, game_id=%s, error: %s', self.request.remote_ip, game_id, e + ) + self.res = e.data + self.write(self.res) ROUTES = [ diff --git a/main.py b/main.py index 9f4a629..ca31a59 100644 --- a/main.py +++ b/main.py @@ -19,6 +19,7 @@ import config import models.database import services.avatar import services.chat +import services.open_live import services.translate import update import utils.request @@ -61,6 +62,7 @@ def init(): services.avatar.init() services.translate.init() + services.open_live.init() services.chat.init() update.check_update() diff --git a/services/chat.py b/services/chat.py index b9c6475..f164e6e 100644 --- a/services/chat.py +++ b/services/chat.py @@ -228,11 +228,7 @@ class OpenLiveClient(blivedm.OpenLiveClient): # 保存一下,防止await之后game_id改变 game_id = self._game_id try: - await api_open_live.request_open_live_or_common_server( - api_open_live.GAME_HEARTBEAT_OPEN_LIVE_URL, - api_open_live.GAME_HEARTBEAT_COMMON_SERVER_URL, - {'game_id': game_id} - ) + await api_open_live.send_game_heartbeat_by_service_or_common_server(game_id) except api_open_live.TransportError: logger.error('room=%d _send_game_heartbeat() failed', self.room_id) return False diff --git a/services/open_live.py b/services/open_live.py new file mode 100644 index 0000000..bc3214c --- /dev/null +++ b/services/open_live.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- +import asyncio +import dataclasses +import datetime +import logging +from typing import * + +import api.open_live +import config + +logger = logging.getLogger(__name__) + +# 正在等待发送的心跳任务,game_id -> HeartbeatTask +_game_id_heart_task_map: Dict[str, 'HeartbeatTask'] = {} + + +@dataclasses.dataclass +class HeartbeatTask: + game_id: str + future: 'asyncio.Future[dict]' + + +def init(): + cfg = config.get_config() + # 批量心跳只支持配置了开放平台的公共服务器,私有服务器用的人少,意义不大 + if cfg.is_open_live_configured: + asyncio.create_task(_game_heartbeat_consumer()) + + +async def send_game_heartbeat(game_id) -> dict: + """发送项目心跳。成功则返回符合开放平台格式的结果,失败则抛出异常""" + assert config.get_config().is_open_live_configured + if game_id in (None, ''): + raise api.open_live.BusinessError({'code': 4000, 'message': '参数错误', 'request_id': '0', 'data': None}) + + task = _game_id_heart_task_map.get(game_id, None) + if task is None: + task = HeartbeatTask( + game_id=game_id, + future=asyncio.get_running_loop().create_future(), + ) + + _game_id_heart_task_map[game_id] = task + # 限制一次发送的数量,数量太多了就立即发送 + if len(_game_id_heart_task_map) >= 95: + await _flush_game_heartbeat_tasks() + + return await task.future + + +async def _game_heartbeat_consumer(): + while True: + try: + start_time = datetime.datetime.now() + await _flush_game_heartbeat_tasks() + cost_time = (datetime.datetime.now() - start_time).total_seconds() + + # 如果等待时间太短,请求频率会太高;如果等待时间太长,前端请求、项目心跳会超时 + await asyncio.sleep(5 - cost_time) + except Exception: # noqa + logger.exception('_heartbeat_consumer error:') + + +async def _flush_game_heartbeat_tasks(): + global _game_id_heart_task_map + if not _game_id_heart_task_map: + return + game_id_task_map = _game_id_heart_task_map + _game_id_heart_task_map = {} + + game_ids = list(game_id_task_map.keys()) + logger.info('Sending game batch heartbeat for %d games', len(game_ids)) + try: + res = await api.open_live.request_open_live( + api.open_live.GAME_BATCH_HEARTBEAT_OPEN_LIVE_URL, + {'game_ids': game_ids} + ) + failed_game_ids = res['data']['failed_game_ids'] + if failed_game_ids is None: # 哪个SB后端给数组传null的 + failed_game_ids = set() + else: + failed_game_ids = set(failed_game_ids) + request_id = res['request_id'] + except Exception as e: + for task in game_id_task_map.values(): + task.future.set_exception(e) + return + if failed_game_ids: + logger.info( + 'Game batch heartbeat res: %d succeeded, %d failed, request_id=%s', + len(game_ids) - len(failed_game_ids), len(failed_game_ids), request_id + ) + + for task in game_id_task_map.values(): + if task.game_id in failed_game_ids: + task.future.set_exception(api.open_live.BusinessError( + {'code': 7003, 'message': '心跳过期或GameId错误', 'request_id': request_id, 'data': None} + )) + else: + task.future.set_result({'code': 0, 'message': '0', 'request_id': request_id, 'data': None})