添加开放平台接口的代理

This commit is contained in:
John Smith 2023-09-08 20:53:04 +08:00
parent 07569033fa
commit de0bac3119
7 changed files with 296 additions and 22 deletions

View File

@ -1,13 +1,14 @@
# -*- coding: utf-8 -*-
import json
from typing import *
import tornado.web
class ApiHandler(tornado.web.RequestHandler): # noqa
class ApiHandler(tornado.web.RequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.json_args = None
self.json_args: Optional[dict] = None
def prepare(self):
self.set_header('Cache-Control', 'no-cache')

View File

@ -118,7 +118,7 @@ def make_translation_message_data(msg_id, translation):
]
class ChatHandler(tornado.websocket.WebSocketHandler): # noqa
class ChatHandler(tornado.websocket.WebSocketHandler):
HEARTBEAT_INTERVAL = 10
RECEIVE_TIMEOUT = HEARTBEAT_INTERVAL + 5
@ -317,7 +317,7 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa
self.send_cmd_data(Command.ADD_GIFT, gift_data)
class RoomInfoHandler(api.base.ApiHandler): # noqa
class RoomInfoHandler(api.base.ApiHandler):
async def get(self):
room_id = int(self.get_query_argument('roomId'))
logger.info('client=%s getting room info, room=%d', self.request.remote_ip, room_id)
@ -367,7 +367,7 @@ class RoomInfoHandler(api.base.ApiHandler): # noqa
return room_info['room_id'], room_info['uid']
class AvatarHandler(api.base.ApiHandler): # noqa
class AvatarHandler(api.base.ApiHandler):
async def get(self):
uid = int(self.get_query_argument('uid'))
avatar_url = await services.avatar.get_avatar_url_or_none(uid)
@ -381,3 +381,10 @@ class AvatarHandler(api.base.ApiHandler): # noqa
self.write({
'avatarUrl': avatar_url
})
ROUTES = [
(r'/api/chat', ChatHandler),
(r'/api/room_info', RoomInfoHandler),
(r'/api/avatar_url', AvatarHandler),
]

View File

@ -16,7 +16,7 @@ EMOTICON_UPLOAD_PATH = os.path.join(config.DATA_PATH, 'emoticons')
EMOTICON_BASE_URL = '/emoticons'
class MainHandler(tornado.web.StaticFileHandler): # noqa
class MainHandler(tornado.web.StaticFileHandler):
"""为了使用Vue Router的history模式把不存在的文件请求转发到index.html"""
async def get(self, path, include_body=True):
if path == '':
@ -37,7 +37,7 @@ class MainHandler(tornado.web.StaticFileHandler): # noqa
await super().get('index.html', include_body)
class ServerInfoHandler(api.base.ApiHandler): # noqa
class ServerInfoHandler(api.base.ApiHandler):
async def get(self):
cfg = config.get_config()
self.write({
@ -50,7 +50,7 @@ class ServerInfoHandler(api.base.ApiHandler): # noqa
})
class UploadEmoticonHandler(api.base.ApiHandler): # noqa
class UploadEmoticonHandler(api.base.ApiHandler):
async def post(self):
cfg = config.get_config()
if not cfg.enable_upload_file:
@ -85,3 +85,14 @@ class UploadEmoticonHandler(api.base.ApiHandler): # noqa
os.replace(tmp_path, path)
return f'{EMOTICON_BASE_URL}/{filename}'
ROUTES = [
(r'/api/server_info', ServerInfoHandler),
(r'/api/emoticon', UploadEmoticonHandler),
]
# 通配的放在最后
LAST_ROUTES = [
(rf'{EMOTICON_BASE_URL}/(.*)', tornado.web.StaticFileHandler, {'path': EMOTICON_UPLOAD_PATH}),
(r'/(.*)', MainHandler, {'path': config.WEB_ROOT}),
]

202
api/open_live.py Normal file
View File

@ -0,0 +1,202 @@
# -*- coding: utf-8 -*-
import asyncio
import datetime
import hashlib
import hmac
import json
import logging
import random
from typing import *
import aiohttp
import tornado.web
import api.base
import config
import utils.request
logger = logging.getLogger(__name__)
START_GAME_OPEN_LIVE_URL = 'https://live-open.biliapi.com/v2/app/start'
END_GAME_OPEN_LIVE_URL = 'https://live-open.biliapi.com/v2/app/end'
GAME_HEARTBEAT_OPEN_LIVE_URL = 'https://live-open.biliapi.com/v2/app/heartbeat'
COMMON_SERVER_BASE_URL = 'https://chat.bilisc.com'
START_GAME_COMMON_SERVER_URL = COMMON_SERVER_BASE_URL + '/api/internal/open_live/start_game'
END_GAME_COMMON_SERVER_URL = COMMON_SERVER_BASE_URL + '/api/internal/open_live/end_game'
GAME_HEARTBEAT_COMMON_SERVER_URL = COMMON_SERVER_BASE_URL + '/api/internal/open_live/game_heartbeat'
class TransportError(Exception):
"""网络错误或HTTP状态码错误"""
class BusinessError(Exception):
"""业务返回码错误"""
def __init__(self, data: dict):
super().__init__(f"message={data['message']}, request_id={data['request_id']}")
self.data = data
@property
def code(self) -> int:
return self.data['code']
async def request_open_live_or_common_server(open_live_url, common_server_url, body: Union[dict, str, bytes]) -> dict:
"""如果配置了开放平台,则直接请求,否则转发请求到公共服务器的内部接口"""
cfg = config.get_config()
if cfg.is_open_live_configured:
return await _request_open_live(open_live_url, body)
post_params = {'headers': {'Content-Type': 'application/json'}}
if isinstance(body, dict):
post_params['json'] = body
else:
post_params['data'] = body
req_ctx_mgr = utils.request.http_session.post(common_server_url, **post_params)
try:
return await _read_response(req_ctx_mgr)
except TransportError:
logger.exception('Request common server failed:')
raise
except BusinessError as e:
logger.warning('Request common server failed: %s', e)
raise
async def _request_open_live(url, body: Union[dict, str, bytes]) -> dict:
cfg = config.get_config()
assert cfg.is_open_live_configured
if isinstance(body, dict):
body_bytes = json.dumps(body).encode('utf-8')
elif isinstance(body, str):
body_bytes = body.encode('utf-8')
else:
body_bytes = body
headers = {
'x-bili-accesskeyid': cfg.open_live_access_key_id,
'x-bili-content-md5': hashlib.md5(body_bytes).hexdigest(),
'x-bili-signature-method': 'HMAC-SHA256',
'x-bili-signature-nonce': str(random.randint(0, 999999999)),
'x-bili-signature-version': '1.0',
'x-bili-timestamp': str(int(datetime.datetime.now().timestamp())),
}
str_to_sign = '\n'.join(
f'{key}:{value}'
for key, value in headers.items()
)
signature = hmac.new(
cfg.open_live_access_key_secret.encode('utf-8'), str_to_sign.encode('utf-8'), hashlib.sha256
).hexdigest()
headers['Authorization'] = signature
headers['Content-Type'] = 'application/json'
headers['Accept'] = 'application/json'
req_ctx_mgr = utils.request.http_session.post(url, headers=headers, data=body_bytes)
try:
return await _read_response(req_ctx_mgr)
except TransportError:
logger.exception('Request open live failed:')
raise
except BusinessError as e:
logger.warning('Request open live failed: %s', e)
raise
async def _read_response(req_ctx_mgr: AsyncContextManager[aiohttp.ClientResponse]) -> dict:
try:
async with req_ctx_mgr as r:
r.raise_for_status()
data = await r.json()
code = data['code']
if code != 0:
raise BusinessError(data)
return data
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
raise TransportError(f'{type(e).__name__}: {e}')
class _OpenLiveHandlerBase(api.base.ApiHandler):
def prepare(self):
super().prepare()
# 做一些简单的检查
if not isinstance(self.json_args, dict):
raise tornado.web.MissingArgumentError('body')
logger.info('client=%s requesting open live, cls=%s', self.request.remote_ip, type(self).__name__)
class _PublicHandlerBase(_OpenLiveHandlerBase):
"""外部接口,如果配置了开放平台,则直接请求,否则转发请求到公共服务器的内部接口"""
_OPEN_LIVE_URL: str
_COMMON_SERVER_URL: str
async def post(self):
try:
res = await request_open_live_or_common_server(
self._OPEN_LIVE_URL, self._COMMON_SERVER_URL, self.request.body
)
except TransportError:
raise tornado.web.HTTPError(500)
except BusinessError as e:
res = e.data
self.write(res)
class _PrivateHandlerBase(_OpenLiveHandlerBase):
"""内部接口,如果配置了开放平台,则直接请求,否则响应错误"""
_OPEN_LIVE_URL: str
async def post(self):
cfg = config.get_config()
if not cfg.is_open_live_configured:
raise tornado.web.HTTPError(501)
try:
res = await _request_open_live(self._OPEN_LIVE_URL, self.request.body)
except TransportError:
raise tornado.web.HTTPError(500)
except BusinessError as e:
res = e.data
self.write(res)
class StartGamePublicHandler(_PublicHandlerBase):
_OPEN_LIVE_URL = START_GAME_OPEN_LIVE_URL
_COMMON_SERVER_URL = START_GAME_COMMON_SERVER_URL
class StartGamePrivateHandler(_PrivateHandlerBase):
_OPEN_LIVE_URL = START_GAME_OPEN_LIVE_URL
class EndGamePublicHandler(_PublicHandlerBase):
_OPEN_LIVE_URL = END_GAME_OPEN_LIVE_URL
_COMMON_SERVER_URL = END_GAME_COMMON_SERVER_URL
class EndGamePrivateHandler(_PrivateHandlerBase):
_OPEN_LIVE_URL = END_GAME_OPEN_LIVE_URL
class GameHeartbeatPublicHandler(_PublicHandlerBase):
_OPEN_LIVE_URL = GAME_HEARTBEAT_OPEN_LIVE_URL
_COMMON_SERVER_URL = GAME_HEARTBEAT_COMMON_SERVER_URL
class GameHeartbeatPrivateHandler(_PrivateHandlerBase):
_OPEN_LIVE_URL = GAME_HEARTBEAT_OPEN_LIVE_URL
ROUTES = [
(r'/api/open_live/start_game', StartGamePublicHandler),
(r'/api/internal/open_live/start_game', StartGamePrivateHandler),
(r'/api/open_live/end_game', EndGamePublicHandler),
(r'/api/internal/open_live/end_game', EndGamePrivateHandler),
(r'/api/open_live/game_heartbeat', GameHeartbeatPublicHandler),
(r'/api/internal/open_live/game_heartbeat', GameHeartbeatPrivateHandler),
]

14
main.py
View File

@ -13,6 +13,7 @@ import tornado.web
import api.chat
import api.main
import api.open_live
import config
import models.database
import services.avatar
@ -24,15 +25,10 @@ import utils.request
logger = logging.getLogger(__name__)
ROUTES = [
(r'/api/server_info', api.main.ServerInfoHandler),
(r'/api/emoticon', api.main.UploadEmoticonHandler),
(r'/api/chat', api.chat.ChatHandler),
(r'/api/room_info', api.chat.RoomInfoHandler),
(r'/api/avatar_url', api.chat.AvatarHandler),
(rf'{api.main.EMOTICON_BASE_URL}/(.*)', tornado.web.StaticFileHandler, {'path': api.main.EMOTICON_UPLOAD_PATH}),
(r'/(.*)', api.main.MainHandler, {'path': config.WEB_ROOT}),
*api.main.ROUTES,
*api.chat.ROUTES,
*api.open_live.ROUTES,
*api.main.LAST_ROUTES,
]
server: Optional[tornado.httpserver.HTTPServer] = None

View File

@ -9,6 +9,7 @@ import uuid
from typing import *
import api.chat
import api.open_live as api_open_live
import blivedm.blivedm as blivedm
import blivedm.blivedm.models.web as dm_web_models
import blivedm.blivedm.models.pb as dm_pb_models
@ -114,9 +115,6 @@ class LiveClientManager:
client_room_manager.del_room(room_key)
def get_live_client(self, room_key: RoomKey):
return self._live_clients.get(room_key, None)
class WebLiveClient(blivedm.BLiveClient):
HEARTBEAT_INTERVAL = 10
@ -171,7 +169,67 @@ class OpenLiveClient(blivedm.OpenLiveClient):
logger.info('room=%s live client init failed', self.room_key)
return res
# TODO 如果没有配置access_key则请求公共服务器
async def _start_game(self):
try:
data = await api_open_live.request_open_live_or_common_server(
api_open_live.START_GAME_OPEN_LIVE_URL,
api_open_live.START_GAME_COMMON_SERVER_URL,
{'code': self._room_owner_auth_code, 'app_id': self._app_id}
)
except api_open_live.TransportError:
logger.error('_start_game() failed')
return False
except api_open_live.BusinessError:
logger.warning('_start_game() failed')
return False
return self._parse_start_game(data['data'])
async def _end_game(self):
if self._game_id in (None, ''):
return True
try:
await api_open_live.request_open_live_or_common_server(
api_open_live.END_GAME_OPEN_LIVE_URL,
api_open_live.END_GAME_COMMON_SERVER_URL,
{'app_id': self._app_id, 'game_id': self._game_id}
)
except api_open_live.TransportError:
logger.error('room=%d _end_game() failed', self.room_id)
return False
except api_open_live.BusinessError as e:
if e.code in (7000, 7003):
# 项目已经关闭了也算成功
return True
logger.warning('room=%d _end_game() failed', self.room_id)
return False
return True
async def _send_game_heartbeat(self):
if self._game_id in (None, ''):
logger.warning('game=%d _send_game_heartbeat() failed, game_id not found', self._game_id)
return False
# 保存一下防止await之后game_id改变
game_id = self._game_id
try:
await api_open_live.request_open_live_or_common_server(
api_open_live.GAME_HEARTBEAT_OPEN_LIVE_URL,
api_open_live.GAME_HEARTBEAT_COMMON_SERVER_URL,
{'game_id': game_id}
)
except api_open_live.TransportError:
logger.error('room=%d _send_game_heartbeat() failed', self.room_id)
return False
except api_open_live.BusinessError as e:
logger.warning('room=%d _send_game_heartbeat() failed', self.room_id)
if e.code == 7003 and self._game_id == game_id:
# 项目异常关闭,可能是心跳超时,需要重新开启项目
self._need_init_room = True
if self._websocket is not None and not self._websocket.closed:
await self._websocket.close()
return False
return True
class ClientRoomManager:

View File

@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
import asyncio
from typing import *
import aiohttp