feat: use pyav

This commit is contained in:
acgnhik 2022-10-23 13:37:38 +08:00
parent a64d2d7153
commit 121b5f9648
6 changed files with 156 additions and 293 deletions

View File

@ -53,6 +53,7 @@ install_requires =
lxml >= 4.6.4, < 5.0.0
toml >= 0.10.2, < 0.11.0
m3u8 >= 1.0.0, < 2.0.0
av >= 10.0.0, < 11.0.0
jsonpath == 0.82
psutil >= 5.8.0, < 6.0.0
reactivex >= 4.0.0, < 5.0.0

View File

@ -55,9 +55,7 @@ class HLSStreamRecorderImpl(StreamRecorderImpl):
self._prober = hls_ops.Prober()
self._dl_statistics = core_ops.SizedStatistics()
self._stream_parser = core_ops.StreamParser(
self._stream_param_holder, ignore_eof=True, ignore_value_error=True
)
self._segment_parser = hls_ops.SegmentParser()
self._analyser = flv_ops.Analyser()
self._injector = flv_ops.Injector(self._metadata_provider)
self._join_point_extractor = flv_ops.JoinPointExtractor()
@ -144,14 +142,11 @@ class HLSStreamRecorderImpl(StreamRecorderImpl):
self._segment_fetcher,
self._dl_statistics,
self._prober,
ops.observe_on(
NewThreadScheduler(self._thread_factory('SegmentRemuxer'))
),
self._segment_remuxer,
ops.observe_on(
NewThreadScheduler(self._thread_factory('StreamRecorder'))
),
self._stream_parser,
self._segment_remuxer,
self._segment_parser,
flv_ops.process(),
self._cutter,
self._limiter,

View File

@ -4,6 +4,7 @@ from .playlist_resolver import PlaylistResolver
from .prober import Prober, StreamProfile
from .segment_dumper import SegmentDumper
from .segment_fetcher import InitSectionData, SegmentData, SegmentFetcher
from .segment_parser import SegmentParser
from .segment_remuxer import SegmentRemuxer
__all__ = (
@ -15,6 +16,7 @@ __all__ = (
'SegmentData',
'SegmentDumper',
'SegmentFetcher',
'SegmentParser',
'SegmentRemuxer',
'StreamProfile',
)

View File

@ -0,0 +1,104 @@
from __future__ import annotations
import io
import logging
from typing import Optional
from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from blrec.flv.common import (
is_audio_sequence_header,
is_metadata_tag,
is_video_sequence_header,
)
from blrec.flv.io import FlvReader
from blrec.flv.models import AudioTag, FlvHeader, ScriptTag, VideoTag
from blrec.flv.operators.typing import FLVStream, FLVStreamItem
__all__ = ('SegmentParser',)
logger = logging.getLogger(__name__)
class SegmentParser:
def __init__(self) -> None:
self._backup_timestamp = True
def __call__(self, source: Observable[bytes]) -> FLVStream:
return self._parse(source)
def _parse(self, source: Observable[bytes]) -> FLVStream:
def subscribe(
observer: abc.ObserverBase[FLVStreamItem],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
last_flv_header: Optional[FlvHeader] = None
last_metadata_tag: Optional[ScriptTag] = None
last_audio_sequence_header: Optional[AudioTag] = None
last_video_sequence_header: Optional[VideoTag] = None
def reset() -> None:
nonlocal last_flv_header, last_metadata_tag
nonlocal last_audio_sequence_header, last_video_sequence_header
last_flv_header = None
last_metadata_tag = None
last_audio_sequence_header = None
last_video_sequence_header = None
def on_next(data: bytes) -> None:
nonlocal last_flv_header, last_metadata_tag
nonlocal last_audio_sequence_header, last_video_sequence_header
if b'' == data:
reset()
return
try:
reader = FlvReader(
io.BytesIO(data), backup_timestamp=self._backup_timestamp
)
flv_header = reader.read_header()
if not last_flv_header:
observer.on_next(flv_header)
last_flv_header = flv_header
else:
assert last_flv_header == flv_header
while not disposed:
tag = reader.read_tag()
if is_metadata_tag(tag):
if last_metadata_tag is not None:
continue
last_metadata_tag = tag
elif is_video_sequence_header(tag):
if tag == last_video_sequence_header:
continue
last_video_sequence_header = tag
elif is_audio_sequence_header(tag):
if tag == last_audio_sequence_header:
continue
last_audio_sequence_header = tag
observer.on_next(tag)
except EOFError:
pass
except Exception as e:
observer.on_error(e)
def dispose() -> None:
nonlocal disposed
disposed = True
reset()
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler
)
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe)

View File

@ -2,121 +2,72 @@ from __future__ import annotations
import io
import logging
from typing import Final, List, Optional, Union
import os
from typing import Optional, Union
import urllib3
import av
from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from tenacity import Retrying, stop_after_delay, wait_fixed
from tenacity.retry import retry_if_not_exception_type
from blrec.bili.live import Live
from blrec.utils.io import wait_for
from ..stream_remuxer import StreamRemuxer
from .segment_fetcher import InitSectionData, SegmentData
__all__ = ('SegmentRemuxer',)
logger = logging.getLogger(__name__)
logging.getLogger(urllib3.__name__).setLevel(logging.WARNING)
TRACE_REMUX_SEGMENT = bool(os.environ.get('TRACE_REMUX_SEGMENT'))
TRACE_LIBAV = bool(os.environ.get('TRACE_LIBAV'))
if TRACE_LIBAV:
logging.getLogger('libav').setLevel(5)
else:
av.logging.set_level(av.logging.FATAL)
class SegmentRemuxer:
_SEGMENT_DATA_CACHE: Final = 10
_MAX_SEGMENT_DATA_CACHE: Final = 15
def __init__(self, live: Live) -> None:
self._live = live
self._timeout: float = 10
self._stream_remuxer = StreamRemuxer(live.room_id, remove_filler_data=True)
def __call__(
self, source: Observable[Union[InitSectionData, SegmentData]]
) -> Observable[io.RawIOBase]:
) -> Observable[bytes]:
return self._remux(source)
def _remux(
self, source: Observable[Union[InitSectionData, SegmentData]]
) -> Observable[io.RawIOBase]:
) -> Observable[bytes]:
def subscribe(
observer: abc.ObserverBase[io.RawIOBase],
observer: abc.ObserverBase[bytes],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
init_section_data: Optional[bytes] = None
segment_data_cache: List[bytes] = []
self._stream_remuxer.stop()
def reset() -> None:
nonlocal init_section_data, segment_data_cache
nonlocal init_section_data
init_section_data = None
segment_data_cache = []
self._stream_remuxer.stop()
def write(data: bytes) -> int:
return wait_for(
self._stream_remuxer.input.write,
args=(data,),
timeout=self._timeout,
)
def on_next(data: Union[InitSectionData, SegmentData]) -> None:
nonlocal init_section_data
nonlocal segment_data_cache
if isinstance(data, InitSectionData):
init_section_data = data.payload
segment_data_cache.clear()
logger.debug('Stop stream remuxer for init section')
self._stream_remuxer.stop()
observer.on_next(b'')
return
if self._stream_remuxer.exception and not self._stream_remuxer.stopped:
logger.debug(
'Stop stream remuxer due to '
+ repr(self._stream_remuxer.exception)
)
self._stream_remuxer.stop()
if init_section_data is None:
return
try:
if self._stream_remuxer.stopped:
self._stream_remuxer.start()
while True:
ready = self._stream_remuxer.wait(timeout=1)
if disposed:
return
if ready:
break
observer.on_next(RemuxedStream(self._stream_remuxer))
if init_section_data:
write(init_section_data)
if segment_data_cache:
for cached_data in segment_data_cache:
write(cached_data)
if isinstance(data, InitSectionData):
return
write(data.payload)
except Exception as e:
logger.warning(f'Failed to write data to stream remuxer: {repr(e)}')
logger.debug(f'Stop stream remuxer due to {repr(e)}')
self._stream_remuxer.stop()
if len(segment_data_cache) >= self._MAX_SEGMENT_DATA_CACHE:
segment_data_cache = segment_data_cache[
-self._MAX_SEGMENT_DATA_CACHE + 1 :
]
remuxed_data = self._remux_segemnt(init_section_data + data.payload)
except av.FFmpegError as e:
logger.warning(f'Failed to remux segment: {repr(e)}', exc_info=e)
else:
if len(segment_data_cache) >= self._SEGMENT_DATA_CACHE:
segment_data_cache = segment_data_cache[
-self._SEGMENT_DATA_CACHE + 1 :
]
segment_data_cache.append(data.payload)
observer.on_next(remuxed_data)
def dispose() -> None:
nonlocal disposed
@ -131,56 +82,30 @@ class SegmentRemuxer:
return Observable(subscribe)
def _remux_segemnt(self, data: bytes, format: str = 'flv') -> bytes:
in_file = io.BytesIO(data)
out_file = io.BytesIO()
class CloseRemuxedStream(Exception):
pass
with av.open(in_file) as in_container:
with av.open(out_file, mode='w', format=format) as out_container:
in_video_stream = in_container.streams.video[0]
in_audio_stream = in_container.streams.audio[0]
out_video_stream = out_container.add_stream(template=in_video_stream)
out_audio_stream = out_container.add_stream(template=in_audio_stream)
for packet in in_container.demux():
if TRACE_REMUX_SEGMENT:
logger.debug(repr(packet))
# We need to skip the "flushing" packets that `demux` generates.
if packet.dts is None:
continue
# We need to assign the packet to the new stream.
if packet.stream.type == 'video':
packet.stream = out_video_stream
elif packet.stream.type == 'audio':
packet.stream = out_audio_stream
else:
raise NotImplementedError(packet.stream.type)
out_container.mux(packet)
class RemuxedStream(io.RawIOBase):
def __init__(
self, stream_remuxer: StreamRemuxer, *, read_timeout: float = 10
) -> None:
self._stream_remuxer = stream_remuxer
self._read_timeout = read_timeout
self._offset: int = 0
def read(self, size: int = -1) -> bytes:
if self._stream_remuxer.stopped:
ready = self._stream_remuxer.wait(timeout=self._read_timeout)
if not ready:
msg = f'Stream remuxer not ready in {self._read_timeout} seconds'
logger.debug(msg)
raise EOFError(msg)
try:
for attempt in Retrying(
reraise=True,
retry=retry_if_not_exception_type(TimeoutError),
wait=wait_fixed(1),
stop=stop_after_delay(self._read_timeout),
):
with attempt:
data = wait_for(
self._stream_remuxer.output.read,
args=(size,),
timeout=self._read_timeout,
)
except Exception as exc:
logger.warning(f'Failed to read data from stream remuxer: {repr(exc)}')
self._stream_remuxer.exception = exc
raise EOFError(exc)
else:
assert data is not None
self._offset += len(data)
return data
def tell(self) -> int:
return self._offset
def close(self) -> None:
if self._stream_remuxer.stopped:
return
if self._stream_remuxer.exception:
return
logger.debug('Close remuxed stream')
self._stream_remuxer.exception = CloseRemuxedStream()
return out_file.getvalue()

View File

@ -1,164 +0,0 @@
import errno
import io
import logging
import os
import re
import shlex
from contextlib import suppress
from subprocess import PIPE, CalledProcessError, Popen
from threading import Condition, Thread
from typing import Optional, cast
from blrec.utils.io import wait_for
from blrec.utils.mixins import StoppableMixin, SupportDebugMixin
logger = logging.getLogger(__name__)
__all__ = ('StreamRemuxer',)
class FFmpegError(Exception):
pass
class StreamRemuxer(StoppableMixin, SupportDebugMixin):
_ERROR_PATTERN = re.compile(
r'\b(error|failed|missing|invalid|corrupt)\b', re.IGNORECASE
)
def __init__(self, room_id: int, remove_filler_data: bool = False) -> None:
super().__init__()
self._room_id = room_id
self._remove_filler_data = remove_filler_data
self._exception: Optional[Exception] = None
self._ready = Condition()
self._env = None
self._init_for_debug(room_id)
if self._debug:
self._env = os.environ.copy()
path = os.path.join(self._debug_dir, f'ffreport-{room_id}-%t.log')
self._env['FFREPORT'] = f'file={path}:level=48'
@property
def input(self) -> io.BufferedWriter:
assert self._subprocess.stdin is not None
return cast(io.BufferedWriter, self._subprocess.stdin)
@property
def output(self) -> io.BufferedReader:
assert self._subprocess.stdout is not None
return cast(io.BufferedReader, self._subprocess.stdout)
@property
def exception(self) -> Optional[Exception]:
return self._exception
@exception.setter
def exception(self, exc: Exception) -> None:
self._exception = exc
def __enter__(self): # type: ignore
self.start()
self.wait()
return self
def __exit__(self, exc_type, value, traceback): # type: ignore
self.stop()
self.raise_for_exception()
def wait(self, timeout: Optional[float] = None) -> bool:
with self._ready:
return self._ready.wait(timeout=timeout)
def restart(self) -> None:
logger.debug('Restarting stream remuxer...')
self.stop()
self.start()
logger.debug('Restarted stream remuxer')
def raise_for_exception(self) -> None:
if not self.exception:
return
raise self.exception
def _do_start(self) -> None:
logger.debug('Starting stream remuxer...')
self._thread = Thread(
target=self._run, name=f'StreamRemuxer::{self._room_id}', daemon=True
)
self._thread.start()
def _do_stop(self) -> None:
logger.debug('Stopping stream remuxer...')
if hasattr(self, '_subprocess'):
with suppress(ProcessLookupError):
self._subprocess.kill()
self._subprocess.wait(timeout=10)
if hasattr(self, '_thread'):
self._thread.join(timeout=10)
def _run(self) -> None:
logger.debug('Started stream remuxer')
self._exception = None
try:
self._run_subprocess()
except BrokenPipeError as exc:
logger.debug(repr(exc))
except FFmpegError as exc:
if not self._stopped:
logger.warning(repr(exc))
else:
logger.debug(repr(exc))
except TimeoutError as exc:
logger.debug(repr(exc))
except Exception as exc:
# OSError: [Errno 22] Invalid argument
# https://stackoverflow.com/questions/23688492/oserror-errno-22-invalid-argument-in-subprocess
if isinstance(exc, OSError) and exc.errno == errno.EINVAL:
pass
else:
self._exception = exc
logger.exception(exc)
finally:
self._stopped = True
logger.debug('Stopped stream remuxer')
def _run_subprocess(self) -> None:
cmd = 'ffmpeg -xerror -i pipe:0 -c copy -copyts'
if self._remove_filler_data:
cmd += ' -bsf:v filter_units=remove_types=12'
cmd += ' -f flv pipe:1'
args = shlex.split(cmd)
with Popen(
args, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=self._env
) as self._subprocess:
with self._ready:
self._ready.notify_all()
assert self._subprocess.stderr is not None
with io.TextIOWrapper(
self._subprocess.stderr, encoding='utf-8', errors='backslashreplace'
) as stderr:
while not self._stopped:
line = wait_for(stderr.readline, timeout=10)
if not line:
if self._subprocess.poll() is not None:
break
else:
continue
if self._debug:
logger.debug('ffmpeg: %s', line)
self._check_error(line)
if not self._stopped and self._subprocess.returncode not in (0, 255):
# 255: Exiting standardly, received signal 2.
raise CalledProcessError(self._subprocess.returncode, cmd=cmd)
def _check_error(self, line: str) -> None:
match = self._ERROR_PATTERN.search(line)
if not match:
return
raise FFmpegError(line)