开放平台项目心跳改成批量心跳

This commit is contained in:
John Smith 2023-11-26 12:18:14 +08:00
parent dbf4de5fa5
commit 65a9efa37e
4 changed files with 172 additions and 18 deletions

View File

@ -15,13 +15,16 @@ import tornado.web
import api.base
import config
import services.open_live
import utils.request
logger = logging.getLogger(__name__)
START_GAME_OPEN_LIVE_URL = 'https://live-open.biliapi.com/v2/app/start'
END_GAME_OPEN_LIVE_URL = 'https://live-open.biliapi.com/v2/app/end'
GAME_HEARTBEAT_OPEN_LIVE_URL = 'https://live-open.biliapi.com/v2/app/heartbeat'
OPEN_LIVE_BASE_URL = 'https://live-open.biliapi.com'
START_GAME_OPEN_LIVE_URL = OPEN_LIVE_BASE_URL + '/v2/app/start'
END_GAME_OPEN_LIVE_URL = OPEN_LIVE_BASE_URL + '/v2/app/end'
GAME_HEARTBEAT_OPEN_LIVE_URL = OPEN_LIVE_BASE_URL + '/v2/app/heartbeat'
GAME_BATCH_HEARTBEAT_OPEN_LIVE_URL = OPEN_LIVE_BASE_URL + '/v2/app/batchHeartbeat'
COMMON_SERVER_BASE_URL = 'https://chat.bilisc.com'
START_GAME_COMMON_SERVER_URL = COMMON_SERVER_BASE_URL + '/api/internal/open_live/start_game'
@ -50,7 +53,7 @@ async def request_open_live_or_common_server(open_live_url, common_server_url, b
"""如果配置了开放平台,则直接请求,否则转发请求到公共服务器的内部接口"""
cfg = config.get_config()
if cfg.is_open_live_configured:
return await _request_open_live(open_live_url, body)
return await request_open_live(open_live_url, body)
try:
req_ctx_mgr = utils.request.http_session.post(common_server_url, json=body)
@ -63,7 +66,7 @@ async def request_open_live_or_common_server(open_live_url, common_server_url, b
raise
async def _request_open_live(url, body: dict) -> dict:
async def request_open_live(url, body: dict) -> dict:
cfg = config.get_config()
assert cfg.is_open_live_configured
@ -180,7 +183,7 @@ class _PrivateHandlerBase(_OpenLiveHandlerBase):
raise tornado.web.HTTPError(501)
try:
self.res = await _request_open_live(self._OPEN_LIVE_URL, self.json_args)
self.res = await request_open_live(self._OPEN_LIVE_URL, self.json_args)
except TransportError:
raise tornado.web.HTTPError(500)
except BusinessError as e:
@ -202,11 +205,17 @@ class _StartGameMixin(_OpenLiveHandlerBase):
except (TypeError, KeyError):
room_id = None
code = self.res['code']
logger.info('room_id=%s start game res: %s %s', room_id, code, self.res['message'])
logger.info(
'client=%s room_id=%s start game res: %s %s, game_id=%s', self.request.remote_ip, room_id,
code, self.res['message'], self.res['data']['game_info']['game_id']
)
if code == 7007:
# 身份码错误
# 让我看看是哪个混蛋把房间ID、UID当做身份码
logger.info('Auth code error! auth_code=%s', self.json_args.get('code', None))
logger.info(
'client=%s auth code error! auth_code=%s', self.request.remote_ip,
self.json_args.get('code', None)
)
class StartGamePublicHandler(_StartGameMixin, _PublicHandlerBase):
@ -226,13 +235,60 @@ class EndGamePrivateHandler(_PrivateHandlerBase):
_OPEN_LIVE_URL = END_GAME_OPEN_LIVE_URL
class GameHeartbeatPublicHandler(_PublicHandlerBase):
_OPEN_LIVE_URL = GAME_HEARTBEAT_OPEN_LIVE_URL
_COMMON_SERVER_URL = GAME_HEARTBEAT_COMMON_SERVER_URL
class GameHeartbeatPublicHandler(_OpenLiveHandlerBase):
async def post(self):
game_id = self.json_args.get('game_id', None)
if not isinstance(game_id, str) or game_id == '':
raise tornado.web.MissingArgumentError('game_id')
try:
self.res = await send_game_heartbeat_by_service_or_common_server(game_id)
except TransportError as e:
logger.error(
'client=%s game heartbeat failed, game_id=%s, error: %s', self.request.remote_ip, game_id, e
)
raise tornado.web.HTTPError(500)
except BusinessError as e:
logger.info(
'client=%s game heartbeat failed, game_id=%s, error: %s', self.request.remote_ip, game_id, e
)
self.res = e.data
self.write(self.res)
class GameHeartbeatPrivateHandler(_PrivateHandlerBase):
_OPEN_LIVE_URL = GAME_HEARTBEAT_OPEN_LIVE_URL
async def send_game_heartbeat_by_service_or_common_server(game_id):
cfg = config.get_config()
if cfg.is_open_live_configured:
return await services.open_live.send_game_heartbeat(game_id)
# 这里GAME_HEARTBEAT_OPEN_LIVE_URL没用因为一定是请求公共服务器
return await request_open_live_or_common_server(
GAME_HEARTBEAT_OPEN_LIVE_URL, GAME_HEARTBEAT_COMMON_SERVER_URL, {'game_id': game_id}
)
class GameHeartbeatPrivateHandler(_OpenLiveHandlerBase):
async def post(self):
cfg = config.get_config()
if not cfg.is_open_live_configured:
raise tornado.web.HTTPError(501)
game_id = self.json_args.get('game_id', None)
if not isinstance(game_id, str) or game_id == '':
raise tornado.web.MissingArgumentError('game_id')
try:
self.res = await services.open_live.send_game_heartbeat(game_id)
except TransportError as e:
logger.error(
'client=%s game heartbeat failed, game_id=%s, error: %s', self.request.remote_ip, game_id, e
)
raise tornado.web.HTTPError(500)
except BusinessError as e:
logger.info(
'client=%s game heartbeat failed, game_id=%s, error: %s', self.request.remote_ip, game_id, e
)
self.res = e.data
self.write(self.res)
ROUTES = [

View File

@ -19,6 +19,7 @@ import config
import models.database
import services.avatar
import services.chat
import services.open_live
import services.translate
import update
import utils.request
@ -61,6 +62,7 @@ def init():
services.avatar.init()
services.translate.init()
services.open_live.init()
services.chat.init()
update.check_update()

View File

@ -228,11 +228,7 @@ class OpenLiveClient(blivedm.OpenLiveClient):
# 保存一下防止await之后game_id改变
game_id = self._game_id
try:
await api_open_live.request_open_live_or_common_server(
api_open_live.GAME_HEARTBEAT_OPEN_LIVE_URL,
api_open_live.GAME_HEARTBEAT_COMMON_SERVER_URL,
{'game_id': game_id}
)
await api_open_live.send_game_heartbeat_by_service_or_common_server(game_id)
except api_open_live.TransportError:
logger.error('room=%d _send_game_heartbeat() failed', self.room_id)
return False

100
services/open_live.py Normal file
View File

@ -0,0 +1,100 @@
# -*- coding: utf-8 -*-
import asyncio
import dataclasses
import datetime
import logging
from typing import *
import api.open_live
import config
logger = logging.getLogger(__name__)
# 正在等待发送的心跳任务game_id -> HeartbeatTask
_game_id_heart_task_map: Dict[str, 'HeartbeatTask'] = {}
@dataclasses.dataclass
class HeartbeatTask:
game_id: str
future: 'asyncio.Future[dict]'
def init():
cfg = config.get_config()
# 批量心跳只支持配置了开放平台的公共服务器,私有服务器用的人少,意义不大
if cfg.is_open_live_configured:
asyncio.create_task(_game_heartbeat_consumer())
async def send_game_heartbeat(game_id) -> dict:
"""发送项目心跳。成功则返回符合开放平台格式的结果,失败则抛出异常"""
assert config.get_config().is_open_live_configured
if game_id in (None, ''):
raise api.open_live.BusinessError({'code': 4000, 'message': '参数错误', 'request_id': '0', 'data': None})
task = _game_id_heart_task_map.get(game_id, None)
if task is None:
task = HeartbeatTask(
game_id=game_id,
future=asyncio.get_running_loop().create_future(),
)
_game_id_heart_task_map[game_id] = task
# 限制一次发送的数量,数量太多了就立即发送
if len(_game_id_heart_task_map) >= 95:
await _flush_game_heartbeat_tasks()
return await task.future
async def _game_heartbeat_consumer():
while True:
try:
start_time = datetime.datetime.now()
await _flush_game_heartbeat_tasks()
cost_time = (datetime.datetime.now() - start_time).total_seconds()
# 如果等待时间太短,请求频率会太高;如果等待时间太长,前端请求、项目心跳会超时
await asyncio.sleep(5 - cost_time)
except Exception: # noqa
logger.exception('_heartbeat_consumer error:')
async def _flush_game_heartbeat_tasks():
global _game_id_heart_task_map
if not _game_id_heart_task_map:
return
game_id_task_map = _game_id_heart_task_map
_game_id_heart_task_map = {}
game_ids = list(game_id_task_map.keys())
logger.info('Sending game batch heartbeat for %d games', len(game_ids))
try:
res = await api.open_live.request_open_live(
api.open_live.GAME_BATCH_HEARTBEAT_OPEN_LIVE_URL,
{'game_ids': game_ids}
)
failed_game_ids = res['data']['failed_game_ids']
if failed_game_ids is None: # 哪个SB后端给数组传null的
failed_game_ids = set()
else:
failed_game_ids = set(failed_game_ids)
request_id = res['request_id']
except Exception as e:
for task in game_id_task_map.values():
task.future.set_exception(e)
return
if failed_game_ids:
logger.info(
'Game batch heartbeat res: %d succeeded, %d failed, request_id=%s',
len(game_ids) - len(failed_game_ids), len(failed_game_ids), request_id
)
for task in game_id_task_map.values():
if task.game_id in failed_game_ids:
task.future.set_exception(api.open_live.BusinessError(
{'code': 7003, 'message': '心跳过期或GameId错误', 'request_id': request_id, 'data': None}
))
else:
task.future.set_result({'code': 0, 'message': '0', 'request_id': request_id, 'data': None})