perf: postprocessing one video only at the same time

This commit is contained in:
acgnhik 2022-06-19 13:32:56 +08:00
parent b74c25ebb6
commit 765df3ead9

View File

@ -4,7 +4,7 @@ import asyncio
import logging import logging
from contextlib import suppress from contextlib import suppress
from pathlib import PurePath from pathlib import PurePath
from typing import Any, Awaitable, Dict, Iterator, List, Optional, Union from typing import Any, Awaitable, Dict, Final, Iterator, List, Optional, Union
from reactivex.scheduler import ThreadPoolScheduler from reactivex.scheduler import ThreadPoolScheduler
@ -48,6 +48,8 @@ class Postprocessor(
AsyncCooperationMixin, AsyncCooperationMixin,
SupportDebugMixin, SupportDebugMixin,
): ):
_worker_semaphore: Final = asyncio.Semaphore(value=1)
def __init__( def __init__(
self, self,
live: Live, live: Live,
@ -127,38 +129,43 @@ class Postprocessor(
@aio_task_with_room_id @aio_task_with_room_id
async def _worker(self) -> None: async def _worker(self) -> None:
while True: while True:
self._status = PostprocessorStatus.WAITING self._status = PostprocessorStatus.WAITING
self._postprocessing_path = None self._postprocessing_path = None
self._postprocessing_progress = None self._postprocessing_progress = None
video_path = await self._queue.get() video_path = await self._queue.get()
logger.debug(f'Postprocessing... {video_path}')
if not await self._is_vaild_flv_file(video_path): async with self._worker_semaphore:
logger.warning(f'Invalid flv file: {video_path}') logger.debug(f'Postprocessing... {video_path}')
self._queue.task_done()
continue
try: if not await self._is_vaild_flv_file(video_path):
if self.remux_to_mp4: logger.warning(f'Invalid flv file: {video_path}')
self._status = PostprocessorStatus.REMUXING self._queue.task_done()
result_path = await self._remux_flv_to_mp4(video_path) continue
elif self.inject_extra_metadata:
self._status = PostprocessorStatus.INJECTING
result_path = await self._inject_extra_metadata(video_path)
else:
result_path = video_path
if not self._debug: try:
await discard_file(extra_metadata_path(video_path), 'DEBUG') if self.remux_to_mp4:
self._status = PostprocessorStatus.REMUXING
result_path = await self._remux_flv_to_mp4(video_path)
elif self.inject_extra_metadata:
self._status = PostprocessorStatus.INJECTING
result_path = await self._inject_extra_metadata(video_path)
else:
result_path = video_path
self._completed_files.append(result_path) if not self._debug:
await self._emit('video_postprocessing_completed', self, result_path) await discard_file(extra_metadata_path(video_path), 'DEBUG')
except Exception as exc:
submit_exception(exc) self._completed_files.append(result_path)
finally: await self._emit(
self._queue.task_done() 'video_postprocessing_completed', self, result_path
)
except Exception as exc:
submit_exception(exc)
finally:
self._queue.task_done()
async def _inject_extra_metadata(self, path: str) -> str: async def _inject_extra_metadata(self, path: str) -> str:
try: try: