From e546b47e293d4963a996cab40365071100047810 Mon Sep 17 00:00:00 2001 From: acgnhik Date: Sat, 2 Jul 2022 16:46:44 +0800 Subject: [PATCH] feat: adjust stime of Danmaku when stream recording interrupted --- src/blrec/core/danmaku_dumper.py | 29 +++++++- src/blrec/core/flv_stream_recorder_impl.py | 1 + src/blrec/core/hls_stream_recorder_impl.py | 1 + src/blrec/core/operators/__init__.py | 2 + src/blrec/core/operators/recording_monitor.py | 69 +++++++++++++++++++ src/blrec/core/stream_recorder.py | 6 ++ src/blrec/core/stream_recorder_impl.py | 22 ++++++ src/blrec/flv/operators/analyse.py | 6 +- 8 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 src/blrec/core/operators/recording_monitor.py diff --git a/src/blrec/core/danmaku_dumper.py b/src/blrec/core/danmaku_dumper.py index 964608e..ed13501 100644 --- a/src/blrec/core/danmaku_dumper.py +++ b/src/blrec/core/danmaku_dumper.py @@ -116,8 +116,11 @@ class DanmakuDumper( self, video_path: str, record_start_time: int ) -> None: with self._lock: + self._delta: float = 0 + self._record_start_time: int = record_start_time + self._timebase: int = self._record_start_time * 1000 + self._stream_recording_interrupted: bool = False self._path = danmaku_path(video_path) - self._record_start_time = record_start_time self._files.append(self._path) self._start_dumping() @@ -126,6 +129,20 @@ class DanmakuDumper( await self._stop_dumping() self._path = None + async def on_stream_recording_interrupted(self, duration: float) -> None: + logger.debug(f'Stream recording interrupted, {duration}') + self._duration = duration + self._stream_recording_recovered = asyncio.Condition() + self._stream_recording_interrupted = True + + async def on_stream_recording_recovered(self, timestamp: int) -> None: + logger.debug(f'Stream recording recovered, {timestamp}') + self._timebase = timestamp * 1000 + self._delta = self._duration * 1000 + self._stream_recording_interrupted = False + async with self._stream_recording_recovered: + self._stream_recording_recovered.notify_all() + def _start_dumping(self) -> None: self._create_dump_task() @@ -172,6 +189,7 @@ class DanmakuDumper( async def _dumping_loop(self, writer: DanmakuWriter) -> None: while True: msg = await self._receiver.get_message() + if isinstance(msg, DanmuMsg): await writer.write_danmu(self._make_danmu(msg)) self._statistics.submit(1) @@ -193,6 +211,13 @@ class DanmakuDumper( else: logger.warning('Unsupported message type:', repr(msg)) + if self._stream_recording_interrupted: + logger.debug( + f'Last message before stream recording interrupted: {repr(msg)}' + ) + async with self._stream_recording_recovered: + await self._stream_recording_recovered.wait() + def _make_metadata(self) -> Metadata: return Metadata( user_name=self._live.user_info.name, @@ -259,4 +284,4 @@ class DanmakuDumper( ) def _calc_stime(self, timestamp: int) -> float: - return max((timestamp - self._record_start_time * 1000), 0) / 1000 + return (max(timestamp - self._timebase, 0) + self._delta) / 1000 diff --git a/src/blrec/core/flv_stream_recorder_impl.py b/src/blrec/core/flv_stream_recorder_impl.py index 04025fa..42a8363 100644 --- a/src/blrec/core/flv_stream_recorder_impl.py +++ b/src/blrec/core/flv_stream_recorder_impl.py @@ -47,6 +47,7 @@ class FLVStreamRecorderImpl(StreamRecorderImpl): .pipe( self._stream_url_resolver, self._stream_fetcher, + self._recording_monitor, self._dl_statistics, self._stream_parser, self._connection_error_handler, diff --git a/src/blrec/core/hls_stream_recorder_impl.py b/src/blrec/core/hls_stream_recorder_impl.py index e8483d9..e5f9d2c 100644 --- a/src/blrec/core/hls_stream_recorder_impl.py +++ b/src/blrec/core/hls_stream_recorder_impl.py @@ -57,6 +57,7 @@ class HLSStreamRecorderImpl(StreamRecorderImpl): NewThreadScheduler(self._thread_factory('PlaylistFetcher')) ), self._playlist_fetcher, + self._recording_monitor, self._connection_error_handler, self._request_exception_handler, self._playlist_resolver, diff --git a/src/blrec/core/operators/__init__.py b/src/blrec/core/operators/__init__.py index 67f3fae..88e810e 100644 --- a/src/blrec/core/operators/__init__.py +++ b/src/blrec/core/operators/__init__.py @@ -4,6 +4,7 @@ from .hls_prober import HLSProber, StreamProfile from .playlist_fetcher import PlaylistFetcher from .playlist_resolver import PlaylistResolver from .progress_bar import ProgressBar +from .recording_monitor import RecordingMonitor from .request_exception_handler import RequestExceptionHandler from .segment_fetcher import InitSectionData, SegmentData, SegmentFetcher from .segment_remuxer import SegmentRemuxer @@ -21,6 +22,7 @@ __all__ = ( 'PlaylistFetcher', 'PlaylistResolver', 'ProgressBar', + 'RecordingMonitor', 'RequestExceptionHandler', 'SegmentData', 'SegmentFetcher', diff --git a/src/blrec/core/operators/recording_monitor.py b/src/blrec/core/operators/recording_monitor.py new file mode 100644 index 0000000..2945c72 --- /dev/null +++ b/src/blrec/core/operators/recording_monitor.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +import logging +from typing import Final, Optional, TypeVar + +from reactivex import Observable, Subject, abc + +from ...bili.live import Live +from ...flv import operators as flv_ops +from ...utils.mixins import AsyncCooperationMixin + +__all__ = ('RecordingMonitor',) + + +logger = logging.getLogger(__name__) + +_T = TypeVar('_T') + + +class RecordingMonitor(AsyncCooperationMixin): + def __init__(self, live: Live, analyser: flv_ops.Analyser) -> None: + super().__init__() + self._live = live + self._analyser = analyser + self._interrupted: Subject[float] = Subject() + self._recovered: Subject[int] = Subject() + + @property + def interrupted(self) -> Observable[float]: + return self._interrupted + + @property + def recovered(self) -> Observable[int]: + return self._recovered + + def __call__(self, source: Observable[_T]) -> Observable[_T]: + return self._monitor(source) + + def _monitor(self, source: Observable[_T]) -> Observable[_T]: + CRITERIA: Final[int] = 1 + recording: bool = False + failed_count: int = 0 + + def subscribe( + observer: abc.ObserverBase[_T], + scheduler: Optional[abc.SchedulerBase] = None, + ) -> abc.DisposableBase: + def on_next(item: _T) -> None: + nonlocal recording, failed_count + recording = True + if failed_count >= CRITERIA: + ts = self._run_coroutine(self._live.get_timestamp()) + self._recovered.on_next(ts) + failed_count = 0 + observer.on_next(item) + + def on_error(exc: Exception) -> None: + nonlocal failed_count + if recording: + failed_count += 1 + if failed_count == CRITERIA: + self._interrupted.on_next(self._analyser.duration) + observer.on_error(exc) + + return source.subscribe( + on_next, on_error, observer.on_completed, scheduler=scheduler + ) + + return Observable(subscribe) diff --git a/src/blrec/core/stream_recorder.py b/src/blrec/core/stream_recorder.py index 1674238..4132848 100644 --- a/src/blrec/core/stream_recorder.py +++ b/src/blrec/core/stream_recorder.py @@ -254,6 +254,12 @@ class StreamRecorder( async def on_video_file_completed(self, path: str) -> None: await self._emit('video_file_completed', path) + async def on_stream_recording_interrupted(self, timestamp: int) -> None: + await self._emit('stream_recording_interrupted', timestamp) + + async def on_stream_recording_recovered(self, timestamp: int) -> None: + await self._emit('stream_recording_recovered', timestamp) + async def on_stream_recording_completed(self) -> None: await self._emit('stream_recording_completed') diff --git a/src/blrec/core/stream_recorder_impl.py b/src/blrec/core/stream_recorder_impl.py index 57a679c..34e500d 100644 --- a/src/blrec/core/stream_recorder_impl.py +++ b/src/blrec/core/stream_recorder_impl.py @@ -1,5 +1,6 @@ import logging from abc import ABC, abstractmethod +from datetime import datetime from threading import Thread from typing import Any, Iterator, List, Optional, Tuple, Union @@ -14,6 +15,7 @@ from ..event.event_emitter import EventEmitter, EventListener from ..flv import operators as flv_ops from ..flv.metadata_dumper import MetadataDumper from ..flv.operators import StreamProfile +from ..flv.utils import format_timestamp from ..logging.room_id import aio_task_with_room_id from ..utils.mixins import AsyncCooperationMixin, AsyncStoppableMixin from . import operators as core_ops @@ -35,6 +37,12 @@ class StreamRecorderEventListener(EventListener): async def on_video_file_completed(self, path: str) -> None: ... + async def on_stream_recording_interrupted(self, duratin: float) -> None: + ... + + async def on_stream_recording_recovered(self, timestamp: int) -> None: + ... + async def on_stream_recording_completed(self) -> None: ... @@ -87,6 +95,7 @@ class StreamRecorderImpl( self._path_provider = PathProvider(live, out_dir, path_template) self._dumper = flv_ops.Dumper(self._path_provider, buffer_size) self._rec_statistics = core_ops.SizedStatistics() + self._recording_monitor = core_ops.RecordingMonitor(live, self._analyser) self._prober: Union[flv_ops.Prober, core_ops.HLSProber] self._dl_statistics: Union[core_ops.StreamStatistics, core_ops.SizedStatistics] @@ -135,6 +144,19 @@ class StreamRecorderImpl( self._dumper.file_opened.subscribe(on_file_opened) self._dumper.file_closed.subscribe(on_file_closed) + def on_recording_interrupted(duration: float) -> None: + duration_string = format_timestamp(int(duration * 1000)) + logger.info(f'Recording interrupted, current duration: {duration_string}') + self._emit_event('stream_recording_interrupted', duration) + + def on_recording_recovered(timestamp: int) -> None: + datetime_string = datetime.fromtimestamp(timestamp).isoformat() + logger.info(f'Recording recovered, current date time {(datetime_string)}') + self._emit_event('stream_recording_recovered', timestamp) + + self._recording_monitor.interrupted.subscribe(on_recording_interrupted) + self._recording_monitor.recovered.subscribe(on_recording_recovered) + @property def stream_url(self) -> str: return self._stream_url_resolver.stream_url diff --git a/src/blrec/flv/operators/analyse.py b/src/blrec/flv/operators/analyse.py index 21d00b1..32ebd05 100644 --- a/src/blrec/flv/operators/analyse.py +++ b/src/blrec/flv/operators/analyse.py @@ -128,7 +128,11 @@ class Analyser: self._video_analysed = False @property - def metadatas(self) -> Observable[MetaData]: + def duration(self) -> float: + return self._last_timestamp / 1000 + + @property + def metadatas(self) -> Observable[Optional[MetaData]]: return self._metadatas def __call__(self, source: FLVStream) -> FLVStream: