fix: fix fetching segments failed continuously

This commit is contained in:
acgnhik 2022-11-06 11:04:31 +08:00
parent 2ad6847f42
commit 26b389633e
4 changed files with 47 additions and 5 deletions

View File

@ -48,7 +48,9 @@ class HLSRawStreamRecorderImpl(StreamRecorderImpl):
self._playlist_fetcher = hls_ops.PlaylistFetcher(self._live, self._session) self._playlist_fetcher = hls_ops.PlaylistFetcher(self._live, self._session)
self._playlist_dumper = hls_ops.PlaylistDumper(self._path_provider) self._playlist_dumper = hls_ops.PlaylistDumper(self._path_provider)
self._segment_fetcher = hls_ops.SegmentFetcher(self._live, self._session) self._segment_fetcher = hls_ops.SegmentFetcher(
self._live, self._session, self._stream_url_resolver
)
self._segment_dumper = hls_ops.SegmentDumper(self._playlist_dumper) self._segment_dumper = hls_ops.SegmentDumper(self._playlist_dumper)
self._ff_metadata_dumper = MetadataDumper( self._ff_metadata_dumper = MetadataDumper(
self._playlist_dumper, self._metadata_provider self._playlist_dumper, self._metadata_provider

View File

@ -49,7 +49,9 @@ class HLSStreamRecorderImpl(StreamRecorderImpl):
self._playlist_fetcher = hls_ops.PlaylistFetcher(self._live, self._session) self._playlist_fetcher = hls_ops.PlaylistFetcher(self._live, self._session)
self._playlist_resolver = hls_ops.PlaylistResolver(self._stream_url_resolver) self._playlist_resolver = hls_ops.PlaylistResolver(self._stream_url_resolver)
self._segment_fetcher = hls_ops.SegmentFetcher(self._live, self._session) self._segment_fetcher = hls_ops.SegmentFetcher(
self._live, self._session, self._stream_url_resolver
)
self._segment_remuxer = hls_ops.SegmentRemuxer(live) self._segment_remuxer = hls_ops.SegmentRemuxer(live)
self._prober = hls_ops.Prober() self._prober = hls_ops.Prober()

View File

@ -4,3 +4,7 @@ class SegmentDataCorrupted(ValueError):
class NoNewSegments(Exception): class NoNewSegments(Exception):
pass pass
class FetchSegmentError(Exception):
pass

View File

@ -10,6 +10,7 @@ import requests
import urllib3 import urllib3
from m3u8.model import InitializationSection from m3u8.model import InitializationSection
from reactivex import Observable, abc from reactivex import Observable, abc
from reactivex import operators as ops
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from tenacity import ( from tenacity import (
retry, retry,
@ -21,8 +22,12 @@ from tenacity import (
) )
from blrec.bili.live import Live from blrec.bili.live import Live
from blrec.core import operators as core_ops
from blrec.utils import operators as utils_ops
from blrec.utils.hash import cksum from blrec.utils.hash import cksum
from ..exceptions import FetchSegmentError
__all__ = ('SegmentFetcher', 'InitSectionData', 'SegmentData') __all__ = ('SegmentFetcher', 'InitSectionData', 'SegmentData')
@ -48,14 +53,23 @@ class SegmentData:
class SegmentFetcher: class SegmentFetcher:
def __init__(self, live: Live, session: requests.Session) -> None: def __init__(
self,
live: Live,
session: requests.Session,
stream_url_resolver: core_ops.StreamURLResolver,
) -> None:
self._live = live self._live = live
self._session = session self._session = session
self._stream_url_resolver = stream_url_resolver
def __call__( def __call__(
self, source: Observable[m3u8.Segment] self, source: Observable[m3u8.Segment]
) -> Observable[Union[InitSectionData, SegmentData]]: ) -> Observable[Union[InitSectionData, SegmentData]]:
return self._fetch(source) return self._fetch(source).pipe( # type: ignore
ops.do_action(on_error=self._before_retry),
utils_ops.retry(should_retry=self._should_retry),
)
def _fetch( def _fetch(
self, source: Observable[m3u8.Segment] self, source: Observable[m3u8.Segment]
@ -67,10 +81,11 @@ class SegmentFetcher:
disposed = False disposed = False
subscription = SerialDisposable() subscription = SerialDisposable()
attempts: int = 0
last_segment: Optional[m3u8.Segment] = None last_segment: Optional[m3u8.Segment] = None
def on_next(seg: m3u8.Segment) -> None: def on_next(seg: m3u8.Segment) -> None:
nonlocal last_segment nonlocal attempts, last_segment
url: str = '' url: str = ''
try: try:
@ -123,8 +138,13 @@ class SegmentFetcher:
logger.warning(f'Segment data corrupted: {url}') logger.warning(f'Segment data corrupted: {url}')
except Exception as exc: except Exception as exc:
logger.warning(f'Failed to fetch segment {url}', exc_info=exc) logger.warning(f'Failed to fetch segment {url}', exc_info=exc)
attempts += 1
if attempts > 3:
attempts = 0
observer.on_error(FetchSegmentError(exc))
else: else:
observer.on_next(SegmentData(segment=seg, payload=data)) observer.on_next(SegmentData(segment=seg, payload=data))
attempts = 0
def dispose() -> None: def dispose() -> None:
nonlocal disposed nonlocal disposed
@ -155,3 +175,17 @@ class SegmentFetcher:
with self._session.get(url, headers=self._live.headers, timeout=10) as response: with self._session.get(url, headers=self._live.headers, timeout=10) as response:
response.raise_for_status() response.raise_for_status()
return response.content return response.content
def _should_retry(self, exc: Exception) -> bool:
if isinstance(exc, FetchSegmentError):
return True
else:
return False
def _before_retry(self, exc: Exception) -> None:
if not isinstance(exc, FetchSegmentError):
return
logger.warning(
'Fetch segments failed continuously, trying to update the stream url.'
)
self._stream_url_resolver.reset()