refactor: refactor SegmentFetcher

This commit is contained in:
acgnhik 2022-08-14 19:29:06 +08:00
parent 86a4a4e2ac
commit 305ec40a46

View File

@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
import time
from typing import Optional, Union from typing import Optional, Union
import attr import attr
@ -77,6 +78,24 @@ class SegmentFetcher:
): ):
url = seg.init_section.absolute_uri url = seg.init_section.absolute_uri
data = self._fetch_segment(url) data = self._fetch_segment(url)
while True:
time.sleep(1)
if (_data := self._fetch_segment(url)) == data:
logger.debug(
'Init section checked: '
f'crc32 of previous data: {cksum(data)}, '
f'crc32 of current data: {cksum(_data)}, '
f'init section url: {url}'
)
break
else:
logger.debug(
'Init section corrupted: '
f'crc32 of previous data: {cksum(data)}, '
f'crc32 of current data: {cksum(_data)}, '
f'init section url: {url}'
)
data = _data
observer.on_next( observer.on_next(
InitSectionData(init_section=seg.init_section, payload=data) InitSectionData(init_section=seg.init_section, payload=data)
) )
@ -97,8 +116,8 @@ class SegmentFetcher:
) )
else: else:
raise SegmentDataCorrupted(crc32, crc32_of_data) raise SegmentDataCorrupted(crc32, crc32_of_data)
except Exception as e: except Exception as exc:
logger.warning(f'Failed to fetch segment {url}: {repr(e)}') logger.warning(f'Failed to fetch segment {url}', exc_info=exc)
else: else:
observer.on_next(SegmentData(segment=seg, payload=data)) observer.on_next(SegmentData(segment=seg, payload=data))
@ -121,7 +140,7 @@ class SegmentFetcher:
retry=retry_if_exception_type( retry=retry_if_exception_type(
(requests.exceptions.RequestException, urllib3.exceptions.HTTPError) (requests.exceptions.RequestException, urllib3.exceptions.HTTPError)
), ),
wait=wait_exponential(multiplier=0.1, max=5), wait=wait_exponential(max=10),
stop=stop_after_delay(60), stop=stop_after_delay(60),
) )
def _fetch_segment(self, url: str) -> bytes: def _fetch_segment(self, url: str) -> bytes: