refactor: refactor segemnt_remuxer

This commit is contained in:
acgnhik 2022-06-16 10:42:10 +08:00
parent 3519df4fe7
commit 4c73f22526

View File

@ -25,6 +25,7 @@ class SegmentRemuxer:
def __init__(self, live: Live) -> None: def __init__(self, live: Live) -> None:
self._live = live self._live = live
self._timeout: float = 10
self._stream_remuxer = StreamRemuxer(live.room_id, remove_filler_data=True) self._stream_remuxer = StreamRemuxer(live.room_id, remove_filler_data=True)
def __call__( def __call__(
@ -44,14 +45,22 @@ class SegmentRemuxer:
init_section_data: Optional[bytes] = None init_section_data: Optional[bytes] = None
segment_data_cache: List[bytes] = [] segment_data_cache: List[bytes] = []
self._stream_remuxer.stop() self._stream_remuxer.stop()
def write(data: bytes) -> int:
return wait_for(
self._stream_remuxer.input.write,
args=(data,),
timeout=self._timeout,
)
def on_next(data: Union[InitSectionData, SegmentData]) -> None: def on_next(data: Union[InitSectionData, SegmentData]) -> None:
nonlocal init_section_data nonlocal init_section_data
if isinstance(data, InitSectionData): if isinstance(data, InitSectionData):
init_section_data = data.payload init_section_data = data.payload
segment_data_cache.clear()
self._stream_remuxer.stop()
try: try:
if self._stream_remuxer.stopped: if self._stream_remuxer.stopped:
@ -65,15 +74,15 @@ class SegmentRemuxer:
observer.on_next(RemuxedStream(self._stream_remuxer)) observer.on_next(RemuxedStream(self._stream_remuxer))
if segment_data_cache:
if init_section_data: if init_section_data:
self._stream_remuxer.input.write(init_section_data) write(init_section_data)
if segment_data_cache:
for cached_data in segment_data_cache: for cached_data in segment_data_cache:
if cached_data == init_section_data: write(cached_data)
continue if isinstance(data, InitSectionData):
self._stream_remuxer.input.write(cached_data) return
self._stream_remuxer.input.write(data.payload) write(data.payload)
except Exception as e: except Exception as e:
logger.warning(f'Failed to write data to stream remuxer: {repr(e)}') logger.warning(f'Failed to write data to stream remuxer: {repr(e)}')
self._stream_remuxer.stop() self._stream_remuxer.stop()
@ -100,15 +109,15 @@ class RemuxedStream(io.RawIOBase):
self, stream_remuxer: StreamRemuxer, *, read_timeout: float = 10 self, stream_remuxer: StreamRemuxer, *, read_timeout: float = 10
) -> None: ) -> None:
self._stream_remuxer = stream_remuxer self._stream_remuxer = stream_remuxer
self._read_timmeout = read_timeout self._read_timeout = read_timeout
self._offset: int = 0 self._offset: int = 0
def read(self, size: int = -1) -> bytes: def read(self, size: int = -1) -> bytes:
if self._stream_remuxer.stopped: if self._stream_remuxer.stopped:
ready = self._stream_remuxer.wait(timeout=self._read_timmeout) ready = self._stream_remuxer.wait(timeout=self._read_timeout)
if not ready: if not ready:
logger.debug( logger.debug(
f'Stream remuxer not ready in {self._read_timmeout} seconds' f'Stream remuxer not ready in {self._read_timeout} seconds'
) )
raise EOFError raise EOFError
@ -116,7 +125,7 @@ class RemuxedStream(io.RawIOBase):
data = wait_for( data = wait_for(
self._stream_remuxer.output.read, self._stream_remuxer.output.read,
args=(size,), args=(size,),
timeout=self._read_timmeout, timeout=self._read_timeout,
) )
except Exception as e: except Exception as e:
logger.warning(f'Failed to read data from stream remuxer: {repr(e)}') logger.warning(f'Failed to read data from stream remuxer: {repr(e)}')