From eaa60a081793e0bb5bc82171430da19a07d0f508 Mon Sep 17 00:00:00 2001 From: acgnhik <acgnhik@outlook.com> Date: Sun, 15 May 2022 14:20:33 +0800 Subject: [PATCH] feat: improve SpaceReclaimer use mtime and atime instead of ctime add REC_TTL env variable --- FAQ.md | 4 +-- src/blrec/disk_space/space_monitor.py | 34 ++++++++++--------------- src/blrec/disk_space/space_reclaimer.py | 29 +++++++++++---------- src/blrec/utils/mixins.py | 19 ++++++++------ 4 files changed, 42 insertions(+), 44 deletions(-) diff --git a/FAQ.md b/FAQ.md index cfb2994..85a7795 100644 --- a/FAQ.md +++ b/FAQ.md @@ -61,11 +61,11 @@ cpu 使用率过高、网络带宽不足或不稳定、硬盘读写慢都会导 ## 怎样才算是旧录播文件? -创建时间超过 24 小时才会被当成旧录播文件在空间不足时被删除。 +修改时间和访问时间都超过 24 小时才会被当成旧录播文件在空间不足时被删除。 ## 空间不足时是怎样删除旧录播文件的? -删除文件是按创建时间的先后进行的,最早创建的最先被删除,直到可用空间不少于所设置的阈值为止。 +删除文件是按修改时间和访问时间的先后进行的,修改时间或访问时间较早的先被删除,直到可用空间不少于所设置的阈值为止。 ## 支持录制付费直播吗? diff --git a/src/blrec/disk_space/space_monitor.py b/src/blrec/disk_space/space_monitor.py index 8271152..8614374 100644 --- a/src/blrec/disk_space/space_monitor.py +++ b/src/blrec/disk_space/space_monitor.py @@ -1,16 +1,14 @@ -import logging import asyncio +import logging import shutil from contextlib import suppress - -from .models import DiskUsage -from .helpers import is_space_enough -from ..event.event_emitter import EventListener, EventEmitter +from ..event.event_emitter import EventEmitter, EventListener from ..exception import exception_callback -from ..utils.mixins import SwitchableMixin from ..logging.room_id import aio_task_with_room_id - +from ..utils.mixins import AsyncStoppableMixin, SwitchableMixin +from .helpers import is_space_enough +from .models import DiskUsage __all__ = 'SpaceMonitor', 'SpaceEventListener' @@ -25,7 +23,9 @@ class SpaceEventListener(EventListener): ... -class SpaceMonitor(EventEmitter[SpaceEventListener], SwitchableMixin): +class SpaceMonitor( + EventEmitter[SpaceEventListener], SwitchableMixin, AsyncStoppableMixin +): def __init__( self, path: str, @@ -40,24 +40,18 @@ class SpaceMonitor(EventEmitter[SpaceEventListener], SwitchableMixin): self._monitoring: bool = False def _do_enable(self) -> None: - asyncio.create_task(self._start()) + asyncio.create_task(self.start()) logger.debug('Enabled space monitor') def _do_disable(self) -> None: - asyncio.create_task(self._stop()) + asyncio.create_task(self.stop()) logger.debug('Disabled space monitor') - async def _start(self) -> None: - if self._monitoring: - return + async def _do_start(self) -> None: self._create_polling_task() - self._monitoring = True - async def _stop(self) -> None: - if not self._monitoring: - return + async def _do_stop(self) -> None: await self._cancel_polling_task() - self._monitoring = False def _create_polling_task(self) -> None: self._polling_task = asyncio.create_task(self._polling_loop()) @@ -78,6 +72,4 @@ class SpaceMonitor(EventEmitter[SpaceEventListener], SwitchableMixin): async def _emit_space_no_enough(self) -> None: usage = DiskUsage(*shutil.disk_usage(self.path)) - await self._emit( - 'space_no_enough', self.path, self.space_threshold, usage - ) + await self._emit('space_no_enough', self.path, self.space_threshold, usage) diff --git a/src/blrec/disk_space/space_reclaimer.py b/src/blrec/disk_space/space_reclaimer.py index 94c82f1..60ecfa6 100644 --- a/src/blrec/disk_space/space_reclaimer.py +++ b/src/blrec/disk_space/space_reclaimer.py @@ -7,19 +7,14 @@ import asyncio from functools import partial from typing import Iterable, List -from tenacity import ( - retry, - wait_none, - stop_after_attempt, - retry_if_exception_type, -) +from tenacity import retry, wait_none, stop_after_attempt, retry_if_exception_type from .helpers import delete_file, is_space_enough from .space_monitor import SpaceMonitor, DiskUsage, SpaceEventListener from ..utils.mixins import SwitchableMixin -__all__ = 'SpaceReclaimer', +__all__ = ('SpaceReclaimer',) logger = logging.getLogger(__name__) @@ -33,11 +28,18 @@ class SpaceReclaimer(SpaceEventListener, SwitchableMixin): space_monitor: SpaceMonitor, path: str, *, + rec_ttl: int = 60 * 60 * 24, recycle_records: bool = False, ) -> None: super().__init__() self._space_monitor = space_monitor self.path = path + if value := os.environ.get('REC_TTL'): + try: + rec_ttl = int(value) + except Exception as e: + logger.warning(repr(e)) + self.rec_ttl = rec_ttl self.recycle_records = recycle_records async def on_space_no_enough( @@ -63,9 +65,8 @@ class SpaceReclaimer(SpaceEventListener, SwitchableMixin): async def _free_space_from_records(self, size: int) -> bool: logger.info('Free space from records ...') - # only delete files created 24 hours ago - max_ctime = datetime.now().timestamp() - 60 * 60 * 24 - for path in await self._get_record_file_paths(max_ctime): + ts = datetime.now().timestamp() - self.rec_ttl + for path in await self._get_record_file_paths(ts): await delete_file(path) if is_space_enough(self.path, size): return True @@ -76,13 +77,15 @@ class SpaceReclaimer(SpaceEventListener, SwitchableMixin): wait=wait_none(), stop=stop_after_attempt(3), ) - async def _get_record_file_paths(self, max_ctime: float) -> List[str]: + async def _get_record_file_paths(self, ts: float) -> List[str]: glob_path = os.path.join(self.path, '*/**/*.*') paths: Iterable[Path] paths = map(lambda p: Path(p), glob.iglob(glob_path, recursive=True)) paths = filter(lambda p: p.suffix in self._SUFFIX_SET, paths) - paths = filter(lambda p: p.stat().st_ctime <= max_ctime, paths) - func = partial(sorted, paths, key=lambda p: p.stat().st_ctime) + paths = filter(lambda p: p.stat().st_mtime < ts > p.stat().st_atime, paths) + func = partial( + sorted, paths, key=lambda p: (p.stat().st_mtime, p.stat().st_atime) + ) loop = asyncio.get_running_loop() path_list = await loop.run_in_executor(None, func) return list(map(str, path_list)) diff --git a/src/blrec/utils/mixins.py b/src/blrec/utils/mixins.py index f6e0260..ce9972a 100644 --- a/src/blrec/utils/mixins.py +++ b/src/blrec/utils/mixins.py @@ -81,6 +81,7 @@ class AsyncStoppableMixin(ABC): def __init__(self) -> None: super().__init__() self._stopped = True + self._stopped_lock = asyncio.Lock() @property def stopped(self) -> bool: @@ -88,17 +89,19 @@ class AsyncStoppableMixin(ABC): @final async def start(self) -> None: - if not self._stopped: - return - self._stopped = False - await self._do_start() + async with self._stopped_lock: + if not self._stopped: + return + self._stopped = False + await self._do_start() @final async def stop(self) -> None: - if self._stopped: - return - self._stopped = True - await self._do_stop() + async with self._stopped_lock: + if self._stopped: + return + self._stopped = True + await self._do_stop() @abstractmethod async def _do_start(self) -> None: