From 261a2993be6088eae461045d285cfcee87ffce67 Mon Sep 17 00:00:00 2001 From: acgnhik <acgnhik@outlook.com> Date: Sun, 26 Jun 2022 11:00:15 +0800 Subject: [PATCH] refactor: refactor RequestExceptionHandler --- src/blrec/core/flv_stream_recorder_impl.py | 2 +- src/blrec/core/hls_stream_recorder_impl.py | 2 +- .../operators/request_exception_handler.py | 31 ++++++++++--------- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/src/blrec/core/flv_stream_recorder_impl.py b/src/blrec/core/flv_stream_recorder_impl.py index e3ca13f..04025fa 100644 --- a/src/blrec/core/flv_stream_recorder_impl.py +++ b/src/blrec/core/flv_stream_recorder_impl.py @@ -49,8 +49,8 @@ class FLVStreamRecorderImpl(StreamRecorderImpl): self._stream_fetcher, self._dl_statistics, self._stream_parser, - self._request_exception_handler, self._connection_error_handler, + self._request_exception_handler, flv_ops.process(), self._cutter, self._limiter, diff --git a/src/blrec/core/hls_stream_recorder_impl.py b/src/blrec/core/hls_stream_recorder_impl.py index 2a03d9a..e8483d9 100644 --- a/src/blrec/core/hls_stream_recorder_impl.py +++ b/src/blrec/core/hls_stream_recorder_impl.py @@ -57,8 +57,8 @@ class HLSStreamRecorderImpl(StreamRecorderImpl): NewThreadScheduler(self._thread_factory('PlaylistFetcher')) ), self._playlist_fetcher, - self._request_exception_handler, self._connection_error_handler, + self._request_exception_handler, self._playlist_resolver, ops.observe_on( NewThreadScheduler(self._thread_factory('SegmentFetcher')) diff --git a/src/blrec/core/operators/request_exception_handler.py b/src/blrec/core/operators/request_exception_handler.py index e2632f6..2f65aab 100644 --- a/src/blrec/core/operators/request_exception_handler.py +++ b/src/blrec/core/operators/request_exception_handler.py @@ -2,6 +2,7 @@ from __future__ import annotations import asyncio import logging +import time from typing import Optional, TypeVar import requests @@ -19,9 +20,12 @@ _T = TypeVar('_T') class RequestExceptionHandler: + def __init__(self) -> None: + self._last_retry_time = time.monotonic() + def __call__(self, source: Observable[_T]) -> Observable[_T]: return self._handle(source).pipe( - utils_ops.retry(delay=1, should_retry=self._should_retry) + utils_ops.retry(should_retry=self._should_retry) ) def _handle(self, source: Observable[_T]) -> Observable[_T]: @@ -32,19 +36,20 @@ class RequestExceptionHandler: def on_error(exc: Exception) -> None: try: raise exc + except requests.exceptions.RequestException: # XXX: ConnectionError + logger.warning(repr(exc)) + except urllib3.exceptions.HTTPError: + logger.warning(repr(exc)) except asyncio.exceptions.TimeoutError: logger.warning(repr(exc)) - except requests.exceptions.Timeout: - logger.warning(repr(exc)) - except requests.exceptions.HTTPError: - logger.warning(repr(exc)) - except urllib3.exceptions.TimeoutError: - logger.warning(repr(exc)) - except urllib3.exceptions.ProtocolError: - # ProtocolError('Connection broken: IncompleteRead( - logger.warning(repr(exc)) except Exception: pass + + if self._should_retry(exc): + if time.monotonic() - self._last_retry_time < 1: + time.sleep(1) + self._last_retry_time = time.monotonic() + observer.on_error(exc) return source.subscribe( @@ -57,11 +62,9 @@ class RequestExceptionHandler: if isinstance( exc, ( + requests.exceptions.RequestException, # XXX: ConnectionError + urllib3.exceptions.HTTPError, asyncio.exceptions.TimeoutError, - requests.exceptions.Timeout, - requests.exceptions.HTTPError, - urllib3.exceptions.TimeoutError, - urllib3.exceptions.ProtocolError, ), ): return True