mirror of
https://github.com/xfgryujk/blivechat.git
synced 2024-12-26 21:00:15 +08:00
196 lines
7.4 KiB
Python
196 lines
7.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
import configparser
|
|
import logging
|
|
import os
|
|
from typing import *
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
BASE_PATH = os.path.dirname(os.path.realpath(__file__))
|
|
WEB_ROOT = os.path.join(BASE_PATH, 'frontend', 'dist')
|
|
DATA_PATH = os.path.join(BASE_PATH, 'data')
|
|
|
|
CONFIG_PATH_LIST = [
|
|
os.path.join(DATA_PATH, 'config.ini'),
|
|
os.path.join(DATA_PATH, 'config.example.ini')
|
|
]
|
|
|
|
_config: Optional['AppConfig'] = None
|
|
|
|
|
|
def init(cmd_args):
|
|
if reload(cmd_args):
|
|
return
|
|
logger.warning('Using default config')
|
|
|
|
config = AppConfig()
|
|
config.load_cmd_args(cmd_args)
|
|
|
|
global _config
|
|
_config = config
|
|
|
|
|
|
def reload(cmd_args):
|
|
config_path = ''
|
|
for path in CONFIG_PATH_LIST:
|
|
if os.path.exists(path):
|
|
config_path = path
|
|
break
|
|
if config_path == '':
|
|
return False
|
|
|
|
config = AppConfig()
|
|
if not config.load(config_path):
|
|
return False
|
|
config.load_cmd_args(cmd_args)
|
|
|
|
global _config
|
|
_config = config
|
|
return True
|
|
|
|
|
|
def get_config():
|
|
return _config
|
|
|
|
|
|
class AppConfig:
|
|
def __init__(self):
|
|
self.debug = False
|
|
self.host = '127.0.0.1'
|
|
self.port = 12450
|
|
self.database_url = 'sqlite:///data/database.db'
|
|
self.tornado_xheaders = False
|
|
self.loader_url = ''
|
|
self.open_browser_at_startup = True
|
|
self.enable_upload_file = True
|
|
|
|
self.fetch_avatar_max_queue_size = 4
|
|
self.avatar_cache_size = 10000
|
|
|
|
self.open_live_access_key_id = ''
|
|
self.open_live_access_key_secret = ''
|
|
self.open_live_app_id = 0
|
|
|
|
self.enable_translate = True
|
|
self.allow_translate_rooms = set()
|
|
self.translate_max_queue_size = 10
|
|
self.translation_cache_size = 50000
|
|
self.translator_configs = []
|
|
|
|
self.text_emoticons = []
|
|
|
|
@property
|
|
def is_open_live_configured(self):
|
|
return (
|
|
self.open_live_access_key_id != '' and self.open_live_access_key_secret != '' and self.open_live_app_id != 0
|
|
)
|
|
|
|
def load_cmd_args(self, args):
|
|
if args.host is not None:
|
|
self.host = args.host
|
|
if args.port is not None:
|
|
self.port = args.port
|
|
self.debug = args.debug
|
|
|
|
def load(self, path):
|
|
try:
|
|
config = configparser.ConfigParser()
|
|
config.read(path, 'utf-8-sig')
|
|
|
|
self._load_app_config(config)
|
|
self._load_translator_configs(config)
|
|
self._load_text_emoticons(config)
|
|
except Exception: # noqa
|
|
logger.exception('Failed to load config:')
|
|
return False
|
|
return True
|
|
|
|
def _load_app_config(self, config: configparser.ConfigParser):
|
|
app_section = config['app']
|
|
self.host = app_section.get('host', self.host)
|
|
self.port = app_section.getint('port', self.port)
|
|
self.database_url = app_section.get('database_url', self.database_url)
|
|
self.tornado_xheaders = app_section.getboolean('tornado_xheaders', self.tornado_xheaders)
|
|
self.loader_url = app_section.get('loader_url', self.loader_url)
|
|
self.open_browser_at_startup = app_section.getboolean('open_browser_at_startup', self.open_browser_at_startup)
|
|
self.enable_upload_file = app_section.getboolean('enable_upload_file', self.enable_upload_file)
|
|
|
|
self.fetch_avatar_max_queue_size = app_section.getint(
|
|
'fetch_avatar_max_queue_size', self.fetch_avatar_max_queue_size
|
|
)
|
|
self.avatar_cache_size = app_section.getint('avatar_cache_size', self.avatar_cache_size)
|
|
|
|
self.open_live_access_key_id = app_section.get('open_live_access_key_id', self.open_live_access_key_id)
|
|
self.open_live_access_key_secret = app_section.get(
|
|
'open_live_access_key_secret', self.open_live_access_key_secret
|
|
)
|
|
self.open_live_app_id = app_section.getint('open_live_app_id', self.open_live_app_id)
|
|
|
|
self.enable_translate = app_section.getboolean('enable_translate', self.enable_translate)
|
|
self.allow_translate_rooms = _str_to_list(app_section.get('allow_translate_rooms', ''), int, set)
|
|
self.translate_max_queue_size = app_section.getint('translate_max_queue_size', self.translate_max_queue_size)
|
|
self.translation_cache_size = app_section.getint('translation_cache_size', self.translation_cache_size)
|
|
|
|
def _load_translator_configs(self, config: configparser.ConfigParser):
|
|
app_section = config['app']
|
|
section_names = _str_to_list(app_section.get('translator_configs', ''))
|
|
translator_configs = []
|
|
for section_name in section_names:
|
|
try:
|
|
section = config[section_name]
|
|
type_ = section['type']
|
|
|
|
translator_config = {
|
|
'type': type_,
|
|
'query_interval': section.getfloat('query_interval'),
|
|
}
|
|
if type_ == 'TencentTranslateFree':
|
|
translator_config['source_language'] = section['source_language']
|
|
translator_config['target_language'] = section['target_language']
|
|
elif type_ == 'BilibiliTranslateFree':
|
|
pass
|
|
elif type_ == 'TencentTranslate':
|
|
translator_config['source_language'] = section['source_language']
|
|
translator_config['target_language'] = section['target_language']
|
|
translator_config['secret_id'] = section['secret_id']
|
|
translator_config['secret_key'] = section['secret_key']
|
|
translator_config['region'] = section['region']
|
|
elif type_ == 'BaiduTranslate':
|
|
translator_config['source_language'] = section['source_language']
|
|
translator_config['target_language'] = section['target_language']
|
|
translator_config['app_id'] = section['app_id']
|
|
translator_config['secret'] = section['secret']
|
|
elif type_ == 'GeminiTranslate':
|
|
translator_config['proxy'] = section['proxy']
|
|
translator_config['api_key'] = section['api_key']
|
|
translator_config['model_code'] = section['model_code']
|
|
translator_config['prompt'] = section['prompt'].replace('\n', ' ').replace('\\n', '\n')
|
|
translator_config['temperature'] = section.getfloat('temperature')
|
|
else:
|
|
raise ValueError(f'Invalid translator type: {type_}')
|
|
except Exception: # noqa
|
|
logger.exception('Failed to load translator=%s config:', section_name)
|
|
continue
|
|
|
|
translator_configs.append(translator_config)
|
|
self.translator_configs = translator_configs
|
|
|
|
def _load_text_emoticons(self, config: configparser.ConfigParser):
|
|
mappings_section = config['text_emoticon_mappings']
|
|
text_emoticons = []
|
|
for value in mappings_section.values():
|
|
keyword, _, url = value.partition(',')
|
|
text_emoticons.append({'keyword': keyword, 'url': url})
|
|
self.text_emoticons = text_emoticons
|
|
|
|
|
|
def _str_to_list(value, item_type: Type = str, container_type: Type = list):
|
|
value = value.strip()
|
|
if value == '':
|
|
return container_type()
|
|
items = value.split(',')
|
|
items = map(lambda item: item.strip(), items)
|
|
if item_type is not str:
|
|
items = map(lambda item: item_type(item), items)
|
|
return container_type(items)
|