From f6d1777662e627706953be6431ac664ab66ff931 Mon Sep 17 00:00:00 2001 From: acgnhik Date: Thu, 9 Jun 2022 13:12:46 +0800 Subject: [PATCH] refactor: refactor on_completed --- src/blrec/core/operators/sized_statistics.py | 6 +++++- src/blrec/core/operators/stream_statistics.py | 6 +++++- src/blrec/core/stream_recorder_impl.py | 14 +++++++++++--- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/blrec/core/operators/sized_statistics.py b/src/blrec/core/operators/sized_statistics.py index 61857ff..d53eadd 100644 --- a/src/blrec/core/operators/sized_statistics.py +++ b/src/blrec/core/operators/sized_statistics.py @@ -44,8 +44,12 @@ class SizedStatistics: self._statistics.submit(len(item)) observer.on_next(item) + def on_completed() -> None: + self._statistics.freeze() + observer.on_completed() + return source.subscribe( - on_next, observer.on_error, observer.on_completed, scheduler=scheduler + on_next, observer.on_error, on_completed, scheduler=scheduler ) return Observable(subscribe) diff --git a/src/blrec/core/operators/stream_statistics.py b/src/blrec/core/operators/stream_statistics.py index 91b3b85..0ba7cd3 100644 --- a/src/blrec/core/operators/stream_statistics.py +++ b/src/blrec/core/operators/stream_statistics.py @@ -46,8 +46,12 @@ class StreamStatistics: calculable_stream.size_updates.subscribe(self._statistics.submit) observer.on_next(calculable_stream) + def on_completed() -> None: + self._statistics.freeze() + observer.on_completed() + return source.subscribe( - on_next, observer.on_error, observer.on_completed, scheduler=scheduler + on_next, observer.on_error, on_completed, scheduler=scheduler ) return Observable(subscribe) diff --git a/src/blrec/core/stream_recorder_impl.py b/src/blrec/core/stream_recorder_impl.py index dddca03..5e17e9b 100644 --- a/src/blrec/core/stream_recorder_impl.py +++ b/src/blrec/core/stream_recorder_impl.py @@ -106,6 +106,7 @@ class StreamRecorderImpl( self._metadata_dumper.enable() self._subscription: abc.DisposableBase + self._completed: bool = False self._threads: List[Thread] = [] self._files: List[str] = [] @@ -291,6 +292,7 @@ class StreamRecorderImpl( self._files.clear() self._stream_profile = {} self._record_start_time = None + self._completed = False async def _do_start(self) -> None: logger.debug('Starting stream recorder...') @@ -301,9 +303,7 @@ class StreamRecorderImpl( async def _do_stop(self) -> None: logger.debug('Stopping stream recorder...') self._stream_param_holder.cancel() - thread = self._thread_factory('StreamRecorderDisposer')( - self._subscription.dispose - ) + thread = self._thread_factory('StreamRecorderDisposer')(self._dispose) thread.start() for thread in self._threads: await self._loop.run_in_executor(None, thread.join, 30) @@ -324,7 +324,15 @@ class StreamRecorderImpl( return factory + def _dispose(self) -> None: + self._subscription.dispose() + self._on_completed() + def _on_completed(self) -> None: + if self._completed: + return + self._completed = True + self._dl_statistics.freeze() self._rec_statistics.freeze() self._emit_event('stream_recording_completed')