feat: adjust stime of Danmaku when stream recording interrupted

This commit is contained in:
acgnhik 2022-07-02 16:46:44 +08:00
parent 261a2993be
commit e546b47e29
8 changed files with 133 additions and 3 deletions

View File

@ -116,8 +116,11 @@ class DanmakuDumper(
self, video_path: str, record_start_time: int
) -> None:
with self._lock:
self._delta: float = 0
self._record_start_time: int = record_start_time
self._timebase: int = self._record_start_time * 1000
self._stream_recording_interrupted: bool = False
self._path = danmaku_path(video_path)
self._record_start_time = record_start_time
self._files.append(self._path)
self._start_dumping()
@ -126,6 +129,20 @@ class DanmakuDumper(
await self._stop_dumping()
self._path = None
async def on_stream_recording_interrupted(self, duration: float) -> None:
logger.debug(f'Stream recording interrupted, {duration}')
self._duration = duration
self._stream_recording_recovered = asyncio.Condition()
self._stream_recording_interrupted = True
async def on_stream_recording_recovered(self, timestamp: int) -> None:
logger.debug(f'Stream recording recovered, {timestamp}')
self._timebase = timestamp * 1000
self._delta = self._duration * 1000
self._stream_recording_interrupted = False
async with self._stream_recording_recovered:
self._stream_recording_recovered.notify_all()
def _start_dumping(self) -> None:
self._create_dump_task()
@ -172,6 +189,7 @@ class DanmakuDumper(
async def _dumping_loop(self, writer: DanmakuWriter) -> None:
while True:
msg = await self._receiver.get_message()
if isinstance(msg, DanmuMsg):
await writer.write_danmu(self._make_danmu(msg))
self._statistics.submit(1)
@ -193,6 +211,13 @@ class DanmakuDumper(
else:
logger.warning('Unsupported message type:', repr(msg))
if self._stream_recording_interrupted:
logger.debug(
f'Last message before stream recording interrupted: {repr(msg)}'
)
async with self._stream_recording_recovered:
await self._stream_recording_recovered.wait()
def _make_metadata(self) -> Metadata:
return Metadata(
user_name=self._live.user_info.name,
@ -259,4 +284,4 @@ class DanmakuDumper(
)
def _calc_stime(self, timestamp: int) -> float:
return max((timestamp - self._record_start_time * 1000), 0) / 1000
return (max(timestamp - self._timebase, 0) + self._delta) / 1000

View File

@ -47,6 +47,7 @@ class FLVStreamRecorderImpl(StreamRecorderImpl):
.pipe(
self._stream_url_resolver,
self._stream_fetcher,
self._recording_monitor,
self._dl_statistics,
self._stream_parser,
self._connection_error_handler,

View File

@ -57,6 +57,7 @@ class HLSStreamRecorderImpl(StreamRecorderImpl):
NewThreadScheduler(self._thread_factory('PlaylistFetcher'))
),
self._playlist_fetcher,
self._recording_monitor,
self._connection_error_handler,
self._request_exception_handler,
self._playlist_resolver,

View File

@ -4,6 +4,7 @@ from .hls_prober import HLSProber, StreamProfile
from .playlist_fetcher import PlaylistFetcher
from .playlist_resolver import PlaylistResolver
from .progress_bar import ProgressBar
from .recording_monitor import RecordingMonitor
from .request_exception_handler import RequestExceptionHandler
from .segment_fetcher import InitSectionData, SegmentData, SegmentFetcher
from .segment_remuxer import SegmentRemuxer
@ -21,6 +22,7 @@ __all__ = (
'PlaylistFetcher',
'PlaylistResolver',
'ProgressBar',
'RecordingMonitor',
'RequestExceptionHandler',
'SegmentData',
'SegmentFetcher',

View File

@ -0,0 +1,69 @@
from __future__ import annotations
import logging
from typing import Final, Optional, TypeVar
from reactivex import Observable, Subject, abc
from ...bili.live import Live
from ...flv import operators as flv_ops
from ...utils.mixins import AsyncCooperationMixin
__all__ = ('RecordingMonitor',)
logger = logging.getLogger(__name__)
_T = TypeVar('_T')
class RecordingMonitor(AsyncCooperationMixin):
def __init__(self, live: Live, analyser: flv_ops.Analyser) -> None:
super().__init__()
self._live = live
self._analyser = analyser
self._interrupted: Subject[float] = Subject()
self._recovered: Subject[int] = Subject()
@property
def interrupted(self) -> Observable[float]:
return self._interrupted
@property
def recovered(self) -> Observable[int]:
return self._recovered
def __call__(self, source: Observable[_T]) -> Observable[_T]:
return self._monitor(source)
def _monitor(self, source: Observable[_T]) -> Observable[_T]:
CRITERIA: Final[int] = 1
recording: bool = False
failed_count: int = 0
def subscribe(
observer: abc.ObserverBase[_T],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
def on_next(item: _T) -> None:
nonlocal recording, failed_count
recording = True
if failed_count >= CRITERIA:
ts = self._run_coroutine(self._live.get_timestamp())
self._recovered.on_next(ts)
failed_count = 0
observer.on_next(item)
def on_error(exc: Exception) -> None:
nonlocal failed_count
if recording:
failed_count += 1
if failed_count == CRITERIA:
self._interrupted.on_next(self._analyser.duration)
observer.on_error(exc)
return source.subscribe(
on_next, on_error, observer.on_completed, scheduler=scheduler
)
return Observable(subscribe)

View File

@ -254,6 +254,12 @@ class StreamRecorder(
async def on_video_file_completed(self, path: str) -> None:
await self._emit('video_file_completed', path)
async def on_stream_recording_interrupted(self, timestamp: int) -> None:
await self._emit('stream_recording_interrupted', timestamp)
async def on_stream_recording_recovered(self, timestamp: int) -> None:
await self._emit('stream_recording_recovered', timestamp)
async def on_stream_recording_completed(self) -> None:
await self._emit('stream_recording_completed')

View File

@ -1,5 +1,6 @@
import logging
from abc import ABC, abstractmethod
from datetime import datetime
from threading import Thread
from typing import Any, Iterator, List, Optional, Tuple, Union
@ -14,6 +15,7 @@ from ..event.event_emitter import EventEmitter, EventListener
from ..flv import operators as flv_ops
from ..flv.metadata_dumper import MetadataDumper
from ..flv.operators import StreamProfile
from ..flv.utils import format_timestamp
from ..logging.room_id import aio_task_with_room_id
from ..utils.mixins import AsyncCooperationMixin, AsyncStoppableMixin
from . import operators as core_ops
@ -35,6 +37,12 @@ class StreamRecorderEventListener(EventListener):
async def on_video_file_completed(self, path: str) -> None:
...
async def on_stream_recording_interrupted(self, duratin: float) -> None:
...
async def on_stream_recording_recovered(self, timestamp: int) -> None:
...
async def on_stream_recording_completed(self) -> None:
...
@ -87,6 +95,7 @@ class StreamRecorderImpl(
self._path_provider = PathProvider(live, out_dir, path_template)
self._dumper = flv_ops.Dumper(self._path_provider, buffer_size)
self._rec_statistics = core_ops.SizedStatistics()
self._recording_monitor = core_ops.RecordingMonitor(live, self._analyser)
self._prober: Union[flv_ops.Prober, core_ops.HLSProber]
self._dl_statistics: Union[core_ops.StreamStatistics, core_ops.SizedStatistics]
@ -135,6 +144,19 @@ class StreamRecorderImpl(
self._dumper.file_opened.subscribe(on_file_opened)
self._dumper.file_closed.subscribe(on_file_closed)
def on_recording_interrupted(duration: float) -> None:
duration_string = format_timestamp(int(duration * 1000))
logger.info(f'Recording interrupted, current duration: {duration_string}')
self._emit_event('stream_recording_interrupted', duration)
def on_recording_recovered(timestamp: int) -> None:
datetime_string = datetime.fromtimestamp(timestamp).isoformat()
logger.info(f'Recording recovered, current date time {(datetime_string)}')
self._emit_event('stream_recording_recovered', timestamp)
self._recording_monitor.interrupted.subscribe(on_recording_interrupted)
self._recording_monitor.recovered.subscribe(on_recording_recovered)
@property
def stream_url(self) -> str:
return self._stream_url_resolver.stream_url

View File

@ -128,7 +128,11 @@ class Analyser:
self._video_analysed = False
@property
def metadatas(self) -> Observable[MetaData]:
def duration(self) -> float:
return self._last_timestamp / 1000
@property
def metadatas(self) -> Observable[Optional[MetaData]]:
return self._metadatas
def __call__(self, source: FLVStream) -> FLVStream: