feat: deprecated old implementations and adopted a new one for HLS recording

This commit is contained in:
acgnhik 2023-10-22 12:58:15 +08:00
parent c35271f0e0
commit b1296d964d
64 changed files with 1209 additions and 1034 deletions

14
FAQ.md
View File

@ -1,19 +1,5 @@
# 常见问题
## HLS 标准录制模式和原始录制模式有什么区别?
| | 标准录制模式 | 原始录制模式 |
| --- | --- | --- |
| ffmpeg | 需要 | 不需要 |
| 资源占用 | 较多 | 较少 |
| 稳定性 | 比较差 | 比较好 |
| 录播文件 | 一个 flv 文件 | 很多片段文件 |
| 录播信息 | 包含在 flv 文件里 | 单独保存为一个文件 (index.meta.json) |
| 播放器支持 | 几乎全部播放器都支持 flv | 支持 m3u8 的播放器很少 (VLC、dandanplay) |
| 自动分割文件 | 支持 | 不支持 |
| 手动分割文件 | 支持 | 不支持 |
| 自动转 mp4 | 支持 | 支持 |
## 如何终止程序?
`ctrl + c`

View File

@ -53,7 +53,6 @@ install_requires =
lxml >= 4.6.4, < 5.0.0
toml >= 0.10.2, < 0.11.0
m3u8 >= 3.3.0, < 4.0.0
av >= 10.0.0, < 11.0.0
jsonpath == 0.82
psutil >= 5.8.0, < 6.0.0
reactivex >= 4.0.0, < 5.0.0

View File

@ -9,10 +9,11 @@ import psutil
from . import __prog__, __version__
from .bili.helpers import ensure_room_id
from .core.typing import MetaData
from .disk_space import SpaceMonitor, SpaceReclaimer
from .event.event_submitters import SpaceEventSubmitter
from .exception import ExceptionHandler, ExistsError, exception_callback
from .flv.operators import MetaData, StreamProfile
from .flv.operators import StreamProfile
from .notification import (
BarkNotifier,
EmailNotifier,

View File

@ -39,6 +39,7 @@ class Live:
self._room_id = room_id
self._user_agent = user_agent
self._cookie = cookie
self._update_headers()
self._html_page_url = f'https://live.bilibili.com/{room_id}'
self._session = aiohttp.ClientSession(
@ -87,6 +88,7 @@ class Live:
@user_agent.setter
def user_agent(self, value: str) -> None:
self._user_agent = value
self._update_headers()
self._webapi.headers = self.headers
self._appapi.headers = self.headers
@ -97,12 +99,16 @@ class Live:
@cookie.setter
def cookie(self, value: str) -> None:
self._cookie = value
self._update_headers()
self._webapi.headers = self.headers
self._appapi.headers = self.headers
@property
def headers(self) -> Dict[str, str]:
return {
return self._headers
def _update_headers(self) -> None:
self._headers = {
**BASE_HEADERS,
'Referer': f'https://live.bilibili.com/{self._room_id}',
'User-Agent': self._user_agent,

View File

@ -167,6 +167,22 @@ class LiveMonitor(EventEmitter[LiveEventListener], DanmakuListener, SwitchableMi
self._previous_status = current_status
@aio_task_with_room_id
async def check_live_status(self) -> None:
logger.debug('Checking live status...')
try:
await self._check_live_status()
except Exception as e:
logger.warning(f'Failed to check live status: {repr(e)}')
logger.debug('Done checking live status')
@aio_task_with_room_id
async def _check_live_status(self) -> None:
await self._live.update_room_info()
current_status = self._live.room_info.live_status
if current_status != self._previous_status:
await self._handle_status_change(current_status)
@aio_task_with_room_id
async def _poll_live_status(self) -> None:
logger.debug('Started polling live status')
@ -174,10 +190,7 @@ class LiveMonitor(EventEmitter[LiveEventListener], DanmakuListener, SwitchableMi
while True:
try:
await asyncio.sleep(600 + random.randrange(-60, 60))
await self._live.update_room_info()
current_status = self._live.room_info.live_status
if current_status != self._previous_status:
await self._handle_status_change(current_status)
await self._check_live_status()
except asyncio.CancelledError:
logger.debug('Cancelled polling live status')
break

View File

@ -2,6 +2,7 @@ import asyncio
import html
import logging
from contextlib import suppress
from decimal import Decimal
from threading import Lock
from typing import Iterator, List, Optional
@ -123,7 +124,6 @@ class DanmakuDumper(
with self._lock:
self._delta: float = 0
self._record_start_time: int = record_start_time
self._timebase: int = self._record_start_time * 1000
self._stream_recording_interrupted: bool = False
self._path = danmaku_path(video_path)
self._files.append(self._path)
@ -135,19 +135,32 @@ class DanmakuDumper(
await self._stop_dumping()
self._path = None
async def on_stream_recording_interrupted(self, duration: float) -> None:
logger.debug(f'Stream recording interrupted, {duration}')
async def on_stream_recording_interrupted(
self, timestamp: float, duration: float
) -> None:
self._interrupted_timestamp = timestamp
self._duration = duration
self._stream_recording_recovered = asyncio.Condition()
self._stream_recording_interrupted = True
logger.debug(
'Stream recording interrupted, '
f'timestamp: {timestamp}, duration: {duration}'
)
async def on_stream_recording_recovered(self, timestamp: int) -> None:
logger.debug(f'Stream recording recovered, {timestamp}')
self._timebase = timestamp * 1000
self._delta = self._duration * 1000
async def on_stream_recording_recovered(self, timestamp: float) -> None:
self._recovered_timestamp = timestamp
self._delta += -float(
Decimal(str(self._recovered_timestamp))
- Decimal(str(self._interrupted_timestamp))
)
self._stream_recording_interrupted = False
async with self._stream_recording_recovered:
self._stream_recording_recovered.notify_all()
logger.debug(
'Stream recording recovered, '
f'timestamp: {timestamp}, delta: {self._delta}'
)
async def on_duration_lost(self, duration: float) -> None:
logger.debug(f'Total duration lost: {(duration)}')
self._delta = -duration
def _start_dumping(self) -> None:
self._create_dump_task()
@ -217,14 +230,7 @@ class DanmakuDumper(
continue
await writer.write_super_chat_record(self._make_super_chat_record(msg))
else:
logger.warning('Unsupported message type:', repr(msg))
if self._stream_recording_interrupted:
logger.debug(
f'Last message before stream recording interrupted: {repr(msg)}'
)
async with self._stream_recording_recovered:
await self._stream_recording_recovered.wait()
logger.warning(f'Unsupported message type: {repr(msg)}')
def _make_metadata(self) -> Metadata:
return Metadata(
@ -246,7 +252,7 @@ class DanmakuDumper(
text = html.escape(text)
return Danmu(
stime=self._calc_stime(msg.date),
stime=self._calc_stime(msg.date / 1000),
mode=msg.mode,
size=msg.size,
color=msg.color,
@ -261,7 +267,7 @@ class DanmakuDumper(
def _make_gift_send_record(self, msg: GiftSendMsg) -> GiftSendRecord:
return GiftSendRecord(
ts=self._calc_stime(msg.timestamp * 1000),
ts=self._calc_stime(msg.timestamp),
uid=msg.uid,
user=msg.uname,
giftname=msg.gift_name,
@ -272,7 +278,7 @@ class DanmakuDumper(
def _make_guard_buy_record(self, msg: GuardBuyMsg) -> GuardBuyRecord:
return GuardBuyRecord(
ts=self._calc_stime(msg.timestamp * 1000),
ts=self._calc_stime(msg.timestamp),
uid=msg.uid,
user=msg.uname,
giftname=msg.gift_name,
@ -283,7 +289,7 @@ class DanmakuDumper(
def _make_super_chat_record(self, msg: SuperChatMsg) -> SuperChatRecord:
return SuperChatRecord(
ts=self._calc_stime(msg.timestamp * 1000),
ts=self._calc_stime(msg.timestamp),
uid=msg.uid,
user=msg.uname,
price=msg.price * msg.rate,
@ -293,7 +299,7 @@ class DanmakuDumper(
def _make_user_toast(self, msg: UserToastMsg) -> UserToast:
return UserToast(
ts=self._calc_stime(msg.start_time * 1000),
ts=self._calc_stime(msg.start_time),
uid=msg.uid,
user=msg.username,
unit=msg.unit,
@ -304,5 +310,15 @@ class DanmakuDumper(
msg=msg.toast_msg,
)
def _calc_stime(self, timestamp: int) -> float:
return (max(timestamp - self._timebase, 0) + self._delta) / 1000
def _calc_stime(self, timestamp: float) -> float:
if self._stream_recording_interrupted:
return self._duration
else:
return (
max(
timestamp * 1000
- self._record_start_time * 1000
+ self._delta * 1000,
0,
)
) / 1000

View File

@ -4,6 +4,7 @@ from typing import Optional
from reactivex.scheduler import NewThreadScheduler
from blrec.bili.live import Live
from blrec.bili.live_monitor import LiveMonitor
from blrec.bili.typing import QualityNumber
from blrec.flv import operators as flv_ops
from blrec.flv.metadata_dumper import MetadataDumper
@ -22,6 +23,7 @@ class FLVStreamRecorderImpl(StreamRecorderImpl, SupportDebugMixin):
def __init__(
self,
live: Live,
live_monitor: LiveMonitor,
out_dir: str,
path_template: str,
*,
@ -34,6 +36,7 @@ class FLVStreamRecorderImpl(StreamRecorderImpl, SupportDebugMixin):
) -> None:
super().__init__(
live=live,
live_monitor=live_monitor,
out_dir=out_dir,
path_template=path_template,
stream_format='flv',
@ -66,7 +69,7 @@ class FLVStreamRecorderImpl(StreamRecorderImpl, SupportDebugMixin):
)
self._recording_monitor = core_ops.RecordingMonitor(
live, lambda: self._analyser.duration
live, lambda: self._analyser.duration, self._analyser.duration_updated
)
self._prober.profiles.subscribe(self._on_profile_updated)

View File

@ -1,110 +0,0 @@
import logging
from typing import Optional
from reactivex.scheduler import NewThreadScheduler
from blrec.bili.live import Live
from blrec.bili.typing import QualityNumber
from blrec.hls import operators as hls_ops
from blrec.hls.metadata_dumper import MetadataDumper
from blrec.utils import operators as utils_ops
from . import operators as core_ops
from .stream_recorder_impl import StreamRecorderImpl
__all__ = ('HLSRawStreamRecorderImpl',)
logger = logging.getLogger(__name__)
class HLSRawStreamRecorderImpl(StreamRecorderImpl):
def __init__(
self,
live: Live,
out_dir: str,
path_template: str,
*,
quality_number: QualityNumber = 10000,
buffer_size: Optional[int] = None,
read_timeout: Optional[int] = None,
disconnection_timeout: Optional[int] = None,
filesize_limit: int = 0,
duration_limit: int = 0,
) -> None:
super().__init__(
live=live,
out_dir=out_dir,
path_template=path_template,
stream_format='fmp4',
recording_mode='raw',
quality_number=quality_number,
buffer_size=buffer_size,
read_timeout=read_timeout,
disconnection_timeout=disconnection_timeout,
filesize_limit=filesize_limit,
duration_limit=duration_limit,
)
self._playlist_fetcher = hls_ops.PlaylistFetcher(self._live, self._session)
self._playlist_dumper = hls_ops.PlaylistDumper(self._path_provider)
self._segment_fetcher = hls_ops.SegmentFetcher(
self._live, self._session, self._stream_url_resolver
)
self._segment_dumper = hls_ops.SegmentDumper(self._playlist_dumper)
self._ff_metadata_dumper = MetadataDumper(
self._playlist_dumper, self._metadata_provider
)
self._prober = hls_ops.Prober()
self._dl_statistics = core_ops.SizedStatistics()
self._recording_monitor = core_ops.RecordingMonitor(
live, lambda: self._playlist_dumper.duration
)
self._prober.profiles.subscribe(self._on_profile_updated)
self._playlist_dumper.file_opened.subscribe(self._on_video_file_opened)
self._playlist_dumper.file_closed.subscribe(self._on_video_file_closed)
self._recording_monitor.interrupted.subscribe(self._on_recording_interrupted)
self._recording_monitor.recovered.subscribe(self._on_recording_recovered)
@property
def recording_path(self) -> Optional[str]:
return self._playlist_dumper.path
def _on_start(self) -> None:
self._ff_metadata_dumper.enable()
def _on_stop(self) -> None:
self._ff_metadata_dumper.disable()
def _run(self) -> None:
self._subscription = (
self._stream_param_holder.get_stream_params() # type: ignore
.pipe(
self._stream_url_resolver,
self._playlist_fetcher,
self._recording_monitor,
self._connection_error_handler,
self._request_exception_handler,
self._playlist_dumper,
utils_ops.observe_on_new_thread(
queue_size=60,
thread_name=f'SegmentDownloader::{self._live.room_id}',
),
self._segment_fetcher,
self._dl_statistics,
self._prober,
self._segment_dumper,
self._rec_statistics,
self._progress_bar,
self._exception_handler,
)
.subscribe(
on_completed=self._on_completed,
scheduler=NewThreadScheduler(
self._thread_factory('HLSRawStreamRecorder')
),
)
)

View File

@ -4,10 +4,10 @@ from typing import Optional
from reactivex.scheduler import NewThreadScheduler
from blrec.bili.live import Live
from blrec.bili.live_monitor import LiveMonitor
from blrec.bili.typing import QualityNumber
from blrec.flv import operators as flv_ops
from blrec.flv.metadata_dumper import MetadataDumper
from blrec.hls import operators as hls_ops
from blrec.hls.metadata_dumper import MetadataDumper
from blrec.utils import operators as utils_ops
from . import operators as core_ops
@ -23,6 +23,7 @@ class HLSStreamRecorderImpl(StreamRecorderImpl):
def __init__(
self,
live: Live,
live_monitor: LiveMonitor,
out_dir: str,
path_template: str,
*,
@ -35,6 +36,7 @@ class HLSStreamRecorderImpl(StreamRecorderImpl):
) -> None:
super().__init__(
live=live,
live_monitor=live_monitor,
out_dir=out_dir,
path_template=path_template,
stream_format='fmp4',
@ -52,43 +54,41 @@ class HLSStreamRecorderImpl(StreamRecorderImpl):
self._segment_fetcher = hls_ops.SegmentFetcher(
self._live, self._session, self._stream_url_resolver
)
self._segment_remuxer = hls_ops.SegmentRemuxer(live)
self._prober = hls_ops.Prober()
self._dl_statistics = core_ops.SizedStatistics()
self._segment_parser = hls_ops.SegmentParser()
self._analyser = flv_ops.Analyser()
self._injector = flv_ops.Injector(self._metadata_provider)
self._join_point_extractor = flv_ops.JoinPointExtractor()
self._limiter = flv_ops.Limiter(filesize_limit, duration_limit)
self._cutter = flv_ops.Cutter()
self._dumper = flv_ops.Dumper(self._path_provider, buffer_size)
self._metadata_dumper = MetadataDumper(
self._dumper, self._analyser, self._join_point_extractor
self._segment_dumper = hls_ops.SegmentDumper(self._path_provider)
self._playlist_dumper = hls_ops.PlaylistDumper(self._segment_dumper)
self._ff_metadata_dumper = MetadataDumper(
self._segment_dumper, self._metadata_provider
)
self._cutter = hls_ops.Cutter(self._playlist_dumper)
self._limiter = hls_ops.Limiter(
self._playlist_dumper,
self._segment_dumper,
filesize_limit=filesize_limit,
duration_limit=duration_limit,
)
self._prober = hls_ops.Prober()
self._analyser = hls_ops.Analyser(
self._playlist_dumper, self._segment_dumper, self._prober
)
self._dl_statistics = core_ops.SizedStatistics()
self._recording_monitor = core_ops.RecordingMonitor(
live, lambda: self._analyser.duration
live,
lambda: self._playlist_dumper.duration,
self._playlist_dumper.duration_updated,
)
self._prober.profiles.subscribe(self._on_profile_updated)
self._dumper.file_opened.subscribe(self._on_video_file_opened)
self._dumper.file_closed.subscribe(self._on_video_file_closed)
self._segment_dumper.file_opened.subscribe(self._on_video_file_opened)
self._segment_dumper.file_closed.subscribe(self._on_video_file_closed)
self._playlist_dumper.segments_lost.subscribe(self._on_duration_lost)
self._recording_monitor.interrupted.subscribe(self._on_recording_interrupted)
self._recording_monitor.recovered.subscribe(self._on_recording_recovered)
@property
def buffer_size(self) -> int:
return self._dumper.buffer_size
@buffer_size.setter
def buffer_size(self, value: int) -> None:
self._dumper.buffer_size = value
@property
def recording_path(self) -> Optional[str]:
return self._dumper.path
return self._segment_dumper.path
@property
def filesize_limit(self) -> int:
@ -107,11 +107,8 @@ class HLSStreamRecorderImpl(StreamRecorderImpl):
self._limiter.duration_limit = value
@property
def metadata(self) -> Optional[flv_ops.MetaData]:
try:
def metadata(self) -> Optional[hls_ops.MetaData]:
return self._analyser.make_metadata()
except Exception:
return None
def can_cut_stream(self) -> bool:
return self._cutter.can_cut_stream()
@ -120,10 +117,10 @@ class HLSStreamRecorderImpl(StreamRecorderImpl):
return self._cutter.cut_stream()
def _on_start(self) -> None:
self._metadata_dumper.enable()
self._ff_metadata_dumper.enable()
def _on_stop(self) -> None:
self._metadata_dumper.disable()
self._ff_metadata_dumper.disable()
def _run(self) -> None:
self._subscription = (
@ -141,20 +138,13 @@ class HLSStreamRecorderImpl(StreamRecorderImpl):
self._segment_fetcher,
self._dl_statistics,
self._prober,
utils_ops.observe_on_new_thread(
queue_size=10, thread_name=f'StreamRecorder::{self._live.room_id}'
),
self._segment_remuxer,
self._segment_parser,
flv_ops.process(),
self._analyser,
self._cutter,
self._limiter,
self._join_point_extractor,
self._injector,
self._analyser,
self._dumper,
self._segment_dumper,
self._rec_statistics,
self._progress_bar,
self._playlist_dumper,
self._exception_handler,
)
.subscribe(

View File

@ -54,13 +54,6 @@ class MetadataProvider:
', bluray' if '_bluray' in self._stream_recorder.stream_url else '',
)
if self._stream_recorder.recording_mode == 'standard':
recording_mode_desc = '标准'
elif self._stream_recorder.recording_mode == 'raw':
recording_mode_desc = '原始'
else:
recording_mode_desc = ''
return {
'Title': self._live.room_info.title,
'Artist': self._live.user_info.name,
@ -74,11 +67,10 @@ B站直播录像
开播时间{live_start_time}
开始推流时间: {stream_available_time}
HLS流可用时间: {hls_stream_available_time}
开始录制时间: {record_start_time}
录播起始时间: {record_start_time}
流主机: {self._stream_recorder.stream_host}
流格式{self._stream_recorder.stream_format}
流画质{stream_quality}
录制模式: {recording_mode_desc}
录制程序{__prog__} v{__version__} {__github__}''',
'description': OrderedDict(
{
@ -95,7 +87,6 @@ HLS流可用时间: {hls_stream_available_time}
'StreamHost': self._stream_recorder.stream_host,
'StreamFormat': self._stream_recorder.stream_format,
'StreamQuality': stream_quality,
'RecordingMode': self._stream_recorder.recording_mode,
'Recorder': f'{__prog__} v{__version__} {__github__}',
}
),

View File

@ -77,7 +77,7 @@ class ConnectionErrorHandler(AsyncCooperationMixin):
timeout = self.disconnection_timeout
logger.info(f'Waiting {timeout} seconds for connection recovery... ')
timebase = time.monotonic()
while not self._run_coroutine(self._live.check_connectivity()):
while not self._call_coroutine(self._live.check_connectivity()):
if timeout is not None and time.monotonic() - timebase > timeout:
logger.error(f'Connection not recovered in {timeout} seconds')
return False

View File

@ -1,7 +1,8 @@
from __future__ import annotations
import logging
from typing import Callable, Final, Optional, TypeVar
import time
from typing import Callable, Final, Optional, Tuple, TypeVar
from reactivex import Observable, Subject, abc
@ -17,21 +18,35 @@ _T = TypeVar('_T')
class RecordingMonitor(AsyncCooperationMixin):
def __init__(self, live: Live, duration_provider: Callable[..., float]) -> None:
def __init__(
self,
live: Live,
duration_provider: Callable[..., float],
duration_updated: Observable[float],
) -> None:
super().__init__()
self._live = live
self._duration_provider = duration_provider
self._interrupted: Subject[float] = Subject()
self._recovered: Subject[int] = Subject()
self._duration_updated = duration_updated
self._duration_subscription: Optional[abc.DisposableBase] = None
self._interrupted: Subject[Tuple[float, float]] = Subject()
self._recovered: Subject[float] = Subject()
@property
def interrupted(self) -> Observable[float]:
def interrupted(self) -> Observable[Tuple[float, float]]:
return self._interrupted
@property
def recovered(self) -> Observable[int]:
def recovered(self) -> Observable[float]:
return self._recovered
def _on_duration_updated(self, duration: float) -> None:
ts = time.time()
self._recovered.on_next(ts)
assert self._duration_subscription is not None
self._duration_subscription.dispose()
self._duration_subscription = None
def __call__(self, source: Observable[_T]) -> Observable[_T]:
return self._monitor(source)
@ -48,8 +63,11 @@ class RecordingMonitor(AsyncCooperationMixin):
nonlocal recording, failed_count
recording = True
if failed_count >= CRITERIA:
ts = self._run_coroutine(self._live.get_timestamp())
self._recovered.on_next(ts)
if self._duration_subscription is not None:
self._duration_subscription.dispose()
self._duration_subscription = self._duration_updated.subscribe(
self._on_duration_updated
)
failed_count = 0
observer.on_next(item)
@ -58,8 +76,9 @@ class RecordingMonitor(AsyncCooperationMixin):
if recording:
failed_count += 1
if failed_count == CRITERIA:
ts = time.time()
duration = self._duration_provider()
self._interrupted.on_next(duration)
self._interrupted.on_next((ts, duration))
observer.on_error(exc)
return source.subscribe(

View File

@ -9,7 +9,9 @@ import aiohttp
import requests
import urllib3
from reactivex import Observable, abc
from reactivex import operators as ops
from blrec.core import operators as core_ops
from blrec.utils import operators as utils_ops
__all__ = ('RequestExceptionHandler',)
@ -21,12 +23,14 @@ _T = TypeVar('_T')
class RequestExceptionHandler:
def __init__(self) -> None:
def __init__(self, stream_url_resolver: core_ops.StreamURLResolver) -> None:
self._stream_url_resolver = stream_url_resolver
self._last_retry_time = time.monotonic()
def __call__(self, source: Observable[_T]) -> Observable[_T]:
return self._handle(source).pipe(
utils_ops.retry(should_retry=self._should_retry)
ops.do_action(on_error=self._before_retry),
utils_ops.retry(should_retry=self._should_retry),
)
def _handle(self, source: Observable[_T]) -> Observable[_T]:
@ -74,3 +78,10 @@ class RequestExceptionHandler:
return True
else:
return False
def _before_retry(self, exc: Exception) -> None:
if isinstance(
exc, requests.exceptions.HTTPError
) and exc.response.status_code in (403, 404):
self._stream_url_resolver.reset()
self._stream_url_resolver.rotate_routes()

View File

@ -1,7 +1,7 @@
from __future__ import annotations
import logging
from typing import Optional
from typing import Final, Optional
from urllib.parse import urlparse
import requests
@ -19,6 +19,7 @@ from blrec.bili.exceptions import (
NoStreamQualityAvailable,
)
from blrec.bili.live import Live
from blrec.bili.live_monitor import LiveMonitor
from blrec.utils import operators as utils_ops
from blrec.utils.mixins import AsyncCooperationMixin
@ -31,13 +32,24 @@ logger = logging.getLogger(__name__)
class StreamURLResolver(AsyncCooperationMixin):
def __init__(self, live: Live, stream_param_holder: StreamParamHolder) -> None:
_MAX_ATTEMPTS_FOR_NO_STREAM: Final[int] = 10
def __init__(
self,
live: Live,
session: requests.Session,
live_monitor: LiveMonitor,
stream_param_holder: StreamParamHolder,
) -> None:
super().__init__()
self._live = live
self._session = session
self._live_monitor = live_monitor
self._stream_param_holder = stream_param_holder
self._stream_url: str = ''
self._stream_host: str = ''
self._stream_params: Optional[StreamParams] = None
self._attempts_for_no_stream: int = 0
@property
def stream_url(self) -> str:
@ -47,10 +59,22 @@ class StreamURLResolver(AsyncCooperationMixin):
def stream_host(self) -> str:
return self._stream_host
@property
def use_alternative_stream(self) -> bool:
return self._stream_param_holder.use_alternative_stream
@use_alternative_stream.setter
def use_alternative_stream(self, value: bool) -> None:
self._stream_param_holder.use_alternative_stream = value
def reset(self) -> None:
self._stream_url = ''
self._stream_host = ''
self._stream_params = None
self._attempts_for_no_stream = 0
def rotate_routes(self) -> None:
self.use_alternative_stream = not self.use_alternative_stream
def __call__(self, source: Observable[StreamParams]) -> Observable[str]:
self.reset()
@ -77,7 +101,7 @@ class StreamURLResolver(AsyncCooperationMixin):
f'api platform: {params.api_platform}, '
f'use alternative stream: {params.use_alternative_stream}'
)
url = self._run_coroutine(
url = self._call_coroutine(
self._live.get_live_stream_url(
params.quality_number,
api_platform=params.api_platform,
@ -93,6 +117,7 @@ class StreamURLResolver(AsyncCooperationMixin):
self._stream_url = url
self._stream_host = urlparse(url).hostname or ''
self._stream_params = params
self._attempts_for_no_stream = 0
observer.on_next(url)
return source.subscribe(
@ -104,8 +129,8 @@ class StreamURLResolver(AsyncCooperationMixin):
def _can_resue_url(self, params: StreamParams) -> bool:
if params == self._stream_params and self._stream_url:
try:
response = requests.get(
self._stream_url, stream=True, headers=self._live.headers
response = self._session.get(
self._stream_url, stream=True, headers=self._live.headers, timeout=3
)
response.raise_for_status()
except Exception:
@ -134,9 +159,15 @@ class StreamURLResolver(AsyncCooperationMixin):
try:
raise exc
except (NoStreamAvailable, NoStreamCodecAvailable, NoStreamFormatAvailable):
pass
self._attempts_for_no_stream += 1
if self._attempts_for_no_stream > self._MAX_ATTEMPTS_FOR_NO_STREAM:
self._run_coroutine(self._live_monitor.check_live_status())
self._attempts_for_no_stream = 0
except NoStreamQualityAvailable:
qn = self._stream_param_holder.quality_number
if qn == 10000:
logger.warning('The original stream quality (10000) is not available')
else:
logger.info(
f'The specified stream quality ({qn}) is not available, '
'will using the original stream quality (10000) instead.'

View File

@ -20,10 +20,11 @@ class PathProvider(AsyncCooperationMixin):
self.out_dir = out_dir
self.path_template = path_template
def __call__(self) -> Tuple[str, int]:
ts = self._run_coroutine(self._live.get_timestamp())
path = self._make_path(ts)
return path, ts
def __call__(self, timestamp: int = None) -> Tuple[str, int]:
if timestamp is None:
timestamp = self._call_coroutine(self._live.get_timestamp())
path = self._make_path(timestamp)
return path, timestamp
def _make_path(self, timestamp: int) -> str:
date_time = datetime.fromtimestamp(timestamp)

View File

@ -12,8 +12,9 @@ from blrec.bili.live import Live
from blrec.bili.live_monitor import LiveEventListener, LiveMonitor
from blrec.bili.models import RoomInfo
from blrec.bili.typing import QualityNumber, StreamFormat
from blrec.core.typing import MetaData
from blrec.event.event_emitter import EventEmitter, EventListener
from blrec.flv.operators import MetaData, StreamProfile
from blrec.flv.operators import StreamProfile
from blrec.setting.typing import RecordingMode
from blrec.utils.mixins import AsyncStoppableMixin
@ -114,7 +115,8 @@ class Recorder(
self._stream_available: bool = False
self._stream_recorder = StreamRecorder(
self._live,
live,
live_monitor,
out_dir=out_dir,
path_template=path_template,
stream_format=stream_format,

View File

@ -4,17 +4,18 @@ import time
from typing import Iterator, Optional
from blrec.bili.live import Live
from blrec.bili.live_monitor import LiveMonitor
from blrec.bili.typing import QualityNumber, StreamFormat
from blrec.event.event_emitter import EventEmitter
from blrec.flv.operators import MetaData, StreamProfile
from blrec.flv.operators import StreamProfile
from blrec.setting.typing import RecordingMode
from blrec.utils.libc import malloc_trim
from blrec.utils.mixins import AsyncStoppableMixin
from .flv_stream_recorder_impl import FLVStreamRecorderImpl
from .hls_raw_stream_recorder_impl import HLSRawStreamRecorderImpl
from .hls_stream_recorder_impl import HLSStreamRecorderImpl
from .stream_recorder_impl import StreamRecorderEventListener
from .typing import MetaData
__all__ = 'StreamRecorder', 'StreamRecorderEventListener'
@ -30,6 +31,7 @@ class StreamRecorder(
def __init__(
self,
live: Live,
live_monitor: LiveMonitor,
out_dir: str,
path_template: str,
*,
@ -46,6 +48,7 @@ class StreamRecorder(
super().__init__()
self._live = live
self._live_monitor = live_monitor
self.stream_format = stream_format
self.recording_mode = recording_mode
self.fmp4_stream_timeout = fmp4_stream_timeout
@ -53,10 +56,7 @@ class StreamRecorder(
if stream_format == 'flv':
cls = FLVStreamRecorderImpl
elif stream_format == 'fmp4':
if recording_mode == 'standard':
cls = HLSStreamRecorderImpl # type: ignore
else:
cls = HLSRawStreamRecorderImpl # type: ignore
else:
logger.warning(
f'The specified stream format ({stream_format}) is '
@ -67,6 +67,7 @@ class StreamRecorder(
self._impl = cls(
live=live,
live_monitor=live_monitor,
out_dir=out_dir,
path_template=path_template,
quality_number=quality_number,
@ -277,10 +278,12 @@ class StreamRecorder(
async def on_video_file_completed(self, path: str) -> None:
await self._emit('video_file_completed', path)
async def on_stream_recording_interrupted(self, duration: float) -> None:
await self._emit('stream_recording_interrupted', duration)
async def on_stream_recording_interrupted(
self, timestamp: float, duration: float
) -> None:
await self._emit('stream_recording_interrupted', timestamp, duration)
async def on_stream_recording_recovered(self, timestamp: int) -> None:
async def on_stream_recording_recovered(self, timestamp: float) -> None:
await self._emit('stream_recording_recovered', timestamp)
async def on_stream_recording_completed(self) -> None:
@ -307,10 +310,7 @@ class StreamRecorder(
if stream_format == 'flv':
cls = FLVStreamRecorderImpl
elif stream_format == 'fmp4':
if self.recording_mode == 'standard':
cls = HLSStreamRecorderImpl # type: ignore
else:
cls = HLSRawStreamRecorderImpl # type: ignore
else:
logger.warning(
f'The specified stream format ({stream_format}) is '
@ -327,6 +327,7 @@ class StreamRecorder(
self._impl = cls(
live=self._impl._live,
live_monitor=self._impl._live_monitor,
out_dir=self._impl.out_dir,
path_template=self._impl.path_template,
quality_number=self._impl.quality_number,

View File

@ -11,12 +11,13 @@ from reactivex import abc
from reactivex.typing import StartableFactory, StartableTarget
from blrec.bili.live import Live
from blrec.bili.live_monitor import LiveMonitor
from blrec.bili.typing import QualityNumber, StreamFormat
from blrec.event.event_emitter import EventEmitter, EventListener
from blrec.flv import operators as flv_ops
from blrec.flv.operators import StreamProfile
from blrec.flv.utils import format_timestamp
from blrec.logging.room_id import aio_task_with_room_id
from blrec.hls import operators as hls_ops
from blrec.setting.typing import RecordingMode
from blrec.utils.mixins import AsyncCooperationMixin, AsyncStoppableMixin
@ -39,10 +40,15 @@ class StreamRecorderEventListener(EventListener):
async def on_video_file_completed(self, path: str) -> None:
...
async def on_stream_recording_interrupted(self, duratin: float) -> None:
async def on_stream_recording_interrupted(
self, timestamp: float, duration: float
) -> None:
...
async def on_stream_recording_recovered(self, timestamp: int) -> None:
async def on_stream_recording_recovered(self, timestamp: float) -> None:
...
async def on_duration_lost(self, duration: float) -> None:
...
async def on_stream_recording_completed(self) -> None:
@ -58,6 +64,7 @@ class StreamRecorderImpl(
def __init__(
self,
live: Live,
live_monitor: LiveMonitor,
out_dir: str,
path_template: str,
*,
@ -73,6 +80,7 @@ class StreamRecorderImpl(
super().__init__()
self._live = live
self._live_monitor = live_monitor
self._session = requests.Session()
self._recording_mode = recording_mode
@ -85,7 +93,7 @@ class StreamRecorderImpl(
stream_format=stream_format, quality_number=quality_number
)
self._stream_url_resolver = core_ops.StreamURLResolver(
live, self._stream_param_holder
live, self._session, live_monitor, self._stream_param_holder
)
self._progress_bar = core_ops.ProgressBar(live)
self._metadata_provider = MetadataProvider(live, self)
@ -93,7 +101,9 @@ class StreamRecorderImpl(
self._rec_statistics = core_ops.SizedStatistics()
self._dl_statistics: Union[core_ops.StreamStatistics, core_ops.SizedStatistics]
self._request_exception_handler = core_ops.RequestExceptionHandler()
self._request_exception_handler = core_ops.RequestExceptionHandler(
self._stream_url_resolver
)
self._connection_error_handler = core_ops.ConnectionErrorHandler(
live, disconnection_timeout=disconnection_timeout
)
@ -240,7 +250,7 @@ class StreamRecorderImpl(
return ''
@property
def metadata(self) -> Optional[flv_ops.MetaData]:
def metadata(self) -> Optional[Union[flv_ops.MetaData, hls_ops.MetaData]]:
return None
@property
@ -337,19 +347,24 @@ class StreamRecorderImpl(
logger.info(f"Video file completed: '{path}'")
self._emit_event('video_file_completed', path)
def _on_recording_interrupted(self, duration: float) -> None:
duration_string = format_timestamp(int(duration * 1000))
logger.info(f'Recording interrupted, current duration: {duration_string}')
self._emit_event('stream_recording_interrupted', duration)
def _on_recording_recovered(self, timestamp: int) -> None:
def _on_recording_interrupted(self, args: Tuple[float, float]) -> None:
timestamp, duration = args[0], args[1]
datetime_string = datetime.fromtimestamp(timestamp).isoformat()
logger.info(f'Recording recovered, current date time {(datetime_string)}')
duration_string = format_timestamp(int(duration * 1000))
logger.info(
f'Recording interrupted, datetime: {datetime_string}, '
f'duration: {duration_string}'
)
self._emit_event('stream_recording_interrupted', timestamp, duration)
def _on_recording_recovered(self, timestamp: float) -> None:
datetime_string = datetime.fromtimestamp(timestamp).isoformat()
logger.info(f'Recording recovered, datetime: {(datetime_string)}')
self._emit_event('stream_recording_recovered', timestamp)
def _emit_event(self, name: str, *args: Any, **kwds: Any) -> None:
self._run_coroutine(self._emit(name, *args, **kwds))
def _on_duration_lost(self, duration: float) -> None:
logger.info(f'Total duration lost: {(duration)}')
self._emit_event('duration_lost', duration)
@aio_task_with_room_id
async def _emit(self, *args: Any, **kwds: Any) -> None: # type: ignore
await super()._emit(*args, **kwds)
def _emit_event(self, name: str, *args: Any, **kwds: Any) -> None:
self._call_coroutine(self._emit(name, *args, **kwds))

View File

@ -1,5 +1,9 @@
from typing import Union
from blrec.flv.operators import MetaData as FLVMetaData
from blrec.hls.operators import MetaData as HLSMetaData
from .models import DanmuMsg, GiftSendMsg, GuardBuyMsg, SuperChatMsg, UserToastMsg
DanmakuMsg = Union[DanmuMsg, GiftSendMsg, GuardBuyMsg, SuperChatMsg, UserToastMsg]
MetaData = Union[FLVMetaData, HLSMetaData]

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -10,6 +10,6 @@
<body>
<app-root></app-root>
<noscript>Please enable JavaScript to continue using this application.</noscript>
<script src="runtime.bad8d115055bada4.js" type="module"></script><script src="polyfills.4e5433063877ea34.js" type="module"></script><script src="main.f21b7d831ad9cafb.js" type="module"></script>
<script src="runtime.8688afa20dbe5cc7.js" type="module"></script><script src="polyfills.4e5433063877ea34.js" type="module"></script><script src="main.f21b7d831ad9cafb.js" type="module"></script>
</body></html>

View File

@ -1,6 +1,6 @@
{
"configVersion": 1,
"timestamp": 1697282115210,
"timestamp": 1697949101780,
"index": "/index.html",
"assetGroups": [
{
@ -12,17 +12,17 @@
},
"urls": [
"/103.4a2aea63cc3bf42b.js",
"/287.5c768f00dcd24631.js",
"/287.63ace7ac80c3d9f2.js",
"/386.2404f3bc252e1df3.js",
"/503.6553f508f4a9247d.js",
"/548.73ee5c2419f2617e.js",
"/548.f8a3199ca2412e0d.js",
"/688.7032fddba7983cf6.js",
"/common.1fc175bce139f4df.js",
"/index.html",
"/main.f21b7d831ad9cafb.js",
"/manifest.webmanifest",
"/polyfills.4e5433063877ea34.js",
"/runtime.bad8d115055bada4.js",
"/runtime.8688afa20dbe5cc7.js",
"/styles.ae81e04dfa5b2860.css"
],
"patterns": []
@ -1635,10 +1635,10 @@
"dataGroups": [],
"hashTable": {
"/103.4a2aea63cc3bf42b.js": "2711817f2977bfdc18c34fee4fe9385fe012bb22",
"/287.5c768f00dcd24631.js": "4cd0f85040b1a482bf9796575738afdd2dcda00e",
"/287.63ace7ac80c3d9f2.js": "7a52c7715de66142dae39668a3a0fb0f9ee4bb50",
"/386.2404f3bc252e1df3.js": "f937945645579b9651be2666f70cec2c5de4e367",
"/503.6553f508f4a9247d.js": "0878ea0e91bfd5458dd55875561e91060ecb0837",
"/548.73ee5c2419f2617e.js": "93153313f106aed86859050288e59e6c19e3d4cf",
"/548.f8a3199ca2412e0d.js": "58ae6ac139c0b62ed266313e7a75a8266770387f",
"/688.7032fddba7983cf6.js": "eae55044529782a51b7e534365255bbfa5522b05",
"/assets/animal/panda.js": "fec2868bb3053dd2da45f96bbcb86d5116ed72b1",
"/assets/animal/panda.svg": "bebd302cdc601e0ead3a6d2710acf8753f3d83b1",
@ -3234,11 +3234,11 @@
"/assets/twotone/warning.js": "fb2d7ea232f3a99bf8f080dbc94c65699232ac01",
"/assets/twotone/warning.svg": "8c7a2d3e765a2e7dd58ac674870c6655cecb0068",
"/common.1fc175bce139f4df.js": "af1775164711ec49e5c3a91ee45bd77509c17c54",
"/index.html": "8dc97bacb089295cf14011c3c3e21b14ba7108a1",
"/index.html": "28dc5eb629ca29943d45677dac7fded24c0362c3",
"/main.f21b7d831ad9cafb.js": "fc51efa446c2ac21ee17e165217dd3faeacc5290",
"/manifest.webmanifest": "62c1cb8c5ad2af551a956b97013ab55ce77dd586",
"/polyfills.4e5433063877ea34.js": "68159ab99e0608976404a17132f60b5ceb6f12d2",
"/runtime.bad8d115055bada4.js": "a679fb0193729e75a72e77d0551dc85e080d9b41",
"/runtime.8688afa20dbe5cc7.js": "602d7051e97524a7becae76c8e76e7db29370b2b",
"/styles.ae81e04dfa5b2860.css": "5933b4f1c4d8fcc1891b68940ee78af4091472b7"
},
"navigationUrls": [

View File

@ -1 +1 @@
(()=>{"use strict";var e,v={},m={};function r(e){var n=m[e];if(void 0!==n)return n.exports;var t=m[e]={exports:{}};return v[e](t,t.exports,r),t.exports}r.m=v,e=[],r.O=(n,t,f,o)=>{if(!t){var a=1/0;for(i=0;i<e.length;i++){for(var[t,f,o]=e[i],c=!0,u=0;u<t.length;u++)(!1&o||a>=o)&&Object.keys(r.O).every(b=>r.O[b](t[u]))?t.splice(u--,1):(c=!1,o<a&&(a=o));if(c){e.splice(i--,1);var d=f();void 0!==d&&(n=d)}}return n}o=o||0;for(var i=e.length;i>0&&e[i-1][2]>o;i--)e[i]=e[i-1];e[i]=[t,f,o]},r.n=e=>{var n=e&&e.__esModule?()=>e.default:()=>e;return r.d(n,{a:n}),n},r.d=(e,n)=>{for(var t in n)r.o(n,t)&&!r.o(e,t)&&Object.defineProperty(e,t,{enumerable:!0,get:n[t]})},r.f={},r.e=e=>Promise.all(Object.keys(r.f).reduce((n,t)=>(r.f[t](e,n),n),[])),r.u=e=>(592===e?"common":e)+"."+{103:"4a2aea63cc3bf42b",287:"5c768f00dcd24631",386:"2404f3bc252e1df3",503:"6553f508f4a9247d",548:"73ee5c2419f2617e",592:"1fc175bce139f4df",688:"7032fddba7983cf6"}[e]+".js",r.miniCssF=e=>{},r.o=(e,n)=>Object.prototype.hasOwnProperty.call(e,n),(()=>{var e={},n="blrec:";r.l=(t,f,o,i)=>{if(e[t])e[t].push(f);else{var a,c;if(void 0!==o)for(var u=document.getElementsByTagName("script"),d=0;d<u.length;d++){var l=u[d];if(l.getAttribute("src")==t||l.getAttribute("data-webpack")==n+o){a=l;break}}a||(c=!0,(a=document.createElement("script")).type="module",a.charset="utf-8",a.timeout=120,r.nc&&a.setAttribute("nonce",r.nc),a.setAttribute("data-webpack",n+o),a.src=r.tu(t)),e[t]=[f];var s=(g,b)=>{a.onerror=a.onload=null,clearTimeout(p);var _=e[t];if(delete e[t],a.parentNode&&a.parentNode.removeChild(a),_&&_.forEach(h=>h(b)),g)return g(b)},p=setTimeout(s.bind(null,void 0,{type:"timeout",target:a}),12e4);a.onerror=s.bind(null,a.onerror),a.onload=s.bind(null,a.onload),c&&document.head.appendChild(a)}}})(),r.r=e=>{typeof Symbol<"u"&&Symbol.toStringTag&&Object.defineProperty(e,Symbol.toStringTag,{value:"Module"}),Object.defineProperty(e,"__esModule",{value:!0})},(()=>{var e;r.tt=()=>(void 0===e&&(e={createScriptURL:n=>n},typeof trustedTypes<"u"&&trustedTypes.createPolicy&&(e=trustedTypes.createPolicy("angular#bundler",e))),e)})(),r.tu=e=>r.tt().createScriptURL(e),r.p="",(()=>{var e={666:0};r.f.j=(f,o)=>{var i=r.o(e,f)?e[f]:void 0;if(0!==i)if(i)o.push(i[2]);else if(666!=f){var a=new Promise((l,s)=>i=e[f]=[l,s]);o.push(i[2]=a);var c=r.p+r.u(f),u=new Error;r.l(c,l=>{if(r.o(e,f)&&(0!==(i=e[f])&&(e[f]=void 0),i)){var s=l&&("load"===l.type?"missing":l.type),p=l&&l.target&&l.target.src;u.message="Loading chunk "+f+" failed.\n("+s+": "+p+")",u.name="ChunkLoadError",u.type=s,u.request=p,i[1](u)}},"chunk-"+f,f)}else e[f]=0},r.O.j=f=>0===e[f];var n=(f,o)=>{var u,d,[i,a,c]=o,l=0;if(i.some(p=>0!==e[p])){for(u in a)r.o(a,u)&&(r.m[u]=a[u]);if(c)var s=c(r)}for(f&&f(o);l<i.length;l++)r.o(e,d=i[l])&&e[d]&&e[d][0](),e[d]=0;return r.O(s)},t=self.webpackChunkblrec=self.webpackChunkblrec||[];t.forEach(n.bind(null,0)),t.push=n.bind(null,t.push.bind(t))})()})();
(()=>{"use strict";var e,v={},m={};function r(e){var n=m[e];if(void 0!==n)return n.exports;var t=m[e]={exports:{}};return v[e](t,t.exports,r),t.exports}r.m=v,e=[],r.O=(n,t,i,o)=>{if(!t){var a=1/0;for(f=0;f<e.length;f++){for(var[t,i,o]=e[f],c=!0,u=0;u<t.length;u++)(!1&o||a>=o)&&Object.keys(r.O).every(b=>r.O[b](t[u]))?t.splice(u--,1):(c=!1,o<a&&(a=o));if(c){e.splice(f--,1);var d=i();void 0!==d&&(n=d)}}return n}o=o||0;for(var f=e.length;f>0&&e[f-1][2]>o;f--)e[f]=e[f-1];e[f]=[t,i,o]},r.n=e=>{var n=e&&e.__esModule?()=>e.default:()=>e;return r.d(n,{a:n}),n},r.d=(e,n)=>{for(var t in n)r.o(n,t)&&!r.o(e,t)&&Object.defineProperty(e,t,{enumerable:!0,get:n[t]})},r.f={},r.e=e=>Promise.all(Object.keys(r.f).reduce((n,t)=>(r.f[t](e,n),n),[])),r.u=e=>(592===e?"common":e)+"."+{103:"4a2aea63cc3bf42b",287:"63ace7ac80c3d9f2",386:"2404f3bc252e1df3",503:"6553f508f4a9247d",548:"f8a3199ca2412e0d",592:"1fc175bce139f4df",688:"7032fddba7983cf6"}[e]+".js",r.miniCssF=e=>{},r.o=(e,n)=>Object.prototype.hasOwnProperty.call(e,n),(()=>{var e={},n="blrec:";r.l=(t,i,o,f)=>{if(e[t])e[t].push(i);else{var a,c;if(void 0!==o)for(var u=document.getElementsByTagName("script"),d=0;d<u.length;d++){var l=u[d];if(l.getAttribute("src")==t||l.getAttribute("data-webpack")==n+o){a=l;break}}a||(c=!0,(a=document.createElement("script")).type="module",a.charset="utf-8",a.timeout=120,r.nc&&a.setAttribute("nonce",r.nc),a.setAttribute("data-webpack",n+o),a.src=r.tu(t)),e[t]=[i];var s=(g,b)=>{a.onerror=a.onload=null,clearTimeout(p);var _=e[t];if(delete e[t],a.parentNode&&a.parentNode.removeChild(a),_&&_.forEach(h=>h(b)),g)return g(b)},p=setTimeout(s.bind(null,void 0,{type:"timeout",target:a}),12e4);a.onerror=s.bind(null,a.onerror),a.onload=s.bind(null,a.onload),c&&document.head.appendChild(a)}}})(),r.r=e=>{typeof Symbol<"u"&&Symbol.toStringTag&&Object.defineProperty(e,Symbol.toStringTag,{value:"Module"}),Object.defineProperty(e,"__esModule",{value:!0})},(()=>{var e;r.tt=()=>(void 0===e&&(e={createScriptURL:n=>n},typeof trustedTypes<"u"&&trustedTypes.createPolicy&&(e=trustedTypes.createPolicy("angular#bundler",e))),e)})(),r.tu=e=>r.tt().createScriptURL(e),r.p="",(()=>{var e={666:0};r.f.j=(i,o)=>{var f=r.o(e,i)?e[i]:void 0;if(0!==f)if(f)o.push(f[2]);else if(666!=i){var a=new Promise((l,s)=>f=e[i]=[l,s]);o.push(f[2]=a);var c=r.p+r.u(i),u=new Error;r.l(c,l=>{if(r.o(e,i)&&(0!==(f=e[i])&&(e[i]=void 0),f)){var s=l&&("load"===l.type?"missing":l.type),p=l&&l.target&&l.target.src;u.message="Loading chunk "+i+" failed.\n("+s+": "+p+")",u.name="ChunkLoadError",u.type=s,u.request=p,f[1](u)}},"chunk-"+i,i)}else e[i]=0},r.O.j=i=>0===e[i];var n=(i,o)=>{var u,d,[f,a,c]=o,l=0;if(f.some(p=>0!==e[p])){for(u in a)r.o(a,u)&&(r.m[u]=a[u]);if(c)var s=c(r)}for(i&&i(o);l<f.length;l++)r.o(e,d=f[l])&&e[d]&&e[d][0](),e[d]=0;return r.O(s)},t=self.webpackChunkblrec=self.webpackChunkblrec||[];t.forEach(n.bind(null,0)),t.push=n.bind(null,t.push.bind(t))})()})();

View File

@ -22,7 +22,19 @@ logger = logging.getLogger(__name__)
class SpaceReclaimer(SpaceEventListener, SwitchableMixin):
_SUFFIX_SET = frozenset(
('.flv', '.mp4', '.ts', '.m4s', '.m3u8' '.xml', '.jsonl', '.jpg')
(
'.flv',
'.mp4',
'.ts',
'.m4s',
'.m3u8',
'.xml',
'.json',
'.meta',
'.jsonl',
'.jpg',
'.png',
)
)
def __init__(

View File

@ -103,6 +103,7 @@ class MetaDataDict:
class Analyser:
def __init__(self) -> None:
self._metadatas: Subject[Optional[MetaData]] = Subject()
self._duration_updated: Subject[float] = Subject()
self._reset()
def _reset(self) -> None:
@ -135,6 +136,10 @@ class Analyser:
def metadatas(self) -> Observable[Optional[MetaData]]:
return self._metadatas
@property
def duration_updated(self) -> Observable[float]:
return self._duration_updated
def __call__(self, source: FLVStream) -> FLVStream:
return self._analyse(source)
@ -307,6 +312,7 @@ class Analyser:
self._size_of_tags += tag.tag_size
self._size_of_data += tag.data_size
self._last_timestamp = tag.timestamp
self._duration_updated.on_next(self._last_timestamp / 1000)
def _analyse_audio_tag(self, tag: AudioTag) -> None:
if not self._audio_analysed:

View File

@ -31,7 +31,7 @@ def from_file(
path: str, *, backup_timestamp: bool = False, restore_timestamp: bool = False
) -> FLVStream:
return from_stream(
open(path, 'rb'),
open(path, 'rb'), # type: ignore
complete_on_eof=True,
backup_timestamp=backup_timestamp,
restore_timestamp=restore_timestamp,

View File

@ -36,7 +36,7 @@ def parse(
try:
try:
reader = FlvReader(
stream,
stream, # type: ignore
backup_timestamp=backup_timestamp,
restore_timestamp=restore_timestamp,
)

View File

@ -7,7 +7,7 @@ from typing import List, Optional, cast
from reactivex import Observable, Subject, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from ...utils.ffprobe import StreamProfile, ffprobe
from ...utils.ffprobe import StreamProfile, ffprobe_on
from ..common import find_aac_header_tag, find_avc_header_tag
from ..io import FlvWriter
from ..models import FlvHeader, FlvTag
@ -99,4 +99,4 @@ class Prober:
def on_error(e: Exception) -> None:
logger.warning(f'Failed to probe stream by ffprobe: {repr(e)}')
ffprobe(bytes_io.getvalue()).subscribe(on_next, on_error)
ffprobe_on(bytes_io.getvalue()).subscribe(on_next, on_error)

12
src/blrec/hls/helpler.py Normal file
View File

@ -0,0 +1,12 @@
import os
__all__ = ('name_of', 'sequence_number_of')
def name_of(uri: str) -> str:
name, _ext = os.path.splitext(uri)
return name
def sequence_number_of(uri: str) -> int:
return int(name_of(uri))

View File

@ -18,17 +18,17 @@ logger = logging.getLogger(__name__)
class MetadataDumper(SwitchableMixin):
def __init__(
self,
playlist_dumper: hls_ops.PlaylistDumper,
segment_dumper: hls_ops.SegmentDumper,
metadata_provider: Callable[[Dict[str, Any]], Dict[str, Any]],
) -> None:
super().__init__()
self._playlist_dumper = playlist_dumper
self._segment_dumper = segment_dumper
self._metadata_provider = metadata_provider
self._metadata: Dict[str, Any] = {}
def _do_enable(self) -> None:
self._file_opened_subscription = self._playlist_dumper.file_opened.subscribe(
self._on_playlist_file_opened
self._file_opened_subscription = self._segment_dumper.file_opened.subscribe(
self._on_video_file_opened
)
logger.debug('Enabled metadata dumper')
@ -39,13 +39,13 @@ class MetadataDumper(SwitchableMixin):
self._metadata.clear()
logger.debug('Disabled metadata dumper')
def _on_playlist_file_opened(self, args: Tuple[str, int]) -> None:
playlist_path, _ = args
def _on_video_file_opened(self, args: Tuple[str, int]) -> None:
video_path, _timestamp = args
metadata = self._metadata_provider({})
self._dump_metadata(playlist_path, metadata)
self._dump_metadata(video_path, metadata)
def _dump_metadata(self, playlist_path: str, metadata: Dict[str, Any]) -> None:
path = record_metadata_path(playlist_path)
def _dump_metadata(self, video_path: str, metadata: Dict[str, Any]) -> None:
path = record_metadata_path(video_path)
logger.debug(f"Dumping metadata to file: '{path}'")
with open(path, 'wt', encoding='utf8') as file:

View File

@ -1,14 +1,19 @@
from .analyser import Analyser, MetaData
from .cutter import Cutter
from .limiter import Limiter
from .playlist_dumper import PlaylistDumper
from .playlist_fetcher import PlaylistFetcher
from .playlist_resolver import PlaylistResolver
from .prober import Prober, StreamProfile
from .segment_dumper import SegmentDumper
from .segment_fetcher import InitSectionData, SegmentData, SegmentFetcher
from .segment_parser import SegmentParser
from .segment_remuxer import SegmentRemuxer
__all__ = (
'Analyser',
'Cutter',
'InitSectionData',
'Limiter',
'MetaData',
'PlaylistDumper',
'PlaylistFetcher',
'PlaylistResolver',
@ -16,7 +21,5 @@ __all__ = (
'SegmentData',
'SegmentDumper',
'SegmentFetcher',
'SegmentParser',
'SegmentRemuxer',
'StreamProfile',
)

View File

@ -0,0 +1,91 @@
from __future__ import annotations
import logging
from typing import Optional, Union
import attr
from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from .prober import Prober, StreamProfile
from .segment_fetcher import InitSectionData, SegmentData
from .playlist_dumper import PlaylistDumper
from .segment_dumper import SegmentDumper
__all__ = ('Analyser', 'MetaData')
logger = logging.getLogger(__name__)
@attr.s(auto_attribs=True, slots=True, frozen=True, kw_only=True)
class MetaData:
duration: float
filesize: int
width: int
height: int
class Analyser:
def __init__(
self,
playlist_dumper: PlaylistDumper,
segment_dumper: SegmentDumper,
prober: Prober,
) -> None:
self._playlist_dumper = playlist_dumper
self._segment_dumper = segment_dumper
self._prober = prober
self._reset()
self._prober.profiles.subscribe(self._on_profile_updated)
def _reset(self) -> None:
self._video_width: int = 0
self._video_height: int = 0
def _on_profile_updated(self, profile: StreamProfile) -> None:
video_profile = profile['streams'][0]
assert video_profile['codec_type'] == 'video'
self._video_width = video_profile['width']
self._video_height = video_profile['height']
def make_metadata(self) -> MetaData:
return MetaData(
duration=self._playlist_dumper.duration,
filesize=self._segment_dumper.filesize,
width=self._video_width,
height=self._video_height,
)
def __call__(
self, source: Observable[Union[InitSectionData, SegmentData]]
) -> Observable[Union[InitSectionData, SegmentData]]:
return self._analyse(source)
def _analyse(
self, source: Observable[Union[InitSectionData, SegmentData]]
) -> Observable[Union[InitSectionData, SegmentData]]:
def subscribe(
observer: abc.ObserverBase[Union[InitSectionData, SegmentData]],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
self._reset()
def on_next(item: Union[InitSectionData, SegmentData]) -> None:
observer.on_next(item)
def dispose() -> None:
nonlocal disposed
disposed = True
self._reset()
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler
)
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe)

View File

@ -0,0 +1,90 @@
from __future__ import annotations
import logging
from typing import Optional, Tuple, Union
from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from .segment_fetcher import InitSectionData, SegmentData
from .playlist_dumper import PlaylistDumper
__all__ = ('Cutter',)
logger = logging.getLogger(__name__)
class Cutter:
def __init__(
self, playlist_dumper: PlaylistDumper, min_duration: float = 5.0
) -> None:
self._playlist_dumper = playlist_dumper
self._min_duration = min_duration # seconds
self._cutting: bool
self._triggered: bool
self._reset()
def on_open(args: Tuple[str, int]) -> None:
self._cutting = False
self._playlist_dumper.file_opened.subscribe(on_open)
def _reset(self) -> None:
self._cutting = False
self._triggered = False
def is_cutting(self) -> bool:
return self._cutting
def can_cut_stream(self) -> bool:
if self._triggered or self._cutting:
return False
return self._playlist_dumper.duration >= self._min_duration
def cut_stream(self) -> bool:
if self.can_cut_stream():
self._triggered = True
return True
return False
def _add_flag(self, item: Union[InitSectionData, SegmentData]) -> None:
item.segment.custom_parser_values['split'] = True
def __call__(
self, source: Observable[Union[InitSectionData, SegmentData]]
) -> Observable[Union[InitSectionData, SegmentData]]:
return self._cut(source)
def _cut(
self, source: Observable[Union[InitSectionData, SegmentData]]
) -> Observable[Union[InitSectionData, SegmentData]]:
def subscribe(
observer: abc.ObserverBase[Union[InitSectionData, SegmentData]],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
self._reset()
def on_next(item: Union[InitSectionData, SegmentData]) -> None:
if self._triggered:
self._add_flag(item)
self._cutting = True
self._triggered = False
observer.on_next(item)
def dispose() -> None:
nonlocal disposed
disposed = True
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler
)
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe)

View File

@ -0,0 +1,93 @@
from __future__ import annotations
import logging
from typing import Optional, Union
from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from .segment_fetcher import InitSectionData, SegmentData
from .playlist_dumper import PlaylistDumper
from .segment_dumper import SegmentDumper
__all__ = ('Limiter',)
logger = logging.getLogger(__name__)
class Limiter:
def __init__(
self,
playlist_dumper: PlaylistDumper,
segment_dumper: SegmentDumper,
*,
filesize_limit: int = 0, # file size in bytes, no limit by default.
duration_limit: int = 0, # duration in seconds, no limit by default.
) -> None:
self._playlist_dumper = playlist_dumper
self._segment_dumper = segment_dumper
self.filesize_limit = filesize_limit
self.duration_limit = duration_limit
def _will_over_limits(self, item: Union[InitSectionData, SegmentData]) -> bool:
if (
self.filesize_limit > 0
and self._segment_dumper.filesize + len(item) >= self.filesize_limit
):
logger.debug(
'File size will be over the limit: {} + {}'.format(
self._segment_dumper.filesize, len(item)
)
)
return True
if (
self.duration_limit > 0
and self._playlist_dumper.duration + float(item.segment.duration)
>= self.duration_limit
):
logger.debug(
'Duration will be over the limit: {} + {}'.format(
self._playlist_dumper.duration, item.segment.duration
)
)
return True
return False
def _add_flag(self, item: Union[InitSectionData, SegmentData]) -> None:
item.segment.custom_parser_values['split'] = True
def __call__(
self, source: Observable[Union[InitSectionData, SegmentData]]
) -> Observable[Union[InitSectionData, SegmentData]]:
return self._limit(source)
def _limit(
self, source: Observable[Union[InitSectionData, SegmentData]]
) -> Observable[Union[InitSectionData, SegmentData]]:
def subscribe(
observer: abc.ObserverBase[Union[InitSectionData, SegmentData]],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
def on_next(item: Union[InitSectionData, SegmentData]) -> None:
if self._will_over_limits(item):
self._add_flag(item)
observer.on_next(item)
def dispose() -> None:
nonlocal disposed
disposed = True
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler
)
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe)

View File

@ -2,31 +2,52 @@ from __future__ import annotations
import io
import logging
import os
from copy import deepcopy
from decimal import Decimal
from typing import Callable, Optional, Tuple
from pathlib import PurePath
from typing import Optional, Tuple, Union
import m3u8
from reactivex import Observable, Subject, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from ..helpler import sequence_number_of
from .segment_fetcher import InitSectionData, SegmentData
from .segment_dumper import SegmentDumper
__all__ = ('PlaylistDumper',)
logger = logging.getLogger(__name__)
class PlaylistDumper:
def __init__(self, path_provider: Callable[..., Tuple[str, int]]) -> None:
self._path_provider = path_provider
def __init__(self, segment_dumper: SegmentDumper) -> None:
self._segment_dumper = segment_dumper
def on_open(args: Tuple[str, int]) -> None:
self._open_file(*args)
def on_close(path: str) -> None:
self._close_file()
self._reset()
self._segment_dumper.file_opened.subscribe(on_open)
self._segment_dumper.file_closed.subscribe(on_close)
self._file_opened: Subject[Tuple[str, int]] = Subject()
self._file_closed: Subject[str] = Subject()
self._duration_updated: Subject[float] = Subject()
self._segments_lost: Subject[int] = Subject()
self._reset()
def _reset(self) -> None:
self._path: str = ''
self._file: Optional[io.TextIOWrapper] = None
self._duration: Decimal = Decimal()
self._last_segment: Optional[m3u8.Segment] = None
self._last_seq_num: Optional[int] = None
self._num_of_segments_lost: int = 0
@property
def path(self) -> str:
@ -36,6 +57,10 @@ class PlaylistDumper:
def duration(self) -> float:
return float(self._duration)
@property
def num_of_segments_lost(self) -> int:
return self._num_of_segments_lost
@property
def file_opened(self) -> Observable[Tuple[str, int]]:
return self._file_opened
@ -44,17 +69,27 @@ class PlaylistDumper:
def file_closed(self) -> Observable[str]:
return self._file_closed
def __call__(self, source: Observable[m3u8.M3U8]) -> Observable[m3u8.Segment]:
@property
def duration_updated(self) -> Observable[float]:
return self._duration_updated
@property
def segments_lost(self) -> Observable[int]:
return self._segments_lost
def __call__(
self, source: Observable[Union[InitSectionData, SegmentData]]
) -> Observable[Union[InitSectionData, SegmentData]]:
return self._dump(source)
def _open_file(self) -> None:
path, timestamp = self._path_provider()
root, ext = os.path.splitext(path)
os.makedirs(root, exist_ok=True)
self._path = os.path.join(root, 'index.m3u8')
def _open_file(self, video_path: str, timestamp: int) -> None:
path = PurePath(video_path)
self._video_file_name = path.name
self._path = str(path.with_suffix('.m3u8'))
self._file = open(self._path, 'wt', encoding='utf8') # type: ignore
logger.debug(f'Opened file: {self._path}')
self._file_opened.on_next((self._path, timestamp))
self._header_dumped = False
def _close_file(self) -> None:
if self._file is not None and not self._file.closed:
@ -63,97 +98,87 @@ class PlaylistDumper:
logger.debug(f'Closed file: {self._path}')
self._file_closed.on_next(self._path)
def _name_of(self, uri: str) -> str:
name, ext = os.path.splitext(uri)
return name
def _dump_header(self, item: InitSectionData) -> None:
if self._header_dumped:
return
playlist: m3u8.M3U8 = deepcopy(item.segment.custom_parser_values['playlist'])
playlist.segments.clear()
playlist.is_endlist = False
assert self._file is not None
self._file.write(playlist.dumps())
self._file.flush()
self._header_dumped = True
def _sequence_number_of(self, uri: str) -> int:
return int(self._name_of(uri))
def _replace_uri(self, segment: m3u8.Segment) -> m3u8.Segment:
copied_seg = deepcopy(segment)
if init_section := getattr(copied_seg, 'init_section', None):
init_section.uri = f'segments/{init_section.uri}'
uri = segment.uri
name = self._name_of(uri)
copied_seg.uri = 'segments/%s/%s' % (name[:-3], uri)
return copied_seg
def _replace_all_uri(self, playlist: m3u8.M3U8) -> m3u8.M3U8:
copied_playlist = deepcopy(playlist)
copied_playlist.segments = m3u8.SegmentList(
self._replace_uri(s) for s in copied_playlist.segments
def _dump_segment(self, init_item: InitSectionData, item: SegmentData) -> None:
seg = self._make_segment(
item.segment,
self._video_file_name,
init_section_byterange=f'{len(init_item)}@{init_item.offset}',
segment_byterange=f'{len(item)}@{item.offset}',
)
return copied_playlist
curr_seq_num = sequence_number_of(item.segment.uri)
if self._last_seq_num is not None:
if self._last_seq_num + 1 != curr_seq_num:
seg.discontinuity = True
if self._last_seq_num + 1 < curr_seq_num:
self._num_of_segments_lost += curr_seq_num - self._last_seq_num - 1
self._segments_lost.on_next(self._num_of_segments_lost)
assert self._file is not None
self._file.write(seg.dumps(self._last_segment) + '\n')
self._file.flush()
self._last_segment = seg
self._last_seq_num = curr_seq_num
def _make_segment(
self,
segment: m3u8.Segment,
uri: str,
init_section_byterange: str,
segment_byterange: str,
) -> m3u8.Segment:
seg = deepcopy(segment)
if init_section := getattr(seg, 'init_section', None):
init_section.uri = uri
init_section.byterange = init_section_byterange
seg.uri = uri
seg.byterange = segment_byterange
seg.title += '|' + segment.uri
return seg
def _update_duration(self, segment: m3u8.Segment) -> None:
self._duration += Decimal(str(segment.duration))
self._duration_updated.on_next(float(self._duration))
def _dump(self, source: Observable[m3u8.M3U8]) -> Observable[m3u8.Segment]:
def _dump(
self, source: Observable[Union[InitSectionData, SegmentData]]
) -> Observable[Union[InitSectionData, SegmentData]]:
def subscribe(
observer: abc.ObserverBase[m3u8.Segment],
observer: abc.ObserverBase[Union[InitSectionData, SegmentData]],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
last_init_item: Optional[InitSectionData] = None
last_segment: Optional[m3u8.Segment] = None
last_sequence_number: Optional[int] = None
first_playlist_dumped: bool = False
self._close_file()
self._reset()
def on_next(playlist: m3u8.M3U8) -> None:
nonlocal last_sequence_number, last_segment, first_playlist_dumped
if playlist.is_endlist:
logger.debug('Playlist ended')
def on_next(item: Union[InitSectionData, SegmentData]) -> None:
nonlocal last_init_item
try:
if not first_playlist_dumped:
self._close_file()
self._reset()
self._open_file()
assert self._file is not None
playlist.is_endlist = False
self._file.write(self._replace_all_uri(playlist).dumps())
self._file.flush()
for seg in playlist.segments:
observer.on_next(seg)
self._update_duration(seg)
last_segment = seg
last_sequence_number = self._sequence_number_of(seg.uri)
first_playlist_dumped = True
logger.debug('The first playlist has been dumped')
return
assert self._file is not None
for seg in playlist.segments:
num = self._sequence_number_of(seg.uri)
discontinuity = False
if last_sequence_number is not None:
if last_sequence_number >= num:
continue
if last_sequence_number + 1 != num:
logger.warning(
'Segments discontinuous: '
f'last sequence number: {last_sequence_number}, '
f'current sequence number: {num}'
)
discontinuity = True
new_seg = self._replace_uri(seg)
new_seg.discontinuity = discontinuity
new_last_seg = self._replace_uri(last_segment)
self._file.write(new_seg.dumps(new_last_seg) + '\n')
observer.on_next(seg)
self._update_duration(seg)
last_segment = seg
last_sequence_number = num
if isinstance(item, InitSectionData):
self._dump_header(item)
last_init_item = item
else:
assert last_init_item is not None
self._dump_segment(last_init_item, item)
self._update_duration(item.segment)
except Exception as e:
self._close_file()
self._reset()
observer.on_error(e)
else:
observer.on_next(item)
def on_completed() -> None:
self._close_file()
@ -167,10 +192,9 @@ class PlaylistDumper:
def dispose() -> None:
nonlocal disposed
nonlocal last_segment, last_sequence_number
nonlocal last_init_item
disposed = True
last_segment = None
last_sequence_number = None
last_init_item = None
self._close_file()
self._reset()

View File

@ -49,9 +49,14 @@ class PlaylistFetcher(SupportDebugMixin):
def on_next(url: str) -> None:
logger.info(f'Fetching playlist... {url}')
while not disposed:
try:
content = self._fetch_playlist(url)
except Exception as e:
logger.warning(f'Failed to fetch playlist: {repr(e)}')
observer.on_error(e)
else:
if self._debug:
playlist_debug_file.write(content + '\n')
playlist = m3u8.loads(content, uri=url)
@ -59,9 +64,6 @@ class PlaylistFetcher(SupportDebugMixin):
url = self._get_best_quality_url(playlist)
logger.debug('Playlist changed to variant playlist')
on_next(url)
except Exception as e:
logger.warning(f'Failed to fetch playlist: {repr(e)}')
observer.on_error(e)
else:
observer.on_next(playlist)
time.sleep(1)
@ -96,10 +98,15 @@ class PlaylistFetcher(SupportDebugMixin):
)
),
wait=wait_exponential(multiplier=0.1, max=1),
stop=stop_after_delay(10),
stop=stop_after_delay(8),
)
def _fetch_playlist(self, url: str) -> str:
try:
response = self._session.get(url, headers=self._live.headers, timeout=3)
response.raise_for_status()
except Exception as e:
logger.debug(f'Failed to fetch playlist: {repr(e)}')
raise
else:
response.encoding = 'utf-8'
return response.text

View File

@ -1,7 +1,6 @@
from __future__ import annotations
import logging
import os
from typing import Optional
import m3u8
@ -13,6 +12,7 @@ from blrec.core import operators as core_ops
from blrec.utils import operators as utils_ops
from ..exceptions import NoNewSegments
from ..helpler import sequence_number_of
__all__ = ('PlaylistResolver',)
@ -23,6 +23,8 @@ logger = logging.getLogger(__name__)
class PlaylistResolver:
def __init__(self, stream_url_resolver: core_ops.StreamURLResolver) -> None:
self._stream_url_resolver = stream_url_resolver
self._last_media_sequence: int = 0
self._last_sequence_number: Optional[int] = None
def __call__(self, source: Observable[m3u8.M3U8]) -> Observable[m3u8.Segment]:
return self._solve(source).pipe(
@ -30,14 +32,10 @@ class PlaylistResolver:
utils_ops.retry(should_retry=self._should_retry),
)
def _name_of(self, uri: str) -> str:
name, ext = os.path.splitext(uri)
return name
def _sequence_number_of(self, uri: str) -> int:
return int(self._name_of(uri))
def _solve(self, source: Observable[m3u8.M3U8]) -> Observable[m3u8.Segment]:
self._last_media_sequence = 0
self._last_sequence_number = None
def subscribe(
observer: abc.ObserverBase[m3u8.Segment],
scheduler: Optional[abc.SchedulerBase] = None,
@ -46,29 +44,43 @@ class PlaylistResolver:
subscription = SerialDisposable()
attempts: int = 0
last_sequence_number: Optional[int] = None
def on_next(playlist: m3u8.M3U8) -> None:
nonlocal attempts, last_sequence_number
nonlocal attempts
discontinuity = False
if playlist.is_endlist:
logger.debug('Playlist ended')
new_segments = []
for seg in playlist.segments:
num = self._sequence_number_of(seg.uri)
if last_sequence_number is not None:
if last_sequence_number >= num:
continue
if last_sequence_number + 1 != num:
if playlist.media_sequence < self._last_media_sequence:
logger.warning(
'Segments discontinuous: '
f'last sequence number: {last_sequence_number}, '
f'last media sequence: {self._last_media_sequence}, '
f'current media sequence: {playlist.media_sequence}'
)
discontinuity = True
self._last_sequence_number = None
self._last_media_sequence = playlist.media_sequence
new_segments = []
for seg in playlist.segments:
num = sequence_number_of(seg.uri)
if self._last_sequence_number is not None:
if num <= self._last_sequence_number:
continue
if num == self._last_sequence_number + 1:
discontinuity = False
else:
logger.warning(
'Segments discontinuous: '
f'last sequence number: {self._last_sequence_number}, '
f'current sequence number: {num}'
)
seg.discontinuity = True
discontinuity = True
seg.discontinuity = discontinuity
seg.custom_parser_values['playlist'] = playlist
new_segments.append(seg)
last_sequence_number = num
self._last_sequence_number = num
if not new_segments:
attempts += 1
@ -84,9 +96,7 @@ class PlaylistResolver:
def dispose() -> None:
nonlocal disposed
nonlocal last_sequence_number
disposed = True
last_sequence_number = None
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler

View File

@ -7,7 +7,7 @@ from typing import List, Optional, Union
from reactivex import Observable, Subject, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from blrec.utils.ffprobe import StreamProfile, ffprobe
from blrec.utils.ffprobe import StreamProfile, ffprobe_on
from .segment_fetcher import InitSectionData, SegmentData
@ -88,4 +88,4 @@ class Prober:
def on_error(e: Exception) -> None:
logger.warning(f'Failed to probe stream by ffprobe: {repr(e)}')
ffprobe(bytes_io.getvalue()).subscribe(on_next, on_error)
ffprobe_on(bytes_io.getvalue()).subscribe(on_next, on_error)

View File

@ -1,13 +1,17 @@
import io
import logging
import os
from typing import Optional, Tuple, Union
from datetime import datetime, timedelta, timezone
from pathlib import PurePath
from typing import Callable, Optional, Tuple, Union
import attr
from reactivex import Observable, Subject, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from blrec.hls.operators.segment_fetcher import InitSectionData, SegmentData
from blrec.utils.ffprobe import ffprobe
from .playlist_dumper import PlaylistDumper
from ..helpler import sequence_number_of
from .segment_fetcher import InitSectionData, SegmentData
__all__ = ('SegmentDumper',)
@ -15,17 +19,31 @@ logger = logging.getLogger(__name__)
class SegmentDumper:
def __init__(self, playlist_dumper: PlaylistDumper) -> None:
self._playlist_dumper = playlist_dumper
self._out_dir: str = ''
def on_next(args: Tuple[str, int]) -> None:
path, timestamp = args
self._out_dir = os.path.dirname(path)
self._playlist_dumper.file_opened.subscribe(on_next)
def __init__(
self, path_provider: Callable[[Optional[int]], Tuple[str, int]]
) -> None:
self._path_provider = path_provider
self._file_opened: Subject[Tuple[str, int]] = Subject()
self._file_closed: Subject[str] = Subject()
self._reset()
def _reset(self) -> None:
self._path: str = ''
self._file: Optional[io.BufferedWriter] = None
self._filesize: int = 0
self._record_start_time: Optional[int] = None
@property
def path(self) -> str:
return self._path
@property
def filesize(self) -> int:
return self._filesize
@property
def record_start_time(self) -> Optional[int]:
return self._record_start_time
@property
def file_opened(self) -> Observable[Tuple[str, int]]:
@ -40,6 +58,84 @@ class SegmentDumper:
) -> Observable[Union[InitSectionData, SegmentData]]:
return self._dump(source)
def _open_file(self) -> None:
assert self._record_start_time is not None
path, timestamp = self._path_provider(self._record_start_time)
self._path = str(PurePath(path).with_suffix('.m4s'))
self._file = open(self._path, 'wb') # type: ignore
logger.debug(f'Opened file: {self._path}')
self._file_opened.on_next((self._path, timestamp))
def _close_file(self) -> None:
if self._file is not None and not self._file.closed:
self._file.close()
logger.debug(f'Closed file: {self._path}')
self._file_closed.on_next(self._path)
def _write_data(self, item: Union[InitSectionData, SegmentData]) -> Tuple[int, int]:
assert self._file is not None
offset = self._file.tell()
size = self._file.write(item.payload)
assert size == len(item)
return offset, size
def _update_filesize(self, size: int) -> None:
self._filesize += size
def _set_record_start_time(self, item: Union[InitSectionData, SegmentData]) -> None:
seq = sequence_number_of(item.segment.uri)
dt = datetime.utcfromtimestamp(seq)
tz = timezone(timedelta(hours=8))
ts = dt.replace(year=datetime.today().year, tzinfo=tz).timestamp()
self._record_start_time = int(ts)
def _must_split_file(
self, prev_init_item: Optional[InitSectionData], curr_init_item: InitSectionData
) -> bool:
if prev_init_item is None:
curr_profile = ffprobe(curr_init_item.payload)
logger.debug(f'current init section profile: {curr_profile}')
return True
prev_profile = ffprobe(prev_init_item.payload)
logger.debug(f'previous init section profile: {prev_profile}')
curr_profile = ffprobe(curr_init_item.payload)
logger.debug(f'current init section profile: {curr_profile}')
prev_video_profile = prev_profile['streams'][0]
prev_audio_profile = prev_profile['streams'][1]
assert prev_video_profile['codec_type'] == 'video'
assert prev_audio_profile['codec_type'] == 'audio'
curr_video_profile = curr_profile['streams'][0]
curr_audio_profile = curr_profile['streams'][1]
assert curr_video_profile['codec_type'] == 'video'
assert curr_audio_profile['codec_type'] == 'audio'
if (
prev_video_profile['codec_name'] != curr_video_profile['codec_name']
or prev_video_profile['width'] != curr_video_profile['width']
or prev_video_profile['height'] != curr_video_profile['height']
or prev_video_profile['coded_width'] != curr_video_profile['coded_width']
or prev_video_profile['coded_height'] != curr_video_profile['coded_height']
):
logger.warning('Video parameters changed')
return True
if (
prev_audio_profile['codec_name'] != curr_audio_profile['codec_name']
or prev_audio_profile['channels'] != curr_audio_profile['channels']
or prev_audio_profile['sample_rate'] != curr_audio_profile['sample_rate']
or prev_audio_profile['bit_rate'] != curr_audio_profile['bit_rate']
):
logger.warning('Audio parameters changed')
return True
return False
def _need_split_file(self, item: Union[InitSectionData, SegmentData]) -> bool:
return item.segment.custom_parser_values.get('split', False)
def _dump(
self, source: Observable[Union[InitSectionData, SegmentData]]
) -> Observable[Union[InitSectionData, SegmentData]]:
@ -47,31 +143,63 @@ class SegmentDumper:
observer: abc.ObserverBase[Union[InitSectionData, SegmentData]],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
last_init_item: Optional[InitSectionData] = None
def on_next(item: Union[InitSectionData, SegmentData]) -> None:
nonlocal last_init_item
split_file = False
if isinstance(item, InitSectionData):
uri = item.init_section.uri
path = os.path.join(self._out_dir, 'segments', uri)
else:
uri = item.segment.uri
name, ext = os.path.splitext(uri)
path = os.path.join(self._out_dir, 'segments', name[:-3], uri)
os.makedirs(os.path.dirname(path), exist_ok=True)
split_file = self._must_split_file(last_init_item, item)
last_init_item = item
if not split_file:
split_file = self._need_split_file(item)
if split_file:
self._close_file()
self._reset()
self._set_record_start_time(item)
self._open_file()
try:
with open(path, 'wb') as file:
file.write(item.payload)
if split_file and not isinstance(item, InitSectionData):
assert last_init_item is not None
offset, size = self._write_data(last_init_item)
self._update_filesize(size)
observer.on_next(attr.evolve(last_init_item, offset=offset))
offset, size = self._write_data(item)
self._update_filesize(size)
observer.on_next(attr.evolve(item, offset=offset))
except Exception as e:
logger.error(f'Failed to dump segmemt: {repr(e)}')
logger.error(f'Failed to write data: {repr(e)}')
self._close_file()
self._reset()
observer.on_error(e)
def on_completed() -> None:
self._close_file()
self._reset()
observer.on_completed()
def on_error(e: Exception) -> None:
self._close_file()
self._reset()
observer.on_error(e)
else:
observer.on_next(item)
def dispose() -> None:
pass
nonlocal disposed
nonlocal last_init_item
disposed = True
last_init_item = None
self._close_file()
self._reset()
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler
on_next, on_error, on_completed, scheduler=scheduler
)
return CompositeDisposable(subscription, Disposable(dispose))

View File

@ -8,7 +8,6 @@ import attr
import m3u8
import requests
import urllib3
from m3u8.model import InitializationSection
from reactivex import Observable, abc
from reactivex import operators as ops
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
@ -36,8 +35,9 @@ logger = logging.getLogger(__name__)
@attr.s(auto_attribs=True, slots=True, frozen=True)
class InitSectionData:
init_section: InitializationSection
segment: m3u8.Segment
payload: bytes
offset: int = 0
def __len__(self) -> int:
return len(self.payload)
@ -47,6 +47,7 @@ class InitSectionData:
class SegmentData:
segment: m3u8.Segment
payload: bytes
offset: int = 0
def __len__(self) -> int:
return len(self.payload)
@ -116,9 +117,7 @@ class SegmentFetcher:
f'init section url: {url}'
)
data = _data
observer.on_next(
InitSectionData(init_section=seg.init_section, payload=data)
)
observer.on_next(InitSectionData(segment=seg, payload=data))
last_segment = seg
url = seg.absolute_uri
@ -172,8 +171,13 @@ class SegmentFetcher:
stop=stop_after_delay(60),
)
def _fetch_segment(self, url: str) -> bytes:
with self._session.get(url, headers=self._live.headers, timeout=10) as response:
try:
response = self._session.get(url, headers=self._live.headers, timeout=5)
response.raise_for_status()
except Exception as e:
logger.debug(f'Failed to fetch segment {url}: {repr(e)}')
raise
else:
return response.content
def _should_retry(self, exc: Exception) -> bool:
@ -189,3 +193,4 @@ class SegmentFetcher:
'Fetch segments failed continuously, trying to update the stream url.'
)
self._stream_url_resolver.reset()
self._stream_url_resolver.rotate_routes()

View File

@ -1,104 +0,0 @@
from __future__ import annotations
import io
import logging
from typing import Optional
from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from blrec.flv.common import (
is_audio_sequence_header,
is_metadata_tag,
is_video_sequence_header,
)
from blrec.flv.io import FlvReader
from blrec.flv.models import AudioTag, FlvHeader, ScriptTag, VideoTag
from blrec.flv.operators.typing import FLVStream, FLVStreamItem
__all__ = ('SegmentParser',)
logger = logging.getLogger(__name__)
class SegmentParser:
def __init__(self) -> None:
self._backup_timestamp = True
def __call__(self, source: Observable[bytes]) -> FLVStream:
return self._parse(source)
def _parse(self, source: Observable[bytes]) -> FLVStream:
def subscribe(
observer: abc.ObserverBase[FLVStreamItem],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
last_flv_header: Optional[FlvHeader] = None
last_metadata_tag: Optional[ScriptTag] = None
last_audio_sequence_header: Optional[AudioTag] = None
last_video_sequence_header: Optional[VideoTag] = None
def reset() -> None:
nonlocal last_flv_header, last_metadata_tag
nonlocal last_audio_sequence_header, last_video_sequence_header
last_flv_header = None
last_metadata_tag = None
last_audio_sequence_header = None
last_video_sequence_header = None
def on_next(data: bytes) -> None:
nonlocal last_flv_header, last_metadata_tag
nonlocal last_audio_sequence_header, last_video_sequence_header
if b'' == data:
reset()
return
try:
reader = FlvReader(
io.BytesIO(data), backup_timestamp=self._backup_timestamp
)
flv_header = reader.read_header()
if not last_flv_header:
observer.on_next(flv_header)
last_flv_header = flv_header
else:
assert last_flv_header == flv_header
while not disposed:
tag = reader.read_tag()
if is_metadata_tag(tag):
if last_metadata_tag is not None:
continue
last_metadata_tag = tag
elif is_video_sequence_header(tag):
if tag == last_video_sequence_header:
continue
last_video_sequence_header = tag
elif is_audio_sequence_header(tag):
if tag == last_audio_sequence_header:
continue
last_audio_sequence_header = tag
observer.on_next(tag)
except EOFError:
pass
except Exception as e:
observer.on_error(e)
def dispose() -> None:
nonlocal disposed
disposed = True
reset()
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler
)
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe)

View File

@ -1,111 +0,0 @@
from __future__ import annotations
import io
import logging
import os
from typing import Optional, Union
import av
from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from blrec.bili.live import Live
from .segment_fetcher import InitSectionData, SegmentData
__all__ = ('SegmentRemuxer',)
logger = logging.getLogger(__name__)
TRACE_REMUX_SEGMENT = bool(os.environ.get('BLREC_TRACE_REMUX_SEGMENT'))
TRACE_LIBAV = bool(os.environ.get('BLREC_TRACE_LIBAV'))
if TRACE_LIBAV:
logging.getLogger('libav').setLevel(5)
else:
av.logging.set_level(av.logging.FATAL)
class SegmentRemuxer:
def __init__(self, live: Live) -> None:
self._live = live
def __call__(
self, source: Observable[Union[InitSectionData, SegmentData]]
) -> Observable[bytes]:
return self._remux(source)
def _remux(
self, source: Observable[Union[InitSectionData, SegmentData]]
) -> Observable[bytes]:
def subscribe(
observer: abc.ObserverBase[bytes],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
init_section_data: Optional[bytes] = None
def reset() -> None:
nonlocal init_section_data
init_section_data = None
def on_next(data: Union[InitSectionData, SegmentData]) -> None:
nonlocal init_section_data
if isinstance(data, InitSectionData):
init_section_data = data.payload
observer.on_next(b'')
return
if init_section_data is None:
return
try:
remuxed_data = self._remux_segemnt(init_section_data + data.payload)
except av.FFmpegError as e:
logger.warning(f'Failed to remux segment: {repr(e)}', exc_info=e)
else:
observer.on_next(remuxed_data)
def dispose() -> None:
nonlocal disposed
disposed = True
reset()
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler
)
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe)
def _remux_segemnt(self, data: bytes, format: str = 'flv') -> bytes:
in_file = io.BytesIO(data)
out_file = io.BytesIO()
with av.open(in_file) as in_container:
with av.open(out_file, mode='w', format=format) as out_container:
in_video_stream = in_container.streams.video[0]
in_audio_stream = in_container.streams.audio[0]
out_video_stream = out_container.add_stream(template=in_video_stream)
out_audio_stream = out_container.add_stream(template=in_audio_stream)
for packet in in_container.demux():
if TRACE_REMUX_SEGMENT:
logger.debug(repr(packet))
# We need to skip the "flushing" packets that `demux` generates.
if packet.dts is None:
continue
# We need to assign the packet to the new stream.
if packet.stream.type == 'video':
packet.stream = out_video_stream
elif packet.stream.type == 'audio':
packet.stream = out_audio_stream
else:
raise NotImplementedError(packet.stream.type)
out_container.mux(packet)
return out_file.getvalue()

View File

@ -47,7 +47,7 @@ __all__ = (
'PushdeerNotifier',
'PushplusNotifier',
'TelegramNotifier',
'BarkNotifer',
'BarkNotifier',
)

View File

@ -296,7 +296,7 @@ class Bark(MessagingProvider):
"body": content,
"device_key": self.pushkey,
"badge": 1,
"icon": "https://raw.githubusercontent.com/acgnhiki/blrec/master/webapp/src/assets/icons/icon-72x72.png",
"icon": "https://raw.githubusercontent.com/acgnhiki/blrec/master/webapp/src/assets/icons/icon-72x72.png", # noqa
"group": "blrec",
}
async with aiohttp.ClientSession(raise_for_status=True) as session:

View File

@ -4,9 +4,12 @@ from .helpers import (
danmaku_path,
escape_path,
extra_metadata_path,
ffmpeg_metadata_path,
file_exists,
playlist_path,
raw_danmaku_path,
record_metadata_path,
video_path,
)
__all__ = (
@ -15,7 +18,10 @@ __all__ = (
'danmaku_path',
'escape_path',
'extra_metadata_path',
'ffmpeg_metadata_path',
'file_exists',
'playlist_path',
'raw_danmaku_path',
'record_metadata_path',
'video_path',
)

View File

@ -6,6 +6,7 @@ __all__ = (
'cover_path',
'create_file',
'danmaku_path',
'playlist_path',
'escape_path',
'extra_metadata_path',
'ffmpeg_metadata_path',
@ -29,6 +30,14 @@ def danmaku_path(video_path: str) -> str:
return str(PurePath(video_path).with_suffix('.xml'))
def playlist_path(video_path: str) -> str:
return str(PurePath(video_path).with_suffix('.m3u8'))
def video_path(playlist_path: str) -> str:
return str(PurePath(playlist_path).with_suffix('.m4s'))
def cover_path(video_path: str, ext: str = 'jpg') -> str:
return str(PurePath(video_path).with_suffix('.' + ext))

View File

@ -3,6 +3,7 @@ from __future__ import annotations
import asyncio
import json
import logging
import os
from decimal import Decimal
from typing import Iterable, List, Tuple, cast
@ -12,7 +13,7 @@ import m3u8
from blrec.flv.helpers import make_comment_for_joinpoints
from blrec.flv.operators import JoinPoint
from blrec.flv.utils import format_timestamp
from blrec.path.helpers import ffmpeg_metadata_path
from blrec.path.helpers import ffmpeg_metadata_path, playlist_path
from .helpers import get_extra_metadata, get_metadata, get_record_metadata
@ -21,11 +22,13 @@ logger = logging.getLogger(__name__)
async def make_metadata_file(video_path: str) -> str:
path = ffmpeg_metadata_path(video_path)
async with aiofiles.open(path, 'wb') as file:
if video_path.endswith('.flv'):
_, ext = os.path.splitext(video_path)
if ext == '.flv':
content = await _make_metadata_content_for_flv(video_path)
elif video_path.endswith('.m3u8'):
content = await _make_metadata_content_for_m3u8(video_path)
elif ext == '.m4s':
content = await _make_metadata_content_for_m3u8(playlist_path(video_path))
else:
raise NotImplementedError(video_path)
await file.write(content.encode(encoding='utf8'))

View File

@ -3,7 +3,6 @@ import json
import logging
import os
import shutil
from pathlib import PurePath
from typing import Any, Dict, Iterable, Literal
import aiofiles
@ -11,6 +10,7 @@ import aiofiles
from blrec.path.helpers import (
cover_path,
danmaku_path,
playlist_path,
raw_danmaku_path,
record_metadata_path,
)
@ -45,31 +45,21 @@ async def discard_dir(path: str, log_level: Literal['INFO', 'DEBUG'] = 'INFO') -
def files_related(video_path: str) -> Iterable[str]:
for path in [
file_paths = [
danmaku_path(video_path),
raw_danmaku_path(video_path),
cover_path(video_path, ext='jpg'),
cover_path(video_path, ext='png'),
]:
]
if video_path.endswith('.m4s'):
file_paths.append(playlist_path(video_path))
for path in file_paths:
if os.path.isfile(path):
yield path
async def copy_files_related(video_path: str) -> None:
loop = asyncio.get_running_loop()
dirname = os.path.dirname(video_path)
for src_path in files_related(video_path):
root, ext = os.path.splitext(src_path)
dst_path = PurePath(dirname).with_suffix(ext)
try:
await loop.run_in_executor(None, shutil.copy, src_path, dst_path)
except Exception as e:
logger.error(f"Failed to copy '{src_path}' to '{dst_path}': {repr(e)}")
else:
logger.info(f"Copied '{src_path}' to '{dst_path}'")
async def get_metadata(flv_path: str) -> Dict[str, Any]:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _get_metadata, flv_path)

View File

@ -17,16 +17,15 @@ from ..flv.helpers import is_valid_flv_file
from ..flv.metadata_analysis import analyse_metadata
from ..flv.metadata_injection import InjectingProgress, inject_metadata
from ..logging.room_id import aio_task_with_room_id
from ..path import danmaku_path, extra_metadata_path, record_metadata_path
from ..path import (
extra_metadata_path,
ffmpeg_metadata_path,
playlist_path,
record_metadata_path,
)
from ..utils.mixins import AsyncCooperationMixin, AsyncStoppableMixin, SupportDebugMixin
from .ffmpeg_metadata import make_metadata_file
from .helpers import (
copy_files_related,
discard_dir,
discard_file,
files_related,
get_extra_metadata,
)
from .helpers import discard_file, files_related, get_extra_metadata
from .models import DeleteStrategy, PostprocessorStatus
from .remux import RemuxingProgress, RemuxingResult, remux_video
from .typing import Progress
@ -144,31 +143,50 @@ class Postprocessor(
@aio_task_with_room_id
async def _worker(self) -> None:
while True:
await self._postprocess()
async def _postprocess(self) -> None:
self._status = PostprocessorStatus.WAITING
self._postprocessing_path = None
self._postprocessing_progress = None
video_path = await self._queue.get()
self._completed_files.append(video_path)
async with self._worker_semaphore:
logger.debug(f'Postprocessing... {video_path}')
await self._wait_for_metadata_file(video_path)
try:
if video_path.endswith('.flv'):
_, ext = os.path.splitext(video_path)
if ext == '.flv':
result_path = await self._process_flv(video_path)
elif ext == '.m4s':
result_path = await self._process_m4s(video_path)
else:
result_path = video_path
if result_path != video_path:
self._completed_files.append(result_path)
await self._emit('video_postprocessing_completed', self, result_path)
files = [result_path, *files_related(result_path)]
await self._emit('postprocessing_completed', self, files)
except Exception as exc:
submit_exception(exc)
finally:
self._queue.task_done()
async def _process_flv(self, video_path: str) -> str:
if not await self._is_vaild_flv_file(video_path):
logger.warning(f'The flv file may be invalid: {video_path}')
if os.path.getsize(video_path) < 1024**2:
continue
return video_path
if self.remux_to_mp4:
self._status = PostprocessorStatus.REMUXING
(
result_path,
remuxing_result,
) = await self._remux_video_to_mp4(video_path)
result_path, remuxing_result = await self._remux_video_to_mp4(video_path)
if not self._debug:
if self._should_delete_source_files(remuxing_result):
await discard_file(video_path)
@ -177,42 +195,26 @@ class Postprocessor(
result_path = await self._inject_extra_metadata(video_path)
else:
result_path = video_path
if not self._debug:
await discard_file(extra_metadata_path(video_path), 'DEBUG')
elif video_path.endswith('.m3u8'):
if self.remux_to_mp4:
return result_path
async def _process_m4s(self, video_path: str) -> str:
if not self.remux_to_mp4:
return video_path
self._status = PostprocessorStatus.REMUXING
(
result_path,
remuxing_result,
) = await self._remux_video_to_mp4(video_path)
await copy_files_related(video_path)
if result_path != video_path:
self._completed_files.append(danmaku_path(result_path))
with suppress(ValueError):
self._completed_files.remove(
danmaku_path(video_path)
)
if not self._debug:
if self._should_delete_source_files(remuxing_result):
await discard_dir(os.path.dirname(video_path))
else:
result_path = video_path
else:
result_path = video_path
result_path, remuxing_result = await self._remux_video_to_mp4(video_path)
self._completed_files.append(result_path)
await self._emit(
'video_postprocessing_completed', self, result_path
)
if not self._debug and self._should_delete_source_files(remuxing_result):
await discard_file(video_path)
await discard_file(playlist_path(video_path))
await discard_file(record_metadata_path(video_path), 'DEBUG')
await discard_file(ffmpeg_metadata_path(video_path), 'DEBUG')
files = [result_path, *files_related(result_path)]
await self._emit('postprocessing_completed', self, files)
except Exception as exc:
submit_exception(exc)
finally:
self._queue.task_done()
return result_path
async def _inject_extra_metadata(self, path: str) -> str:
logger.info(f"Injecting metadata for '{path}' ...")
@ -240,12 +242,16 @@ class Postprocessor(
return path
async def _remux_video_to_mp4(self, in_path: str) -> Tuple[str, RemuxingResult]:
if in_path.endswith('.flv'):
_, ext = os.path.splitext(in_path)
if ext == '.flv':
out_path = str(PurePath(in_path).with_suffix('.mp4'))
metadata_path = await make_metadata_file(in_path)
elif in_path.endswith('.m3u8'):
out_path = str(PurePath(in_path).parent.with_suffix('.mp4'))
metadata_path = await make_metadata_file(in_path)
elif ext == '.m4s':
_in_path = in_path
in_path = playlist_path(in_path)
out_path = str(PurePath(in_path).with_suffix('.mp4'))
metadata_path = await make_metadata_file(_in_path)
else:
raise NotImplementedError(in_path)
@ -254,7 +260,7 @@ class Postprocessor(
if remux_result.is_failed():
logger.error(f"Failed to remux '{in_path}' to '{out_path}'")
result_path = in_path
result_path = _in_path if ext == 'm4s' else in_path
elif remux_result.is_warned():
logger.warning('Remuxing done, but ran into problems.')
result_path = out_path
@ -266,7 +272,7 @@ class Postprocessor(
logger.debug(f'ffmpeg output:\n{remux_result.output}')
if not self._debug and in_path.endswith('.flv'):
if not self._debug and ext == '.flv':
await discard_file(metadata_path, 'DEBUG')
return result_path, remux_result
@ -349,15 +355,17 @@ class Postprocessor(
return False
async def _wait_for_metadata_file(self, video_path: str) -> None:
loop = asyncio.get_running_loop()
_, ext = os.path.splitext(video_path)
if video_path.endswith('.flv'):
if ext == '.flv':
path = extra_metadata_path(video_path)
elif video_path.endswith('.m3u8'):
elif ext == '.m4s':
path = record_metadata_path(video_path)
else:
return
loop = asyncio.get_running_loop()
for _ in range(10):
if await loop.run_in_executor(None, os.path.isfile, path):
break

View File

@ -10,6 +10,8 @@ from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposab
from reactivex.scheduler.currentthreadscheduler import CurrentThreadScheduler
from tqdm import tqdm
from blrec.path.helpers import video_path
__all__ = 'RemuxingResult', 'remux_video'
@ -63,15 +65,9 @@ def remux_video(
) -> Observable[Union[RemuxingProgress, RemuxingResult]]:
SIZE_PATTERN: Final = re.compile(r'size=\s*(?P<number>\d+)(?P<unit>[a-zA-Z]?B)')
if in_path.endswith('.m3u8'):
total = 0
for root, dirs, files in os.walk(os.path.dirname(in_path)):
for filename in files:
if not (filename.endswith('.m4s') or filename.endswith('.ts')):
continue
total += os.path.getsize(os.path.join(root, filename))
postfix = os.path.join(
os.path.basename(os.path.dirname(in_path)), os.path.basename(in_path)
)
_in_path = video_path(in_path)
total = os.path.getsize(_in_path)
postfix = os.path.basename(_in_path)
else:
total = os.path.getsize(in_path)
postfix = os.path.basename(in_path)

View File

@ -18,13 +18,14 @@ from blrec.postprocess import DeleteStrategy
from blrec.utils.string import camel_case
from .typing import (
BarkMessageType,
EmailMessageType,
MessageType,
PushdeerMessageType,
PushplusMessageType,
RecordingMode,
ServerchanMessageType,
TelegramMessageType,
BarkMessageType,
)
logger = logging.getLogger(__name__)
@ -462,16 +463,16 @@ class NotificationSettings(BaseModel):
class MessageTemplateSettings(BaseModel):
began_message_type: str
began_message_type: MessageType
began_message_title: str
began_message_content: str
ended_message_type: str
ended_message_type: MessageType
ended_message_title: str
ended_message_content: str
space_message_type: str
space_message_type: MessageType
space_message_title: str
space_message_content: str
error_message_type: str
error_message_type: MessageType
error_message_title: str
error_message_content: str

View File

@ -93,14 +93,14 @@ class VideoFileStatus(str, Enum):
INJECTING = 'injecting'
COMPLETED = 'completed'
MISSING = 'missing'
BROKEN = 'broken'
UNKNOWN = 'unknown'
class DanmukuFileStatus(str, Enum):
RECORDING = 'recording'
COMPLETED = 'completed'
MISSING = 'missing'
BROKEN = 'broken'
UNKNOWN = 'unknown'
@attr.s(auto_attribs=True, slots=True, frozen=True)

View File

@ -11,13 +11,14 @@ from blrec.bili.models import RoomInfo, UserInfo
from blrec.bili.typing import QualityNumber, StreamFormat
from blrec.core import Recorder
from blrec.core.cover_downloader import CoverSaveStrategy
from blrec.core.typing import MetaData
from blrec.event.event_submitters import (
LiveEventSubmitter,
PostprocessorEventSubmitter,
RecorderEventSubmitter,
)
from blrec.flv.metadata_injection import InjectingProgress
from blrec.flv.operators import MetaData, StreamProfile
from blrec.flv.operators import StreamProfile
from blrec.logging.room_id import aio_task_with_room_id
from blrec.postprocess import DeleteStrategy, Postprocessor, PostprocessorStatus
from blrec.postprocess.remux import RemuxingProgress
@ -134,9 +135,6 @@ class RecordTask:
size = os.path.getsize(path)
exists = True
except FileNotFoundError:
if path.endswith('.m3u8'):
mp4_path = str(PurePath(path).parent.with_suffix('.mp4'))
else:
mp4_path = str(PurePath(path).with_suffix('.mp4'))
try:
size = os.path.getsize(mp4_path)
@ -165,7 +163,7 @@ class RecordTask:
status = VideoFileStatus.INJECTING
else:
# disabling recorder by force or stoping task by force
status = VideoFileStatus.BROKEN
status = VideoFileStatus.UNKNOWN
yield VideoFileDetail(path=path, size=size, status=status)
@ -196,7 +194,7 @@ class RecordTask:
status = DanmukuFileStatus.RECORDING
else:
# disabling recorder by force or stoping task by force
status = DanmukuFileStatus.BROKEN
status = DanmukuFileStatus.UNKNOWN
yield DanmakuFileDetail(path=path, size=size, status=status)

View File

@ -10,8 +10,9 @@ from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_expo
from blrec.utils.libc import malloc_trim
from ..bili.exceptions import ApiRequestError
from ..core.typing import MetaData
from ..exception import NotFoundError, submit_exception
from ..flv.operators import MetaData, StreamProfile
from ..flv.operators import StreamProfile
from .models import DanmakuFileDetail, TaskData, TaskParam, VideoFileDetail
from .task import RecordTask

View File

@ -2,24 +2,91 @@ from __future__ import annotations
import json
from subprocess import PIPE, Popen
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Literal, Optional, TypedDict, Union
from reactivex import Observable, abc
from reactivex.scheduler import CurrentThreadScheduler
__all__ = ('ffprobe', 'StreamProfile')
StreamProfile = Dict[str, Any]
__all__ = (
'ffprobe',
'ffprobe_on',
'StreamProfile',
'FormatProfile',
'VideoProfile',
'AudioProfile',
)
def ffprobe(data: bytes) -> Observable[StreamProfile]:
def subscribe(
observer: abc.ObserverBase[StreamProfile],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
_scheduler = scheduler or CurrentThreadScheduler()
class VideoProfile(TypedDict, total=False):
index: int
codec_name: str
codec_long_name: str
codec_type: Literal['video']
codec_tag_string: str
codec_tag: str
width: int
height: int
coded_width: int
coded_height: int
closed_captions: int
film_grain: int
has_b_frames: int
level: int
refs: int
is_avc: str
nal_length_size: str
id: str
r_frame_rate: str
avg_frame_rate: str
time_base: str
duration_ts: int
duration: str
extradata_size: int
disposition: Dict[str, int]
tags: Dict[str, Any]
def action(scheduler: abc.SchedulerBase, state: Optional[Any] = None) -> None:
class AudioProfile(TypedDict, total=False):
index: int
codec_name: str
codec_long_name: str
codec_type: Literal['audio']
codec_tag_string: str
codec_tag: str
sample_fmt: str
sample_rate: str
channels: int
channel_layout: str
bits_per_sample: int
id: str
r_frame_rate: str
avg_frame_rate: str
time_base: str
duration_ts: int
duration: str
bit_rate: str
extradata_size: int
disposition: Dict[str, int]
tags: Dict[str, Any]
class FormatProfile(TypedDict, total=False):
filename: str
nb_streams: int
nb_programs: int
format_name: str
format_long_name: str
size: str
probe_score: int
tags: Dict[str, Any]
class StreamProfile(TypedDict, total=False):
streams: List[Union[VideoProfile, AudioProfile]]
format: FormatProfile
def ffprobe(data: bytes) -> StreamProfile:
args = [
'ffprobe',
'-show_streams',
@ -31,13 +98,29 @@ def ffprobe(data: bytes) -> Observable[StreamProfile]:
with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as process:
try:
stdout, stderr = process.communicate(data, timeout=10)
except Exception as e:
stdout, _stderr = process.communicate(data, timeout=10)
profile = json.loads(stdout)
except Exception:
process.kill()
process.wait()
raise
else:
return profile
def ffprobe_on(data: bytes) -> Observable[StreamProfile]:
def subscribe(
observer: abc.ObserverBase[StreamProfile],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
_scheduler = scheduler or CurrentThreadScheduler()
def action(scheduler: abc.SchedulerBase, state: Optional[Any] = None) -> None:
try:
profile = ffprobe(data)
except Exception as e:
observer.on_error(e)
else:
profile = json.loads(stdout)
observer.on_next(profile)
observer.on_completed()

View File

@ -2,6 +2,7 @@ import asyncio
import os
import threading
from abc import ABC, abstractmethod
from concurrent.futures import Future
from typing import Awaitable, TypeVar, final
from blrec.logging.room_id import aio_task_with_room_id
@ -130,10 +131,13 @@ class AsyncCooperationMixin(ABC):
# workaround for `RuntimeError: no running event loop`
submit_exception(exc)
self._run_coroutine(wrapper())
self._call_coroutine(wrapper())
def _run_coroutine(self, coro: Awaitable[_T]) -> _T:
future = asyncio.run_coroutine_threadsafe(self._with_room_id(coro), self._loop)
def _run_coroutine(self, coro: Awaitable[_T]) -> Future[_T]:
return asyncio.run_coroutine_threadsafe(coro, self._loop)
def _call_coroutine(self, coro: Awaitable[_T]) -> _T:
future = self._run_coroutine(coro)
return future.result()
@aio_task_with_room_id

View File

@ -1,41 +1,32 @@
from typing import Any, Dict, List
import attr
from fastapi import APIRouter, BackgroundTasks, Body, Depends, status
from pydantic import PositiveInt, conint
from fastapi import (
APIRouter,
status,
Body,
Depends,
BackgroundTasks,
)
from ..dependencies import task_data_filter, TaskDataFilter
from ..schemas import ResponseMessage
from ..responses import (
not_found_responses,
forbidden_responses,
confict_responses,
accepted_responses,
created_responses,
)
from ...exception import NotFoundError, ForbiddenError
from ...application import Application
from ...exception import ForbiddenError, NotFoundError
from ...utils.ffprobe import StreamProfile
from ..dependencies import TaskDataFilter, task_data_filter
from ..responses import (
accepted_responses,
confict_responses,
created_responses,
forbidden_responses,
not_found_responses,
)
from ..schemas import ResponseMessage
app: Application = None # type: ignore # bypass flake8 F821
router = APIRouter(
prefix='/api/v1/tasks',
tags=['tasks'],
)
router = APIRouter(prefix='/api/v1/tasks', tags=['tasks'])
@router.get('/data')
async def get_task_data(
page: PositiveInt = 1,
size: conint(ge=10, le=100) = 100, # type: ignore
filter: TaskDataFilter = Depends(task_data_filter)
filter: TaskDataFilter = Depends(task_data_filter),
) -> List[Dict[str, Any]]:
start = (page - 1) * size
stop = page * size
@ -51,26 +42,17 @@ async def get_task_data(
return task_data
@router.get(
'/{room_id}/data',
responses={**not_found_responses},
)
@router.get('/{room_id}/data', responses={**not_found_responses})
async def get_one_task_data(room_id: int) -> Dict[str, Any]:
return attr.asdict(app.get_task_data(room_id))
@router.get(
'/{room_id}/param',
responses={**not_found_responses},
)
@router.get('/{room_id}/param', responses={**not_found_responses})
async def get_task_param(room_id: int) -> Dict[str, Any]:
return attr.asdict(app.get_task_param(room_id))
@router.get(
'/{room_id}/metadata',
responses={**not_found_responses},
)
@router.get('/{room_id}/metadata', responses={**not_found_responses})
async def get_task_metadata(room_id: int) -> Dict[str, Any]:
metadata = app.get_task_metadata(room_id)
if not metadata:
@ -78,44 +60,29 @@ async def get_task_metadata(room_id: int) -> Dict[str, Any]:
return attr.asdict(metadata)
@router.get(
'/{room_id}/profile',
responses={**not_found_responses},
)
async def get_task_stream_profile(room_id: int) -> Dict[str, Any]:
@router.get('/{room_id}/profile', responses={**not_found_responses})
async def get_task_stream_profile(room_id: int) -> StreamProfile:
return app.get_task_stream_profile(room_id)
@router.get(
'/{room_id}/videos',
responses={**not_found_responses},
)
@router.get('/{room_id}/videos', responses={**not_found_responses})
async def get_task_video_file_details(room_id: int) -> List[Dict[str, Any]]:
return [attr.asdict(d) for d in app.get_task_video_file_details(room_id)]
@router.get(
'/{room_id}/danmakus',
responses={**not_found_responses},
)
@router.get('/{room_id}/danmakus', responses={**not_found_responses})
async def get_task_danmaku_file_details(room_id: int) -> List[Dict[str, Any]]:
return [attr.asdict(d) for d in app.get_task_danmaku_file_details(room_id)]
@router.post(
'/info',
response_model=ResponseMessage,
responses={**not_found_responses},
)
@router.post('/info', response_model=ResponseMessage, responses={**not_found_responses})
async def update_all_task_infos() -> ResponseMessage:
await app.update_all_task_infos()
return ResponseMessage(message='All task infos have been updated')
@router.post(
'/{room_id}/info',
response_model=ResponseMessage,
responses={**not_found_responses},
'/{room_id}/info', response_model=ResponseMessage, responses={**not_found_responses}
)
async def update_task_info(room_id: int) -> ResponseMessage:
await app.update_task_info(room_id)
@ -123,20 +90,14 @@ async def update_task_info(room_id: int) -> ResponseMessage:
@router.get(
'/{room_id}/cut',
response_model=ResponseMessage,
responses={**not_found_responses},
'/{room_id}/cut', response_model=ResponseMessage, responses={**not_found_responses}
)
async def can_cut_stream(room_id: int) -> ResponseMessage:
if app.can_cut_stream(room_id):
return ResponseMessage(
message='The stream can been cut',
data={'result': True},
)
return ResponseMessage(message='The stream can been cut', data={'result': True})
else:
return ResponseMessage(
message='The stream cannot been cut',
data={'result': False},
message='The stream cannot been cut', data={'result': False}
)
@ -153,9 +114,7 @@ async def cut_stream(room_id: int) -> ResponseMessage:
@router.post(
'/start',
response_model=ResponseMessage,
responses={**not_found_responses},
'/start', response_model=ResponseMessage, responses={**not_found_responses}
)
async def start_all_tasks() -> ResponseMessage:
await app.start_all_tasks()
@ -214,10 +173,7 @@ async def stop_task(
return ResponseMessage(message='The task has been stopped')
@router.post(
'/recorder/enable',
response_model=ResponseMessage,
)
@router.post('/recorder/enable', response_model=ResponseMessage)
async def enable_all_task_recorders() -> ResponseMessage:
await app.enable_all_task_recorders()
return ResponseMessage(message='All task recorders have been enabled')
@ -246,9 +202,7 @@ async def disable_all_task_recorders(
) -> ResponseMessage:
if background:
background_tasks.add_task(app.disable_all_task_recorders, force)
return ResponseMessage(
message='Disabling all task recorders on the background'
)
return ResponseMessage(message='Disabling all task recorders on the background')
await app.disable_all_task_recorders(force)
return ResponseMessage(message='All task recorders have been disabled')
@ -271,9 +225,7 @@ async def disable_task_recorder(
if background:
background_tasks.add_task(app.disable_task_recorder, room_id, force)
return ResponseMessage(
message='Disabling the task recorder on the background'
)
return ResponseMessage(message='Disabling the task recorder on the background')
await app.disable_task_recorder(room_id, force)
return ResponseMessage(message='The task recorder has been disabled')
@ -283,9 +235,7 @@ async def disable_task_recorder(
'/{room_id}',
response_model=ResponseMessage,
status_code=status.HTTP_201_CREATED,
responses={
**created_responses, **confict_responses, **forbidden_responses
},
responses={**created_responses, **confict_responses, **forbidden_responses},
)
async def add_task(room_id: int) -> ResponseMessage:
"""Add a task for a room.
@ -298,24 +248,18 @@ async def add_task(room_id: int) -> ResponseMessage:
"""
real_room_id = await app.add_task(room_id)
return ResponseMessage(
message='Successfully Added Task',
data={'room_id': real_room_id},
message='Successfully Added Task', data={'room_id': real_room_id}
)
@router.delete(
'',
response_model=ResponseMessage,
)
@router.delete('', response_model=ResponseMessage)
async def remove_all_tasks() -> ResponseMessage:
await app.remove_all_tasks()
return ResponseMessage(message='All tasks have been removed')
@router.delete(
'/{room_id}',
response_model=ResponseMessage,
responses={**not_found_responses},
'/{room_id}', response_model=ResponseMessage, responses={**not_found_responses}
)
async def remove_task(room_id: int) -> ResponseMessage:
await app.remove_task(room_id)

View File

@ -74,38 +74,6 @@
</nz-select>
</nz-form-control>
</nz-form-item>
<nz-form-item
class="setting-item"
*ngIf="streamFormatControl.value === 'fmp4'"
>
<nz-form-label
class="setting-label"
nzNoColon
[nzTooltipTitle]="recordingModeTip"
>录制模式</nz-form-label
>
<ng-template #recordingModeTip>
<p>
标准模式: 对下载的流数据进行解析处理,支持自动分割文件等功能。
<br />
原始模式: 直接下载流数据,没有进行解析处理,不支持自动分割文件等功能。
<br />
</p>
</ng-template>
<nz-form-control
class="setting-control select"
[nzWarningTip]="syncFailedWarningTip"
[nzValidateStatus]="
syncStatus.recordingMode ? recordingModeControl : 'warning'
"
>
<nz-select
formControlName="recordingMode"
[nzOptions]="recordingModeOptions"
>
</nz-select>
</nz-form-control>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-label
class="setting-label"
@ -224,11 +192,7 @@
</nz-form-item>
<nz-form-item
class="setting-item"
*ngIf="
streamFormatControl.value === 'flv' ||
(streamFormatControl.value === 'fmp4' &&
recordingModeControl.value === 'standard')
"
*ngIf="streamFormatControl.value === 'flv'"
>
<nz-form-label
class="setting-label"

View File

@ -214,7 +214,6 @@ export type PushplusMessageType =
export type TelegramMessageType = MarkdownMessageType | HtmlMessageType;
export type BarkMessageType = TextMessageType;
export interface MessageTemplateSettings {
beganMessageType: string;
beganMessageTitle: string;

View File

@ -11,7 +11,7 @@ const STATUS_MAPPING = new Map([
[VideoFileStatus.REMUXING, '处理中'],
[VideoFileStatus.COMPLETED, '已完成'],
[VideoFileStatus.MISSING, '不存在'],
[VideoFileStatus.BROKEN, '录制中断'],
[VideoFileStatus.UNKNOWN, '???'],
]);
@Pipe({

View File

@ -296,14 +296,14 @@ export enum VideoFileStatus {
INJECTING = 'injecting',
COMPLETED = 'completed',
MISSING = 'missing',
BROKEN = 'broken',
UNKNOWN = 'unknown',
}
export enum DanmakuFileStatus {
RECORDING = 'recording',
COMPLETED = 'completed',
MISSING = 'missing',
BROKEN = 'broken',
UNKNOWN = 'unknown',
}
export interface VideoFileDetail {

View File

@ -52,17 +52,7 @@
>覆盖全局设置</label
>
</nz-form-item>
<nz-form-item
class="setting-item filesize-limit"
*ngIf="
(options.recorder.streamFormat || model.recorder.streamFormat) ===
'flv' ||
((options.recorder.streamFormat || model.recorder.streamFormat) ===
'fmp4' &&
(options.recorder.recordingMode ||
model.recorder.recordingMode) === 'standard')
"
>
<nz-form-item class="setting-item filesize-limit">
<nz-form-label
class="setting-label"
nzNoColon
@ -97,17 +87,7 @@
>覆盖全局设置</label
>
</nz-form-item>
<nz-form-item
class="setting-item duration-limit"
*ngIf="
(options.recorder.streamFormat || model.recorder.streamFormat) ===
'flv' ||
((options.recorder.streamFormat || model.recorder.streamFormat) ===
'fmp4' &&
(options.recorder.recordingMode ||
model.recorder.recordingMode) === 'standard')
"
>
<nz-form-item class="setting-item duration-limit">
<nz-form-label
class="setting-label"
nzNoColon
@ -237,48 +217,6 @@
>覆盖全局设置</label
>
</nz-form-item>
<nz-form-item
class="setting-item"
*ngIf="
(options.recorder.streamFormat || model.recorder.streamFormat) ===
'fmp4'
"
>
<nz-form-label
class="setting-label"
nzNoColon
[nzTooltipTitle]="recordingModeTip"
>录制模式</nz-form-label
>
<ng-template #recordingModeTip>
<p>
标准模式: 对下载的流数据进行解析处理,支持自动分割文件等功能。
<br />
原始模式:
直接下载流数据,没有进行解析处理,不支持自动分割文件等功能。
<br />
</p>
</ng-template>
<nz-form-control class="setting-control select">
<nz-select
name="recordingMode"
[(ngModel)]="model.recorder.recordingMode"
[disabled]="options.recorder.recordingMode === null"
[nzOptions]="recordingModeOptions"
>
</nz-select>
</nz-form-control>
<label
nz-checkbox
[nzChecked]="options.recorder.recordingMode !== null"
(nzCheckedChange)="
options.recorder.recordingMode = $event
? globalSettings.recorder.recordingMode
: null
"
>覆盖全局设置</label
>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-label
class="setting-label"
@ -440,11 +378,7 @@
class="setting-item"
*ngIf="
(options.recorder.streamFormat || model.recorder.streamFormat) ===
'flv' ||
((options.recorder.streamFormat || model.recorder.streamFormat) ===
'fmp4' &&
(options.recorder.recordingMode ||
model.recorder.recordingMode) === 'standard')
'flv'
"
>
<nz-form-label
@ -644,17 +578,7 @@
<div ngModelGroup="postprocessing" class="form-group postprocessing">
<h2>文件处理</h2>
<nz-form-item
class="setting-item"
*ngIf="
(options.recorder.streamFormat || model.recorder.streamFormat) ===
'flv' ||
((options.recorder.streamFormat || model.recorder.streamFormat) ===
'fmp4' &&
(options.recorder.recordingMode ||
model.recorder.recordingMode) === 'standard')
"
>
<nz-form-item class="setting-item">
<nz-form-label
class="setting-label"
nzNoColon