diff --git a/src/blrec/application.py b/src/blrec/application.py index a13a73c..ff3333c 100644 --- a/src/blrec/application.py +++ b/src/blrec/application.py @@ -1,43 +1,36 @@ -import os -import logging import asyncio +import logging +import os from typing import Iterator, List, Optional import attr import psutil from . import __prog__, __version__ -from .flv.operators import MetaData, StreamProfile -from .disk_space import SpaceMonitor, SpaceReclaimer from .bili.helpers import ensure_room_id +from .disk_space import SpaceMonitor, SpaceReclaimer +from .event.event_submitters import SpaceEventSubmitter +from .exception import ExceptionHandler, ExistsError, exception_callback +from .flv.operators import MetaData, StreamProfile +from .notification import ( + BarkNotifier, + EmailNotifier, + PushdeerNotifier, + PushplusNotifier, + ServerchanNotifier, + TelegramNotifier, +) +from .setting import Settings, SettingsIn, SettingsManager, SettingsOut, TaskOptions +from .setting.typing import KeySetOfSettings from .task import ( + DanmakuFileDetail, RecordTaskManager, TaskData, TaskParam, VideoFileDetail, - DanmakuFileDetail, -) -from .exception import ExistsError, ExceptionHandler, exception_callback -from .event.event_submitters import SpaceEventSubmitter -from .setting import ( - SettingsManager, - Settings, - SettingsIn, - SettingsOut, - TaskOptions, -) -from .setting.typing import KeySetOfSettings -from .notification import ( - EmailNotifier, - ServerchanNotifier, - PushdeerNotifier, - PushplusNotifier, - TelegramNotifier, - BarkNotifier, ) from .webhook import WebHookEmitter - logger = logging.getLogger(__name__) @@ -105,6 +98,7 @@ class Application: await self.exit() async def launch(self) -> None: + logger.info('Launching Application...') self._setup() logger.debug(f'Default umask {os.umask(000)}') logger.info(f'Launched Application v{__version__}') @@ -112,10 +106,12 @@ class Application: task.add_done_callback(exception_callback) async def exit(self) -> None: + logger.info('Exiting Application...') await self._exit() logger.info('Exited Application') async def abort(self) -> None: + logger.info('Aborting Application...') await self._exit(force=True) logger.info('Aborted Application') @@ -128,6 +124,7 @@ class Application: logger.info('Restarting Application...') await self.exit() await self.launch() + logger.info('Restarted Application') def has_task(self, room_id: int) -> bool: return self._task_manager.has_task(room_id) @@ -136,9 +133,7 @@ class Application: room_id = await ensure_room_id(room_id) if self._task_manager.has_task(room_id): - raise ExistsError( - f'a task for the room {room_id} is already existed' - ) + raise ExistsError(f'a task for the room {room_id} is already existed') settings = self._settings_manager.find_task_settings(room_id) if not settings: @@ -214,9 +209,7 @@ class Application: await self._settings_manager.mark_task_recorder_enabled(room_id) logger.info(f'Successfully enabled recorder for task {room_id}') - async def disable_task_recorder( - self, room_id: int, force: bool = False - ) -> None: + async def disable_task_recorder(self, room_id: int, force: bool = False) -> None: logger.info(f'Disabling recorder for task {room_id}...') await self._task_manager.disable_task_recorder(room_id, force) await self._settings_manager.mark_task_recorder_disabled(room_id) @@ -249,9 +242,7 @@ class Application: def get_task_stream_profile(self, room_id: int) -> StreamProfile: return self._task_manager.get_task_stream_profile(room_id) - def get_task_video_file_details( - self, room_id: int - ) -> Iterator[VideoFileDetail]: + def get_task_video_file_details(self, room_id: int) -> Iterator[VideoFileDetail]: yield from self._task_manager.get_task_video_file_details(room_id) def get_task_danmaku_file_details( @@ -291,9 +282,7 @@ class Application: async def change_task_options( self, room_id: int, options: TaskOptions ) -> TaskOptions: - return await self._settings_manager.change_task_options( - room_id, options - ) + return await self._settings_manager.change_task_options(room_id, options) def _setup(self) -> None: self._setup_logger() @@ -320,9 +309,7 @@ class Application: self._space_event_submitter = SpaceEventSubmitter(self._space_monitor) def _setup_space_reclaimer(self) -> None: - self._space_reclaimer = SpaceReclaimer( - self._space_monitor, self._out_dir, - ) + self._space_reclaimer = SpaceReclaimer(self._space_monitor, self._out_dir) self._settings_manager.apply_space_reclaimer_settings() self._space_reclaimer.enable() diff --git a/src/blrec/task/task_manager.py b/src/blrec/task/task_manager.py index e129fc8..5c5e3b0 100644 --- a/src/blrec/task/task_manager.py +++ b/src/blrec/task/task_manager.py @@ -54,13 +54,14 @@ class RecordTaskManager: logger.info('Load all tasks complete') async def destroy_all_tasks(self) -> None: - logger.info('Destroying all tasks...') - if not self._tasks: - return - await asyncio.wait([t.destroy() for t in self._tasks.values() if t.ready]) + logger.debug('Destroying all tasks...') + for task in self._tasks.values(): + if not task.ready: + continue + await task.destroy() self._tasks.clear() malloc_trim(0) - logger.info('Successfully destroyed all task') + logger.debug('Successfully destroyed all task') def has_task(self, room_id: int) -> bool: return room_id in self._tasks @@ -113,74 +114,110 @@ class RecordTaskManager: logger.info(f'Successfully added task {settings.room_id}') async def remove_task(self, room_id: int) -> None: + logger.debug(f'Removing task {room_id}...') task = self._get_task(room_id, check_ready=True) await task.disable_recorder(force=True) await task.disable_monitor() await task.destroy() del self._tasks[room_id] malloc_trim(0) + logger.debug(f'Removed task {room_id}') async def remove_all_tasks(self) -> None: - coros = [self.remove_task(i) for i, t in self._tasks.items() if t.ready] - if coros: - await asyncio.wait(coros) + logger.debug('Removing all tasks...') + for room_id, task in self._tasks.items(): + if not task.ready: + continue + await self.remove_task(room_id) malloc_trim(0) + logger.debug('Removed all tasks') async def start_task(self, room_id: int) -> None: + logger.debug(f'Starting task {room_id}...') task = self._get_task(room_id, check_ready=True) await task.update_info() await task.enable_monitor() await task.enable_recorder() + logger.debug(f'Started task {room_id}') async def stop_task(self, room_id: int, force: bool = False) -> None: + logger.debug(f'Stopping task {room_id}...') task = self._get_task(room_id, check_ready=True) await task.disable_recorder(force) await task.disable_monitor() + logger.debug(f'Stopped task {room_id}') async def start_all_tasks(self) -> None: - await self.update_all_task_infos() - await self.enable_all_task_monitors() - await self.enable_all_task_recorders() + logger.debug('Starting all tasks...') + for room_id, task in self._tasks.items(): + if not task.ready: + continue + await self.start_task(room_id) + logger.debug('Started all tasks') async def stop_all_tasks(self, force: bool = False) -> None: - await self.disable_all_task_recorders(force) - await self.disable_all_task_monitors() + logger.debug('Stopping all tasks...') + for room_id, task in self._tasks.items(): + if not task.ready: + continue + await self.stop_task(room_id, force=force) + logger.debug('Stopped all tasks') async def enable_task_monitor(self, room_id: int) -> None: + logger.debug(f'Enabling live monitor for task {room_id}...') task = self._get_task(room_id, check_ready=True) await task.enable_monitor() + logger.debug(f'Enabled live monitor for task {room_id}') async def disable_task_monitor(self, room_id: int) -> None: + logger.debug(f'Disabling live monitor for task {room_id}...') task = self._get_task(room_id, check_ready=True) await task.disable_monitor() + logger.debug(f'Disabled live monitor for task {room_id}') async def enable_all_task_monitors(self) -> None: - coros = [t.enable_monitor() for t in self._tasks.values() if t.ready] - if coros: - await asyncio.wait(coros) + logger.debug('Enabling live monitor for all tasks...') + for room_id, task in self._tasks.items(): + if not task.ready: + continue + await self.enable_task_monitor(room_id) + logger.debug('Enabled live monitor for all tasks') async def disable_all_task_monitors(self) -> None: - coros = [t.disable_monitor() for t in self._tasks.values() if t.ready] - if coros: - await asyncio.wait(coros) + logger.debug('Disabling live monitor for all tasks...') + for room_id, task in self._tasks.items(): + if not task.ready: + continue + await self.disable_task_monitor(room_id) + logger.debug('Disabled live monitor for all tasks') async def enable_task_recorder(self, room_id: int) -> None: + logger.debug(f'Enabling recorder for task {room_id}...') task = self._get_task(room_id, check_ready=True) await task.enable_recorder() + logger.debug(f'Enabled recorder for task {room_id}') async def disable_task_recorder(self, room_id: int, force: bool = False) -> None: + logger.debug(f'Disabling recorder for task {room_id}...') task = self._get_task(room_id, check_ready=True) await task.disable_recorder(force) + logger.debug(f'Disabled recorder for task {room_id}') async def enable_all_task_recorders(self) -> None: - coros = [t.enable_recorder() for t in self._tasks.values() if t.ready] - if coros: - await asyncio.wait(coros) + logger.debug('Enabling recorder for all tasks...') + for room_id, task in self._tasks.items(): + if not task.ready: + continue + await self.enable_task_recorder(room_id) + logger.debug('Enabled recorder for all tasks') async def disable_all_task_recorders(self, force: bool = False) -> None: - coros = [t.disable_recorder(force) for t in self._tasks.values() if t.ready] - if coros: - await asyncio.wait(coros) + logger.debug('Disabling recorder for all tasks...') + for room_id, task in self._tasks.items(): + if not task.ready: + continue + await self.disable_task_recorder(room_id, force=force) + logger.debug('Disabled recorder for all tasks') def get_task_data(self, room_id: int) -> TaskData: task = self._get_task(room_id, check_ready=True) @@ -221,15 +258,18 @@ class RecordTaskManager: return task.cut_stream() async def update_task_info(self, room_id: int) -> None: + logger.debug(f'Updating info for task {room_id}...') task = self._get_task(room_id, check_ready=True) await task.update_info(raise_exception=True) + logger.debug(f'Updated info for task {room_id}') async def update_all_task_infos(self) -> None: - coros = [ - t.update_info(raise_exception=True) for t in self._tasks.values() if t.ready - ] - if coros: - await asyncio.wait(coros) + logger.debug('Updating info for all tasks...') + for room_id, task in self._tasks.items(): + if not task.ready: + continue + await self.update_task_info(room_id) + logger.debug('Updated info for all tasks') def apply_task_bili_api_settings( self, room_id: int, settings: BiliApiSettings