From 35ef14b31cbd870c5a9b7ac0c10b7ed565b8fdfd Mon Sep 17 00:00:00 2001
From: acgnhik <acgnhik@outlook.com>
Date: Sat, 27 Aug 2022 13:04:30 +0800
Subject: [PATCH] refactor: refactor segment remuxer...

---
 src/blrec/flv/operators/parse.py           |  8 ++--
 src/blrec/hls/operators/segment_remuxer.py | 56 ++++++++++++++++------
 src/blrec/hls/stream_remuxer.py            |  4 ++
 3 files changed, 50 insertions(+), 18 deletions(-)

diff --git a/src/blrec/flv/operators/parse.py b/src/blrec/flv/operators/parse.py
index ed52690..994ba57 100644
--- a/src/blrec/flv/operators/parse.py
+++ b/src/blrec/flv/operators/parse.py
@@ -52,15 +52,17 @@ def parse(
                             observer.on_next(tag)
                         stream.close()
                 except EOFError as e:
+                    logger.debug(f'Error occurred while parsing stream: {repr(e)}')
                     if complete_on_eof:
                         observer.on_completed()
                     else:
                         if not ignore_eof:
                             observer.on_error(e)
                 except ValueError as e:
-                    if ignore_value_error:
-                        logger.debug(f'Error occurred while parsing stream: {repr(e)}')
-                    else:
+                    logger.debug(
+                        f'Error occurred while parsing stream: {repr(e)}', exc_info=e
+                    )
+                    if not ignore_value_error:
                         observer.on_error(e)
                 except Exception as e:
                     observer.on_error(e)
diff --git a/src/blrec/hls/operators/segment_remuxer.py b/src/blrec/hls/operators/segment_remuxer.py
index 0df8302..ce1c26c 100644
--- a/src/blrec/hls/operators/segment_remuxer.py
+++ b/src/blrec/hls/operators/segment_remuxer.py
@@ -7,6 +7,8 @@ from typing import Final, List, Optional, Union
 import urllib3
 from reactivex import Observable, abc
 from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
+from tenacity import Retrying, stop_after_delay, wait_fixed
+from tenacity.retry import retry_if_not_exception_type
 
 from blrec.bili.live import Live
 from blrec.utils.io import wait_for
@@ -69,12 +71,20 @@ class SegmentRemuxer:
                 if isinstance(data, InitSectionData):
                     init_section_data = data.payload
                     segment_data_cache.clear()
+                    logger.debug('Stop stream remuxer for init section')
+                    self._stream_remuxer.stop()
+
+                if self._stream_remuxer.exception and not self._stream_remuxer.stopped:
+                    logger.debug(
+                        'Stop stream remuxer due to '
+                        + repr(self._stream_remuxer.exception)
+                    )
                     self._stream_remuxer.stop()
 
                 try:
                     if self._stream_remuxer.stopped:
+                        self._stream_remuxer.start()
                         while True:
-                            self._stream_remuxer.start()
                             ready = self._stream_remuxer.wait(timeout=1)
                             if disposed:
                                 return
@@ -94,6 +104,7 @@ class SegmentRemuxer:
                     write(data.payload)
                 except Exception as e:
                     logger.warning(f'Failed to write data to stream remuxer: {repr(e)}')
+                    logger.debug(f'Stop stream remuxer due to {repr(e)}')
                     self._stream_remuxer.stop()
                     if len(segment_data_cache) >= self._MAX_SEGMENT_DATA_CACHE:
                         segment_data_cache = segment_data_cache[
@@ -121,6 +132,10 @@ class SegmentRemuxer:
         return Observable(subscribe)
 
 
+class CloseRemuxedStream(Exception):
+    pass
+
+
 class RemuxedStream(io.RawIOBase):
     def __init__(
         self, stream_remuxer: StreamRemuxer, *, read_timeout: float = 10
@@ -133,21 +148,27 @@ class RemuxedStream(io.RawIOBase):
         if self._stream_remuxer.stopped:
             ready = self._stream_remuxer.wait(timeout=self._read_timeout)
             if not ready:
-                logger.debug(
-                    f'Stream remuxer not ready in {self._read_timeout} seconds'
-                )
-                raise EOFError
+                msg = f'Stream remuxer not ready in {self._read_timeout} seconds'
+                logger.debug(msg)
+                raise EOFError(msg)
 
         try:
-            data = wait_for(
-                self._stream_remuxer.output.read,
-                args=(size,),
-                timeout=self._read_timeout,
-            )
-        except Exception as e:
-            logger.warning(f'Failed to read data from stream remuxer: {repr(e)}')
-            self._stream_remuxer.stop()
-            raise EOFError
+            for attempt in Retrying(
+                reraise=True,
+                retry=retry_if_not_exception_type(TimeoutError),
+                wait=wait_fixed(1),
+                stop=stop_after_delay(self._read_timeout),
+            ):
+                with attempt:
+                    data = wait_for(
+                        self._stream_remuxer.output.read,
+                        args=(size,),
+                        timeout=self._read_timeout,
+                    )
+        except Exception as exc:
+            logger.warning(f'Failed to read data from stream remuxer: {repr(exc)}')
+            self._stream_remuxer.exception = exc
+            raise EOFError(exc)
         else:
             assert data is not None
             self._offset += len(data)
@@ -157,4 +178,9 @@ class RemuxedStream(io.RawIOBase):
         return self._offset
 
     def close(self) -> None:
-        self._stream_remuxer.stop()
+        if self._stream_remuxer.stopped:
+            return
+        if self._stream_remuxer.exception:
+            return
+        logger.debug('Close remuxed stream')
+        self._stream_remuxer.exception = CloseRemuxedStream()
diff --git a/src/blrec/hls/stream_remuxer.py b/src/blrec/hls/stream_remuxer.py
index a0f0035..31ad7be 100644
--- a/src/blrec/hls/stream_remuxer.py
+++ b/src/blrec/hls/stream_remuxer.py
@@ -55,6 +55,10 @@ class StreamRemuxer(StoppableMixin, SupportDebugMixin):
     def exception(self) -> Optional[Exception]:
         return self._exception
 
+    @exception.setter
+    def exception(self, exc: Exception) -> None:
+        self._exception = exc
+
     def __enter__(self):  # type: ignore
         self.start()
         self.wait()