fix: fix downloading fmp4 segments stopped due to the stream interrupted

This commit is contained in:
acgnhik 2022-11-01 13:33:20 +08:00
parent 121b5f9648
commit f625e85595
4 changed files with 49 additions and 13 deletions

View File

@ -48,7 +48,7 @@ class HLSStreamRecorderImpl(StreamRecorderImpl):
)
self._playlist_fetcher = hls_ops.PlaylistFetcher(self._live, self._session)
self._playlist_resolver = hls_ops.PlaylistResolver()
self._playlist_resolver = hls_ops.PlaylistResolver(self._stream_url_resolver)
self._segment_fetcher = hls_ops.SegmentFetcher(self._live, self._session)
self._segment_remuxer = hls_ops.SegmentRemuxer(live)

View File

@ -5,7 +5,6 @@ from typing import Optional
from urllib.parse import urlparse
import requests
import urllib3
from reactivex import Observable, abc
from reactivex import operators as ops
@ -29,7 +28,6 @@ __all__ = ('StreamURLResolver',)
logger = logging.getLogger(__name__)
logging.getLogger(urllib3.__name__).setLevel(logging.WARNING)
class StreamURLResolver(AsyncCooperationMixin):
@ -49,13 +47,13 @@ class StreamURLResolver(AsyncCooperationMixin):
def stream_host(self) -> str:
return self._stream_host
def _reset(self) -> None:
def reset(self) -> None:
self._stream_url = ''
self._stream_host = ''
self._stream_params = None
def __call__(self, source: Observable[StreamParams]) -> Observable[str]:
self._reset()
self.reset()
return self._solve(source).pipe(
ops.do_action(on_error=self._before_retry),
utils_ops.retry(delay=1, should_retry=self._should_retry),

View File

@ -1,2 +1,6 @@
class SegmentDataCorrupted(ValueError):
pass
class NoNewSegments(Exception):
pass

View File

@ -5,20 +5,30 @@ import os
from typing import Optional
import m3u8
import urllib3
from reactivex import Observable, abc
from reactivex import operators as ops
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from blrec.core import operators as core_ops
from blrec.utils import operators as utils_ops
from ..exceptions import NoNewSegments
__all__ = ('PlaylistResolver',)
logger = logging.getLogger(__name__)
logging.getLogger(urllib3.__name__).setLevel(logging.WARNING)
class PlaylistResolver:
def __init__(self, stream_url_resolver: core_ops.StreamURLResolver) -> None:
self._stream_url_resolver = stream_url_resolver
def __call__(self, source: Observable[m3u8.M3U8]) -> Observable[m3u8.Segment]:
return self._solve(source)
return self._solve(source).pipe(
ops.do_action(on_error=self._before_retry),
utils_ops.retry(should_retry=self._should_retry),
)
def _name_of(self, uri: str) -> str:
name, ext = os.path.splitext(uri)
@ -35,18 +45,18 @@ class PlaylistResolver:
disposed = False
subscription = SerialDisposable()
attempts: int = 0
last_sequence_number: Optional[int] = None
def on_next(playlist: m3u8.M3U8) -> None:
nonlocal last_sequence_number
nonlocal attempts, last_sequence_number
if playlist.is_endlist:
logger.debug('Playlist ended')
new_segments = []
for seg in playlist.segments:
uri = seg.uri
name = self._name_of(uri)
num = int(name)
num = self._sequence_number_of(seg.uri)
if last_sequence_number is not None:
if last_sequence_number >= num:
continue
@ -57,9 +67,21 @@ class PlaylistResolver:
f'current sequence number: {num}'
)
seg.discontinuity = True
observer.on_next(seg)
new_segments.append(seg)
last_sequence_number = num
if not new_segments:
attempts += 1
if attempts > 3:
attempts = 0
observer.on_error(NoNewSegments())
return
else:
attempts = 0
for seg in new_segments:
observer.on_next(seg)
def dispose() -> None:
nonlocal disposed
nonlocal last_sequence_number
@ -73,3 +95,15 @@ class PlaylistResolver:
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe)
def _should_retry(self, exc: Exception) -> bool:
if isinstance(exc, NoNewSegments):
return True
else:
return False
def _before_retry(self, exc: Exception) -> None:
if not isinstance(exc, NoNewSegments):
return
logger.warning('No new segments received, trying to update the stream url.')
self._stream_url_resolver.reset()