From 54cf48193193e470cc12291db8f749b6b221d719 Mon Sep 17 00:00:00 2001 From: acgnhik Date: Thu, 16 Jun 2022 23:06:49 +0800 Subject: [PATCH] refactor: refactor segemnt_remuxer --- src/blrec/core/operators/segment_remuxer.py | 3 +++ src/blrec/core/operators/stream_parser.py | 13 +++++++++++-- src/blrec/core/stream_recorder_impl.py | 4 +++- src/blrec/flv/io.py | 21 ++++++++++----------- src/blrec/flv/operators/parse.py | 6 ++++++ 5 files changed, 33 insertions(+), 14 deletions(-) diff --git a/src/blrec/core/operators/segment_remuxer.py b/src/blrec/core/operators/segment_remuxer.py index e9091fc..6e8416e 100644 --- a/src/blrec/core/operators/segment_remuxer.py +++ b/src/blrec/core/operators/segment_remuxer.py @@ -138,3 +138,6 @@ class RemuxedStream(io.RawIOBase): def tell(self) -> int: return self._offset + + def close(self) -> None: + self._stream_remuxer.stop() diff --git a/src/blrec/core/operators/stream_parser.py b/src/blrec/core/operators/stream_parser.py index 184eaca..f8ac3a9 100644 --- a/src/blrec/core/operators/stream_parser.py +++ b/src/blrec/core/operators/stream_parser.py @@ -20,14 +20,23 @@ logger = logging.getLogger(__name__) class StreamParser: def __init__( - self, stream_param_holder: StreamParamHolder, *, ignore_eof: bool = False + self, + stream_param_holder: StreamParamHolder, + *, + ignore_eof: bool = False, + ignore_value_error: bool = False, ) -> None: self._stream_param_holder = stream_param_holder self._ignore_eof = ignore_eof + self._ignore_value_error = ignore_value_error def __call__(self, source: Observable[io.RawIOBase]) -> FLVStream: return source.pipe( # type: ignore - flv_ops.parse(ignore_eof=self._ignore_eof, backup_timestamp=True), + flv_ops.parse( + ignore_eof=self._ignore_eof, + ignore_value_error=self._ignore_value_error, + backup_timestamp=True, + ), ops.do_action(on_error=self._before_retry), utils_ops.retry(should_retry=self._should_retry), ) diff --git a/src/blrec/core/stream_recorder_impl.py b/src/blrec/core/stream_recorder_impl.py index 5e17e9b..ddef4c9 100644 --- a/src/blrec/core/stream_recorder_impl.py +++ b/src/blrec/core/stream_recorder_impl.py @@ -73,7 +73,9 @@ class StreamRecorderImpl( live, self._session, read_timeout=read_timeout ) self._stream_parser = core_ops.StreamParser( - self._stream_param_holder, ignore_eof=stream_format != 'flv' + self._stream_param_holder, + ignore_eof=stream_format != 'flv', + ignore_value_error=stream_format != 'flv', ) self._progress_bar = core_ops.ProgressBar(live) self._analyser = flv_ops.Analyser() diff --git a/src/blrec/flv/io.py b/src/blrec/flv/io.py index 7da5d64..cc351b7 100644 --- a/src/blrec/flv/io.py +++ b/src/blrec/flv/io.py @@ -1,13 +1,12 @@ -from io import SEEK_CUR import logging +from io import SEEK_CUR from typing import Iterable, Iterator - -from .format import FlvParser, FlvDumper -from .models import FlvHeader, FlvTag, BACK_POINTER_SIZE -from .utils import OffsetRepositor, AutoRollbacker +from .exceptions import FlvDataError +from .format import FlvDumper, FlvParser from .io_protocols import RandomIO - +from .models import BACK_POINTER_SIZE, FlvHeader, FlvTag +from .utils import AutoRollbacker, OffsetRepositor __all__ = 'FlvReader', 'FlvWriter' @@ -35,13 +34,15 @@ class FlvReader: def read_header(self) -> FlvHeader: header = self._parser.parse_header() previous_tag_size = self._parser.parse_previous_tag_size() - assert previous_tag_size == 0 + if previous_tag_size != 0: + raise FlvDataError(f'First back-pointer must be 0: {previous_tag_size}') return header def read_tag(self, *, no_body: bool = False) -> FlvTag: tag = self._parser.parse_tag(no_body=no_body) previous_tag_size = self._parser.parse_previous_tag_size() - assert previous_tag_size == tag.tag_size + if previous_tag_size != tag.tag_size: + raise FlvDataError(f'Wrong back-pointer: {previous_tag_size}') return tag def read_tags(self, *, no_body: bool = False) -> Iterator[FlvTag]: @@ -56,9 +57,7 @@ class FlvReader: with OffsetRepositor(self._stream): return self.read_tag(no_body=no_body) - def rread_tags( - self, *, no_body: bool = False - ) -> Iterator[FlvTag]: + def rread_tags(self, *, no_body: bool = False) -> Iterator[FlvTag]: while True: try: yield self.rread_tag(no_body=no_body) diff --git a/src/blrec/flv/operators/parse.py b/src/blrec/flv/operators/parse.py index 84dc7cc..b6426ff 100644 --- a/src/blrec/flv/operators/parse.py +++ b/src/blrec/flv/operators/parse.py @@ -17,6 +17,7 @@ def parse( *, ignore_eof: bool = False, complete_on_eof: bool = False, + ignore_value_error: bool = False, backup_timestamp: bool = False, restore_timestamp: bool = False, ) -> Callable[[Observable[io.RawIOBase]], FLVStream]: @@ -48,6 +49,11 @@ def parse( else: if not ignore_eof: observer.on_error(e) + except ValueError as e: + if ignore_value_error: + logger.debug(f'Error occurred while parsing stream: {repr(e)}') + else: + observer.on_error(e) except Exception as e: observer.on_error(e) else: