diff --git a/src/blrec/core/operators/segment_remuxer.py b/src/blrec/core/operators/segment_remuxer.py index e4ec8dc..e9091fc 100644 --- a/src/blrec/core/operators/segment_remuxer.py +++ b/src/blrec/core/operators/segment_remuxer.py @@ -25,6 +25,7 @@ class SegmentRemuxer: def __init__(self, live: Live) -> None: self._live = live + self._timeout: float = 10 self._stream_remuxer = StreamRemuxer(live.room_id, remove_filler_data=True) def __call__( @@ -44,14 +45,22 @@ class SegmentRemuxer: init_section_data: Optional[bytes] = None segment_data_cache: List[bytes] = [] - self._stream_remuxer.stop() + def write(data: bytes) -> int: + return wait_for( + self._stream_remuxer.input.write, + args=(data,), + timeout=self._timeout, + ) + def on_next(data: Union[InitSectionData, SegmentData]) -> None: nonlocal init_section_data if isinstance(data, InitSectionData): init_section_data = data.payload + segment_data_cache.clear() + self._stream_remuxer.stop() try: if self._stream_remuxer.stopped: @@ -65,15 +74,15 @@ class SegmentRemuxer: observer.on_next(RemuxedStream(self._stream_remuxer)) + if init_section_data: + write(init_section_data) if segment_data_cache: - if init_section_data: - self._stream_remuxer.input.write(init_section_data) for cached_data in segment_data_cache: - if cached_data == init_section_data: - continue - self._stream_remuxer.input.write(cached_data) + write(cached_data) + if isinstance(data, InitSectionData): + return - self._stream_remuxer.input.write(data.payload) + write(data.payload) except Exception as e: logger.warning(f'Failed to write data to stream remuxer: {repr(e)}') self._stream_remuxer.stop() @@ -100,15 +109,15 @@ class RemuxedStream(io.RawIOBase): self, stream_remuxer: StreamRemuxer, *, read_timeout: float = 10 ) -> None: self._stream_remuxer = stream_remuxer - self._read_timmeout = read_timeout + self._read_timeout = read_timeout self._offset: int = 0 def read(self, size: int = -1) -> bytes: if self._stream_remuxer.stopped: - ready = self._stream_remuxer.wait(timeout=self._read_timmeout) + ready = self._stream_remuxer.wait(timeout=self._read_timeout) if not ready: logger.debug( - f'Stream remuxer not ready in {self._read_timmeout} seconds' + f'Stream remuxer not ready in {self._read_timeout} seconds' ) raise EOFError @@ -116,7 +125,7 @@ class RemuxedStream(io.RawIOBase): data = wait_for( self._stream_remuxer.output.read, args=(size,), - timeout=self._read_timmeout, + timeout=self._read_timeout, ) except Exception as e: logger.warning(f'Failed to read data from stream remuxer: {repr(e)}')