From 3b523ca11a321837cd33636224628c203aedb142 Mon Sep 17 00:00:00 2001 From: acgnhik Date: Mon, 11 Jul 2022 13:05:26 +0800 Subject: [PATCH] refactor: add sort operator --- src/blrec/core/flv_stream_recorder_impl.py | 6 +- src/blrec/flv/common.py | 40 +++++++++- src/blrec/flv/format.py | 6 +- src/blrec/flv/models.py | 2 +- src/blrec/flv/operators/__init__.py | 2 + src/blrec/flv/operators/analyse.py | 2 +- src/blrec/flv/operators/concat.py | 4 +- src/blrec/flv/operators/fix.py | 6 +- src/blrec/flv/operators/parse.py | 8 ++ src/blrec/flv/operators/process.py | 26 +++++- src/blrec/flv/operators/sort.py | 93 ++++++++++++++++++++++ 11 files changed, 178 insertions(+), 17 deletions(-) create mode 100644 src/blrec/flv/operators/sort.py diff --git a/src/blrec/core/flv_stream_recorder_impl.py b/src/blrec/core/flv_stream_recorder_impl.py index 42a8363..306dc83 100644 --- a/src/blrec/core/flv_stream_recorder_impl.py +++ b/src/blrec/core/flv_stream_recorder_impl.py @@ -6,6 +6,7 @@ from reactivex.scheduler import NewThreadScheduler from ..bili.live import Live from ..bili.typing import QualityNumber from ..flv import operators as flv_ops +from ..utils.mixins import SupportDebugMixin from .stream_recorder_impl import StreamRecorderImpl __all__ = ('FLVStreamRecorderImpl',) @@ -14,7 +15,7 @@ __all__ = ('FLVStreamRecorderImpl',) logger = logging.getLogger(__name__) -class FLVStreamRecorderImpl(StreamRecorderImpl): +class FLVStreamRecorderImpl(StreamRecorderImpl, SupportDebugMixin): def __init__( self, live: Live, @@ -40,6 +41,7 @@ class FLVStreamRecorderImpl(StreamRecorderImpl): filesize_limit=filesize_limit, duration_limit=duration_limit, ) + self._init_for_debug(live.room_id) def _run(self) -> None: self._subscription = ( @@ -52,7 +54,7 @@ class FLVStreamRecorderImpl(StreamRecorderImpl): self._stream_parser, self._connection_error_handler, self._request_exception_handler, - flv_ops.process(), + flv_ops.process(sort_tags=True, trace=self._debug), self._cutter, self._limiter, self._join_point_extractor, diff --git a/src/blrec/flv/common.py b/src/blrec/flv/common.py index f35ebff..dc8b723 100644 --- a/src/blrec/flv/common.py +++ b/src/blrec/flv/common.py @@ -9,7 +9,16 @@ from . import scriptdata from .avc import extract_resolution from .io import FlvReader from .io_protocols import RandomIO -from .models import AudioTag, AVCPacketType, FlvTag, ScriptTag, TagType, VideoTag +from .models import ( + AudioTag, + AVCPacketType, + CodecID, + FlvTag, + FrameType, + ScriptTag, + TagType, + VideoTag, +) from .utils import OffsetRepositor @@ -155,8 +164,31 @@ def is_video_nalu_keyframe(tag: FlvTag) -> TypeGuard[VideoTag]: return is_video_tag(tag) and tag.is_keyframe() and tag.is_avc_nalu() +def is_avc_end_sequence(tag: FlvTag) -> TypeGuard[VideoTag]: + return is_video_tag(tag) and tag.is_avc_end() + + +def is_avc_end_sequence_tag(value: Any) -> TypeGuard[VideoTag]: + return isinstance(value, FlvTag) and is_avc_end_sequence(value) + + +def create_avc_end_sequence_tag(offset: int = 0, timestamp: int = 0) -> VideoTag: + return VideoTag( + offset=offset, + filtered=False, + tag_type=TagType.VIDEO, + data_size=5, + timestamp=timestamp, + stream_id=timestamp, + frame_type=FrameType.KEY_FRAME, + codec_id=CodecID.AVC, + avc_packet_type=AVCPacketType.AVC_END_OF_SEQENCE, + composition_time=0, + ) + + def parse_scriptdata(script_tag: ScriptTag) -> scriptdata.ScriptData: - assert script_tag.body is not None + assert script_tag.body return scriptdata.load(script_tag.body) @@ -253,8 +285,8 @@ class Resolution: ) @classmethod - def from_aac_sequence_header(cls, tag: VideoTag) -> Resolution: + def from_avc_sequence_header(cls, tag: VideoTag) -> Resolution: assert tag.avc_packet_type == AVCPacketType.AVC_SEQUENCE_HEADER - assert tag.body is not None + assert tag.body width, height = extract_resolution(tag.body) return cls(width, height) diff --git a/src/blrec/flv/format.py b/src/blrec/flv/format.py index 37abc59..256ec8b 100644 --- a/src/blrec/flv/format.py +++ b/src/blrec/flv/format.py @@ -213,10 +213,10 @@ class FlvDumper: else: raise FlvDataError(f'Unsupported tag type: {tag.tag_type}') - if tag.body is None: - self._stream.seek(tag.tag_end_offset) - else: + if tag.body: self._writer.write(tag.body) + else: + self._stream.seek(tag.tag_end_offset) def dump_flv_tag_header(self, tag: FlvTag) -> None: self._writer.write_ui8((int(tag.filtered) << 5) | tag.tag_type.value) diff --git a/src/blrec/flv/models.py b/src/blrec/flv/models.py index daa9df2..d09d908 100644 --- a/src/blrec/flv/models.py +++ b/src/blrec/flv/models.py @@ -152,7 +152,7 @@ _T = TypeVar('_T', bound='FlvTag') @attr.s(auto_attribs=True, slots=True, frozen=True, kw_only=True) class FlvTag(ABC, FlvTagHeader): offset: int = attr.ib(validator=[non_negative_integer_validator]) - body: Optional[bytes] = attr.ib(default=None, repr=cksum) + body: bytes = attr.ib(default=b'', repr=cksum) def __len__(self) -> int: return self.tag_size diff --git a/src/blrec/flv/operators/__init__.py b/src/blrec/flv/operators/__init__.py index 8da49f2..8d212d3 100644 --- a/src/blrec/flv/operators/__init__.py +++ b/src/blrec/flv/operators/__init__.py @@ -11,6 +11,7 @@ from .parse import parse from .probe import Prober, StreamProfile from .process import process from .progress import ProgressBar +from .sort import sort from .split import split __all__ = ( @@ -33,6 +34,7 @@ __all__ = ( 'Prober', 'process', 'ProgressBar', + 'sort', 'split', 'StreamProfile', ) diff --git a/src/blrec/flv/operators/analyse.py b/src/blrec/flv/operators/analyse.py index 32ebd05..dc8d1aa 100644 --- a/src/blrec/flv/operators/analyse.py +++ b/src/blrec/flv/operators/analyse.py @@ -328,7 +328,7 @@ class Analyser: self._keyframe_timestamps.append(tag.timestamp) self._keyframe_filepositions.append(self.calc_file_size()) if tag.is_avc_header(): - self._resolution = Resolution.from_aac_sequence_header(tag) + self._resolution = Resolution.from_avc_sequence_header(tag) logger.debug(f'Resolution: {self._resolution}') else: pass diff --git a/src/blrec/flv/operators/concat.py b/src/blrec/flv/operators/concat.py index 5069672..177aa79 100644 --- a/src/blrec/flv/operators/concat.py +++ b/src/blrec/flv/operators/concat.py @@ -201,7 +201,7 @@ def concat( return tag.evolve(timestamp=tag.timestamp + delta) def make_join_point_tag(next_tag: FlvTag, seamless: bool) -> ScriptTag: - assert next_tag.body is not None + assert next_tag.body join_point = JoinPoint( seamless=seamless, timestamp=float(next_tag.timestamp), @@ -439,7 +439,7 @@ class JoinPointExtractor: ) -> JoinPoint: script_data = parse_scriptdata(join_point_tag) join_point_data = cast(JoinPointData, script_data['value']) - assert next_tag.body is not None, next_tag + assert next_tag.body, next_tag join_point = JoinPoint( seamless=join_point_data['seamless'], timestamp=next_tag.timestamp, diff --git a/src/blrec/flv/operators/fix.py b/src/blrec/flv/operators/fix.py index 57df3ca..807d246 100644 --- a/src/blrec/flv/operators/fix.py +++ b/src/blrec/flv/operators/fix.py @@ -107,10 +107,12 @@ def fix() -> Callable[[FLVStream], FLVStream]: return False def is_ts_incontinuous(tag: FlvTag) -> bool: + tolerance = 1 if last_tag is None: return False - return tag.timestamp - last_tag.timestamp > max( - sound_sample_interval, video_frame_interval + return ( + tag.timestamp - last_tag.timestamp + > max(sound_sample_interval, video_frame_interval) + tolerance ) def on_next(item: FLVStreamItem) -> None: diff --git a/src/blrec/flv/operators/parse.py b/src/blrec/flv/operators/parse.py index b6426ff..ed52690 100644 --- a/src/blrec/flv/operators/parse.py +++ b/src/blrec/flv/operators/parse.py @@ -5,7 +5,9 @@ from typing import Callable, Optional from reactivex import Observable, abc from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable +from ..common import create_avc_end_sequence_tag, is_avc_end_sequence from ..io import FlvReader +from ..models import FlvTag from .typing import FLVStream, FLVStreamItem __all__ = ('parse',) @@ -30,6 +32,7 @@ def parse( subscription = SerialDisposable() def on_next(stream: io.RawIOBase) -> None: + tag: Optional[FlvTag] = None try: try: reader = FlvReader( @@ -42,6 +45,11 @@ def parse( tag = reader.read_tag() observer.on_next(tag) finally: + if tag is not None and not is_avc_end_sequence(tag): + tag = create_avc_end_sequence_tag( + offset=tag.next_tag_offset, timestamp=tag.timestamp + ) + observer.on_next(tag) stream.close() except EOFError as e: if complete_on_eof: diff --git a/src/blrec/flv/operators/process.py b/src/blrec/flv/operators/process.py index d69dfab..e878256 100644 --- a/src/blrec/flv/operators/process.py +++ b/src/blrec/flv/operators/process.py @@ -1,9 +1,13 @@ import logging from typing import Callable +from reactivex import operators as ops + +from ..common import is_avc_end_sequence_tag from .concat import concat from .defragment import defragment from .fix import fix +from .sort import sort from .split import split from .typing import FLVStream @@ -12,8 +16,26 @@ __all__ = ('process',) logger = logging.getLogger(__name__) -def process() -> Callable[[FLVStream], FLVStream]: +def process( + sort_tags: bool = False, trace: bool = False +) -> Callable[[FLVStream], FLVStream]: def _process(source: FLVStream) -> FLVStream: - return source.pipe(defragment(), split(), fix(), concat()) + if sort_tags: + return source.pipe( + defragment(), + sort(trace=trace), + ops.filter(lambda v: not is_avc_end_sequence_tag(v)), # type: ignore + split(), + fix(), + concat(), + ) + else: + return source.pipe( + defragment(), + ops.filter(lambda v: not is_avc_end_sequence_tag(v)), # type: ignore + split(), + fix(), + concat(), + ) return _process diff --git a/src/blrec/flv/operators/sort.py b/src/blrec/flv/operators/sort.py new file mode 100644 index 0000000..547c431 --- /dev/null +++ b/src/blrec/flv/operators/sort.py @@ -0,0 +1,93 @@ +import logging +from typing import Callable, List, Optional + +from reactivex import Observable, abc +from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable + +from ..common import is_avc_end_sequence, is_video_nalu_keyframe +from ..models import FlvHeader, FlvTag +from .typing import FLVStream, FLVStreamItem + +__all__ = ('sort',) + +logger = logging.getLogger(__name__) + + +def sort(trace: bool = False) -> Callable[[FLVStream], FLVStream]: + "Sort tags in GOP by timestamp to ensure subsequent operators work as expected." + + def _sort(source: FLVStream) -> FLVStream: + def subscribe( + observer: abc.ObserverBase[FLVStreamItem], + scheduler: Optional[abc.SchedulerBase] = None, + ) -> abc.DisposableBase: + disposed = False + subscription = SerialDisposable() + + gop_tags: List[FlvTag] = [] + + def reset() -> None: + nonlocal gop_tags + gop_tags = [] + + def push_gop_tags() -> None: + if not gop_tags: + return + + gop_tags.sort(key=lambda tag: tag.timestamp) + if trace: + logger.debug( + 'Tags in GOP:\n' + f'Number of tags: {len(gop_tags)}\n' + f'Total size of tags: {sum(map(len, gop_tags))}\n' + f'The first tag is {gop_tags[0]}\n' + f'The last tag is {gop_tags[-1]}' + ) + + for tag in gop_tags: + observer.on_next(tag) + + gop_tags.clear() + + def on_next(item: FLVStreamItem) -> None: + if isinstance(item, FlvHeader) or is_avc_end_sequence(item): + push_gop_tags() + observer.on_next(item) + return + + if is_video_nalu_keyframe(item): + push_gop_tags() + gop_tags.append(item) + else: + gop_tags.append(item) + + def on_completed() -> None: + push_gop_tags() + observer.on_completed() + + def on_error(exc: Exception) -> None: + push_gop_tags() + observer.on_error(exc) + + def dispose() -> None: + nonlocal disposed + disposed = True + if gop_tags: + logger.debug( + 'Remaining tags:\n' + f'Number of tags: {len(gop_tags)}\n' + f'Total size of tags: {sum(map(len, gop_tags))}\n' + f'The first tag is {gop_tags[0]}\n' + f'The last tag is {gop_tags[-1]}' + ) + reset() + + subscription.disposable = source.subscribe( + on_next, on_error, on_completed, scheduler=scheduler + ) + + return CompositeDisposable(subscription, Disposable(dispose)) + + return Observable(subscribe) + + return _sort