From 72841ab46e3b3ab30541fc177e811cf9c060c194 Mon Sep 17 00:00:00 2001 From: John Smith Date: Wed, 6 Sep 2023 21:34:04 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E9=9B=85=E5=9C=B0=E5=81=9C=E6=9C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- blivedm | 2 +- main.py | 77 ++++++++++++++++++++++++++++++++++++++----- services/avatar.py | 2 +- services/chat.py | 62 ++++++++++++++++++++++++---------- services/translate.py | 25 ++++++-------- update.py | 2 +- utils/request.py | 11 ++++--- 7 files changed, 132 insertions(+), 49 deletions(-) diff --git a/blivedm b/blivedm index 300840f..356837c 160000 --- a/blivedm +++ b/blivedm @@ -1 +1 @@ -Subproject commit 300840fd65d90ba9954e0c13894c6f037476a6f4 +Subproject commit 356837c136589b92fb2d9f2f26ad3ae1a338f39d diff --git a/main.py b/main.py index bdf4d9b..60a3ffb 100644 --- a/main.py +++ b/main.py @@ -1,9 +1,12 @@ # -*- coding: utf-8 -*- import argparse +import asyncio import logging import logging.handlers import os +import signal import webbrowser +from typing import * import tornado.ioloop import tornado.web @@ -20,7 +23,7 @@ import utils.request logger = logging.getLogger(__name__) -routes = [ +ROUTES = [ (r'/api/server_info', api.main.ServerInfoHandler), (r'/api/emoticon', api.main.UploadEmoticonHandler), @@ -29,14 +32,31 @@ routes = [ (r'/api/avatar_url', api.chat.AvatarHandler), (rf'{api.main.EMOTICON_BASE_URL}/(.*)', tornado.web.StaticFileHandler, {'path': api.main.EMOTICON_UPLOAD_PATH}), - (r'/(.*)', api.main.MainHandler, {'path': config.WEB_ROOT}) + (r'/(.*)', api.main.MainHandler, {'path': config.WEB_ROOT}), ] +server: Optional[tornado.httpserver.HTTPServer] = None + +shut_down_event: Optional[asyncio.Event] = None + + +async def main(): + if not init(): + return 1 + try: + await run() + finally: + await shut_down() + return 0 + + +def init(): + init_signal_handlers() -def main(): args = parse_args() init_logging(args.debug) + logger.info('App started, initializing') config.init() utils.request.init() @@ -48,7 +68,27 @@ def main(): update.check_update() - run_server(args.host, args.port, args.debug) + init_server(args.host, args.port, args.debug) + return server is not None + + +def init_signal_handlers(): + global shut_down_event + shut_down_event = asyncio.Event() + + signums = (signal.SIGINT, signal.SIGTERM) + try: + loop = asyncio.get_running_loop() + for signum in signums: + loop.add_signal_handler(signum, on_shut_down_signal) + except NotImplementedError: + # 不太安全,但Windows只能用这个 + for signum in signums: + signal.signal(signum, on_shut_down_signal) + + +def on_shut_down_signal(*_args): + shut_down_event.set() def parse_args(): @@ -76,7 +116,7 @@ def init_logging(debug): logging.getLogger('tornado.access').setLevel(logging.WARNING) -def run_server(host, port, debug): +def init_server(host, port, debug): cfg = config.get_config() if host is None: host = cfg.host @@ -84,13 +124,14 @@ def run_server(host, port, debug): port = cfg.port app = tornado.web.Application( - routes, + ROUTES, websocket_ping_interval=10, debug=debug, autoreload=False ) try: - app.listen( + global server + server = app.listen( port, host, xheaders=cfg.tornado_xheaders, @@ -105,8 +146,26 @@ def run_server(host, port, debug): url = 'http://localhost/' if port == 80 else f'http://localhost:{port}/' webbrowser.open(url) logger.info('Server started: %s:%d', host, port) - tornado.ioloop.IOLoop.current().start() + + +async def run(): + logger.info('Running event loop') + await shut_down_event.wait() + logger.info('Received shutdown signal') + + +async def shut_down(): + logger.info('Closing server') + server.stop() + await server.close_all_connections() + + logger.info('Closing websocket connections') + await services.chat.shut_down() + + await utils.request.shut_down() + + logger.info('App shut down') if __name__ == '__main__': - main() + exit(asyncio.run(main())) diff --git a/services/avatar.py b/services/avatar.py index cf22da4..c30a1ca 100644 --- a/services/avatar.py +++ b/services/avatar.py @@ -42,7 +42,7 @@ def init(): global _avatar_url_cache, _task_queue _avatar_url_cache = cachetools.TTLCache(cfg.avatar_cache_size, 10 * 60) _task_queue = asyncio.Queue(cfg.fetch_avatar_max_queue_size) - asyncio.get_event_loop().create_task(_do_init()) + asyncio.get_running_loop().create_task(_do_init()) async def _do_init(): diff --git a/services/chat.py b/services/chat.py index 4bfb855..20cbfc7 100644 --- a/services/chat.py +++ b/services/chat.py @@ -33,18 +33,33 @@ def init(): _live_msg_handler = LiveMsgHandler() +async def shut_down(): + if client_room_manager is not None: + client_room_manager.shut_down() + if _live_client_manager is not None: + await _live_client_manager.shut_down() + + class LiveClientManager: """管理到B站的连接""" def __init__(self): - self._live_clients: Dict[int, LiveClient] = {} + self._live_clients: Dict[int, WebLiveClient] = {} + self._close_client_futures: Set[asyncio.Future] = set() + + async def shut_down(self): + while len(self._live_clients) != 0: + room_id = next(iter(self._live_clients)) + self.del_live_client(room_id) + + await asyncio.gather(*self._close_client_futures, return_exceptions=True) def add_live_client(self, room_id): if room_id in self._live_clients: return logger.info('room=%d creating live client', room_id) - self._live_clients[room_id] = live_client = LiveClient(room_id) + self._live_clients[room_id] = live_client = WebLiveClient(room_id) live_client.set_handler(_live_msg_handler) - # 直接启动吧,这里不用管init_room失败的情况,万一失败了会在on_stopped_by_exception里删除掉这个客户端 + # 直接启动吧,这里不用管init_room失败的情况,万一失败了会在on_client_stopped里删除掉这个客户端 live_client.start() logger.info('room=%d live client created, %d live clients', room_id, len(self._live_clients)) @@ -54,13 +69,17 @@ class LiveClientManager: return logger.info('room=%d removing live client', room_id) live_client.set_handler(None) - asyncio.create_task(live_client.stop_and_close()) + + future = asyncio.create_task(live_client.stop_and_close()) + self._close_client_futures.add(future) + future.add_done_callback(lambda _future: self._close_client_futures.discard(future)) + logger.info('room=%d live client removed, %d live clients', room_id, len(self._live_clients)) client_room_manager.del_room(room_id) -class LiveClient(blivedm.BLiveClient): +class WebLiveClient(blivedm.BLiveClient): HEARTBEAT_INTERVAL = 10 def __init__(self, room_id): @@ -81,6 +100,15 @@ class ClientRoomManager: # room_id -> timer_handle self._delay_del_timer_handles: Dict[int, asyncio.TimerHandle] = {} + def shut_down(self): + while len(self._rooms) != 0: + room_id = next(iter(self._rooms)) + self.del_room(room_id) + + for timer_handle in self._delay_del_timer_handles.values(): + timer_handle.cancel() + self._delay_del_timer_handles.clear() + def add_client(self, room_id, client: 'api.chat.ChatHandler'): room = self._get_or_add_room(room_id) room.add_client(client) @@ -194,7 +222,7 @@ class ClientRoom: class LiveMsgHandler(blivedm.BaseHandler): # 重新定义XXX_callback是为了减少对字段名的依赖,防止B站改字段名 - def __danmu_msg_callback(self, client: LiveClient, command: dict): + def __danmu_msg_callback(self, client: WebLiveClient, command: dict): info = command['info'] dm_v2 = command.get('dm_v2', '') @@ -241,7 +269,7 @@ class LiveMsgHandler(blivedm.BaseHandler): ) return self._on_danmaku(client, message) - def __send_gift_callback(self, client: LiveClient, command: dict): + def __send_gift_callback(self, client: WebLiveClient, command: dict): data = command['data'] message = dm_web_models.GiftMessage( gift_name=data['giftName'], @@ -255,7 +283,7 @@ class LiveMsgHandler(blivedm.BaseHandler): ) return self._on_gift(client, message) - def __guard_buy_callback(self, client: LiveClient, command: dict): + def __guard_buy_callback(self, client: WebLiveClient, command: dict): data = command['data'] message = dm_web_models.GuardBuyMessage( uid=data['uid'], @@ -265,7 +293,7 @@ class LiveMsgHandler(blivedm.BaseHandler): ) return self._on_buy_guard(client, message) - def __super_chat_message_callback(self, client: LiveClient, command: dict): + def __super_chat_message_callback(self, client: WebLiveClient, command: dict): data = command['data'] message = dm_web_models.SuperChatMessage( price=data['price'], @@ -286,13 +314,13 @@ class LiveMsgHandler(blivedm.BaseHandler): 'SUPER_CHAT_MESSAGE': __super_chat_message_callback } - def on_stopped_by_exception(self, client: LiveClient, exception: Exception): + def on_client_stopped(self, client: WebLiveClient, exception: Optional[Exception]): _live_client_manager.del_live_client(client.tmp_room_id) - def _on_danmaku(self, client: LiveClient, message: dm_web_models.DanmakuMessage): + def _on_danmaku(self, client: WebLiveClient, message: dm_web_models.DanmakuMessage): asyncio.create_task(self.__on_danmaku(client, message)) - async def __on_danmaku(self, client: LiveClient, message: dm_web_models.DanmakuMessage): + async def __on_danmaku(self, client: WebLiveClient, message: dm_web_models.DanmakuMessage): avatar_url = message.face if avatar_url != '': services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url) @@ -375,7 +403,7 @@ class LiveMsgHandler(blivedm.BaseHandler): except (json.JSONDecodeError, TypeError, KeyError): return [] - def _on_gift(self, client: LiveClient, message: dm_web_models.GiftMessage): + def _on_gift(self, client: WebLiveClient, message: dm_web_models.GiftMessage): avatar_url = services.avatar.process_avatar_url(message.face) services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url) @@ -397,11 +425,11 @@ class LiveMsgHandler(blivedm.BaseHandler): 'num': message.num }) - def _on_buy_guard(self, client: LiveClient, message: dm_web_models.GuardBuyMessage): + def _on_buy_guard(self, client: WebLiveClient, message: dm_web_models.GuardBuyMessage): asyncio.create_task(self.__on_buy_guard(client, message)) @staticmethod - async def __on_buy_guard(client: LiveClient, message: dm_web_models.GuardBuyMessage): + async def __on_buy_guard(client: WebLiveClient, message: dm_web_models.GuardBuyMessage): # 先异步调用再获取房间,因为返回时房间可能已经不存在了 avatar_url = await services.avatar.get_avatar_url(message.uid) @@ -417,7 +445,7 @@ class LiveMsgHandler(blivedm.BaseHandler): 'privilegeType': message.guard_level }) - def _on_super_chat(self, client: LiveClient, message: dm_web_models.SuperChatMessage): + def _on_super_chat(self, client: WebLiveClient, message: dm_web_models.SuperChatMessage): avatar_url = services.avatar.process_avatar_url(message.face) services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url) @@ -452,7 +480,7 @@ class LiveMsgHandler(blivedm.BaseHandler): message.message, room.room_id, msg_id, services.translate.Priority.HIGH )) - def _on_super_chat_delete(self, client: LiveClient, message: dm_web_models.SuperChatDeleteMessage): + def _on_super_chat_delete(self, client: WebLiveClient, message: dm_web_models.SuperChatDeleteMessage): room = client_room_manager.get_room(client.tmp_room_id) if room is None: return diff --git a/services/translate.py b/services/translate.py index 93269a9..7ccf3e2 100644 --- a/services/translate.py +++ b/services/translate.py @@ -56,7 +56,7 @@ def init(): _translate_cache = cachetools.LRUCache(cfg.translation_cache_size) # 总队列长度会超过translate_max_queue_size,不用这么严格 _task_queues = [asyncio.Queue(cfg.translate_max_queue_size) for _ in range(len(Priority))] - asyncio.get_event_loop().create_task(_do_init()) + asyncio.get_running_loop().create_task(_do_init()) async def _do_init(): @@ -386,21 +386,16 @@ class TencentTranslateFree(TranslateProvider): return True async def _reinit_coroutine(self): - try: - while True: - logger.debug('TencentTranslateFree reinit') - start_time = datetime.datetime.now() - try: - await self._do_init() - except asyncio.CancelledError: - raise - except BaseException: # noqa - pass - cost_time = (datetime.datetime.now() - start_time).total_seconds() + while True: + logger.debug('TencentTranslateFree reinit') + start_time = datetime.datetime.now() + try: + await self._do_init() + except Exception: # noqa + pass + cost_time = (datetime.datetime.now() - start_time).total_seconds() - await asyncio.sleep(30 - cost_time) - except asyncio.CancelledError: - pass + await asyncio.sleep(30 - cost_time) @property def is_available(self): diff --git a/update.py b/update.py index e55c8d8..db5e61a 100644 --- a/update.py +++ b/update.py @@ -9,7 +9,7 @@ VERSION = 'v1.7.0' def check_update(): - asyncio.get_event_loop().create_task(_do_check_update()) + asyncio.get_running_loop().create_task(_do_check_update()) async def _do_check_update(): diff --git a/utils/request.py b/utils/request.py index 46d76dd..f5179bb 100644 --- a/utils/request.py +++ b/utils/request.py @@ -16,9 +16,10 @@ http_session: Optional[aiohttp.ClientSession] = None def init(): - # ClientSession要在异步函数中创建 - async def do_init(): - global http_session - http_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) + global http_session + http_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) - asyncio.get_event_loop().run_until_complete(do_init()) + +async def shut_down(): + if http_session is not None: + await http_session.close()