feat: improve hls stream recorders

avoid excessive memory being occupied
This commit is contained in:
acgnhik 2022-12-26 21:24:28 +08:00
parent 6469881220
commit 34d9aa63ef
4 changed files with 75 additions and 17 deletions

View File

@ -1,13 +1,13 @@
import logging
from typing import Optional
from reactivex import operators as ops
from reactivex.scheduler import NewThreadScheduler
from blrec.bili.live import Live
from blrec.bili.typing import QualityNumber
from blrec.hls import operators as hls_ops
from blrec.hls.metadata_dumper import MetadataDumper
from blrec.utils import operators as utils_ops
from . import operators as core_ops
from .stream_recorder_impl import StreamRecorderImpl
@ -84,16 +84,14 @@ class HLSRawStreamRecorderImpl(StreamRecorderImpl):
self._stream_param_holder.get_stream_params() # type: ignore
.pipe(
self._stream_url_resolver,
ops.subscribe_on(
NewThreadScheduler(self._thread_factory('PlaylistDownloader'))
),
self._playlist_fetcher,
self._recording_monitor,
self._connection_error_handler,
self._request_exception_handler,
self._playlist_dumper,
ops.observe_on(
NewThreadScheduler(self._thread_factory('SegmentDownloader'))
utils_ops.observe_on_new_thread(
queue_size=60,
thread_name=f'SegmentDownloader::{self._live.room_id}',
),
self._segment_fetcher,
self._dl_statistics,
@ -103,5 +101,10 @@ class HLSRawStreamRecorderImpl(StreamRecorderImpl):
self._progress_bar,
self._exception_handler,
)
.subscribe(on_completed=self._on_completed)
.subscribe(
on_completed=self._on_completed,
scheduler=NewThreadScheduler(
self._thread_factory('HLSRawStreamRecorder')
),
)
)

View File

@ -1,7 +1,6 @@
import logging
from typing import Optional
from reactivex import operators as ops
from reactivex.scheduler import NewThreadScheduler
from blrec.bili.live import Live
@ -9,6 +8,7 @@ from blrec.bili.typing import QualityNumber
from blrec.flv import operators as flv_ops
from blrec.flv.metadata_dumper import MetadataDumper
from blrec.hls import operators as hls_ops
from blrec.utils import operators as utils_ops
from . import operators as core_ops
from .stream_recorder_impl import StreamRecorderImpl
@ -130,22 +130,19 @@ class HLSStreamRecorderImpl(StreamRecorderImpl):
self._stream_param_holder.get_stream_params() # type: ignore
.pipe(
self._stream_url_resolver,
ops.subscribe_on(
NewThreadScheduler(self._thread_factory('PlaylistFetcher'))
),
self._playlist_fetcher,
self._recording_monitor,
self._connection_error_handler,
self._request_exception_handler,
self._playlist_resolver,
ops.observe_on(
NewThreadScheduler(self._thread_factory('SegmentFetcher'))
utils_ops.observe_on_new_thread(
queue_size=60, thread_name=f'SegmentFetcher::{self._live.room_id}'
),
self._segment_fetcher,
self._dl_statistics,
self._prober,
ops.observe_on(
NewThreadScheduler(self._thread_factory('StreamRecorder'))
utils_ops.observe_on_new_thread(
queue_size=10, thread_name=f'StreamRecorder::{self._live.room_id}'
),
self._segment_remuxer,
self._segment_parser,
@ -160,5 +157,8 @@ class HLSStreamRecorderImpl(StreamRecorderImpl):
self._progress_bar,
self._exception_handler,
)
.subscribe(on_completed=self._on_completed)
.subscribe(
on_completed=self._on_completed,
scheduler=NewThreadScheduler(self._thread_factory('HLSStreamRecorder')),
)
)

View File

@ -1,4 +1,5 @@
from .replace import replace
from .retry import retry
from .observe_on import observe_on_new_thread
__all__ = ('replace', 'retry')
__all__ = ('replace', 'retry', 'observe_on_new_thread')

View File

@ -0,0 +1,54 @@
from queue import Queue
from threading import Thread
from typing import Any, Callable, Optional, TypeVar
from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
_T = TypeVar('_T')
def observe_on_new_thread(
queue_size: Optional[int] = None, thread_name: Optional[str] = None
) -> Callable[[Observable[_T]], Observable[_T]]:
def observe_on(source: Observable[_T]) -> Observable[_T]:
def subscribe(
observer: abc.ObserverBase[_T],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
queue: Queue[Callable[..., Any]] = Queue(maxsize=queue_size or 0)
def run() -> None:
while not disposed:
queue.get()()
thread = Thread(target=run, name=thread_name, daemon=True)
thread.start()
def on_next(value: _T) -> None:
queue.put(lambda: observer.on_next(value))
def on_error(exc: Exception) -> None:
queue.put(lambda: observer.on_error(exc))
def on_completed() -> None:
queue.put(lambda: observer.on_completed)
def dispose() -> None:
nonlocal disposed
disposed = True
queue.put(lambda: None)
thread.join()
subscription.disposable = source.subscribe(
on_next, on_error, on_completed, scheduler=scheduler
)
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe)
return observe_on