diff --git a/src/blrec/core/operators/hls_prober.py b/src/blrec/core/operators/hls_prober.py index ca91245..ccb5907 100644 --- a/src/blrec/core/operators/hls_prober.py +++ b/src/blrec/core/operators/hls_prober.py @@ -5,6 +5,7 @@ import logging from typing import List, Optional, Union from reactivex import Observable, Subject, abc +from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable from ...utils.ffprobe import StreamProfile, ffprobe from .segment_fetcher import InitSectionData, SegmentData @@ -39,6 +40,9 @@ class HLSProber: observer: abc.ObserverBase[Union[InitSectionData, SegmentData]], scheduler: Optional[abc.SchedulerBase] = None, ) -> abc.DisposableBase: + disposed = False + subscription = SerialDisposable() + self._reset() def on_next(item: Union[InitSectionData, SegmentData]) -> None: @@ -59,10 +63,17 @@ class HLSProber: observer.on_next(item) - return source.subscribe( + def dispose() -> None: + nonlocal disposed + disposed = True + self._reset() + + subscription.disposable = source.subscribe( on_next, observer.on_error, observer.on_completed, scheduler=scheduler ) + return CompositeDisposable(subscription, Disposable(dispose)) + return Observable(subscribe) def _do_probe(self) -> None: diff --git a/src/blrec/core/operators/playlist_resolver.py b/src/blrec/core/operators/playlist_resolver.py index 1301016..5831197 100644 --- a/src/blrec/core/operators/playlist_resolver.py +++ b/src/blrec/core/operators/playlist_resolver.py @@ -29,6 +29,7 @@ class PlaylistResolver: ) -> abc.DisposableBase: disposed = False subscription = SerialDisposable() + last_seg_uris: OrderedSet[str] = OrderedSet() def on_next(playlist: m3u8.M3U8) -> None: diff --git a/src/blrec/core/operators/segment_fetcher.py b/src/blrec/core/operators/segment_fetcher.py index 4543fd4..d850703 100644 --- a/src/blrec/core/operators/segment_fetcher.py +++ b/src/blrec/core/operators/segment_fetcher.py @@ -55,6 +55,7 @@ class SegmentFetcher: ) -> abc.DisposableBase: disposed = False subscription = SerialDisposable() + init_section: Optional[InitializationSection] = None def on_next(seg: m3u8.Segment) -> None: @@ -76,7 +77,9 @@ class SegmentFetcher: def dispose() -> None: nonlocal disposed + nonlocal init_section disposed = True + init_section = None subscription.disposable = source.subscribe( on_next, observer.on_error, observer.on_completed, scheduler=scheduler diff --git a/src/blrec/core/operators/segment_remuxer.py b/src/blrec/core/operators/segment_remuxer.py index 6e8416e..e1ed825 100644 --- a/src/blrec/core/operators/segment_remuxer.py +++ b/src/blrec/core/operators/segment_remuxer.py @@ -21,7 +21,8 @@ logging.getLogger(urllib3.__name__).setLevel(logging.WARNING) class SegmentRemuxer: - _MAX_SEGMENT_DATA_CACHE: Final = 3 + _SEGMENT_DATA_CACHE: Final = 10 + _MAX_SEGMENT_DATA_CACHE: Final = 15 def __init__(self, live: Live) -> None: self._live = live @@ -47,6 +48,12 @@ class SegmentRemuxer: segment_data_cache: List[bytes] = [] self._stream_remuxer.stop() + def reset() -> None: + nonlocal init_section_data, segment_data_cache + init_section_data = None + segment_data_cache = [] + self._stream_remuxer.stop() + def write(data: bytes) -> int: return wait_for( self._stream_remuxer.input.write, @@ -56,6 +63,7 @@ class SegmentRemuxer: def on_next(data: Union[InitSectionData, SegmentData]) -> None: nonlocal init_section_data + nonlocal segment_data_cache if isinstance(data, InitSectionData): init_section_data = data.payload @@ -86,14 +94,22 @@ class SegmentRemuxer: except Exception as e: logger.warning(f'Failed to write data to stream remuxer: {repr(e)}') self._stream_remuxer.stop() + if len(segment_data_cache) >= self._MAX_SEGMENT_DATA_CACHE: + segment_data_cache = segment_data_cache[ + -self._MAX_SEGMENT_DATA_CACHE + 1 : + ] + else: + if len(segment_data_cache) >= self._SEGMENT_DATA_CACHE: + segment_data_cache = segment_data_cache[ + -self._SEGMENT_DATA_CACHE + 1 : + ] segment_data_cache.append(data.payload) - if len(segment_data_cache) > self._MAX_SEGMENT_DATA_CACHE: - segment_data_cache.pop(0) def dispose() -> None: nonlocal disposed disposed = True + reset() subscription.disposable = source.subscribe( on_next, observer.on_error, observer.on_completed, scheduler=scheduler diff --git a/src/blrec/core/recorder.py b/src/blrec/core/recorder.py index aa8f95a..dfa0387 100644 --- a/src/blrec/core/recorder.py +++ b/src/blrec/core/recorder.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import time import logging from datetime import datetime from typing import Iterator, Optional diff --git a/src/blrec/core/stream_recorder_impl.py b/src/blrec/core/stream_recorder_impl.py index ddef4c9..57a679c 100644 --- a/src/blrec/core/stream_recorder_impl.py +++ b/src/blrec/core/stream_recorder_impl.py @@ -328,6 +328,7 @@ class StreamRecorderImpl( def _dispose(self) -> None: self._subscription.dispose() + del self._subscription self._on_completed() def _on_completed(self) -> None: diff --git a/src/blrec/flv/metadata_dumper.py b/src/blrec/flv/metadata_dumper.py index f4e1e66..4c8cc77 100644 --- a/src/blrec/flv/metadata_dumper.py +++ b/src/blrec/flv/metadata_dumper.py @@ -22,11 +22,12 @@ class MetadataDumper(SwitchableMixin): joinpoint_extractor: flv_ops.JoinPointExtractor, ) -> None: super().__init__() - self._dumper = dumper self._analyser = analyser self._joinpoint_extractor = joinpoint_extractor + self._reset() + def _reset(self) -> None: self._last_metadata: Optional[flv_ops.MetaData] = None self._last_join_points: Optional[List[flv_ops.JoinPoint]] = None @@ -40,6 +41,7 @@ class MetadataDumper(SwitchableMixin): self._file_closed_subscription = self._dumper.file_closed.subscribe( self._dump_metadata ) + self._reset() logger.debug('Enabled metadata dumper') def _do_disable(self) -> None: @@ -49,6 +51,7 @@ class MetadataDumper(SwitchableMixin): self._join_points_subscription.dispose() with suppress(Exception): self._file_closed_subscription.dispose() + self._reset() logger.debug('Disabled metadata dumper') def _update_metadata(self, metadata: Optional[flv_ops.MetaData]) -> None: diff --git a/src/blrec/flv/operators/analyse.py b/src/blrec/flv/operators/analyse.py index 26f9094..21d00b1 100644 --- a/src/blrec/flv/operators/analyse.py +++ b/src/blrec/flv/operators/analyse.py @@ -237,9 +237,12 @@ class Analyser: observer: abc.ObserverBase[FLVStreamItem], scheduler: Optional[abc.SchedulerBase] = None, ) -> abc.DisposableBase: - stream_index: int = -1 + disposed = False subscription = SerialDisposable() + self._reset() + stream_index: int = -1 + def push_metadata() -> None: try: metadata = self.make_metadata() @@ -270,7 +273,10 @@ class Analyser: observer.on_error(e) def dispose() -> None: + nonlocal disposed + disposed = True push_metadata() + self._reset() subscription.disposable = source.subscribe( on_next, on_error, on_completed, scheduler=scheduler diff --git a/src/blrec/flv/operators/concat.py b/src/blrec/flv/operators/concat.py index a856dd3..5069672 100644 --- a/src/blrec/flv/operators/concat.py +++ b/src/blrec/flv/operators/concat.py @@ -85,6 +85,9 @@ def concat( observer: abc.ObserverBase[FLVStreamItem], scheduler: Optional[abc.SchedulerBase] = None, ) -> abc.DisposableBase: + disposed = False + subscription = SerialDisposable() + delta: int = 0 action: ACTION = ACTION.NOOP last_tags: List[FlvTag] = [] @@ -93,6 +96,22 @@ def concat( last_audio_sequence_header: Optional[AudioTag] = None last_video_sequence_header: Optional[VideoTag] = None + def reset() -> None: + nonlocal delta + nonlocal action + nonlocal last_tags + nonlocal gathered_tags + nonlocal last_flv_header + nonlocal last_audio_sequence_header + nonlocal last_video_sequence_header + delta = 0 + action = ACTION.NOOP + last_tags = [] + gathered_tags = [] + last_flv_header = None + last_audio_sequence_header = None + last_video_sequence_header = None + def update_last_tags(tag: FlvTag) -> None: nonlocal last_audio_sequence_header, last_video_sequence_header last_tags.append(tag) @@ -315,10 +334,17 @@ def concat( do_concat() observer.on_error(e) - return source.subscribe( + def dispose() -> None: + nonlocal disposed + disposed = True + reset() + + subscription.disposable = source.subscribe( on_next, on_error, on_completed, scheduler=scheduler ) + return CompositeDisposable(subscription, Disposable(dispose)) + return Observable(subscribe) return _concat @@ -340,11 +366,19 @@ class JoinPointExtractor: observer: abc.ObserverBase[FLVStreamItem], scheduler: Optional[abc.SchedulerBase] = None, ) -> abc.DisposableBase: - stream_index: int = -1 + disposed = False subscription = SerialDisposable() + + stream_index: int = -1 join_points: List[JoinPoint] = [] join_point_tag: Optional[ScriptTag] = None + def reset() -> None: + nonlocal stream_index, join_points, join_point_tag + stream_index = -1 + join_points = [] + join_point_tag = None + def push_join_points() -> None: self._join_points.on_next(join_points.copy()) @@ -381,7 +415,10 @@ class JoinPointExtractor: observer.on_error(e) def dispose() -> None: + nonlocal disposed + disposed = True push_join_points() + reset() subscription.disposable = source.subscribe( on_next, on_error, on_completed, scheduler=scheduler @@ -409,4 +446,11 @@ class JoinPointExtractor: crc32=join_point_data['crc32'], ) logger.debug(f'Extracted join point: {join_point}; next tag: {next_tag}') + if cksum(next_tag.body) != join_point_data['crc32']: + logger.warning( + f'Timestamp of extracted join point may be incorrect\n' + f'join point data: {join_point_data}\n' + f'join point tag: {join_point_tag}\n' + f'next tag: {next_tag}\n' + ) return join_point diff --git a/src/blrec/flv/operators/correct.py b/src/blrec/flv/operators/correct.py index 0aa3012..3a75e0a 100644 --- a/src/blrec/flv/operators/correct.py +++ b/src/blrec/flv/operators/correct.py @@ -2,6 +2,7 @@ import logging from typing import Callable, Optional from reactivex import Observable, abc +from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable from ..common import is_script_tag, is_sequence_header from ..models import FlvHeader, FlvTag @@ -20,9 +21,17 @@ def correct() -> Callable[[FLVStream], FLVStream]: observer: abc.ObserverBase[FLVStreamItem], scheduler: Optional[abc.SchedulerBase] = None, ) -> abc.DisposableBase: + disposed = False + subscription = SerialDisposable() + delta: Optional[int] = None first_data_tag: Optional[FlvTag] = None + def reset() -> None: + nonlocal delta, first_data_tag + delta = None + first_data_tag = None + def correct_ts(tag: FlvTag, delta: int) -> FlvTag: if delta == 0: return tag @@ -72,10 +81,17 @@ def correct() -> Callable[[FLVStream], FLVStream]: tag = correct_ts(tag, delta) observer.on_next(tag) - return source.subscribe( + def dispose() -> None: + nonlocal disposed + disposed = True + reset() + + subscription.disposable = source.subscribe( on_next, observer.on_error, observer.on_completed, scheduler=scheduler ) + return CompositeDisposable(subscription, Disposable(dispose)) + return Observable(subscribe) return _correct diff --git a/src/blrec/flv/operators/cut.py b/src/blrec/flv/operators/cut.py index 2cddd13..4383dd7 100644 --- a/src/blrec/flv/operators/cut.py +++ b/src/blrec/flv/operators/cut.py @@ -4,6 +4,7 @@ import logging from typing import Optional from reactivex import Observable, abc +from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable from ..common import ( is_audio_sequence_header, @@ -58,6 +59,9 @@ class Cutter: observer: abc.ObserverBase[FLVStreamItem], scheduler: Optional[abc.SchedulerBase] = None, ) -> abc.DisposableBase: + disposed = False + subscription = SerialDisposable() + def on_next(item: FLVStreamItem) -> None: if isinstance(item, FlvHeader): self._reset() @@ -71,10 +75,17 @@ class Cutter: self._triggered = False observer.on_next(item) - return source.subscribe( + def dispose() -> None: + nonlocal disposed + disposed = True + self._reset() + + subscription.disposable = source.subscribe( on_next, observer.on_error, observer.on_completed, scheduler=scheduler ) + return CompositeDisposable(subscription, Disposable(dispose)) + return Observable(subscribe) def _update_flv_header(self, header: FlvHeader) -> None: diff --git a/src/blrec/flv/operators/defragment.py b/src/blrec/flv/operators/defragment.py index 5482b6b..0b96ab2 100644 --- a/src/blrec/flv/operators/defragment.py +++ b/src/blrec/flv/operators/defragment.py @@ -2,6 +2,7 @@ import logging from typing import Callable, List, Optional from reactivex import Observable, abc +from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable from ..models import FlvHeader from .typing import FLVStream, FLVStreamItem @@ -19,9 +20,17 @@ def defragment(min_tags: int = 10) -> Callable[[FLVStream], FLVStream]: observer: abc.ObserverBase[FLVStreamItem], scheduler: Optional[abc.SchedulerBase] = None, ) -> abc.DisposableBase: + disposed = False + subscription = SerialDisposable() + gathering: bool = False gathered_items: List[FLVStreamItem] = [] + def reset() -> None: + nonlocal gathering, gathered_items + gathering = False + gathered_items = [] + def on_next(item: FLVStreamItem) -> None: nonlocal gathering @@ -47,10 +56,17 @@ def defragment(min_tags: int = 10) -> Callable[[FLVStream], FLVStream]: else: observer.on_next(item) - return source.subscribe( + def dispose() -> None: + nonlocal disposed + disposed = True + reset() + + subscription.disposable = source.subscribe( on_next, observer.on_error, observer.on_completed, scheduler=scheduler ) + return CompositeDisposable(subscription, Disposable(dispose)) + return Observable(subscribe) return _defragment diff --git a/src/blrec/flv/operators/fix.py b/src/blrec/flv/operators/fix.py index c844071..57df3ca 100644 --- a/src/blrec/flv/operators/fix.py +++ b/src/blrec/flv/operators/fix.py @@ -3,6 +3,7 @@ import math from typing import Callable, Optional from reactivex import Observable, abc +from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable from ..common import ( is_audio_tag, @@ -27,6 +28,9 @@ def fix() -> Callable[[FLVStream], FLVStream]: observer: abc.ObserverBase[FLVStreamItem], scheduler: Optional[abc.SchedulerBase] = None, ) -> abc.DisposableBase: + disposed = False + subscription = SerialDisposable() + delta: int = 0 last_tag: Optional[FlvTag] = None last_audio_tag: Optional[AudioTag] = None @@ -143,10 +147,17 @@ def fix() -> Callable[[FLVStream], FLVStream]: tag = correct_ts(tag) observer.on_next(tag) - return source.subscribe( + def dispose() -> None: + nonlocal disposed + disposed = True + reset() + + subscription.disposable = source.subscribe( on_next, observer.on_error, observer.on_completed, scheduler=scheduler ) + return CompositeDisposable(subscription, Disposable(dispose)) + return Observable(subscribe) return _fix diff --git a/src/blrec/flv/operators/limit.py b/src/blrec/flv/operators/limit.py index 7b78077..e30ae79 100644 --- a/src/blrec/flv/operators/limit.py +++ b/src/blrec/flv/operators/limit.py @@ -4,6 +4,7 @@ import logging from typing import Optional from reactivex import Observable, abc +from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable from ..common import ( is_audio_sequence_header, @@ -51,6 +52,9 @@ class Limiter: observer: abc.ObserverBase[FLVStreamItem], scheduler: Optional[abc.SchedulerBase] = None, ) -> abc.DisposableBase: + disposed = False + subscription = SerialDisposable() + def on_next(item: FLVStreamItem) -> None: if isinstance(item, FlvHeader): self._reset() @@ -62,10 +66,17 @@ class Limiter: self._insert_header_and_tags(observer) observer.on_next(item) - return source.subscribe( + def dispose() -> None: + nonlocal disposed + disposed = True + self._reset() + + subscription.disposable = source.subscribe( on_next, observer.on_error, observer.on_completed, scheduler=scheduler ) + return CompositeDisposable(subscription, Disposable(dispose)) + return Observable(subscribe) def _insert_header_and_tags( diff --git a/src/blrec/flv/operators/probe.py b/src/blrec/flv/operators/probe.py index e50d67e..009a447 100644 --- a/src/blrec/flv/operators/probe.py +++ b/src/blrec/flv/operators/probe.py @@ -5,6 +5,7 @@ import logging from typing import List, Optional, cast from reactivex import Observable, Subject, abc +from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable from ...utils.ffprobe import StreamProfile, ffprobe from ..common import find_aac_header_tag, find_avc_header_tag @@ -38,6 +39,9 @@ class Prober: observer: abc.ObserverBase[FLVStreamItem], scheduler: Optional[abc.SchedulerBase] = None, ) -> abc.DisposableBase: + disposed = False + subscription = SerialDisposable() + self._reset() def on_next(item: FLVStreamItem) -> None: @@ -58,10 +62,17 @@ class Prober: observer.on_next(item) - return source.subscribe( + def dispose() -> None: + nonlocal disposed + disposed = True + self._reset() + + subscription.disposable = source.subscribe( on_next, observer.on_error, observer.on_completed, scheduler=scheduler ) + return CompositeDisposable(subscription, Disposable(dispose)) + return Observable(subscribe) def _do_probe(self) -> None: diff --git a/src/blrec/flv/operators/split.py b/src/blrec/flv/operators/split.py index 61f9fa7..4fa89ce 100644 --- a/src/blrec/flv/operators/split.py +++ b/src/blrec/flv/operators/split.py @@ -2,6 +2,7 @@ import logging from typing import Callable, Optional from reactivex import Observable, abc +from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable from ..common import is_audio_sequence_header, is_metadata_tag, is_video_sequence_header from ..models import AudioTag, FlvHeader, ScriptTag, VideoTag @@ -21,6 +22,9 @@ def split() -> Callable[[FLVStream], FLVStream]: observer: abc.ObserverBase[FLVStreamItem], scheduler: Optional[abc.SchedulerBase] = None, ) -> abc.DisposableBase: + disposed = False + subscription = SerialDisposable() + changed: bool = False last_flv_header: Optional[FlvHeader] = None last_metadata_tag: Optional[ScriptTag] = None @@ -91,10 +95,17 @@ def split() -> Callable[[FLVStream], FLVStream]: observer.on_next(tag) - return source.subscribe( + def dispose() -> None: + nonlocal disposed + disposed = True + reset() + + subscription.disposable = source.subscribe( on_next, observer.on_error, observer.on_completed, scheduler=scheduler ) + return CompositeDisposable(subscription, Disposable(dispose)) + return Observable(subscribe).pipe(correct()) return _split