refactor: refactor segemnt_remuxer

This commit is contained in:
acgnhik 2022-06-16 23:06:49 +08:00
parent 4c73f22526
commit 54cf481931
5 changed files with 33 additions and 14 deletions

View File

@ -138,3 +138,6 @@ class RemuxedStream(io.RawIOBase):
def tell(self) -> int: def tell(self) -> int:
return self._offset return self._offset
def close(self) -> None:
self._stream_remuxer.stop()

View File

@ -20,14 +20,23 @@ logger = logging.getLogger(__name__)
class StreamParser: class StreamParser:
def __init__( def __init__(
self, stream_param_holder: StreamParamHolder, *, ignore_eof: bool = False self,
stream_param_holder: StreamParamHolder,
*,
ignore_eof: bool = False,
ignore_value_error: bool = False,
) -> None: ) -> None:
self._stream_param_holder = stream_param_holder self._stream_param_holder = stream_param_holder
self._ignore_eof = ignore_eof self._ignore_eof = ignore_eof
self._ignore_value_error = ignore_value_error
def __call__(self, source: Observable[io.RawIOBase]) -> FLVStream: def __call__(self, source: Observable[io.RawIOBase]) -> FLVStream:
return source.pipe( # type: ignore return source.pipe( # type: ignore
flv_ops.parse(ignore_eof=self._ignore_eof, backup_timestamp=True), flv_ops.parse(
ignore_eof=self._ignore_eof,
ignore_value_error=self._ignore_value_error,
backup_timestamp=True,
),
ops.do_action(on_error=self._before_retry), ops.do_action(on_error=self._before_retry),
utils_ops.retry(should_retry=self._should_retry), utils_ops.retry(should_retry=self._should_retry),
) )

View File

@ -73,7 +73,9 @@ class StreamRecorderImpl(
live, self._session, read_timeout=read_timeout live, self._session, read_timeout=read_timeout
) )
self._stream_parser = core_ops.StreamParser( self._stream_parser = core_ops.StreamParser(
self._stream_param_holder, ignore_eof=stream_format != 'flv' self._stream_param_holder,
ignore_eof=stream_format != 'flv',
ignore_value_error=stream_format != 'flv',
) )
self._progress_bar = core_ops.ProgressBar(live) self._progress_bar = core_ops.ProgressBar(live)
self._analyser = flv_ops.Analyser() self._analyser = flv_ops.Analyser()

View File

@ -1,13 +1,12 @@
from io import SEEK_CUR
import logging import logging
from io import SEEK_CUR
from typing import Iterable, Iterator from typing import Iterable, Iterator
from .exceptions import FlvDataError
from .format import FlvParser, FlvDumper from .format import FlvDumper, FlvParser
from .models import FlvHeader, FlvTag, BACK_POINTER_SIZE
from .utils import OffsetRepositor, AutoRollbacker
from .io_protocols import RandomIO from .io_protocols import RandomIO
from .models import BACK_POINTER_SIZE, FlvHeader, FlvTag
from .utils import AutoRollbacker, OffsetRepositor
__all__ = 'FlvReader', 'FlvWriter' __all__ = 'FlvReader', 'FlvWriter'
@ -35,13 +34,15 @@ class FlvReader:
def read_header(self) -> FlvHeader: def read_header(self) -> FlvHeader:
header = self._parser.parse_header() header = self._parser.parse_header()
previous_tag_size = self._parser.parse_previous_tag_size() previous_tag_size = self._parser.parse_previous_tag_size()
assert previous_tag_size == 0 if previous_tag_size != 0:
raise FlvDataError(f'First back-pointer must be 0: {previous_tag_size}')
return header return header
def read_tag(self, *, no_body: bool = False) -> FlvTag: def read_tag(self, *, no_body: bool = False) -> FlvTag:
tag = self._parser.parse_tag(no_body=no_body) tag = self._parser.parse_tag(no_body=no_body)
previous_tag_size = self._parser.parse_previous_tag_size() previous_tag_size = self._parser.parse_previous_tag_size()
assert previous_tag_size == tag.tag_size if previous_tag_size != tag.tag_size:
raise FlvDataError(f'Wrong back-pointer: {previous_tag_size}')
return tag return tag
def read_tags(self, *, no_body: bool = False) -> Iterator[FlvTag]: def read_tags(self, *, no_body: bool = False) -> Iterator[FlvTag]:
@ -56,9 +57,7 @@ class FlvReader:
with OffsetRepositor(self._stream): with OffsetRepositor(self._stream):
return self.read_tag(no_body=no_body) return self.read_tag(no_body=no_body)
def rread_tags( def rread_tags(self, *, no_body: bool = False) -> Iterator[FlvTag]:
self, *, no_body: bool = False
) -> Iterator[FlvTag]:
while True: while True:
try: try:
yield self.rread_tag(no_body=no_body) yield self.rread_tag(no_body=no_body)

View File

@ -17,6 +17,7 @@ def parse(
*, *,
ignore_eof: bool = False, ignore_eof: bool = False,
complete_on_eof: bool = False, complete_on_eof: bool = False,
ignore_value_error: bool = False,
backup_timestamp: bool = False, backup_timestamp: bool = False,
restore_timestamp: bool = False, restore_timestamp: bool = False,
) -> Callable[[Observable[io.RawIOBase]], FLVStream]: ) -> Callable[[Observable[io.RawIOBase]], FLVStream]:
@ -48,6 +49,11 @@ def parse(
else: else:
if not ignore_eof: if not ignore_eof:
observer.on_error(e) observer.on_error(e)
except ValueError as e:
if ignore_value_error:
logger.debug(f'Error occurred while parsing stream: {repr(e)}')
else:
observer.on_error(e)
except Exception as e: except Exception as e:
observer.on_error(e) observer.on_error(e)
else: else: