diff --git a/src/blrec/postprocess/postprocessor.py b/src/blrec/postprocess/postprocessor.py index c8160fa..3bda49f 100644 --- a/src/blrec/postprocess/postprocessor.py +++ b/src/blrec/postprocess/postprocessor.py @@ -4,7 +4,7 @@ import asyncio import logging from contextlib import suppress from pathlib import PurePath -from typing import Any, Awaitable, Dict, Iterator, List, Optional, Union +from typing import Any, Awaitable, Dict, Final, Iterator, List, Optional, Union from reactivex.scheduler import ThreadPoolScheduler @@ -48,6 +48,8 @@ class Postprocessor( AsyncCooperationMixin, SupportDebugMixin, ): + _worker_semaphore: Final = asyncio.Semaphore(value=1) + def __init__( self, live: Live, @@ -127,38 +129,43 @@ class Postprocessor( @aio_task_with_room_id async def _worker(self) -> None: + while True: self._status = PostprocessorStatus.WAITING self._postprocessing_path = None self._postprocessing_progress = None video_path = await self._queue.get() - logger.debug(f'Postprocessing... {video_path}') - if not await self._is_vaild_flv_file(video_path): - logger.warning(f'Invalid flv file: {video_path}') - self._queue.task_done() - continue + async with self._worker_semaphore: + logger.debug(f'Postprocessing... {video_path}') - try: - if self.remux_to_mp4: - self._status = PostprocessorStatus.REMUXING - result_path = await self._remux_flv_to_mp4(video_path) - elif self.inject_extra_metadata: - self._status = PostprocessorStatus.INJECTING - result_path = await self._inject_extra_metadata(video_path) - else: - result_path = video_path + if not await self._is_vaild_flv_file(video_path): + logger.warning(f'Invalid flv file: {video_path}') + self._queue.task_done() + continue - if not self._debug: - await discard_file(extra_metadata_path(video_path), 'DEBUG') + try: + if self.remux_to_mp4: + self._status = PostprocessorStatus.REMUXING + result_path = await self._remux_flv_to_mp4(video_path) + elif self.inject_extra_metadata: + self._status = PostprocessorStatus.INJECTING + result_path = await self._inject_extra_metadata(video_path) + else: + result_path = video_path - self._completed_files.append(result_path) - await self._emit('video_postprocessing_completed', self, result_path) - except Exception as exc: - submit_exception(exc) - finally: - self._queue.task_done() + if not self._debug: + await discard_file(extra_metadata_path(video_path), 'DEBUG') + + self._completed_files.append(result_path) + await self._emit( + 'video_postprocessing_completed', self, result_path + ) + except Exception as exc: + submit_exception(exc) + finally: + self._queue.task_done() async def _inject_extra_metadata(self, path: str) -> str: try: