refactor: refactor segment remuxer...

This commit is contained in:
acgnhik 2022-08-27 13:04:30 +08:00
parent ebf75ab487
commit 35ef14b31c
3 changed files with 50 additions and 18 deletions

View File

@ -52,15 +52,17 @@ def parse(
observer.on_next(tag)
stream.close()
except EOFError as e:
logger.debug(f'Error occurred while parsing stream: {repr(e)}')
if complete_on_eof:
observer.on_completed()
else:
if not ignore_eof:
observer.on_error(e)
except ValueError as e:
if ignore_value_error:
logger.debug(f'Error occurred while parsing stream: {repr(e)}')
else:
logger.debug(
f'Error occurred while parsing stream: {repr(e)}', exc_info=e
)
if not ignore_value_error:
observer.on_error(e)
except Exception as e:
observer.on_error(e)

View File

@ -7,6 +7,8 @@ from typing import Final, List, Optional, Union
import urllib3
from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from tenacity import Retrying, stop_after_delay, wait_fixed
from tenacity.retry import retry_if_not_exception_type
from blrec.bili.live import Live
from blrec.utils.io import wait_for
@ -69,12 +71,20 @@ class SegmentRemuxer:
if isinstance(data, InitSectionData):
init_section_data = data.payload
segment_data_cache.clear()
logger.debug('Stop stream remuxer for init section')
self._stream_remuxer.stop()
if self._stream_remuxer.exception and not self._stream_remuxer.stopped:
logger.debug(
'Stop stream remuxer due to '
+ repr(self._stream_remuxer.exception)
)
self._stream_remuxer.stop()
try:
if self._stream_remuxer.stopped:
self._stream_remuxer.start()
while True:
self._stream_remuxer.start()
ready = self._stream_remuxer.wait(timeout=1)
if disposed:
return
@ -94,6 +104,7 @@ class SegmentRemuxer:
write(data.payload)
except Exception as e:
logger.warning(f'Failed to write data to stream remuxer: {repr(e)}')
logger.debug(f'Stop stream remuxer due to {repr(e)}')
self._stream_remuxer.stop()
if len(segment_data_cache) >= self._MAX_SEGMENT_DATA_CACHE:
segment_data_cache = segment_data_cache[
@ -121,6 +132,10 @@ class SegmentRemuxer:
return Observable(subscribe)
class CloseRemuxedStream(Exception):
pass
class RemuxedStream(io.RawIOBase):
def __init__(
self, stream_remuxer: StreamRemuxer, *, read_timeout: float = 10
@ -133,21 +148,27 @@ class RemuxedStream(io.RawIOBase):
if self._stream_remuxer.stopped:
ready = self._stream_remuxer.wait(timeout=self._read_timeout)
if not ready:
logger.debug(
f'Stream remuxer not ready in {self._read_timeout} seconds'
)
raise EOFError
msg = f'Stream remuxer not ready in {self._read_timeout} seconds'
logger.debug(msg)
raise EOFError(msg)
try:
data = wait_for(
self._stream_remuxer.output.read,
args=(size,),
timeout=self._read_timeout,
)
except Exception as e:
logger.warning(f'Failed to read data from stream remuxer: {repr(e)}')
self._stream_remuxer.stop()
raise EOFError
for attempt in Retrying(
reraise=True,
retry=retry_if_not_exception_type(TimeoutError),
wait=wait_fixed(1),
stop=stop_after_delay(self._read_timeout),
):
with attempt:
data = wait_for(
self._stream_remuxer.output.read,
args=(size,),
timeout=self._read_timeout,
)
except Exception as exc:
logger.warning(f'Failed to read data from stream remuxer: {repr(exc)}')
self._stream_remuxer.exception = exc
raise EOFError(exc)
else:
assert data is not None
self._offset += len(data)
@ -157,4 +178,9 @@ class RemuxedStream(io.RawIOBase):
return self._offset
def close(self) -> None:
self._stream_remuxer.stop()
if self._stream_remuxer.stopped:
return
if self._stream_remuxer.exception:
return
logger.debug('Close remuxed stream')
self._stream_remuxer.exception = CloseRemuxedStream()

View File

@ -55,6 +55,10 @@ class StreamRemuxer(StoppableMixin, SupportDebugMixin):
def exception(self) -> Optional[Exception]:
return self._exception
@exception.setter
def exception(self, exc: Exception) -> None:
self._exception = exc
def __enter__(self): # type: ignore
self.start()
self.wait()