refactor: refactor code to compatible with Python 3.11

This commit is contained in:
acgnhik 2023-02-19 13:19:20 +08:00
parent b80019a258
commit 97b1b3cd02
2 changed files with 96 additions and 69 deletions

View File

@ -1,43 +1,36 @@
import os
import logging
import asyncio
import logging
import os
from typing import Iterator, List, Optional
import attr
import psutil
from . import __prog__, __version__
from .flv.operators import MetaData, StreamProfile
from .disk_space import SpaceMonitor, SpaceReclaimer
from .bili.helpers import ensure_room_id
from .disk_space import SpaceMonitor, SpaceReclaimer
from .event.event_submitters import SpaceEventSubmitter
from .exception import ExceptionHandler, ExistsError, exception_callback
from .flv.operators import MetaData, StreamProfile
from .notification import (
BarkNotifier,
EmailNotifier,
PushdeerNotifier,
PushplusNotifier,
ServerchanNotifier,
TelegramNotifier,
)
from .setting import Settings, SettingsIn, SettingsManager, SettingsOut, TaskOptions
from .setting.typing import KeySetOfSettings
from .task import (
DanmakuFileDetail,
RecordTaskManager,
TaskData,
TaskParam,
VideoFileDetail,
DanmakuFileDetail,
)
from .exception import ExistsError, ExceptionHandler, exception_callback
from .event.event_submitters import SpaceEventSubmitter
from .setting import (
SettingsManager,
Settings,
SettingsIn,
SettingsOut,
TaskOptions,
)
from .setting.typing import KeySetOfSettings
from .notification import (
EmailNotifier,
ServerchanNotifier,
PushdeerNotifier,
PushplusNotifier,
TelegramNotifier,
BarkNotifier,
)
from .webhook import WebHookEmitter
logger = logging.getLogger(__name__)
@ -105,6 +98,7 @@ class Application:
await self.exit()
async def launch(self) -> None:
logger.info('Launching Application...')
self._setup()
logger.debug(f'Default umask {os.umask(000)}')
logger.info(f'Launched Application v{__version__}')
@ -112,10 +106,12 @@ class Application:
task.add_done_callback(exception_callback)
async def exit(self) -> None:
logger.info('Exiting Application...')
await self._exit()
logger.info('Exited Application')
async def abort(self) -> None:
logger.info('Aborting Application...')
await self._exit(force=True)
logger.info('Aborted Application')
@ -128,6 +124,7 @@ class Application:
logger.info('Restarting Application...')
await self.exit()
await self.launch()
logger.info('Restarted Application')
def has_task(self, room_id: int) -> bool:
return self._task_manager.has_task(room_id)
@ -136,9 +133,7 @@ class Application:
room_id = await ensure_room_id(room_id)
if self._task_manager.has_task(room_id):
raise ExistsError(
f'a task for the room {room_id} is already existed'
)
raise ExistsError(f'a task for the room {room_id} is already existed')
settings = self._settings_manager.find_task_settings(room_id)
if not settings:
@ -214,9 +209,7 @@ class Application:
await self._settings_manager.mark_task_recorder_enabled(room_id)
logger.info(f'Successfully enabled recorder for task {room_id}')
async def disable_task_recorder(
self, room_id: int, force: bool = False
) -> None:
async def disable_task_recorder(self, room_id: int, force: bool = False) -> None:
logger.info(f'Disabling recorder for task {room_id}...')
await self._task_manager.disable_task_recorder(room_id, force)
await self._settings_manager.mark_task_recorder_disabled(room_id)
@ -249,9 +242,7 @@ class Application:
def get_task_stream_profile(self, room_id: int) -> StreamProfile:
return self._task_manager.get_task_stream_profile(room_id)
def get_task_video_file_details(
self, room_id: int
) -> Iterator[VideoFileDetail]:
def get_task_video_file_details(self, room_id: int) -> Iterator[VideoFileDetail]:
yield from self._task_manager.get_task_video_file_details(room_id)
def get_task_danmaku_file_details(
@ -291,9 +282,7 @@ class Application:
async def change_task_options(
self, room_id: int, options: TaskOptions
) -> TaskOptions:
return await self._settings_manager.change_task_options(
room_id, options
)
return await self._settings_manager.change_task_options(room_id, options)
def _setup(self) -> None:
self._setup_logger()
@ -320,9 +309,7 @@ class Application:
self._space_event_submitter = SpaceEventSubmitter(self._space_monitor)
def _setup_space_reclaimer(self) -> None:
self._space_reclaimer = SpaceReclaimer(
self._space_monitor, self._out_dir,
)
self._space_reclaimer = SpaceReclaimer(self._space_monitor, self._out_dir)
self._settings_manager.apply_space_reclaimer_settings()
self._space_reclaimer.enable()

View File

@ -54,13 +54,14 @@ class RecordTaskManager:
logger.info('Load all tasks complete')
async def destroy_all_tasks(self) -> None:
logger.info('Destroying all tasks...')
if not self._tasks:
return
await asyncio.wait([t.destroy() for t in self._tasks.values() if t.ready])
logger.debug('Destroying all tasks...')
for task in self._tasks.values():
if not task.ready:
continue
await task.destroy()
self._tasks.clear()
malloc_trim(0)
logger.info('Successfully destroyed all task')
logger.debug('Successfully destroyed all task')
def has_task(self, room_id: int) -> bool:
return room_id in self._tasks
@ -113,74 +114,110 @@ class RecordTaskManager:
logger.info(f'Successfully added task {settings.room_id}')
async def remove_task(self, room_id: int) -> None:
logger.debug(f'Removing task {room_id}...')
task = self._get_task(room_id, check_ready=True)
await task.disable_recorder(force=True)
await task.disable_monitor()
await task.destroy()
del self._tasks[room_id]
malloc_trim(0)
logger.debug(f'Removed task {room_id}')
async def remove_all_tasks(self) -> None:
coros = [self.remove_task(i) for i, t in self._tasks.items() if t.ready]
if coros:
await asyncio.wait(coros)
logger.debug('Removing all tasks...')
for room_id, task in self._tasks.items():
if not task.ready:
continue
await self.remove_task(room_id)
malloc_trim(0)
logger.debug('Removed all tasks')
async def start_task(self, room_id: int) -> None:
logger.debug(f'Starting task {room_id}...')
task = self._get_task(room_id, check_ready=True)
await task.update_info()
await task.enable_monitor()
await task.enable_recorder()
logger.debug(f'Started task {room_id}')
async def stop_task(self, room_id: int, force: bool = False) -> None:
logger.debug(f'Stopping task {room_id}...')
task = self._get_task(room_id, check_ready=True)
await task.disable_recorder(force)
await task.disable_monitor()
logger.debug(f'Stopped task {room_id}')
async def start_all_tasks(self) -> None:
await self.update_all_task_infos()
await self.enable_all_task_monitors()
await self.enable_all_task_recorders()
logger.debug('Starting all tasks...')
for room_id, task in self._tasks.items():
if not task.ready:
continue
await self.start_task(room_id)
logger.debug('Started all tasks')
async def stop_all_tasks(self, force: bool = False) -> None:
await self.disable_all_task_recorders(force)
await self.disable_all_task_monitors()
logger.debug('Stopping all tasks...')
for room_id, task in self._tasks.items():
if not task.ready:
continue
await self.stop_task(room_id, force=force)
logger.debug('Stopped all tasks')
async def enable_task_monitor(self, room_id: int) -> None:
logger.debug(f'Enabling live monitor for task {room_id}...')
task = self._get_task(room_id, check_ready=True)
await task.enable_monitor()
logger.debug(f'Enabled live monitor for task {room_id}')
async def disable_task_monitor(self, room_id: int) -> None:
logger.debug(f'Disabling live monitor for task {room_id}...')
task = self._get_task(room_id, check_ready=True)
await task.disable_monitor()
logger.debug(f'Disabled live monitor for task {room_id}')
async def enable_all_task_monitors(self) -> None:
coros = [t.enable_monitor() for t in self._tasks.values() if t.ready]
if coros:
await asyncio.wait(coros)
logger.debug('Enabling live monitor for all tasks...')
for room_id, task in self._tasks.items():
if not task.ready:
continue
await self.enable_task_monitor(room_id)
logger.debug('Enabled live monitor for all tasks')
async def disable_all_task_monitors(self) -> None:
coros = [t.disable_monitor() for t in self._tasks.values() if t.ready]
if coros:
await asyncio.wait(coros)
logger.debug('Disabling live monitor for all tasks...')
for room_id, task in self._tasks.items():
if not task.ready:
continue
await self.disable_task_monitor(room_id)
logger.debug('Disabled live monitor for all tasks')
async def enable_task_recorder(self, room_id: int) -> None:
logger.debug(f'Enabling recorder for task {room_id}...')
task = self._get_task(room_id, check_ready=True)
await task.enable_recorder()
logger.debug(f'Enabled recorder for task {room_id}')
async def disable_task_recorder(self, room_id: int, force: bool = False) -> None:
logger.debug(f'Disabling recorder for task {room_id}...')
task = self._get_task(room_id, check_ready=True)
await task.disable_recorder(force)
logger.debug(f'Disabled recorder for task {room_id}')
async def enable_all_task_recorders(self) -> None:
coros = [t.enable_recorder() for t in self._tasks.values() if t.ready]
if coros:
await asyncio.wait(coros)
logger.debug('Enabling recorder for all tasks...')
for room_id, task in self._tasks.items():
if not task.ready:
continue
await self.enable_task_recorder(room_id)
logger.debug('Enabled recorder for all tasks')
async def disable_all_task_recorders(self, force: bool = False) -> None:
coros = [t.disable_recorder(force) for t in self._tasks.values() if t.ready]
if coros:
await asyncio.wait(coros)
logger.debug('Disabling recorder for all tasks...')
for room_id, task in self._tasks.items():
if not task.ready:
continue
await self.disable_task_recorder(room_id, force=force)
logger.debug('Disabled recorder for all tasks')
def get_task_data(self, room_id: int) -> TaskData:
task = self._get_task(room_id, check_ready=True)
@ -221,15 +258,18 @@ class RecordTaskManager:
return task.cut_stream()
async def update_task_info(self, room_id: int) -> None:
logger.debug(f'Updating info for task {room_id}...')
task = self._get_task(room_id, check_ready=True)
await task.update_info(raise_exception=True)
logger.debug(f'Updated info for task {room_id}')
async def update_all_task_infos(self) -> None:
coros = [
t.update_info(raise_exception=True) for t in self._tasks.values() if t.ready
]
if coros:
await asyncio.wait(coros)
logger.debug('Updating info for all tasks...')
for room_id, task in self._tasks.items():
if not task.ready:
continue
await self.update_task_info(room_id)
logger.debug('Updated info for all tasks')
def apply_task_bili_api_settings(
self, room_id: int, settings: BiliApiSettings