blivechat/api/chat.py

465 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import asyncio
import enum
import json
import logging
import random
import time
import uuid
from typing import *
import aiohttp
import tornado.websocket
import yarl
import api.base
import blivedm.blivedm.clients.web as dm_web_cli
import config
import services.avatar
import services.chat
import services.translate
import utils.async_io
import utils.request
logger = logging.getLogger(__name__)
class Command(enum.IntEnum):
HEARTBEAT = 0
JOIN_ROOM = 1
ADD_TEXT = 2
ADD_GIFT = 3
ADD_MEMBER = 4
ADD_SUPER_CHAT = 5
DEL_SUPER_CHAT = 6
UPDATE_TRANSLATION = 7
FATAL_ERROR = 8
class ContentType(enum.IntEnum):
TEXT = 0
EMOTICON = 1
class FatalErrorType(enum.IntEnum):
AUTH_CODE_ERROR = 1
TOO_MANY_RETRIES = 2
TOO_MANY_CONNECTIONS = 3
def make_message_body(cmd, data):
return json.dumps(
{
'cmd': cmd,
'data': data
}
).encode('utf-8')
def make_text_message_data(
avatar_url: str = services.avatar.DEFAULT_AVATAR_URL,
timestamp: int = None,
author_name: str = '',
author_type: int = 0,
content: str = '',
privilege_type: int = 0,
is_gift_danmaku: bool = False,
author_level: int = 1,
is_newbie: bool = False,
is_mobile_verified: bool = True,
medal_level: int = 0,
id_: str = None,
translation: str = '',
content_type: int = ContentType.TEXT,
content_type_params: list = None,
uid: str = '',
medal_name: str = '',
):
# 为了节省带宽用list而不是dict
return [
# 0: avatarUrl
avatar_url,
# 1: timestamp
timestamp if timestamp is not None else int(time.time()),
# 2: authorName
author_name,
# 3: authorType
author_type,
# 4: content
content,
# 5: privilegeType
privilege_type,
# 6: isGiftDanmaku
1 if is_gift_danmaku else 0,
# 7: authorLevel
author_level,
# 8: isNewbie
1 if is_newbie else 0,
# 9: isMobileVerified
1 if is_mobile_verified else 0,
# 10: medalLevel
medal_level,
# 11: id
id_ if id_ is not None else uuid.uuid4().hex,
# 12: translation
translation,
# 13: contentType
content_type,
# 14: contentTypeParams
content_type_params if content_type_params is not None else [],
# 15: textEmoticons
[], # 已废弃,保留
# 16: uid
uid,
# 17: medalName
medal_name,
]
def make_emoticon_params(url):
return [
# 0: url
url,
]
def make_translation_message_data(msg_id, translation):
return [
# 0: id
msg_id,
# 1: translation
translation,
]
class ChatHandler(tornado.websocket.WebSocketHandler):
HEARTBEAT_INTERVAL = 10
RECEIVE_TIMEOUT = HEARTBEAT_INTERVAL + 5
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._heartbeat_timer_handle = None
self._receive_timeout_timer_handle = None
self.room_key: Optional[services.chat.RoomKey] = None
self.auto_translate = False
def open(self):
logger.info('client=%s connected', self.request.remote_ip)
self._heartbeat_timer_handle = asyncio.get_running_loop().call_later(
self.HEARTBEAT_INTERVAL, self._on_send_heartbeat
)
self._refresh_receive_timeout_timer()
def _on_send_heartbeat(self):
self.send_cmd_data(Command.HEARTBEAT, {})
self._heartbeat_timer_handle = asyncio.get_running_loop().call_later(
self.HEARTBEAT_INTERVAL, self._on_send_heartbeat
)
def _refresh_receive_timeout_timer(self):
if self._receive_timeout_timer_handle is not None:
self._receive_timeout_timer_handle.cancel()
self._receive_timeout_timer_handle = asyncio.get_running_loop().call_later(
self.RECEIVE_TIMEOUT, self._on_receive_timeout
)
def _on_receive_timeout(self):
logger.info('client=%s timed out', self.request.remote_ip)
self._receive_timeout_timer_handle = None
self.close()
def on_close(self):
logger.info('client=%s disconnected, room=%s', self.request.remote_ip, self.room_key)
if self.has_joined_room:
services.chat.client_room_manager.del_client(self.room_key, self)
if self._heartbeat_timer_handle is not None:
self._heartbeat_timer_handle.cancel()
self._heartbeat_timer_handle = None
if self._receive_timeout_timer_handle is not None:
self._receive_timeout_timer_handle.cancel()
self._receive_timeout_timer_handle = None
def on_message(self, message):
try:
body = json.loads(message)
cmd = int(body['cmd'])
if cmd == Command.HEARTBEAT:
# 超时没有加入房间也断开
if self.has_joined_room:
self._refresh_receive_timeout_timer()
elif cmd == Command.JOIN_ROOM:
self._on_join_room_req(body)
else:
logger.warning('client=%s unknown cmd=%d, body=%s', self.request.remote_ip, cmd, body)
except Exception: # noqa
logger.exception('client=%s on_message error, message=%s', self.request.remote_ip, message)
def _on_join_room_req(self, body: dict):
if self.has_joined_room:
return
data = body['data']
room_key_dict = data.get('roomKey', None)
if room_key_dict is not None:
self.room_key = services.chat.RoomKey.from_dict(room_key_dict)
else:
# 兼容旧版客户端 TODO 过几个版本可以移除
self.room_key = services.chat.RoomKey(services.chat.RoomKeyType.ROOM_ID, int(data['roomId']))
logger.info('client=%s joining room %s', self.request.remote_ip, self.room_key)
try:
cfg = data['config']
self.auto_translate = bool(cfg['autoTranslate'])
except KeyError:
pass
services.chat.client_room_manager.add_client(self.room_key, self)
utils.async_io.create_task_with_ref(self._on_joined_room())
self._refresh_receive_timeout_timer()
# 跨域测试用
def check_origin(self, origin):
cfg = config.get_config()
if cfg.debug:
return True
return super().check_origin(origin)
@property
def has_joined_room(self):
return self.room_key is not None
def send_cmd_data(self, cmd, data):
self.send_body_no_raise(make_message_body(cmd, data))
def send_body_no_raise(self, body: Union[bytes, str, Dict[str, Any]]):
try:
self.write_message(body)
except tornado.websocket.WebSocketClosedError:
self.close()
async def _on_joined_room(self):
cfg = config.get_config()
if cfg.debug:
await self._send_test_message()
# 不允许自动翻译的提示
if (
self.auto_translate
and cfg.allow_translate_rooms
# 身份码就不管了吧,反正配置正确的情况下不会看到这个提示
and self.room_key.type == services.chat.RoomKeyType.ROOM_ID
and self.room_key.value not in cfg.allow_translate_rooms
):
self.send_cmd_data(Command.ADD_TEXT, make_text_message_data(
author_name='blivechat',
author_type=2,
content='Translation is not allowed in this room. Please download to use translation',
author_level=60,
))
# 测试用
async def _send_test_message(self):
base_data = {
'avatarUrl': await services.avatar.get_avatar_url(300474, 'xfgryujk'),
'timestamp': int(time.time()),
'authorName': 'xfgryujk',
}
text_data = make_text_message_data(
avatar_url=base_data['avatarUrl'],
timestamp=base_data['timestamp'],
author_name=base_data['authorName'],
content='我能吞下玻璃而不伤身体',
author_level=60,
)
member_data = {
**base_data,
'id': uuid.uuid4().hex,
'privilegeType': 3
}
gift_data = {
**base_data,
'id': uuid.uuid4().hex,
'totalCoin': 450000,
'giftName': '摩天大楼',
'num': 1
}
sc_data = {
**base_data,
'id': str(random.randint(1, 65535)),
'price': 30,
'content': 'The quick brown fox jumps over the lazy dog',
'translation': ''
}
self.send_cmd_data(Command.ADD_TEXT, text_data)
text_data[4] = 'te[dog]st'
text_data[11] = uuid.uuid4().hex
self.send_cmd_data(Command.ADD_TEXT, text_data)
text_data[2] = '主播'
text_data[3] = 3
text_data[4] = "I can eat glass, it doesn't hurt me."
text_data[11] = uuid.uuid4().hex
self.send_cmd_data(Command.ADD_TEXT, text_data)
self.send_cmd_data(Command.ADD_MEMBER, member_data)
self.send_cmd_data(Command.ADD_SUPER_CHAT, sc_data)
sc_data['id'] = str(random.randint(1, 65535))
sc_data['price'] = 100
sc_data['content'] = '敏捷的棕色狐狸跳过了懒狗'
self.send_cmd_data(Command.ADD_SUPER_CHAT, sc_data)
# self.send_cmd_data(Command.DEL_SUPER_CHAT, {'ids': [sc_data['id']]})
self.send_cmd_data(Command.ADD_GIFT, gift_data)
gift_data['id'] = uuid.uuid4().hex
gift_data['totalCoin'] = 1245000
gift_data['giftName'] = '小电视飞船'
self.send_cmd_data(Command.ADD_GIFT, gift_data)
gift_data['id'] = uuid.uuid4().hex
gift_data['totalCoin'] = 0
gift_data['totalFreeCoin'] = 1000
gift_data['giftName'] = '辣条'
gift_data['num'] = 10
self.send_cmd_data(Command.ADD_GIFT, gift_data)
class RoomInfoHandler(api.base.ApiHandler):
async def get(self):
room_id = int(self.get_query_argument('roomId'))
logger.info('client=%s getting room info, room=%d', self.request.remote_ip, room_id)
(room_id, owner_uid), (host_server_list, host_server_token), buvid = await asyncio.gather(
self._get_room_info(room_id),
self._get_server_host_list_and_token(room_id),
self._get_buvid()
)
# 缓存1分钟
self.set_header('Cache-Control', 'private, max-age=60')
self.write({
'roomId': room_id,
'ownerUid': owner_uid,
'hostServerList': host_server_list,
'hostServerToken': host_server_token,
# 虽然没什么用但还是加上比较保险
'buvid': buvid,
})
@staticmethod
async def _get_room_info(room_id) -> Tuple[int, int]:
try:
async with utils.request.http_session.get(
dm_web_cli.ROOM_INIT_URL,
headers={
**utils.request.BILIBILI_COMMON_HEADERS,
'Origin': 'https://live.bilibili.com',
'Referer': f'https://live.bilibili.com/{room_id}'
},
params={
'room_id': room_id
}
) as res:
if res.status != 200:
logger.warning('room=%d _get_room_info failed: %d %s', room_id,
res.status, res.reason)
return room_id, 0
data = await res.json()
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room=%d _get_room_info failed', room_id)
return room_id, 0
if data['code'] != 0:
logger.warning('room=%d _get_room_info failed: %s', room_id, data['message'])
return room_id, 0
room_info = data['data']['room_info']
return room_info['room_id'], room_info['uid']
async def _get_server_host_list_and_token(self, room_id) -> Tuple[dict, Optional[str]]:
try:
async with utils.request.http_session.get(
dm_web_cli.DANMAKU_SERVER_CONF_URL,
headers={
# token会对UA签名要使用和客户端一样的UA
'User-Agent': self.request.headers.get('User-Agent', '')
},
params={
'id': room_id,
'type': 0
}
) as res:
if res.status != 200:
logger.warning('room %d _get_server_host_list failed: %d %s', room_id,
res.status, res.reason)
return dm_web_cli.DEFAULT_DANMAKU_SERVER_LIST, None
data = await res.json()
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room %d _get_server_host_list failed', room_id)
return dm_web_cli.DEFAULT_DANMAKU_SERVER_LIST, None
if data['code'] != 0:
logger.warning('room %d _get_server_host_list failed: %s', room_id, data['message'])
return dm_web_cli.DEFAULT_DANMAKU_SERVER_LIST, None
data = data['data']
host_server_list = data['host_list']
if not host_server_list:
logger.warning('room %d _get_server_host_list failed: host_server_list is empty')
return dm_web_cli.DEFAULT_DANMAKU_SERVER_LIST, None
host_server_token = data.get('token', None)
return host_server_list, host_server_token
async def _get_buvid(self):
buvid = self._do_get_buvid()
if buvid != '':
return buvid
try:
async with utils.request.http_session.get(dm_web_cli.BUVID_INIT_URL):
pass
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
pass
return self._do_get_buvid()
@staticmethod
def _do_get_buvid():
cookies = utils.request.http_session.cookie_jar.filter_cookies(yarl.URL(dm_web_cli.BUVID_INIT_URL))
buvid_cookie = cookies.get('buvid3', None)
if buvid_cookie is None:
return ''
return buvid_cookie.value
class AvatarHandler(api.base.ApiHandler):
async def get(self):
# uid基本是0了现在都是前端计算默认头像URL这个接口留着只是为了兼容旧版
uid = int(self.get_query_argument('uid'))
username = self.get_query_argument('username', '')
avatar_url = await services.avatar.get_avatar_url_or_none(uid)
if avatar_url is None:
avatar_url = services.avatar.get_default_avatar_url(uid, username)
cache_time = 86400 if uid == 0 else 180
else:
cache_time = 86400
self.set_header('Cache-Control', f'private, max-age={cache_time}')
self.write({'avatarUrl': avatar_url})
class TextEmoticonMappingsHandler(api.base.ApiHandler):
async def get(self):
# 缓存1天
self.set_header('Cache-Control', 'private, max-age=86400')
cfg = config.get_config()
self.write({'textEmoticons': cfg.text_emoticons})
ROUTES = [
(r'/api/chat', ChatHandler),
(r'/api/room_info', RoomInfoHandler),
(r'/api/avatar_url', AvatarHandler),
(r'/api/text_emoticon_mappings', TextEmoticonMappingsHandler),
]