From 121b5f964867bc4e6686a0fcf1f0900ddc1da7a1 Mon Sep 17 00:00:00 2001 From: acgnhik Date: Sun, 23 Oct 2022 13:37:38 +0800 Subject: [PATCH] feat: use pyav --- setup.cfg | 1 + src/blrec/core/hls_stream_recorder_impl.py | 11 +- src/blrec/hls/operators/__init__.py | 2 + src/blrec/hls/operators/segment_parser.py | 104 +++++++++++++ src/blrec/hls/operators/segment_remuxer.py | 167 ++++++--------------- src/blrec/hls/stream_remuxer.py | 164 -------------------- 6 files changed, 156 insertions(+), 293 deletions(-) create mode 100644 src/blrec/hls/operators/segment_parser.py delete mode 100644 src/blrec/hls/stream_remuxer.py diff --git a/setup.cfg b/setup.cfg index 0831253..5391fe9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -53,6 +53,7 @@ install_requires = lxml >= 4.6.4, < 5.0.0 toml >= 0.10.2, < 0.11.0 m3u8 >= 1.0.0, < 2.0.0 + av >= 10.0.0, < 11.0.0 jsonpath == 0.82 psutil >= 5.8.0, < 6.0.0 reactivex >= 4.0.0, < 5.0.0 diff --git a/src/blrec/core/hls_stream_recorder_impl.py b/src/blrec/core/hls_stream_recorder_impl.py index 7294ee5..272e7e2 100644 --- a/src/blrec/core/hls_stream_recorder_impl.py +++ b/src/blrec/core/hls_stream_recorder_impl.py @@ -55,9 +55,7 @@ class HLSStreamRecorderImpl(StreamRecorderImpl): self._prober = hls_ops.Prober() self._dl_statistics = core_ops.SizedStatistics() - self._stream_parser = core_ops.StreamParser( - self._stream_param_holder, ignore_eof=True, ignore_value_error=True - ) + self._segment_parser = hls_ops.SegmentParser() self._analyser = flv_ops.Analyser() self._injector = flv_ops.Injector(self._metadata_provider) self._join_point_extractor = flv_ops.JoinPointExtractor() @@ -144,14 +142,11 @@ class HLSStreamRecorderImpl(StreamRecorderImpl): self._segment_fetcher, self._dl_statistics, self._prober, - ops.observe_on( - NewThreadScheduler(self._thread_factory('SegmentRemuxer')) - ), - self._segment_remuxer, ops.observe_on( NewThreadScheduler(self._thread_factory('StreamRecorder')) ), - self._stream_parser, + self._segment_remuxer, + self._segment_parser, flv_ops.process(), self._cutter, self._limiter, diff --git a/src/blrec/hls/operators/__init__.py b/src/blrec/hls/operators/__init__.py index 841bdd8..a21cd32 100644 --- a/src/blrec/hls/operators/__init__.py +++ b/src/blrec/hls/operators/__init__.py @@ -4,6 +4,7 @@ from .playlist_resolver import PlaylistResolver from .prober import Prober, StreamProfile from .segment_dumper import SegmentDumper from .segment_fetcher import InitSectionData, SegmentData, SegmentFetcher +from .segment_parser import SegmentParser from .segment_remuxer import SegmentRemuxer __all__ = ( @@ -15,6 +16,7 @@ __all__ = ( 'SegmentData', 'SegmentDumper', 'SegmentFetcher', + 'SegmentParser', 'SegmentRemuxer', 'StreamProfile', ) diff --git a/src/blrec/hls/operators/segment_parser.py b/src/blrec/hls/operators/segment_parser.py new file mode 100644 index 0000000..3c78824 --- /dev/null +++ b/src/blrec/hls/operators/segment_parser.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +import io +import logging +from typing import Optional + +from reactivex import Observable, abc +from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable + +from blrec.flv.common import ( + is_audio_sequence_header, + is_metadata_tag, + is_video_sequence_header, +) +from blrec.flv.io import FlvReader +from blrec.flv.models import AudioTag, FlvHeader, ScriptTag, VideoTag +from blrec.flv.operators.typing import FLVStream, FLVStreamItem + +__all__ = ('SegmentParser',) + + +logger = logging.getLogger(__name__) + + +class SegmentParser: + def __init__(self) -> None: + self._backup_timestamp = True + + def __call__(self, source: Observable[bytes]) -> FLVStream: + return self._parse(source) + + def _parse(self, source: Observable[bytes]) -> FLVStream: + def subscribe( + observer: abc.ObserverBase[FLVStreamItem], + scheduler: Optional[abc.SchedulerBase] = None, + ) -> abc.DisposableBase: + disposed = False + subscription = SerialDisposable() + + last_flv_header: Optional[FlvHeader] = None + last_metadata_tag: Optional[ScriptTag] = None + last_audio_sequence_header: Optional[AudioTag] = None + last_video_sequence_header: Optional[VideoTag] = None + + def reset() -> None: + nonlocal last_flv_header, last_metadata_tag + nonlocal last_audio_sequence_header, last_video_sequence_header + last_flv_header = None + last_metadata_tag = None + last_audio_sequence_header = None + last_video_sequence_header = None + + def on_next(data: bytes) -> None: + nonlocal last_flv_header, last_metadata_tag + nonlocal last_audio_sequence_header, last_video_sequence_header + + if b'' == data: + reset() + return + + try: + reader = FlvReader( + io.BytesIO(data), backup_timestamp=self._backup_timestamp + ) + + flv_header = reader.read_header() + if not last_flv_header: + observer.on_next(flv_header) + last_flv_header = flv_header + else: + assert last_flv_header == flv_header + + while not disposed: + tag = reader.read_tag() + if is_metadata_tag(tag): + if last_metadata_tag is not None: + continue + last_metadata_tag = tag + elif is_video_sequence_header(tag): + if tag == last_video_sequence_header: + continue + last_video_sequence_header = tag + elif is_audio_sequence_header(tag): + if tag == last_audio_sequence_header: + continue + last_audio_sequence_header = tag + observer.on_next(tag) + except EOFError: + pass + except Exception as e: + observer.on_error(e) + + def dispose() -> None: + nonlocal disposed + disposed = True + reset() + + subscription.disposable = source.subscribe( + on_next, observer.on_error, observer.on_completed, scheduler=scheduler + ) + + return CompositeDisposable(subscription, Disposable(dispose)) + + return Observable(subscribe) diff --git a/src/blrec/hls/operators/segment_remuxer.py b/src/blrec/hls/operators/segment_remuxer.py index ce1c26c..65eaef3 100644 --- a/src/blrec/hls/operators/segment_remuxer.py +++ b/src/blrec/hls/operators/segment_remuxer.py @@ -2,121 +2,72 @@ from __future__ import annotations import io import logging -from typing import Final, List, Optional, Union +import os +from typing import Optional, Union -import urllib3 +import av from reactivex import Observable, abc from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable -from tenacity import Retrying, stop_after_delay, wait_fixed -from tenacity.retry import retry_if_not_exception_type from blrec.bili.live import Live -from blrec.utils.io import wait_for -from ..stream_remuxer import StreamRemuxer from .segment_fetcher import InitSectionData, SegmentData __all__ = ('SegmentRemuxer',) logger = logging.getLogger(__name__) -logging.getLogger(urllib3.__name__).setLevel(logging.WARNING) + +TRACE_REMUX_SEGMENT = bool(os.environ.get('TRACE_REMUX_SEGMENT')) +TRACE_LIBAV = bool(os.environ.get('TRACE_LIBAV')) +if TRACE_LIBAV: + logging.getLogger('libav').setLevel(5) +else: + av.logging.set_level(av.logging.FATAL) class SegmentRemuxer: - _SEGMENT_DATA_CACHE: Final = 10 - _MAX_SEGMENT_DATA_CACHE: Final = 15 - def __init__(self, live: Live) -> None: self._live = live - self._timeout: float = 10 - self._stream_remuxer = StreamRemuxer(live.room_id, remove_filler_data=True) def __call__( self, source: Observable[Union[InitSectionData, SegmentData]] - ) -> Observable[io.RawIOBase]: + ) -> Observable[bytes]: return self._remux(source) def _remux( self, source: Observable[Union[InitSectionData, SegmentData]] - ) -> Observable[io.RawIOBase]: + ) -> Observable[bytes]: def subscribe( - observer: abc.ObserverBase[io.RawIOBase], + observer: abc.ObserverBase[bytes], scheduler: Optional[abc.SchedulerBase] = None, ) -> abc.DisposableBase: disposed = False subscription = SerialDisposable() init_section_data: Optional[bytes] = None - segment_data_cache: List[bytes] = [] - self._stream_remuxer.stop() def reset() -> None: - nonlocal init_section_data, segment_data_cache + nonlocal init_section_data init_section_data = None - segment_data_cache = [] - self._stream_remuxer.stop() - - def write(data: bytes) -> int: - return wait_for( - self._stream_remuxer.input.write, - args=(data,), - timeout=self._timeout, - ) def on_next(data: Union[InitSectionData, SegmentData]) -> None: nonlocal init_section_data - nonlocal segment_data_cache if isinstance(data, InitSectionData): init_section_data = data.payload - segment_data_cache.clear() - logger.debug('Stop stream remuxer for init section') - self._stream_remuxer.stop() + observer.on_next(b'') + return - if self._stream_remuxer.exception and not self._stream_remuxer.stopped: - logger.debug( - 'Stop stream remuxer due to ' - + repr(self._stream_remuxer.exception) - ) - self._stream_remuxer.stop() + if init_section_data is None: + return try: - if self._stream_remuxer.stopped: - self._stream_remuxer.start() - while True: - ready = self._stream_remuxer.wait(timeout=1) - if disposed: - return - if ready: - break - - observer.on_next(RemuxedStream(self._stream_remuxer)) - - if init_section_data: - write(init_section_data) - if segment_data_cache: - for cached_data in segment_data_cache: - write(cached_data) - if isinstance(data, InitSectionData): - return - - write(data.payload) - except Exception as e: - logger.warning(f'Failed to write data to stream remuxer: {repr(e)}') - logger.debug(f'Stop stream remuxer due to {repr(e)}') - self._stream_remuxer.stop() - if len(segment_data_cache) >= self._MAX_SEGMENT_DATA_CACHE: - segment_data_cache = segment_data_cache[ - -self._MAX_SEGMENT_DATA_CACHE + 1 : - ] + remuxed_data = self._remux_segemnt(init_section_data + data.payload) + except av.FFmpegError as e: + logger.warning(f'Failed to remux segment: {repr(e)}', exc_info=e) else: - if len(segment_data_cache) >= self._SEGMENT_DATA_CACHE: - segment_data_cache = segment_data_cache[ - -self._SEGMENT_DATA_CACHE + 1 : - ] - - segment_data_cache.append(data.payload) + observer.on_next(remuxed_data) def dispose() -> None: nonlocal disposed @@ -131,56 +82,30 @@ class SegmentRemuxer: return Observable(subscribe) + def _remux_segemnt(self, data: bytes, format: str = 'flv') -> bytes: + in_file = io.BytesIO(data) + out_file = io.BytesIO() -class CloseRemuxedStream(Exception): - pass + with av.open(in_file) as in_container: + with av.open(out_file, mode='w', format=format) as out_container: + in_video_stream = in_container.streams.video[0] + in_audio_stream = in_container.streams.audio[0] + out_video_stream = out_container.add_stream(template=in_video_stream) + out_audio_stream = out_container.add_stream(template=in_audio_stream) + for packet in in_container.demux(): + if TRACE_REMUX_SEGMENT: + logger.debug(repr(packet)) + # We need to skip the "flushing" packets that `demux` generates. + if packet.dts is None: + continue + # We need to assign the packet to the new stream. + if packet.stream.type == 'video': + packet.stream = out_video_stream + elif packet.stream.type == 'audio': + packet.stream = out_audio_stream + else: + raise NotImplementedError(packet.stream.type) + out_container.mux(packet) -class RemuxedStream(io.RawIOBase): - def __init__( - self, stream_remuxer: StreamRemuxer, *, read_timeout: float = 10 - ) -> None: - self._stream_remuxer = stream_remuxer - self._read_timeout = read_timeout - self._offset: int = 0 - - def read(self, size: int = -1) -> bytes: - if self._stream_remuxer.stopped: - ready = self._stream_remuxer.wait(timeout=self._read_timeout) - if not ready: - msg = f'Stream remuxer not ready in {self._read_timeout} seconds' - logger.debug(msg) - raise EOFError(msg) - - try: - for attempt in Retrying( - reraise=True, - retry=retry_if_not_exception_type(TimeoutError), - wait=wait_fixed(1), - stop=stop_after_delay(self._read_timeout), - ): - with attempt: - data = wait_for( - self._stream_remuxer.output.read, - args=(size,), - timeout=self._read_timeout, - ) - except Exception as exc: - logger.warning(f'Failed to read data from stream remuxer: {repr(exc)}') - self._stream_remuxer.exception = exc - raise EOFError(exc) - else: - assert data is not None - self._offset += len(data) - return data - - def tell(self) -> int: - return self._offset - - def close(self) -> None: - if self._stream_remuxer.stopped: - return - if self._stream_remuxer.exception: - return - logger.debug('Close remuxed stream') - self._stream_remuxer.exception = CloseRemuxedStream() + return out_file.getvalue() diff --git a/src/blrec/hls/stream_remuxer.py b/src/blrec/hls/stream_remuxer.py deleted file mode 100644 index 31ad7be..0000000 --- a/src/blrec/hls/stream_remuxer.py +++ /dev/null @@ -1,164 +0,0 @@ -import errno -import io -import logging -import os -import re -import shlex -from contextlib import suppress -from subprocess import PIPE, CalledProcessError, Popen -from threading import Condition, Thread -from typing import Optional, cast - -from blrec.utils.io import wait_for -from blrec.utils.mixins import StoppableMixin, SupportDebugMixin - -logger = logging.getLogger(__name__) - - -__all__ = ('StreamRemuxer',) - - -class FFmpegError(Exception): - pass - - -class StreamRemuxer(StoppableMixin, SupportDebugMixin): - _ERROR_PATTERN = re.compile( - r'\b(error|failed|missing|invalid|corrupt)\b', re.IGNORECASE - ) - - def __init__(self, room_id: int, remove_filler_data: bool = False) -> None: - super().__init__() - self._room_id = room_id - self._remove_filler_data = remove_filler_data - self._exception: Optional[Exception] = None - self._ready = Condition() - self._env = None - - self._init_for_debug(room_id) - if self._debug: - self._env = os.environ.copy() - path = os.path.join(self._debug_dir, f'ffreport-{room_id}-%t.log') - self._env['FFREPORT'] = f'file={path}:level=48' - - @property - def input(self) -> io.BufferedWriter: - assert self._subprocess.stdin is not None - return cast(io.BufferedWriter, self._subprocess.stdin) - - @property - def output(self) -> io.BufferedReader: - assert self._subprocess.stdout is not None - return cast(io.BufferedReader, self._subprocess.stdout) - - @property - def exception(self) -> Optional[Exception]: - return self._exception - - @exception.setter - def exception(self, exc: Exception) -> None: - self._exception = exc - - def __enter__(self): # type: ignore - self.start() - self.wait() - return self - - def __exit__(self, exc_type, value, traceback): # type: ignore - self.stop() - self.raise_for_exception() - - def wait(self, timeout: Optional[float] = None) -> bool: - with self._ready: - return self._ready.wait(timeout=timeout) - - def restart(self) -> None: - logger.debug('Restarting stream remuxer...') - self.stop() - self.start() - logger.debug('Restarted stream remuxer') - - def raise_for_exception(self) -> None: - if not self.exception: - return - raise self.exception - - def _do_start(self) -> None: - logger.debug('Starting stream remuxer...') - self._thread = Thread( - target=self._run, name=f'StreamRemuxer::{self._room_id}', daemon=True - ) - self._thread.start() - - def _do_stop(self) -> None: - logger.debug('Stopping stream remuxer...') - if hasattr(self, '_subprocess'): - with suppress(ProcessLookupError): - self._subprocess.kill() - self._subprocess.wait(timeout=10) - if hasattr(self, '_thread'): - self._thread.join(timeout=10) - - def _run(self) -> None: - logger.debug('Started stream remuxer') - self._exception = None - try: - self._run_subprocess() - except BrokenPipeError as exc: - logger.debug(repr(exc)) - except FFmpegError as exc: - if not self._stopped: - logger.warning(repr(exc)) - else: - logger.debug(repr(exc)) - except TimeoutError as exc: - logger.debug(repr(exc)) - except Exception as exc: - # OSError: [Errno 22] Invalid argument - # https://stackoverflow.com/questions/23688492/oserror-errno-22-invalid-argument-in-subprocess - if isinstance(exc, OSError) and exc.errno == errno.EINVAL: - pass - else: - self._exception = exc - logger.exception(exc) - finally: - self._stopped = True - logger.debug('Stopped stream remuxer') - - def _run_subprocess(self) -> None: - cmd = 'ffmpeg -xerror -i pipe:0 -c copy -copyts' - if self._remove_filler_data: - cmd += ' -bsf:v filter_units=remove_types=12' - cmd += ' -f flv pipe:1' - args = shlex.split(cmd) - - with Popen( - args, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=self._env - ) as self._subprocess: - with self._ready: - self._ready.notify_all() - - assert self._subprocess.stderr is not None - with io.TextIOWrapper( - self._subprocess.stderr, encoding='utf-8', errors='backslashreplace' - ) as stderr: - while not self._stopped: - line = wait_for(stderr.readline, timeout=10) - if not line: - if self._subprocess.poll() is not None: - break - else: - continue - if self._debug: - logger.debug('ffmpeg: %s', line) - self._check_error(line) - - if not self._stopped and self._subprocess.returncode not in (0, 255): - # 255: Exiting standardly, received signal 2. - raise CalledProcessError(self._subprocess.returncode, cmd=cmd) - - def _check_error(self, line: str) -> None: - match = self._ERROR_PATTERN.search(line) - if not match: - return - raise FFmpegError(line)