feat: improve SpaceReclaimer
use mtime and atime instead of ctime add REC_TTL env variable
This commit is contained in:
parent
8bfb537616
commit
eaa60a0817
4
FAQ.md
4
FAQ.md
@ -61,11 +61,11 @@ cpu 使用率过高、网络带宽不足或不稳定、硬盘读写慢都会导
|
||||
|
||||
## 怎样才算是旧录播文件?
|
||||
|
||||
创建时间超过 24 小时才会被当成旧录播文件在空间不足时被删除。
|
||||
修改时间和访问时间都超过 24 小时才会被当成旧录播文件在空间不足时被删除。
|
||||
|
||||
## 空间不足时是怎样删除旧录播文件的?
|
||||
|
||||
删除文件是按创建时间的先后进行的,最早创建的最先被删除,直到可用空间不少于所设置的阈值为止。
|
||||
删除文件是按修改时间和访问时间的先后进行的,修改时间或访问时间较早的先被删除,直到可用空间不少于所设置的阈值为止。
|
||||
|
||||
## 支持录制付费直播吗?
|
||||
|
||||
|
@ -1,16 +1,14 @@
|
||||
import logging
|
||||
import asyncio
|
||||
import logging
|
||||
import shutil
|
||||
from contextlib import suppress
|
||||
|
||||
|
||||
from .models import DiskUsage
|
||||
from .helpers import is_space_enough
|
||||
from ..event.event_emitter import EventListener, EventEmitter
|
||||
from ..event.event_emitter import EventEmitter, EventListener
|
||||
from ..exception import exception_callback
|
||||
from ..utils.mixins import SwitchableMixin
|
||||
from ..logging.room_id import aio_task_with_room_id
|
||||
|
||||
from ..utils.mixins import AsyncStoppableMixin, SwitchableMixin
|
||||
from .helpers import is_space_enough
|
||||
from .models import DiskUsage
|
||||
|
||||
__all__ = 'SpaceMonitor', 'SpaceEventListener'
|
||||
|
||||
@ -25,7 +23,9 @@ class SpaceEventListener(EventListener):
|
||||
...
|
||||
|
||||
|
||||
class SpaceMonitor(EventEmitter[SpaceEventListener], SwitchableMixin):
|
||||
class SpaceMonitor(
|
||||
EventEmitter[SpaceEventListener], SwitchableMixin, AsyncStoppableMixin
|
||||
):
|
||||
def __init__(
|
||||
self,
|
||||
path: str,
|
||||
@ -40,24 +40,18 @@ class SpaceMonitor(EventEmitter[SpaceEventListener], SwitchableMixin):
|
||||
self._monitoring: bool = False
|
||||
|
||||
def _do_enable(self) -> None:
|
||||
asyncio.create_task(self._start())
|
||||
asyncio.create_task(self.start())
|
||||
logger.debug('Enabled space monitor')
|
||||
|
||||
def _do_disable(self) -> None:
|
||||
asyncio.create_task(self._stop())
|
||||
asyncio.create_task(self.stop())
|
||||
logger.debug('Disabled space monitor')
|
||||
|
||||
async def _start(self) -> None:
|
||||
if self._monitoring:
|
||||
return
|
||||
async def _do_start(self) -> None:
|
||||
self._create_polling_task()
|
||||
self._monitoring = True
|
||||
|
||||
async def _stop(self) -> None:
|
||||
if not self._monitoring:
|
||||
return
|
||||
async def _do_stop(self) -> None:
|
||||
await self._cancel_polling_task()
|
||||
self._monitoring = False
|
||||
|
||||
def _create_polling_task(self) -> None:
|
||||
self._polling_task = asyncio.create_task(self._polling_loop())
|
||||
@ -78,6 +72,4 @@ class SpaceMonitor(EventEmitter[SpaceEventListener], SwitchableMixin):
|
||||
|
||||
async def _emit_space_no_enough(self) -> None:
|
||||
usage = DiskUsage(*shutil.disk_usage(self.path))
|
||||
await self._emit(
|
||||
'space_no_enough', self.path, self.space_threshold, usage
|
||||
)
|
||||
await self._emit('space_no_enough', self.path, self.space_threshold, usage)
|
||||
|
@ -7,19 +7,14 @@ import asyncio
|
||||
from functools import partial
|
||||
from typing import Iterable, List
|
||||
|
||||
from tenacity import (
|
||||
retry,
|
||||
wait_none,
|
||||
stop_after_attempt,
|
||||
retry_if_exception_type,
|
||||
)
|
||||
from tenacity import retry, wait_none, stop_after_attempt, retry_if_exception_type
|
||||
|
||||
from .helpers import delete_file, is_space_enough
|
||||
from .space_monitor import SpaceMonitor, DiskUsage, SpaceEventListener
|
||||
from ..utils.mixins import SwitchableMixin
|
||||
|
||||
|
||||
__all__ = 'SpaceReclaimer',
|
||||
__all__ = ('SpaceReclaimer',)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -33,11 +28,18 @@ class SpaceReclaimer(SpaceEventListener, SwitchableMixin):
|
||||
space_monitor: SpaceMonitor,
|
||||
path: str,
|
||||
*,
|
||||
rec_ttl: int = 60 * 60 * 24,
|
||||
recycle_records: bool = False,
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self._space_monitor = space_monitor
|
||||
self.path = path
|
||||
if value := os.environ.get('REC_TTL'):
|
||||
try:
|
||||
rec_ttl = int(value)
|
||||
except Exception as e:
|
||||
logger.warning(repr(e))
|
||||
self.rec_ttl = rec_ttl
|
||||
self.recycle_records = recycle_records
|
||||
|
||||
async def on_space_no_enough(
|
||||
@ -63,9 +65,8 @@ class SpaceReclaimer(SpaceEventListener, SwitchableMixin):
|
||||
|
||||
async def _free_space_from_records(self, size: int) -> bool:
|
||||
logger.info('Free space from records ...')
|
||||
# only delete files created 24 hours ago
|
||||
max_ctime = datetime.now().timestamp() - 60 * 60 * 24
|
||||
for path in await self._get_record_file_paths(max_ctime):
|
||||
ts = datetime.now().timestamp() - self.rec_ttl
|
||||
for path in await self._get_record_file_paths(ts):
|
||||
await delete_file(path)
|
||||
if is_space_enough(self.path, size):
|
||||
return True
|
||||
@ -76,13 +77,15 @@ class SpaceReclaimer(SpaceEventListener, SwitchableMixin):
|
||||
wait=wait_none(),
|
||||
stop=stop_after_attempt(3),
|
||||
)
|
||||
async def _get_record_file_paths(self, max_ctime: float) -> List[str]:
|
||||
async def _get_record_file_paths(self, ts: float) -> List[str]:
|
||||
glob_path = os.path.join(self.path, '*/**/*.*')
|
||||
paths: Iterable[Path]
|
||||
paths = map(lambda p: Path(p), glob.iglob(glob_path, recursive=True))
|
||||
paths = filter(lambda p: p.suffix in self._SUFFIX_SET, paths)
|
||||
paths = filter(lambda p: p.stat().st_ctime <= max_ctime, paths)
|
||||
func = partial(sorted, paths, key=lambda p: p.stat().st_ctime)
|
||||
paths = filter(lambda p: p.stat().st_mtime < ts > p.stat().st_atime, paths)
|
||||
func = partial(
|
||||
sorted, paths, key=lambda p: (p.stat().st_mtime, p.stat().st_atime)
|
||||
)
|
||||
loop = asyncio.get_running_loop()
|
||||
path_list = await loop.run_in_executor(None, func)
|
||||
return list(map(str, path_list))
|
||||
|
@ -81,6 +81,7 @@ class AsyncStoppableMixin(ABC):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self._stopped = True
|
||||
self._stopped_lock = asyncio.Lock()
|
||||
|
||||
@property
|
||||
def stopped(self) -> bool:
|
||||
@ -88,17 +89,19 @@ class AsyncStoppableMixin(ABC):
|
||||
|
||||
@final
|
||||
async def start(self) -> None:
|
||||
if not self._stopped:
|
||||
return
|
||||
self._stopped = False
|
||||
await self._do_start()
|
||||
async with self._stopped_lock:
|
||||
if not self._stopped:
|
||||
return
|
||||
self._stopped = False
|
||||
await self._do_start()
|
||||
|
||||
@final
|
||||
async def stop(self) -> None:
|
||||
if self._stopped:
|
||||
return
|
||||
self._stopped = True
|
||||
await self._do_stop()
|
||||
async with self._stopped_lock:
|
||||
if self._stopped:
|
||||
return
|
||||
self._stopped = True
|
||||
await self._do_stop()
|
||||
|
||||
@abstractmethod
|
||||
async def _do_start(self) -> None:
|
||||
|
Loading…
Reference in New Issue
Block a user