refactor: refactor on_completed

This commit is contained in:
acgnhik 2022-06-09 13:12:46 +08:00
parent e7a73dbe66
commit f6d1777662
3 changed files with 21 additions and 5 deletions

View File

@ -44,8 +44,12 @@ class SizedStatistics:
self._statistics.submit(len(item)) self._statistics.submit(len(item))
observer.on_next(item) observer.on_next(item)
def on_completed() -> None:
self._statistics.freeze()
observer.on_completed()
return source.subscribe( return source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler on_next, observer.on_error, on_completed, scheduler=scheduler
) )
return Observable(subscribe) return Observable(subscribe)

View File

@ -46,8 +46,12 @@ class StreamStatistics:
calculable_stream.size_updates.subscribe(self._statistics.submit) calculable_stream.size_updates.subscribe(self._statistics.submit)
observer.on_next(calculable_stream) observer.on_next(calculable_stream)
def on_completed() -> None:
self._statistics.freeze()
observer.on_completed()
return source.subscribe( return source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler on_next, observer.on_error, on_completed, scheduler=scheduler
) )
return Observable(subscribe) return Observable(subscribe)

View File

@ -106,6 +106,7 @@ class StreamRecorderImpl(
self._metadata_dumper.enable() self._metadata_dumper.enable()
self._subscription: abc.DisposableBase self._subscription: abc.DisposableBase
self._completed: bool = False
self._threads: List[Thread] = [] self._threads: List[Thread] = []
self._files: List[str] = [] self._files: List[str] = []
@ -291,6 +292,7 @@ class StreamRecorderImpl(
self._files.clear() self._files.clear()
self._stream_profile = {} self._stream_profile = {}
self._record_start_time = None self._record_start_time = None
self._completed = False
async def _do_start(self) -> None: async def _do_start(self) -> None:
logger.debug('Starting stream recorder...') logger.debug('Starting stream recorder...')
@ -301,9 +303,7 @@ class StreamRecorderImpl(
async def _do_stop(self) -> None: async def _do_stop(self) -> None:
logger.debug('Stopping stream recorder...') logger.debug('Stopping stream recorder...')
self._stream_param_holder.cancel() self._stream_param_holder.cancel()
thread = self._thread_factory('StreamRecorderDisposer')( thread = self._thread_factory('StreamRecorderDisposer')(self._dispose)
self._subscription.dispose
)
thread.start() thread.start()
for thread in self._threads: for thread in self._threads:
await self._loop.run_in_executor(None, thread.join, 30) await self._loop.run_in_executor(None, thread.join, 30)
@ -324,7 +324,15 @@ class StreamRecorderImpl(
return factory return factory
def _dispose(self) -> None:
self._subscription.dispose()
self._on_completed()
def _on_completed(self) -> None: def _on_completed(self) -> None:
if self._completed:
return
self._completed = True
self._dl_statistics.freeze() self._dl_statistics.freeze()
self._rec_statistics.freeze() self._rec_statistics.freeze()
self._emit_event('stream_recording_completed') self._emit_event('stream_recording_completed')