mirror of
https://github.com/xfgryujk/blivechat.git
synced 2025-04-03 16:10:35 +08:00
到B站的连接和到客户端的连接解耦
This commit is contained in:
parent
e40f1511ed
commit
c87325e5e0
113
api/chat.py
113
api/chat.py
@ -6,6 +6,7 @@ import logging
|
|||||||
import random
|
import random
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
from typing import *
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
import tornado.websocket
|
import tornado.websocket
|
||||||
@ -37,9 +38,18 @@ class ContentType(enum.IntEnum):
|
|||||||
EMOTICON = 1
|
EMOTICON = 1
|
||||||
|
|
||||||
|
|
||||||
def make_text_message(avatar_url, timestamp, author_name, author_type, content, privilege_type,
|
def make_message_body(cmd, data):
|
||||||
is_gift_danmaku, author_level, is_newbie, is_mobile_verified, medal_level,
|
return json.dumps(
|
||||||
id_, translation, content_type, content_type_params):
|
{
|
||||||
|
'cmd': cmd,
|
||||||
|
'data': data
|
||||||
|
}
|
||||||
|
).encode('utf-8')
|
||||||
|
|
||||||
|
|
||||||
|
def make_text_message_data(avatar_url, timestamp, author_name, author_type, content, privilege_type,
|
||||||
|
is_gift_danmaku, author_level, is_newbie, is_mobile_verified, medal_level,
|
||||||
|
id_, translation, content_type, content_type_params):
|
||||||
return [
|
return [
|
||||||
# 0: avatarUrl
|
# 0: avatarUrl
|
||||||
avatar_url,
|
avatar_url,
|
||||||
@ -81,7 +91,7 @@ def make_emoticon_params(url):
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def make_translation_message(msg_id, translation):
|
def make_translation_message_data(msg_id, translation):
|
||||||
return [
|
return [
|
||||||
# 0: id
|
# 0: id
|
||||||
msg_id,
|
msg_id,
|
||||||
@ -103,14 +113,14 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa
|
|||||||
self.auto_translate = False
|
self.auto_translate = False
|
||||||
|
|
||||||
def open(self):
|
def open(self):
|
||||||
logger.info('Websocket connected %s', self.request.remote_ip)
|
logger.info('client=%s connected', self.request.remote_ip)
|
||||||
self._heartbeat_timer_handle = asyncio.get_event_loop().call_later(
|
self._heartbeat_timer_handle = asyncio.get_event_loop().call_later(
|
||||||
self.HEARTBEAT_INTERVAL, self._on_send_heartbeat
|
self.HEARTBEAT_INTERVAL, self._on_send_heartbeat
|
||||||
)
|
)
|
||||||
self._refresh_receive_timeout_timer()
|
self._refresh_receive_timeout_timer()
|
||||||
|
|
||||||
def _on_send_heartbeat(self):
|
def _on_send_heartbeat(self):
|
||||||
self.send_message(Command.HEARTBEAT, {})
|
self.send_cmd_data(Command.HEARTBEAT, {})
|
||||||
self._heartbeat_timer_handle = asyncio.get_event_loop().call_later(
|
self._heartbeat_timer_handle = asyncio.get_event_loop().call_later(
|
||||||
self.HEARTBEAT_INTERVAL, self._on_send_heartbeat
|
self.HEARTBEAT_INTERVAL, self._on_send_heartbeat
|
||||||
)
|
)
|
||||||
@ -123,14 +133,14 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa
|
|||||||
)
|
)
|
||||||
|
|
||||||
def _on_receive_timeout(self):
|
def _on_receive_timeout(self):
|
||||||
logger.warning('Client %s timed out', self.request.remote_ip)
|
logger.warning('client=%s timed out', self.request.remote_ip)
|
||||||
self._receive_timeout_timer_handle = None
|
self._receive_timeout_timer_handle = None
|
||||||
self.close()
|
self.close()
|
||||||
|
|
||||||
def on_close(self):
|
def on_close(self):
|
||||||
logger.info('Websocket disconnected %s room: %s', self.request.remote_ip, str(self.room_id))
|
logger.info('client=%s disconnected, room=%s', self.request.remote_ip, str(self.room_id))
|
||||||
if self.has_joined_room:
|
if self.has_joined_room:
|
||||||
services.chat.room_manager.del_client(self.room_id, self)
|
services.chat.client_room_manager.del_client(self.room_id, self)
|
||||||
if self._heartbeat_timer_handle is not None:
|
if self._heartbeat_timer_handle is not None:
|
||||||
self._heartbeat_timer_handle.cancel()
|
self._heartbeat_timer_handle.cancel()
|
||||||
self._heartbeat_timer_handle = None
|
self._heartbeat_timer_handle = None
|
||||||
@ -146,26 +156,31 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa
|
|||||||
|
|
||||||
body = json.loads(message)
|
body = json.loads(message)
|
||||||
cmd = body['cmd']
|
cmd = body['cmd']
|
||||||
|
|
||||||
if cmd == Command.HEARTBEAT:
|
if cmd == Command.HEARTBEAT:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
elif cmd == Command.JOIN_ROOM:
|
elif cmd == Command.JOIN_ROOM:
|
||||||
if self.has_joined_room:
|
if self.has_joined_room:
|
||||||
return
|
return
|
||||||
self._refresh_receive_timeout_timer()
|
self._refresh_receive_timeout_timer()
|
||||||
|
|
||||||
self.room_id = int(body['data']['roomId'])
|
self.room_id = int(body['data']['roomId'])
|
||||||
logger.info('Client %s is joining room %d', self.request.remote_ip, self.room_id)
|
logger.info('client=%s joining room %d', self.request.remote_ip, self.room_id)
|
||||||
try:
|
try:
|
||||||
cfg = body['data']['config']
|
cfg = body['data']['config']
|
||||||
self.auto_translate = cfg['autoTranslate']
|
self.auto_translate = bool(cfg['autoTranslate'])
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
asyncio.ensure_future(services.chat.room_manager.add_client(self.room_id, self))
|
services.chat.client_room_manager.add_client(self.room_id, self)
|
||||||
|
asyncio.ensure_future(self._on_joined_room())
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.warning('Unknown cmd, client: %s, cmd: %d, body: %s', self.request.remote_ip, cmd, body)
|
logger.warning('client=%s unknown cmd=%d, body=%s', self.request.remote_ip, cmd, body)
|
||||||
|
|
||||||
except Exception: # noqa
|
except Exception: # noqa
|
||||||
logger.exception('on_message error, client: %s, message: %s', self.request.remote_ip, message)
|
logger.exception('client=%s on_message error, message=%s', self.request.remote_ip, message)
|
||||||
|
|
||||||
# 跨域测试用
|
# 跨域测试用
|
||||||
def check_origin(self, origin):
|
def check_origin(self, origin):
|
||||||
@ -177,22 +192,24 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa
|
|||||||
def has_joined_room(self):
|
def has_joined_room(self):
|
||||||
return self.room_id is not None
|
return self.room_id is not None
|
||||||
|
|
||||||
def send_message(self, cmd, data):
|
def send_cmd_data(self, cmd, data):
|
||||||
body = json.dumps({'cmd': cmd, 'data': data})
|
self.send_body_no_raise(make_message_body(cmd, data))
|
||||||
|
|
||||||
|
def send_body_no_raise(self, body: Union[bytes, str, Dict[str, Any]]):
|
||||||
try:
|
try:
|
||||||
self.write_message(body)
|
self.write_message(body)
|
||||||
except tornado.websocket.WebSocketClosedError:
|
except tornado.websocket.WebSocketClosedError:
|
||||||
self.close()
|
self.close()
|
||||||
|
|
||||||
async def on_join_room(self):
|
async def _on_joined_room(self):
|
||||||
if self.application.settings['debug']:
|
if self.application.settings['debug']:
|
||||||
await self.send_test_message()
|
await self._send_test_message()
|
||||||
|
|
||||||
# 不允许自动翻译的提示
|
# 不允许自动翻译的提示
|
||||||
if self.auto_translate:
|
if self.auto_translate:
|
||||||
cfg = config.get_config()
|
cfg = config.get_config()
|
||||||
if cfg.allow_translate_rooms and self.room_id not in cfg.allow_translate_rooms:
|
if cfg.allow_translate_rooms and self.room_id not in cfg.allow_translate_rooms:
|
||||||
self.send_message(Command.ADD_TEXT, make_text_message(
|
self.send_cmd_data(Command.ADD_TEXT, make_text_message_data(
|
||||||
avatar_url=services.avatar.DEFAULT_AVATAR_URL,
|
avatar_url=services.avatar.DEFAULT_AVATAR_URL,
|
||||||
timestamp=int(time.time()),
|
timestamp=int(time.time()),
|
||||||
author_name='blivechat',
|
author_name='blivechat',
|
||||||
@ -211,13 +228,13 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa
|
|||||||
))
|
))
|
||||||
|
|
||||||
# 测试用
|
# 测试用
|
||||||
async def send_test_message(self):
|
async def _send_test_message(self):
|
||||||
base_data = {
|
base_data = {
|
||||||
'avatarUrl': await services.avatar.get_avatar_url(300474),
|
'avatarUrl': await services.avatar.get_avatar_url(300474),
|
||||||
'timestamp': int(time.time()),
|
'timestamp': int(time.time()),
|
||||||
'authorName': 'xfgryujk',
|
'authorName': 'xfgryujk',
|
||||||
}
|
}
|
||||||
text_data = make_text_message(
|
text_data = make_text_message_data(
|
||||||
avatar_url=base_data['avatarUrl'],
|
avatar_url=base_data['avatarUrl'],
|
||||||
timestamp=base_data['timestamp'],
|
timestamp=base_data['timestamp'],
|
||||||
author_name=base_data['authorName'],
|
author_name=base_data['authorName'],
|
||||||
@ -253,32 +270,30 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa
|
|||||||
'content': 'The quick brown fox jumps over the lazy dog',
|
'content': 'The quick brown fox jumps over the lazy dog',
|
||||||
'translation': ''
|
'translation': ''
|
||||||
}
|
}
|
||||||
self.send_message(Command.ADD_TEXT, text_data)
|
self.send_cmd_data(Command.ADD_TEXT, text_data)
|
||||||
text_data[2] = '主播'
|
text_data[2] = '主播'
|
||||||
text_data[3] = 3
|
text_data[3] = 3
|
||||||
text_data[4] = "I can eat glass, it doesn't hurt me."
|
text_data[4] = "I can eat glass, it doesn't hurt me."
|
||||||
text_data[11] = uuid.uuid4().hex
|
text_data[11] = uuid.uuid4().hex
|
||||||
self.send_message(Command.ADD_TEXT, text_data)
|
self.send_cmd_data(Command.ADD_TEXT, text_data)
|
||||||
self.send_message(Command.ADD_MEMBER, member_data)
|
self.send_cmd_data(Command.ADD_MEMBER, member_data)
|
||||||
self.send_message(Command.ADD_SUPER_CHAT, sc_data)
|
self.send_cmd_data(Command.ADD_SUPER_CHAT, sc_data)
|
||||||
sc_data['id'] = str(random.randint(1, 65535))
|
sc_data['id'] = str(random.randint(1, 65535))
|
||||||
sc_data['price'] = 100
|
sc_data['price'] = 100
|
||||||
sc_data['content'] = '敏捷的棕色狐狸跳过了懒狗'
|
sc_data['content'] = '敏捷的棕色狐狸跳过了懒狗'
|
||||||
self.send_message(Command.ADD_SUPER_CHAT, sc_data)
|
self.send_cmd_data(Command.ADD_SUPER_CHAT, sc_data)
|
||||||
# self.send_message(Command.DEL_SUPER_CHAT, {'ids': [sc_data['id']]})
|
# self.send_message(Command.DEL_SUPER_CHAT, {'ids': [sc_data['id']]})
|
||||||
self.send_message(Command.ADD_GIFT, gift_data)
|
self.send_cmd_data(Command.ADD_GIFT, gift_data)
|
||||||
gift_data['id'] = uuid.uuid4().hex
|
gift_data['id'] = uuid.uuid4().hex
|
||||||
gift_data['totalCoin'] = 1245000
|
gift_data['totalCoin'] = 1245000
|
||||||
gift_data['giftName'] = '小电视飞船'
|
gift_data['giftName'] = '小电视飞船'
|
||||||
self.send_message(Command.ADD_GIFT, gift_data)
|
self.send_cmd_data(Command.ADD_GIFT, gift_data)
|
||||||
|
|
||||||
|
|
||||||
class RoomInfoHandler(api.base.ApiHandler): # noqa
|
class RoomInfoHandler(api.base.ApiHandler): # noqa
|
||||||
_host_server_list_cache = blivedm_client.DEFAULT_DANMAKU_SERVER_LIST
|
|
||||||
|
|
||||||
async def get(self):
|
async def get(self):
|
||||||
room_id = int(self.get_query_argument('roomId'))
|
room_id = int(self.get_query_argument('roomId'))
|
||||||
logger.info('Client %s is getting room info %d', self.request.remote_ip, room_id)
|
logger.info('client=%s getting room info, room=%d', self.request.remote_ip, room_id)
|
||||||
room_id, owner_uid = await self._get_room_info(room_id)
|
room_id, owner_uid = await self._get_room_info(room_id)
|
||||||
host_server_list = await self._get_server_host_list(room_id)
|
host_server_list = await self._get_server_host_list(room_id)
|
||||||
if owner_uid == 0:
|
if owner_uid == 0:
|
||||||
@ -300,49 +315,25 @@ class RoomInfoHandler(api.base.ApiHandler): # noqa
|
|||||||
blivedm_client.ROOM_INIT_URL, params={'room_id': room_id}
|
blivedm_client.ROOM_INIT_URL, params={'room_id': room_id}
|
||||||
) as res:
|
) as res:
|
||||||
if res.status != 200:
|
if res.status != 200:
|
||||||
logger.warning('room %d _get_room_info failed: %d %s', room_id,
|
logger.warning('room=%d _get_room_info failed: %d %s', room_id,
|
||||||
res.status, res.reason)
|
res.status, res.reason)
|
||||||
return room_id, 0
|
return room_id, 0
|
||||||
data = await res.json()
|
data = await res.json()
|
||||||
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
||||||
logger.exception('room %d _get_room_info failed', room_id)
|
logger.exception('room=%d _get_room_info failed', room_id)
|
||||||
return room_id, 0
|
return room_id, 0
|
||||||
|
|
||||||
if data['code'] != 0:
|
if data['code'] != 0:
|
||||||
logger.warning('room %d _get_room_info failed: %s', room_id, data['message'])
|
logger.warning('room=%d _get_room_info failed: %s', room_id, data['message'])
|
||||||
return room_id, 0
|
return room_id, 0
|
||||||
|
|
||||||
room_info = data['data']['room_info']
|
room_info = data['data']['room_info']
|
||||||
return room_info['room_id'], room_info['uid']
|
return room_info['room_id'], room_info['uid']
|
||||||
|
|
||||||
@classmethod
|
@staticmethod
|
||||||
async def _get_server_host_list(cls, _room_id):
|
async def _get_server_host_list(_room_id):
|
||||||
return cls._host_server_list_cache
|
|
||||||
|
|
||||||
# 连接其他host必须要key
|
# 连接其他host必须要key
|
||||||
# try:
|
return blivedm_client.DEFAULT_DANMAKU_SERVER_LIST
|
||||||
# async with _http_session.get(blivedm.DANMAKU_SERVER_CONF_URL, params={'id': room_id, 'type': 0}
|
|
||||||
# ) as res:
|
|
||||||
# if res.status != 200:
|
|
||||||
# logger.warning('room %d _get_server_host_list failed: %d %s', room_id,
|
|
||||||
# res.status, res.reason)
|
|
||||||
# return cls._host_server_list_cache
|
|
||||||
# data = await res.json()
|
|
||||||
# except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
|
|
||||||
# logger.exception('room %d _get_server_host_list failed', room_id)
|
|
||||||
# return cls._host_server_list_cache
|
|
||||||
#
|
|
||||||
# if data['code'] != 0:
|
|
||||||
# logger.warning('room %d _get_server_host_list failed: %s', room_id, data['message'])
|
|
||||||
# return cls._host_server_list_cache
|
|
||||||
#
|
|
||||||
# host_server_list = data['data']['host_list']
|
|
||||||
# if not host_server_list:
|
|
||||||
# logger.warning('room %d _get_server_host_list failed: host_server_list is empty')
|
|
||||||
# return cls._host_server_list_cache
|
|
||||||
#
|
|
||||||
# cls._host_server_list_cache = host_server_list
|
|
||||||
# return host_server_list
|
|
||||||
|
|
||||||
|
|
||||||
class AvatarHandler(api.base.ApiHandler): # noqa
|
class AvatarHandler(api.base.ApiHandler): # noqa
|
||||||
|
371
services/chat.py
371
services/chat.py
@ -1,12 +1,9 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
|
||||||
import logging
|
import logging
|
||||||
import uuid
|
import uuid
|
||||||
from typing import *
|
from typing import *
|
||||||
|
|
||||||
import tornado.websocket
|
|
||||||
|
|
||||||
import api.chat
|
import api.chat
|
||||||
import blivedm.blivedm as blivedm
|
import blivedm.blivedm as blivedm
|
||||||
import config
|
import config
|
||||||
@ -16,19 +13,169 @@ import utils.request
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
room_manager: Optional['RoomManager'] = None
|
# 到B站的连接管理
|
||||||
|
_live_client_manager: Optional['LiveClientManager'] = None
|
||||||
|
# 到客户端的连接管理
|
||||||
|
client_room_manager: Optional['ClientRoomManager'] = None
|
||||||
|
# 直播消息处理器
|
||||||
|
_live_msg_handler: Optional['LiveMsgHandler'] = None
|
||||||
|
|
||||||
|
|
||||||
def init():
|
def init():
|
||||||
global room_manager
|
global _live_client_manager, client_room_manager, _live_msg_handler
|
||||||
room_manager = RoomManager()
|
_live_client_manager = LiveClientManager()
|
||||||
|
client_room_manager = ClientRoomManager()
|
||||||
|
_live_msg_handler = LiveMsgHandler()
|
||||||
|
|
||||||
|
|
||||||
class Room(blivedm.BLiveClient, blivedm.BaseHandler):
|
class LiveClientManager:
|
||||||
HEARTBEAT_INTERVAL = 10
|
"""管理到B站的连接"""
|
||||||
|
def __init__(self):
|
||||||
|
self._live_clients: Dict[int, LiveClient] = {}
|
||||||
|
|
||||||
|
def add_live_client(self, room_id):
|
||||||
|
if room_id in self._live_clients:
|
||||||
|
return
|
||||||
|
logger.info('room=%d creating live client', room_id)
|
||||||
|
self._live_clients[room_id] = live_client = LiveClient(room_id)
|
||||||
|
live_client.add_handler(_live_msg_handler)
|
||||||
|
asyncio.ensure_future(self._init_live_client(live_client))
|
||||||
|
logger.info('room=%d live client created, %d live clients', room_id, len(self._live_clients))
|
||||||
|
|
||||||
|
async def _init_live_client(self, live_client: 'LiveClient'):
|
||||||
|
if not await live_client.init_room():
|
||||||
|
logger.warning('room=%d live client init failed', live_client.tmp_room_id)
|
||||||
|
self.del_live_client(live_client.tmp_room_id)
|
||||||
|
return
|
||||||
|
logger.info('room=%d (%d) live client init succeeded', live_client.tmp_room_id, live_client.room_id)
|
||||||
|
live_client.start()
|
||||||
|
|
||||||
|
def del_live_client(self, room_id):
|
||||||
|
live_client = self._live_clients.pop(room_id, None)
|
||||||
|
if live_client is None:
|
||||||
|
return
|
||||||
|
logger.info('room=%d removing live client', room_id)
|
||||||
|
live_client.remove_handler(_live_msg_handler)
|
||||||
|
asyncio.ensure_future(live_client.stop_and_close())
|
||||||
|
logger.info('room=%d live client removed, %d live clients', room_id, len(self._live_clients))
|
||||||
|
|
||||||
|
client_room_manager.del_room(room_id)
|
||||||
|
|
||||||
|
|
||||||
|
class LiveClient(blivedm.BLiveClient):
|
||||||
|
def __init__(self, room_id):
|
||||||
|
super().__init__(room_id, session=utils.request.http_session, heartbeat_interval=10)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def tmp_room_id(self):
|
||||||
|
"""初始化参数传入的房间ID,room_id可能改变,这个不会变"""
|
||||||
|
return self._tmp_room_id
|
||||||
|
|
||||||
|
async def init_room(self):
|
||||||
|
await super().init_room()
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class ClientRoomManager:
|
||||||
|
"""管理到客户端的连接"""
|
||||||
|
def __init__(self):
|
||||||
|
self._rooms: Dict[int, ClientRoom] = {}
|
||||||
|
|
||||||
|
def add_client(self, room_id, client: 'api.chat.ChatHandler'):
|
||||||
|
room = self.get_or_add_room(room_id)
|
||||||
|
room.add_client(client)
|
||||||
|
|
||||||
|
def del_client(self, room_id, client: 'api.chat.ChatHandler'):
|
||||||
|
room = self.get_room(room_id)
|
||||||
|
if room is None:
|
||||||
|
return
|
||||||
|
room.del_client(client)
|
||||||
|
|
||||||
|
if room.client_count == 0:
|
||||||
|
self.del_room(room_id)
|
||||||
|
|
||||||
|
def get_room(self, room_id):
|
||||||
|
return self._rooms.get(room_id, None)
|
||||||
|
|
||||||
|
def get_or_add_room(self, room_id):
|
||||||
|
room = self._rooms.get(room_id, None)
|
||||||
|
if room is None:
|
||||||
|
logger.info('room=%d creating client room', room_id)
|
||||||
|
self._rooms[room_id] = room = ClientRoom(room_id)
|
||||||
|
logger.info('room=%d client room created, %d client rooms', room_id, len(self._rooms))
|
||||||
|
|
||||||
|
_live_client_manager.add_live_client(room_id)
|
||||||
|
return room
|
||||||
|
|
||||||
|
def del_room(self, room_id):
|
||||||
|
room = self._rooms.pop(room_id, None)
|
||||||
|
if room is None:
|
||||||
|
return
|
||||||
|
logger.info('room=%d removing client room', room_id)
|
||||||
|
room.clear_clients()
|
||||||
|
logger.info('room=%d client room removed, %d client rooms', room_id, len(self._rooms))
|
||||||
|
|
||||||
|
_live_client_manager.del_live_client(room_id)
|
||||||
|
|
||||||
|
|
||||||
|
class ClientRoom:
|
||||||
|
def __init__(self, room_id):
|
||||||
|
self._room_id = room_id
|
||||||
|
self._clients: List[api.chat.ChatHandler] = []
|
||||||
|
self._auto_translate_count = 0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def room_id(self):
|
||||||
|
return self._room_id
|
||||||
|
|
||||||
|
@property
|
||||||
|
def client_count(self):
|
||||||
|
return len(self._clients)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def need_translate(self):
|
||||||
|
return self._auto_translate_count > 0
|
||||||
|
|
||||||
|
def add_client(self, client: 'api.chat.ChatHandler'):
|
||||||
|
logger.info('room=%d addding client %s', self._room_id, client.request.remote_ip)
|
||||||
|
self._clients.append(client)
|
||||||
|
if client.auto_translate:
|
||||||
|
self._auto_translate_count += 1
|
||||||
|
logger.info('room=%d added client %s, %d clients', self._room_id, client.request.remote_ip,
|
||||||
|
self.client_count)
|
||||||
|
|
||||||
|
def del_client(self, client: 'api.chat.ChatHandler'):
|
||||||
|
client.close()
|
||||||
|
try:
|
||||||
|
self._clients.remove(client)
|
||||||
|
except ValueError:
|
||||||
|
return
|
||||||
|
if client.auto_translate:
|
||||||
|
self._auto_translate_count -= 1
|
||||||
|
logger.info('room=%d removed client %s, %d clients', self._room_id, client.request.remote_ip,
|
||||||
|
self.client_count)
|
||||||
|
|
||||||
|
def clear_clients(self):
|
||||||
|
logger.info('room=%d clearing %d clients', self._room_id, self.client_count)
|
||||||
|
for client in self._clients:
|
||||||
|
client.close()
|
||||||
|
self._clients.clear()
|
||||||
|
self._auto_translate_count = 0
|
||||||
|
|
||||||
|
def send_cmd_data(self, cmd, data):
|
||||||
|
body = api.chat.make_message_body(cmd, data)
|
||||||
|
for client in self._clients:
|
||||||
|
client.send_body_no_raise(body)
|
||||||
|
|
||||||
|
def send_cmd_data_if(self, filterer: Callable[['api.chat.ChatHandler'], bool], cmd, data):
|
||||||
|
body = api.chat.make_message_body(cmd, data)
|
||||||
|
for client in filter(filterer, self._clients):
|
||||||
|
client.send_body_no_raise(body)
|
||||||
|
|
||||||
|
|
||||||
|
class LiveMsgHandler(blivedm.BaseHandler):
|
||||||
# 重新定义XXX_callback是为了减少对字段名的依赖,防止B站改字段名
|
# 重新定义XXX_callback是为了减少对字段名的依赖,防止B站改字段名
|
||||||
def __danmu_msg_callback(self, client: blivedm.BLiveClient, command: dict):
|
def __danmu_msg_callback(self, client: LiveClient, command: dict):
|
||||||
info = command['info']
|
info = command['info']
|
||||||
if len(info[3]) != 0:
|
if len(info[3]) != 0:
|
||||||
medal_level = info[3][0]
|
medal_level = info[3][0]
|
||||||
@ -60,7 +207,7 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler):
|
|||||||
)
|
)
|
||||||
return self._on_danmaku(client, message)
|
return self._on_danmaku(client, message)
|
||||||
|
|
||||||
def __send_gift_callback(self, client: blivedm.BLiveClient, command: dict):
|
def __send_gift_callback(self, client: LiveClient, command: dict):
|
||||||
data = command['data']
|
data = command['data']
|
||||||
message = blivedm.GiftMessage(
|
message = blivedm.GiftMessage(
|
||||||
gift_name=data['giftName'],
|
gift_name=data['giftName'],
|
||||||
@ -74,7 +221,7 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler):
|
|||||||
)
|
)
|
||||||
return self._on_gift(client, message)
|
return self._on_gift(client, message)
|
||||||
|
|
||||||
def __guard_buy_callback(self, client: blivedm.BLiveClient, command: dict):
|
def __guard_buy_callback(self, client: LiveClient, command: dict):
|
||||||
data = command['data']
|
data = command['data']
|
||||||
message = blivedm.GuardBuyMessage(
|
message = blivedm.GuardBuyMessage(
|
||||||
uid=data['uid'],
|
uid=data['uid'],
|
||||||
@ -84,7 +231,7 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler):
|
|||||||
)
|
)
|
||||||
return self._on_buy_guard(client, message)
|
return self._on_buy_guard(client, message)
|
||||||
|
|
||||||
def __super_chat_message_callback(self, client: blivedm.BLiveClient, command: dict):
|
def __super_chat_message_callback(self, client: LiveClient, command: dict):
|
||||||
data = command['data']
|
data = command['data']
|
||||||
message = blivedm.SuperChatMessage(
|
message = blivedm.SuperChatMessage(
|
||||||
price=data['price'],
|
price=data['price'],
|
||||||
@ -105,37 +252,18 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler):
|
|||||||
'SUPER_CHAT_MESSAGE': __super_chat_message_callback
|
'SUPER_CHAT_MESSAGE': __super_chat_message_callback
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, room_id):
|
async def _on_danmaku(self, client: LiveClient, message: blivedm.DanmakuMessage):
|
||||||
super().__init__(room_id, session=utils.request.http_session, heartbeat_interval=self.HEARTBEAT_INTERVAL)
|
asyncio.ensure_future(self.__on_danmaku(client, message))
|
||||||
self.add_handler(self)
|
|
||||||
self.clients: List[api.chat.ChatHandler] = []
|
|
||||||
self.auto_translate_count = 0
|
|
||||||
|
|
||||||
async def init_room(self):
|
async def __on_danmaku(self, client: LiveClient, message: blivedm.DanmakuMessage):
|
||||||
await super().init_room()
|
# 先异步调用再获取房间,因为返回时房间可能已经不存在了
|
||||||
return True
|
avatar_url = await services.avatar.get_avatar_url(message.uid)
|
||||||
|
|
||||||
def send_message(self, cmd, data):
|
room = client_room_manager.get_room(client.tmp_room_id)
|
||||||
body = json.dumps({'cmd': cmd, 'data': data})
|
if room is None:
|
||||||
for client in self.clients:
|
return
|
||||||
try:
|
|
||||||
client.write_message(body)
|
|
||||||
except tornado.websocket.WebSocketClosedError:
|
|
||||||
room_manager.del_client(self.room_id, client)
|
|
||||||
|
|
||||||
def send_message_if(self, can_send_func: Callable[['api.chat.ChatHandler'], bool], cmd, data):
|
if message.uid == client.room_owner_uid:
|
||||||
body = json.dumps({'cmd': cmd, 'data': data})
|
|
||||||
for client in filter(can_send_func, self.clients):
|
|
||||||
try:
|
|
||||||
client.write_message(body)
|
|
||||||
except tornado.websocket.WebSocketClosedError:
|
|
||||||
room_manager.del_client(self.room_id, client)
|
|
||||||
|
|
||||||
async def _on_danmaku(self, client: blivedm.BLiveClient, message: blivedm.DanmakuMessage):
|
|
||||||
asyncio.ensure_future(self.__on_danmaku(message))
|
|
||||||
|
|
||||||
async def __on_danmaku(self, message: blivedm.DanmakuMessage):
|
|
||||||
if message.uid == self.room_owner_uid:
|
|
||||||
author_type = 3 # 主播
|
author_type = 3 # 主播
|
||||||
elif message.admin:
|
elif message.admin:
|
||||||
author_type = 2 # 房管
|
author_type = 2 # 房管
|
||||||
@ -153,7 +281,7 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler):
|
|||||||
content_type = api.chat.ContentType.TEXT
|
content_type = api.chat.ContentType.TEXT
|
||||||
content_type_params = None
|
content_type_params = None
|
||||||
|
|
||||||
need_translate = self._need_translate(message.msg)
|
need_translate = self._need_translate(message.msg, room)
|
||||||
if need_translate:
|
if need_translate:
|
||||||
translation = services.translate.get_translation_from_cache(message.msg)
|
translation = services.translate.get_translation_from_cache(message.msg)
|
||||||
if translation is None:
|
if translation is None:
|
||||||
@ -164,10 +292,10 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler):
|
|||||||
else:
|
else:
|
||||||
translation = ''
|
translation = ''
|
||||||
|
|
||||||
id_ = uuid.uuid4().hex
|
msg_id = uuid.uuid4().hex
|
||||||
# 为了节省带宽用list而不是dict
|
# 为了节省带宽用list而不是dict
|
||||||
self.send_message(api.chat.Command.ADD_TEXT, api.chat.make_text_message(
|
room.send_cmd_data(api.chat.Command.ADD_TEXT, api.chat.make_text_message_data(
|
||||||
avatar_url=await services.avatar.get_avatar_url(message.uid),
|
avatar_url=avatar_url,
|
||||||
timestamp=int(message.timestamp / 1000),
|
timestamp=int(message.timestamp / 1000),
|
||||||
author_name=message.uname,
|
author_name=message.uname,
|
||||||
author_type=author_type,
|
author_type=author_type,
|
||||||
@ -177,24 +305,31 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler):
|
|||||||
author_level=message.user_level,
|
author_level=message.user_level,
|
||||||
is_newbie=message.urank < 10000,
|
is_newbie=message.urank < 10000,
|
||||||
is_mobile_verified=message.mobile_verify,
|
is_mobile_verified=message.mobile_verify,
|
||||||
medal_level=0 if message.medal_room_id != self.room_id else message.medal_level,
|
medal_level=0 if message.medal_room_id != client.room_id else message.medal_level,
|
||||||
id_=id_,
|
id_=msg_id,
|
||||||
translation=translation,
|
translation=translation,
|
||||||
content_type=content_type,
|
content_type=content_type,
|
||||||
content_type_params=content_type_params,
|
content_type_params=content_type_params,
|
||||||
))
|
))
|
||||||
|
|
||||||
if need_translate:
|
if need_translate:
|
||||||
await self._translate_and_response(message.msg, id_)
|
await self._translate_and_response(message.msg, room.room_id, msg_id)
|
||||||
|
|
||||||
async def _on_gift(self, client: blivedm.BLiveClient, message: blivedm.GiftMessage):
|
async def _on_gift(self, client: LiveClient, message: blivedm.GiftMessage):
|
||||||
avatar_url = services.avatar.process_avatar_url(message.face)
|
avatar_url = services.avatar.process_avatar_url(message.face)
|
||||||
|
# 服务器白给的头像URL,直接缓存
|
||||||
services.avatar.update_avatar_cache(message.uid, avatar_url)
|
services.avatar.update_avatar_cache(message.uid, avatar_url)
|
||||||
if message.coin_type != 'gold': # 丢人
|
|
||||||
|
# 丢人
|
||||||
|
if message.coin_type != 'gold':
|
||||||
return
|
return
|
||||||
id_ = uuid.uuid4().hex
|
|
||||||
self.send_message(api.chat.Command.ADD_GIFT, {
|
room = client_room_manager.get_room(client.tmp_room_id)
|
||||||
'id': id_,
|
if room is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
room.send_cmd_data(api.chat.Command.ADD_GIFT, {
|
||||||
|
'id': uuid.uuid4().hex,
|
||||||
'avatarUrl': avatar_url,
|
'avatarUrl': avatar_url,
|
||||||
'timestamp': message.timestamp,
|
'timestamp': message.timestamp,
|
||||||
'authorName': message.uname,
|
'authorName': message.uname,
|
||||||
@ -203,24 +338,36 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler):
|
|||||||
'num': message.num
|
'num': message.num
|
||||||
})
|
})
|
||||||
|
|
||||||
async def _on_buy_guard(self, client: blivedm.BLiveClient, message: blivedm.GuardBuyMessage):
|
async def _on_buy_guard(self, client: LiveClient, message: blivedm.GuardBuyMessage):
|
||||||
asyncio.ensure_future(self.__on_buy_guard(message))
|
asyncio.ensure_future(self.__on_buy_guard(client, message))
|
||||||
|
|
||||||
async def __on_buy_guard(self, message: blivedm.GuardBuyMessage):
|
@staticmethod
|
||||||
id_ = uuid.uuid4().hex
|
async def __on_buy_guard(client: LiveClient, message: blivedm.GuardBuyMessage):
|
||||||
self.send_message(api.chat.Command.ADD_MEMBER, {
|
# 先异步调用再获取房间,因为返回时房间可能已经不存在了
|
||||||
'id': id_,
|
avatar_url = await services.avatar.get_avatar_url(message.uid)
|
||||||
'avatarUrl': await services.avatar.get_avatar_url(message.uid),
|
|
||||||
|
room = client_room_manager.get_room(client.tmp_room_id)
|
||||||
|
if room is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
room.send_cmd_data(api.chat.Command.ADD_MEMBER, {
|
||||||
|
'id': uuid.uuid4().hex,
|
||||||
|
'avatarUrl': avatar_url,
|
||||||
'timestamp': message.start_time,
|
'timestamp': message.start_time,
|
||||||
'authorName': message.username,
|
'authorName': message.username,
|
||||||
'privilegeType': message.guard_level
|
'privilegeType': message.guard_level
|
||||||
})
|
})
|
||||||
|
|
||||||
async def _on_super_chat(self, client: blivedm.BLiveClient, message: blivedm.SuperChatMessage):
|
async def _on_super_chat(self, client: LiveClient, message: blivedm.SuperChatMessage):
|
||||||
avatar_url = services.avatar.process_avatar_url(message.face)
|
avatar_url = services.avatar.process_avatar_url(message.face)
|
||||||
|
# 服务器白给的头像URL,直接缓存
|
||||||
services.avatar.update_avatar_cache(message.uid, avatar_url)
|
services.avatar.update_avatar_cache(message.uid, avatar_url)
|
||||||
|
|
||||||
need_translate = self._need_translate(message.message)
|
room = client_room_manager.get_room(client.tmp_room_id)
|
||||||
|
if room is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
need_translate = self._need_translate(message.message, room)
|
||||||
if need_translate:
|
if need_translate:
|
||||||
translation = services.translate.get_translation_from_cache(message.message)
|
translation = services.translate.get_translation_from_cache(message.message)
|
||||||
if translation is None:
|
if translation is None:
|
||||||
@ -231,9 +378,9 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler):
|
|||||||
else:
|
else:
|
||||||
translation = ''
|
translation = ''
|
||||||
|
|
||||||
id_ = str(message.id)
|
msg_id = str(message.id)
|
||||||
self.send_message(api.chat.Command.ADD_SUPER_CHAT, {
|
room.send_cmd_data(api.chat.Command.ADD_SUPER_CHAT, {
|
||||||
'id': id_,
|
'id': msg_id,
|
||||||
'avatarUrl': avatar_url,
|
'avatarUrl': avatar_url,
|
||||||
'timestamp': message.start_time,
|
'timestamp': message.start_time,
|
||||||
'authorName': message.uname,
|
'authorName': message.uname,
|
||||||
@ -243,94 +390,42 @@ class Room(blivedm.BLiveClient, blivedm.BaseHandler):
|
|||||||
})
|
})
|
||||||
|
|
||||||
if need_translate:
|
if need_translate:
|
||||||
asyncio.ensure_future(self._translate_and_response(message.message, id_))
|
asyncio.ensure_future(self._translate_and_response(message.message, room.room_id, msg_id))
|
||||||
|
|
||||||
async def _on_super_chat_delete(self, client: blivedm.BLiveClient, message: blivedm.SuperChatDeleteMessage):
|
async def _on_super_chat_delete(self, client: LiveClient, message: blivedm.SuperChatDeleteMessage):
|
||||||
self.send_message(api.chat.Command.ADD_SUPER_CHAT, {
|
room = client_room_manager.get_room(client.tmp_room_id)
|
||||||
|
if room is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
room.send_cmd_data(api.chat.Command.ADD_SUPER_CHAT, {
|
||||||
'ids': list(map(str, message.ids))
|
'ids': list(map(str, message.ids))
|
||||||
})
|
})
|
||||||
|
|
||||||
def _need_translate(self, text):
|
@staticmethod
|
||||||
|
def _need_translate(text, room: ClientRoom):
|
||||||
cfg = config.get_config()
|
cfg = config.get_config()
|
||||||
return (
|
return (
|
||||||
cfg.enable_translate
|
cfg.enable_translate
|
||||||
and (not cfg.allow_translate_rooms or self.room_id in cfg.allow_translate_rooms)
|
and room.need_translate
|
||||||
and self.auto_translate_count > 0
|
and (not cfg.allow_translate_rooms or room.room_id in cfg.allow_translate_rooms)
|
||||||
and services.translate.need_translate(text)
|
and services.translate.need_translate(text)
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _translate_and_response(self, text, msg_id):
|
@staticmethod
|
||||||
|
async def _translate_and_response(text, room_id, msg_id):
|
||||||
translation = await services.translate.translate(text)
|
translation = await services.translate.translate(text)
|
||||||
if translation is None:
|
if translation is None:
|
||||||
return
|
return
|
||||||
self.send_message_if(
|
|
||||||
|
room = client_room_manager.get_room(room_id)
|
||||||
|
if room is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
room.send_cmd_data_if(
|
||||||
lambda client: client.auto_translate,
|
lambda client: client.auto_translate,
|
||||||
api.chat.Command.UPDATE_TRANSLATION,
|
api.chat.Command.UPDATE_TRANSLATION,
|
||||||
api.chat.make_translation_message(
|
api.chat.make_translation_message_data(
|
||||||
msg_id,
|
msg_id,
|
||||||
translation
|
translation
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class RoomManager:
|
|
||||||
def __init__(self):
|
|
||||||
self._rooms: Dict[int, Room] = {}
|
|
||||||
|
|
||||||
async def add_client(self, room_id, client: 'api.chat.ChatHandler'):
|
|
||||||
if room_id not in self._rooms:
|
|
||||||
if not await self._add_room(room_id):
|
|
||||||
client.close()
|
|
||||||
return
|
|
||||||
room = self._rooms.get(room_id, None)
|
|
||||||
if room is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
room.clients.append(client)
|
|
||||||
logger.info('%d clients in room %s', len(room.clients), room_id)
|
|
||||||
if client.auto_translate:
|
|
||||||
room.auto_translate_count += 1
|
|
||||||
|
|
||||||
await client.on_join_room()
|
|
||||||
|
|
||||||
def del_client(self, room_id, client: 'api.chat.ChatHandler'):
|
|
||||||
room = self._rooms.get(room_id, None)
|
|
||||||
if room is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
room.clients.remove(client)
|
|
||||||
except ValueError:
|
|
||||||
# _add_room未完成,没有执行到room.clients.append
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
logger.info('%d clients in room %s', len(room.clients), room_id)
|
|
||||||
if client.auto_translate:
|
|
||||||
room.auto_translate_count = max(0, room.auto_translate_count - 1)
|
|
||||||
|
|
||||||
if not room.clients:
|
|
||||||
self._del_room(room_id)
|
|
||||||
|
|
||||||
async def _add_room(self, room_id):
|
|
||||||
if room_id in self._rooms:
|
|
||||||
return True
|
|
||||||
logger.info('Creating room %d', room_id)
|
|
||||||
self._rooms[room_id] = room = Room(room_id)
|
|
||||||
if await room.init_room():
|
|
||||||
room.start()
|
|
||||||
logger.info('%d rooms', len(self._rooms))
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
self._del_room(room_id)
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _del_room(self, room_id):
|
|
||||||
room = self._rooms.get(room_id, None)
|
|
||||||
if room is None:
|
|
||||||
return
|
|
||||||
logger.info('Removing room %d', room_id)
|
|
||||||
for client in room.clients:
|
|
||||||
client.close()
|
|
||||||
asyncio.ensure_future(room.stop_and_close())
|
|
||||||
self._rooms.pop(room_id, None)
|
|
||||||
logger.info('%d rooms', len(self._rooms))
|
|
||||||
|
Loading…
Reference in New Issue
Block a user