From 34d9aa63efcac956c6e481deef79e4c1101e4d20 Mon Sep 17 00:00:00 2001 From: acgnhik Date: Mon, 26 Dec 2022 21:24:28 +0800 Subject: [PATCH] feat: improve hls stream recorders avoid excessive memory being occupied --- .../core/hls_raw_stream_recorder_impl.py | 17 +++--- src/blrec/core/hls_stream_recorder_impl.py | 18 +++---- src/blrec/utils/operators/__init__.py | 3 +- src/blrec/utils/operators/observe_on.py | 54 +++++++++++++++++++ 4 files changed, 75 insertions(+), 17 deletions(-) create mode 100644 src/blrec/utils/operators/observe_on.py diff --git a/src/blrec/core/hls_raw_stream_recorder_impl.py b/src/blrec/core/hls_raw_stream_recorder_impl.py index f95bb86..e937d82 100644 --- a/src/blrec/core/hls_raw_stream_recorder_impl.py +++ b/src/blrec/core/hls_raw_stream_recorder_impl.py @@ -1,13 +1,13 @@ import logging from typing import Optional -from reactivex import operators as ops from reactivex.scheduler import NewThreadScheduler from blrec.bili.live import Live from blrec.bili.typing import QualityNumber from blrec.hls import operators as hls_ops from blrec.hls.metadata_dumper import MetadataDumper +from blrec.utils import operators as utils_ops from . import operators as core_ops from .stream_recorder_impl import StreamRecorderImpl @@ -84,16 +84,14 @@ class HLSRawStreamRecorderImpl(StreamRecorderImpl): self._stream_param_holder.get_stream_params() # type: ignore .pipe( self._stream_url_resolver, - ops.subscribe_on( - NewThreadScheduler(self._thread_factory('PlaylistDownloader')) - ), self._playlist_fetcher, self._recording_monitor, self._connection_error_handler, self._request_exception_handler, self._playlist_dumper, - ops.observe_on( - NewThreadScheduler(self._thread_factory('SegmentDownloader')) + utils_ops.observe_on_new_thread( + queue_size=60, + thread_name=f'SegmentDownloader::{self._live.room_id}', ), self._segment_fetcher, self._dl_statistics, @@ -103,5 +101,10 @@ class HLSRawStreamRecorderImpl(StreamRecorderImpl): self._progress_bar, self._exception_handler, ) - .subscribe(on_completed=self._on_completed) + .subscribe( + on_completed=self._on_completed, + scheduler=NewThreadScheduler( + self._thread_factory('HLSRawStreamRecorder') + ), + ) ) diff --git a/src/blrec/core/hls_stream_recorder_impl.py b/src/blrec/core/hls_stream_recorder_impl.py index a9d8b75..fba4812 100644 --- a/src/blrec/core/hls_stream_recorder_impl.py +++ b/src/blrec/core/hls_stream_recorder_impl.py @@ -1,7 +1,6 @@ import logging from typing import Optional -from reactivex import operators as ops from reactivex.scheduler import NewThreadScheduler from blrec.bili.live import Live @@ -9,6 +8,7 @@ from blrec.bili.typing import QualityNumber from blrec.flv import operators as flv_ops from blrec.flv.metadata_dumper import MetadataDumper from blrec.hls import operators as hls_ops +from blrec.utils import operators as utils_ops from . import operators as core_ops from .stream_recorder_impl import StreamRecorderImpl @@ -130,22 +130,19 @@ class HLSStreamRecorderImpl(StreamRecorderImpl): self._stream_param_holder.get_stream_params() # type: ignore .pipe( self._stream_url_resolver, - ops.subscribe_on( - NewThreadScheduler(self._thread_factory('PlaylistFetcher')) - ), self._playlist_fetcher, self._recording_monitor, self._connection_error_handler, self._request_exception_handler, self._playlist_resolver, - ops.observe_on( - NewThreadScheduler(self._thread_factory('SegmentFetcher')) + utils_ops.observe_on_new_thread( + queue_size=60, thread_name=f'SegmentFetcher::{self._live.room_id}' ), self._segment_fetcher, self._dl_statistics, self._prober, - ops.observe_on( - NewThreadScheduler(self._thread_factory('StreamRecorder')) + utils_ops.observe_on_new_thread( + queue_size=10, thread_name=f'StreamRecorder::{self._live.room_id}' ), self._segment_remuxer, self._segment_parser, @@ -160,5 +157,8 @@ class HLSStreamRecorderImpl(StreamRecorderImpl): self._progress_bar, self._exception_handler, ) - .subscribe(on_completed=self._on_completed) + .subscribe( + on_completed=self._on_completed, + scheduler=NewThreadScheduler(self._thread_factory('HLSStreamRecorder')), + ) ) diff --git a/src/blrec/utils/operators/__init__.py b/src/blrec/utils/operators/__init__.py index 1c9d7f8..ff6712f 100644 --- a/src/blrec/utils/operators/__init__.py +++ b/src/blrec/utils/operators/__init__.py @@ -1,4 +1,5 @@ from .replace import replace from .retry import retry +from .observe_on import observe_on_new_thread -__all__ = ('replace', 'retry') +__all__ = ('replace', 'retry', 'observe_on_new_thread') diff --git a/src/blrec/utils/operators/observe_on.py b/src/blrec/utils/operators/observe_on.py new file mode 100644 index 0000000..75e9f2f --- /dev/null +++ b/src/blrec/utils/operators/observe_on.py @@ -0,0 +1,54 @@ +from queue import Queue +from threading import Thread +from typing import Any, Callable, Optional, TypeVar + +from reactivex import Observable, abc +from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable + + +_T = TypeVar('_T') + + +def observe_on_new_thread( + queue_size: Optional[int] = None, thread_name: Optional[str] = None +) -> Callable[[Observable[_T]], Observable[_T]]: + def observe_on(source: Observable[_T]) -> Observable[_T]: + def subscribe( + observer: abc.ObserverBase[_T], + scheduler: Optional[abc.SchedulerBase] = None, + ) -> abc.DisposableBase: + disposed = False + subscription = SerialDisposable() + queue: Queue[Callable[..., Any]] = Queue(maxsize=queue_size or 0) + + def run() -> None: + while not disposed: + queue.get()() + + thread = Thread(target=run, name=thread_name, daemon=True) + thread.start() + + def on_next(value: _T) -> None: + queue.put(lambda: observer.on_next(value)) + + def on_error(exc: Exception) -> None: + queue.put(lambda: observer.on_error(exc)) + + def on_completed() -> None: + queue.put(lambda: observer.on_completed) + + def dispose() -> None: + nonlocal disposed + disposed = True + queue.put(lambda: None) + thread.join() + + subscription.disposable = source.subscribe( + on_next, on_error, on_completed, scheduler=scheduler + ) + + return CompositeDisposable(subscription, Disposable(dispose)) + + return Observable(subscribe) + + return observe_on