diff --git a/src/blrec/flv/metadata_analysis.py b/src/blrec/flv/metadata_analysis.py new file mode 100644 index 0000000..c91f15e --- /dev/null +++ b/src/blrec/flv/metadata_analysis.py @@ -0,0 +1,56 @@ +import json +import logging +import os + +import attr +from reactivex import Observable +from reactivex import operators as ops + +from ..path import extra_metadata_path +from . import operators as flv_ops +from .operators.helpers import from_file + +__all__ = 'AnalysingProgress', 'analyse_metadata' + + +logger = logging.getLogger(__name__) + + +@attr.s(auto_attribs=True, slots=True, frozen=True) +class AnalysingProgress: + count: int + total: int + + +def analyse_metadata( + path: str, *, show_progress: bool = False +) -> Observable[AnalysingProgress]: + filesize = os.path.getsize(path) + filename = os.path.basename(path) + + def dump_metadata() -> None: + try: + metadata = analyser.make_metadata() + data = attr.asdict(metadata, filter=lambda a, v: v is not None) + file_path = extra_metadata_path(path) + with open(file_path, 'wt', encoding='utf8') as file: + json.dump(data, file) + except Exception as e: + logger.error(f'Failed to dump metadata: {e}') + else: + logger.debug(f"Successfully dumped metadata to file: '{path}'") + + analyser = flv_ops.Analyser() + return from_file(path).pipe( + analyser, + flv_ops.ProgressBar( + desc='Analysing', + postfix=filename, + total=filesize, + disable=not show_progress, + ), + ops.map(lambda i: len(i)), # type: ignore + ops.scan(lambda acc, x: acc + x, 0), # type: ignore + ops.map(lambda s: AnalysingProgress(s, filesize)), # type: ignore + ops.do_action(on_completed=dump_metadata), + ) diff --git a/src/blrec/flv/metadata_dumper.py b/src/blrec/flv/metadata_dumper.py index 4c8cc77..65b68c6 100644 --- a/src/blrec/flv/metadata_dumper.py +++ b/src/blrec/flv/metadata_dumper.py @@ -64,13 +64,19 @@ class MetadataDumper(SwitchableMixin): path = extra_metadata_path(video_path) logger.debug(f"Dumping metadata to file: '{path}'") - assert self._last_metadata is not None - assert self._last_join_points is not None + if self._last_metadata is not None: + data = attr.asdict(self._last_metadata, filter=lambda a, v: v is not None) + else: + data = {} + logger.warning('The metadata may be lost duo to something went wrong') - data = attr.asdict(self._last_metadata, filter=lambda a, v: v is not None) - data['joinpoints'] = list( - map(lambda p: p.to_metadata_value(), self._last_join_points) - ) + if self._last_join_points is not None: + data['joinpoints'] = list( + map(lambda p: p.to_metadata_value(), self._last_join_points) + ) + else: + data['joinpoints'] = [] + logger.warning('The joinpoints may be lost duo to something went wrong') try: with open(path, 'wt', encoding='utf8') as file: diff --git a/src/blrec/postprocess/ffmpeg_metadata.py b/src/blrec/postprocess/ffmpeg_metadata.py index d3d08d0..7524056 100644 --- a/src/blrec/postprocess/ffmpeg_metadata.py +++ b/src/blrec/postprocess/ffmpeg_metadata.py @@ -21,7 +21,11 @@ async def make_metadata_file(flv_path: str) -> str: async def _make_metadata_content(flv_path: str) -> str: metadata = await get_metadata(flv_path) - extra_metadata = await get_extra_metadata(flv_path) + try: + extra_metadata = await get_extra_metadata(flv_path) + except Exception as e: + logger.warning(f'Failed to get extra metadata: {repr(e)}') + extra_metadata = {} comment = cast(str, metadata.get('Comment', '')) chapters = '' @@ -29,8 +33,11 @@ async def _make_metadata_content(flv_path: str) -> str: if join_points := extra_metadata.get('joinpoints'): join_points = list(map(JoinPoint.from_metadata_value, join_points)) comment += '\n\n' + make_comment_for_joinpoints(join_points) - duration = int(cast(float, metadata['duration']) * 1000) - chapters = _make_chapters(join_points, duration) + last_timestamp = int( + cast(float, extra_metadata.get('duration') or metadata.get('duration')) + * 1000 + ) + chapters = _make_chapters(join_points, last_timestamp) comment = '\\\n'.join(comment.splitlines()) @@ -48,13 +55,13 @@ Comment={comment} """ -def _make_chapters(join_points: Iterable[JoinPoint], duration: int) -> str: +def _make_chapters(join_points: Iterable[JoinPoint], last_timestamp: int) -> str: join_points = filter(lambda p: not p.seamless, join_points) timestamps = list(map(lambda p: p.timestamp, join_points)) if not timestamps: return '' timestamps.insert(0, 0) - timestamps.append(duration) + timestamps.append(last_timestamp) result = '' for i in range(1, len(timestamps)): diff --git a/src/blrec/postprocess/postprocessor.py b/src/blrec/postprocess/postprocessor.py index 3bda49f..9fd5623 100644 --- a/src/blrec/postprocess/postprocessor.py +++ b/src/blrec/postprocess/postprocessor.py @@ -13,6 +13,7 @@ from ..core import Recorder, RecorderEventListener from ..event.event_emitter import EventEmitter, EventListener from ..exception import exception_callback, submit_exception from ..flv.helpers import is_valid_flv_file +from ..flv.metadata_analysis import analyse_metadata from ..flv.metadata_injection import InjectingProgress, inject_metadata from ..logging.room_id import aio_task_with_room_id from ..path import extra_metadata_path @@ -141,9 +142,7 @@ class Postprocessor( logger.debug(f'Postprocessing... {video_path}') if not await self._is_vaild_flv_file(video_path): - logger.warning(f'Invalid flv file: {video_path}') - self._queue.task_done() - continue + logger.warning(f'The flv file may be invalid: {video_path}') try: if self.remux_to_mp4: @@ -168,9 +167,22 @@ class Postprocessor( self._queue.task_done() async def _inject_extra_metadata(self, path: str) -> str: + logger.info(f"Injecting metadata for '{path}' ...") try: - metadata = await get_extra_metadata(path) - logger.info(f"Injecting metadata for '{path}' ...") + try: + metadata = await get_extra_metadata(path) + except Exception as e: + logger.warning(f'Failed to get extra metadata: {repr(e)}') + logger.info(f"Analysing metadata for '{path}' ...") + await self._analyse_metadata(path) + metadata = await get_extra_metadata(path) + else: + if 'keyframes' not in metadata: + logger.warning('The keyframes metadata lost') + logger.info(f"Analysing metadata for '{path}' ...") + await self._analyse_metadata(path) + new_metadata = await get_extra_metadata(path) + metadata.update(new_metadata) await self._inject_metadata(path, metadata) except Exception as e: logger.error(f"Failed to inject metadata for '{path}': {repr(e)}") @@ -207,6 +219,19 @@ class Postprocessor( return result_path + def _analyse_metadata(self, path: str) -> Awaitable[None]: + future: asyncio.Future[None] = asyncio.Future() + self._postprocessing_path = path + + subscription = analyse_metadata(path, show_progress=True).subscribe( + on_error=lambda e: future.set_exception(e), + on_completed=lambda: future.set_result(None), + scheduler=self._scheduler, + ) + future.add_done_callback(lambda f: subscription.dispose()) + + return future + def _inject_metadata(self, path: str, metadata: Dict[str, Any]) -> Awaitable[None]: future: asyncio.Future[None] = asyncio.Future() self._postprocessing_path = path