refactor: refactor RequestExceptionHandler

This commit is contained in:
acgnhik 2022-06-26 11:00:15 +08:00
parent 44b031ef70
commit 261a2993be
3 changed files with 19 additions and 16 deletions

View File

@ -49,8 +49,8 @@ class FLVStreamRecorderImpl(StreamRecorderImpl):
self._stream_fetcher,
self._dl_statistics,
self._stream_parser,
self._request_exception_handler,
self._connection_error_handler,
self._request_exception_handler,
flv_ops.process(),
self._cutter,
self._limiter,

View File

@ -57,8 +57,8 @@ class HLSStreamRecorderImpl(StreamRecorderImpl):
NewThreadScheduler(self._thread_factory('PlaylistFetcher'))
),
self._playlist_fetcher,
self._request_exception_handler,
self._connection_error_handler,
self._request_exception_handler,
self._playlist_resolver,
ops.observe_on(
NewThreadScheduler(self._thread_factory('SegmentFetcher'))

View File

@ -2,6 +2,7 @@ from __future__ import annotations
import asyncio
import logging
import time
from typing import Optional, TypeVar
import requests
@ -19,9 +20,12 @@ _T = TypeVar('_T')
class RequestExceptionHandler:
def __init__(self) -> None:
self._last_retry_time = time.monotonic()
def __call__(self, source: Observable[_T]) -> Observable[_T]:
return self._handle(source).pipe(
utils_ops.retry(delay=1, should_retry=self._should_retry)
utils_ops.retry(should_retry=self._should_retry)
)
def _handle(self, source: Observable[_T]) -> Observable[_T]:
@ -32,19 +36,20 @@ class RequestExceptionHandler:
def on_error(exc: Exception) -> None:
try:
raise exc
except requests.exceptions.RequestException: # XXX: ConnectionError
logger.warning(repr(exc))
except urllib3.exceptions.HTTPError:
logger.warning(repr(exc))
except asyncio.exceptions.TimeoutError:
logger.warning(repr(exc))
except requests.exceptions.Timeout:
logger.warning(repr(exc))
except requests.exceptions.HTTPError:
logger.warning(repr(exc))
except urllib3.exceptions.TimeoutError:
logger.warning(repr(exc))
except urllib3.exceptions.ProtocolError:
# ProtocolError('Connection broken: IncompleteRead(
logger.warning(repr(exc))
except Exception:
pass
if self._should_retry(exc):
if time.monotonic() - self._last_retry_time < 1:
time.sleep(1)
self._last_retry_time = time.monotonic()
observer.on_error(exc)
return source.subscribe(
@ -57,11 +62,9 @@ class RequestExceptionHandler:
if isinstance(
exc,
(
requests.exceptions.RequestException, # XXX: ConnectionError
urllib3.exceptions.HTTPError,
asyncio.exceptions.TimeoutError,
requests.exceptions.Timeout,
requests.exceptions.HTTPError,
urllib3.exceptions.TimeoutError,
urllib3.exceptions.ProtocolError,
),
):
return True