feat: record raw data of HLS

This commit is contained in:
acgnhik 2022-07-31 12:31:22 +08:00
parent 51f4ff61d0
commit afa0b861fa
71 changed files with 1432 additions and 360 deletions

14
FAQ.md
View File

@ -1,5 +1,19 @@
# 常见问题
## HLS 标准录制模式和原始录制模式有什么区别?
| | 标准录制模式 | 原始录制模式 |
| --- | --- | --- |
| ffmpeg | 需要 | 不需要 |
| 资源占用 | 较多 | 较少 |
| 稳定性 | 比较差 | 比较好 |
| 录播文件 | 一个 flv 文件 | 很多片段文件 |
| 录播信息 | 包含在 flv 文件里 | 单独保存为一个文件 (index.meta.json) |
| 播放器支持 | 几乎全部播放器都支持 flv | 支持 m3u8 的播放器很少 (VLC、dandanplay) |
| 自动分割文件 | 支持 | 不支持 |
| 手动分割文件 | 支持 | 不支持 |
| 自动转 mp4 | 支持 | 支持 |
## 如何终止程序?
`ctrl + c`

View File

@ -8,12 +8,13 @@ import aiofiles
import aiohttp
from tenacity import retry, stop_after_attempt, wait_fixed
from ..bili.live import Live
from ..exception import exception_callback
from ..logging.room_id import aio_task_with_room_id
from ..path import cover_path
from ..utils.hash import sha1sum
from ..utils.mixins import SwitchableMixin
from blrec.bili.live import Live
from blrec.exception import exception_callback
from blrec.logging.room_id import aio_task_with_room_id
from blrec.path import cover_path
from blrec.utils.hash import sha1sum
from blrec.utils.mixins import SwitchableMixin
from .stream_recorder import StreamRecorder, StreamRecorderEventListener
__all__ = ('CoverDownloader',)

View File

@ -7,22 +7,23 @@ from typing import Iterator, List, Optional
from tenacity import AsyncRetrying, retry_if_not_exception_type, stop_after_attempt
from .. import __github__, __prog__, __version__
from ..bili.live import Live
from ..core.models import GiftSendMsg, GuardBuyMsg, SuperChatMsg
from ..danmaku.io import DanmakuWriter
from ..danmaku.models import (
from blrec import __github__, __prog__, __version__
from blrec.bili.live import Live
from blrec.core.models import GiftSendMsg, GuardBuyMsg, SuperChatMsg
from blrec.danmaku.io import DanmakuWriter
from blrec.danmaku.models import (
Danmu,
GiftSendRecord,
GuardBuyRecord,
Metadata,
SuperChatRecord,
)
from ..event.event_emitter import EventEmitter, EventListener
from ..exception import exception_callback, submit_exception
from ..logging.room_id import aio_task_with_room_id
from ..path import danmaku_path
from ..utils.mixins import SwitchableMixin
from blrec.event.event_emitter import EventEmitter, EventListener
from blrec.exception import exception_callback, submit_exception
from blrec.logging.room_id import aio_task_with_room_id
from blrec.path import danmaku_path
from blrec.utils.mixins import SwitchableMixin
from .danmaku_receiver import DanmakuReceiver, DanmuMsg
from .statistics import Statistics
from .stream_recorder import StreamRecorder, StreamRecorderEventListener

View File

@ -2,9 +2,10 @@ import logging
from asyncio import Queue, QueueFull
from typing import Final
from ..bili.danmaku_client import DanmakuClient, DanmakuCommand, DanmakuListener
from ..bili.typing import Danmaku
from ..utils.mixins import StoppableMixin
from blrec.bili.danmaku_client import DanmakuClient, DanmakuCommand, DanmakuListener
from blrec.bili.typing import Danmaku
from blrec.utils.mixins import StoppableMixin
from .models import DanmuMsg, GiftSendMsg, GuardBuyMsg, SuperChatMsg
from .typing import DanmakuMsg

View File

@ -3,10 +3,13 @@ from typing import Optional
from reactivex.scheduler import NewThreadScheduler
from ..bili.live import Live
from ..bili.typing import QualityNumber
from ..flv import operators as flv_ops
from ..utils.mixins import SupportDebugMixin
from blrec.bili.live import Live
from blrec.bili.typing import QualityNumber
from blrec.flv import operators as flv_ops
from blrec.flv.metadata_dumper import MetadataDumper
from blrec.utils.mixins import SupportDebugMixin
from . import operators as core_ops
from .stream_recorder_impl import StreamRecorderImpl
__all__ = ('FLVStreamRecorderImpl',)
@ -34,6 +37,7 @@ class FLVStreamRecorderImpl(StreamRecorderImpl, SupportDebugMixin):
out_dir=out_dir,
path_template=path_template,
stream_format='flv',
recording_mode='standard',
quality_number=quality_number,
buffer_size=buffer_size,
read_timeout=read_timeout,
@ -43,6 +47,89 @@ class FLVStreamRecorderImpl(StreamRecorderImpl, SupportDebugMixin):
)
self._init_for_debug(live.room_id)
self._stream_fetcher = core_ops.StreamFetcher(
live, self._session, read_timeout=read_timeout
)
self._prober = flv_ops.Prober()
self._dl_statistics = core_ops.StreamStatistics()
self._stream_parser = core_ops.StreamParser(self._stream_param_holder)
self._analyser = flv_ops.Analyser()
self._injector = flv_ops.Injector(self._metadata_provider)
self._join_point_extractor = flv_ops.JoinPointExtractor()
self._limiter = flv_ops.Limiter(filesize_limit, duration_limit)
self._cutter = flv_ops.Cutter()
self._dumper = flv_ops.Dumper(self._path_provider, buffer_size)
self._metadata_dumper = MetadataDumper(
self._dumper, self._analyser, self._join_point_extractor
)
self._recording_monitor = core_ops.RecordingMonitor(
live, lambda: self._analyser.duration
)
self._prober.profiles.subscribe(self._on_profile_updated)
self._dumper.file_opened.subscribe(self._on_video_file_opened)
self._dumper.file_closed.subscribe(self._on_video_file_closed)
self._recording_monitor.interrupted.subscribe(self._on_recording_interrupted)
self._recording_monitor.recovered.subscribe(self._on_recording_recovered)
@property
def read_timeout(self) -> int:
return self._stream_fetcher.read_timeout
@read_timeout.setter
def read_timeout(self, value: int) -> None:
self._stream_fetcher.read_timeout = value
@property
def buffer_size(self) -> int:
return self._dumper.buffer_size
@buffer_size.setter
def buffer_size(self, value: int) -> None:
self._dumper.buffer_size = value
@property
def recording_path(self) -> Optional[str]:
return self._dumper.path
@property
def filesize_limit(self) -> int:
return self._limiter.filesize_limit
@filesize_limit.setter
def filesize_limit(self, value: int) -> None:
self._limiter.filesize_limit = value
@property
def duration_limit(self) -> int:
return self._limiter.duration_limit
@duration_limit.setter
def duration_limit(self, value: int) -> None:
self._limiter.duration_limit = value
@property
def metadata(self) -> Optional[flv_ops.MetaData]:
try:
return self._analyser.make_metadata()
except Exception:
return None
def can_cut_stream(self) -> bool:
return self._cutter.can_cut_stream()
def cut_stream(self) -> bool:
return self._cutter.cut_stream()
def _on_start(self) -> None:
self._metadata_dumper.enable()
def _on_stop(self) -> None:
self._metadata_dumper.disable()
def _run(self) -> None:
self._subscription = (
self._stream_param_holder.get_stream_params() # type: ignore

View File

@ -0,0 +1,105 @@
import logging
from typing import Optional
from reactivex import operators as ops
from reactivex.scheduler import NewThreadScheduler
from blrec.bili.live import Live
from blrec.bili.typing import QualityNumber
from blrec.hls import operators as hls_ops
from blrec.hls.metadata_dumper import MetadataDumper
from . import operators as core_ops
from .stream_recorder_impl import StreamRecorderImpl
__all__ = ('HLSRawStreamRecorderImpl',)
logger = logging.getLogger(__name__)
class HLSRawStreamRecorderImpl(StreamRecorderImpl):
def __init__(
self,
live: Live,
out_dir: str,
path_template: str,
*,
quality_number: QualityNumber = 10000,
buffer_size: Optional[int] = None,
read_timeout: Optional[int] = None,
disconnection_timeout: Optional[int] = None,
filesize_limit: int = 0,
duration_limit: int = 0,
) -> None:
super().__init__(
live=live,
out_dir=out_dir,
path_template=path_template,
stream_format='fmp4',
recording_mode='raw',
quality_number=quality_number,
buffer_size=buffer_size,
read_timeout=read_timeout,
disconnection_timeout=disconnection_timeout,
filesize_limit=filesize_limit,
duration_limit=duration_limit,
)
self._playlist_fetcher = hls_ops.PlaylistFetcher(self._live, self._session)
self._playlist_dumper = hls_ops.PlaylistDumper(self._path_provider)
self._segment_fetcher = hls_ops.SegmentFetcher(self._live, self._session)
self._segment_dumper = hls_ops.SegmentDumper(self._playlist_dumper)
self._ff_metadata_dumper = MetadataDumper(
self._playlist_dumper, self._metadata_provider
)
self._prober = hls_ops.Prober()
self._dl_statistics = core_ops.SizedStatistics()
self._recording_monitor = core_ops.RecordingMonitor(
live, lambda: self._playlist_dumper.duration
)
self._prober.profiles.subscribe(self._on_profile_updated)
self._playlist_dumper.file_opened.subscribe(self._on_video_file_opened)
self._playlist_dumper.file_closed.subscribe(self._on_video_file_closed)
self._recording_monitor.interrupted.subscribe(self._on_recording_interrupted)
self._recording_monitor.recovered.subscribe(self._on_recording_recovered)
@property
def recording_path(self) -> Optional[str]:
return self._playlist_dumper.path
def _on_start(self) -> None:
self._ff_metadata_dumper.enable()
def _on_stop(self) -> None:
self._ff_metadata_dumper.disable()
def _run(self) -> None:
self._subscription = (
self._stream_param_holder.get_stream_params() # type: ignore
.pipe(
self._stream_url_resolver,
ops.subscribe_on(
NewThreadScheduler(self._thread_factory('PlaylistDownloader'))
),
self._playlist_fetcher,
self._recording_monitor,
self._connection_error_handler,
self._request_exception_handler,
self._playlist_dumper,
ops.observe_on(
NewThreadScheduler(self._thread_factory('SegmentDownloader'))
),
self._segment_fetcher,
self._dl_statistics,
self._prober,
self._segment_dumper,
self._rec_statistics,
self._progress_bar,
self._exception_handler,
)
.subscribe(on_completed=self._on_completed)
)

View File

@ -4,9 +4,12 @@ from typing import Optional
from reactivex import operators as ops
from reactivex.scheduler import NewThreadScheduler
from ..bili.live import Live
from ..bili.typing import QualityNumber
from ..flv import operators as flv_ops
from blrec.bili.live import Live
from blrec.bili.typing import QualityNumber
from blrec.flv import operators as flv_ops
from blrec.flv.metadata_dumper import MetadataDumper
from blrec.hls import operators as hls_ops
from . import operators as core_ops
from .stream_recorder_impl import StreamRecorderImpl
@ -35,6 +38,7 @@ class HLSStreamRecorderImpl(StreamRecorderImpl):
out_dir=out_dir,
path_template=path_template,
stream_format='fmp4',
recording_mode='standard',
quality_number=quality_number,
buffer_size=buffer_size,
read_timeout=read_timeout,
@ -43,10 +47,83 @@ class HLSStreamRecorderImpl(StreamRecorderImpl):
duration_limit=duration_limit,
)
self._playlist_fetcher = core_ops.PlaylistFetcher(self._live, self._session)
self._playlist_resolver = core_ops.PlaylistResolver()
self._segment_fetcher = core_ops.SegmentFetcher(self._live, self._session)
self._segment_remuxer = core_ops.SegmentRemuxer(live)
self._playlist_fetcher = hls_ops.PlaylistFetcher(self._live, self._session)
self._playlist_resolver = hls_ops.PlaylistResolver()
self._segment_fetcher = hls_ops.SegmentFetcher(self._live, self._session)
self._segment_remuxer = hls_ops.SegmentRemuxer(live)
self._prober = hls_ops.Prober()
self._dl_statistics = core_ops.SizedStatistics()
self._stream_parser = core_ops.StreamParser(
self._stream_param_holder, ignore_eof=True, ignore_value_error=True
)
self._analyser = flv_ops.Analyser()
self._injector = flv_ops.Injector(self._metadata_provider)
self._join_point_extractor = flv_ops.JoinPointExtractor()
self._limiter = flv_ops.Limiter(filesize_limit, duration_limit)
self._cutter = flv_ops.Cutter()
self._dumper = flv_ops.Dumper(self._path_provider, buffer_size)
self._metadata_dumper = MetadataDumper(
self._dumper, self._analyser, self._join_point_extractor
)
self._recording_monitor = core_ops.RecordingMonitor(
live, lambda: self._analyser.duration
)
self._prober.profiles.subscribe(self._on_profile_updated)
self._dumper.file_opened.subscribe(self._on_video_file_opened)
self._dumper.file_closed.subscribe(self._on_video_file_closed)
self._recording_monitor.interrupted.subscribe(self._on_recording_interrupted)
self._recording_monitor.recovered.subscribe(self._on_recording_recovered)
@property
def buffer_size(self) -> int:
return self._dumper.buffer_size
@buffer_size.setter
def buffer_size(self, value: int) -> None:
self._dumper.buffer_size = value
@property
def recording_path(self) -> Optional[str]:
return self._dumper.path
@property
def filesize_limit(self) -> int:
return self._limiter.filesize_limit
@filesize_limit.setter
def filesize_limit(self, value: int) -> None:
self._limiter.filesize_limit = value
@property
def duration_limit(self) -> int:
return self._limiter.duration_limit
@duration_limit.setter
def duration_limit(self, value: int) -> None:
self._limiter.duration_limit = value
@property
def metadata(self) -> Optional[flv_ops.MetaData]:
try:
return self._analyser.make_metadata()
except Exception:
return None
def can_cut_stream(self) -> bool:
return self._cutter.can_cut_stream()
def cut_stream(self) -> bool:
return self._cutter.cut_stream()
def _on_start(self) -> None:
self._metadata_dumper.enable()
def _on_stop(self) -> None:
self._metadata_dumper.disable()
def _run(self) -> None:
self._subscription = (

View File

@ -4,9 +4,9 @@ from collections import OrderedDict
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, Dict, Union
from .. import __github__, __prog__, __version__
from ..bili.helpers import get_quality_name
from ..bili.live import Live
from blrec import __github__, __prog__, __version__
from blrec.bili.helpers import get_quality_name
from blrec.bili.live import Live
if TYPE_CHECKING:
from .stream_recorder_impl import StreamRecorderImpl
@ -54,6 +54,13 @@ class MetadataProvider:
', bluray' if '_bluray' in self._stream_recorder.stream_url else '',
)
if self._stream_recorder.recording_mode == 'standard':
recording_mode_desc = '标准'
elif self._stream_recorder.recording_mode == 'raw':
recording_mode_desc = '原始'
else:
recording_mode_desc = ''
return {
'Title': self._live.room_info.title,
'Artist': self._live.user_info.name,
@ -71,6 +78,7 @@ HLS流可用时间: {hls_stream_available_time}
流主机: {self._stream_recorder.stream_host}
流格式{self._stream_recorder.stream_format}
流画质{stream_quality}
录制模式: {recording_mode_desc}
录制程序{__prog__} v{__version__} {__github__}''',
'description': OrderedDict(
{
@ -87,6 +95,7 @@ HLS流可用时间: {hls_stream_available_time}
'StreamHost': self._stream_recorder.stream_host,
'StreamFormat': self._stream_recorder.stream_format,
'StreamQuality': stream_quality,
'RecordingMode': self._stream_recorder.recording_mode,
'Recorder': f'{__prog__} v{__version__} {__github__}',
}
),

View File

@ -3,8 +3,7 @@ from typing import Literal
import attr
from ..bili.typing import Danmaku
from blrec.bili.typing import Danmaku
logger = logging.getLogger(__name__)

View File

@ -1,13 +1,8 @@
from .connection_error_handler import ConnectionErrorHandler
from .exception_handler import ExceptionHandler
from .hls_prober import HLSProber, StreamProfile
from .playlist_fetcher import PlaylistFetcher
from .playlist_resolver import PlaylistResolver
from .progress_bar import ProgressBar
from .recording_monitor import RecordingMonitor
from .request_exception_handler import RequestExceptionHandler
from .segment_fetcher import InitSectionData, SegmentData, SegmentFetcher
from .segment_remuxer import SegmentRemuxer
from .sized_statistics import SizedStatistics
from .stream_fetcher import StreamFetcher
from .stream_parser import StreamParser
@ -17,20 +12,12 @@ from .stream_url_resolver import StreamURLResolver
__all__ = (
'ConnectionErrorHandler',
'ExceptionHandler',
'HLSProber',
'InitSectionData',
'PlaylistFetcher',
'PlaylistResolver',
'ProgressBar',
'RecordingMonitor',
'RequestExceptionHandler',
'SegmentData',
'SegmentFetcher',
'SegmentRemuxer',
'SizedStatistics',
'StreamFetcher',
'StreamParser',
'StreamProfile',
'StreamStatistics',
'StreamURLResolver',
)

View File

@ -8,9 +8,9 @@ import aiohttp
import requests
from reactivex import Observable, abc
from ...bili.live import Live
from ...utils import operators as utils_ops
from ...utils.mixins import AsyncCooperationMixin
from blrec.bili.live import Live
from blrec.utils import operators as utils_ops
from blrec.utils.mixins import AsyncCooperationMixin
__all__ = ('ConnectionErrorHandler',)

View File

@ -6,9 +6,9 @@ from typing import Optional, TypeVar
from reactivex import Observable, abc
from ...bili.exceptions import LiveRoomEncrypted, LiveRoomHidden, LiveRoomLocked
from ...utils import operators as utils_ops
from ...utils.mixins import AsyncCooperationMixin
from blrec.bili.exceptions import LiveRoomEncrypted, LiveRoomHidden, LiveRoomLocked
from blrec.utils import operators as utils_ops
from blrec.utils.mixins import AsyncCooperationMixin
__all__ = ('ExceptionHandler',)

View File

@ -7,8 +7,8 @@ from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from tqdm import tqdm
from ...bili.live import Live
from ...flv.operators.typing import FLVStream, FLVStreamItem
from blrec.bili.live import Live
from blrec.flv.operators.typing import FLVStream, FLVStreamItem
__all__ = ('ProgressBar',)

View File

@ -1,13 +1,12 @@
from __future__ import annotations
import logging
from typing import Final, Optional, TypeVar
from typing import Callable, Final, Optional, TypeVar
from reactivex import Observable, Subject, abc
from ...bili.live import Live
from ...flv import operators as flv_ops
from ...utils.mixins import AsyncCooperationMixin
from blrec.bili.live import Live
from blrec.utils.mixins import AsyncCooperationMixin
__all__ = ('RecordingMonitor',)
@ -18,10 +17,10 @@ _T = TypeVar('_T')
class RecordingMonitor(AsyncCooperationMixin):
def __init__(self, live: Live, analyser: flv_ops.Analyser) -> None:
def __init__(self, live: Live, duration_provider: Callable[..., float]) -> None:
super().__init__()
self._live = live
self._analyser = analyser
self._duration_provider = duration_provider
self._interrupted: Subject[float] = Subject()
self._recovered: Subject[int] = Subject()
@ -59,7 +58,8 @@ class RecordingMonitor(AsyncCooperationMixin):
if recording:
failed_count += 1
if failed_count == CRITERIA:
self._interrupted.on_next(self._analyser.duration)
duration = self._duration_provider()
self._interrupted.on_next(duration)
observer.on_error(exc)
return source.subscribe(

View File

@ -9,7 +9,7 @@ import requests
import urllib3
from reactivex import Observable, abc
from ...utils import operators as utils_ops
from blrec.utils import operators as utils_ops
__all__ = ('RequestExceptionHandler',)

View File

@ -7,8 +7,8 @@ from typing import Optional
import requests
from reactivex import Observable, abc
from ...bili.live import Live
from ...utils.mixins import AsyncCooperationMixin
from blrec.bili.live import Live
from blrec.utils.mixins import AsyncCooperationMixin
__all__ = ('StreamFetcher',)

View File

@ -6,10 +6,11 @@ import logging
from reactivex import Observable
from reactivex import operators as ops
from ...flv import operators as flv_ops
from ...flv.exceptions import FlvDataError
from ...flv.operators.typing import FLVStream
from ...utils import operators as utils_ops
from blrec.flv import operators as flv_ops
from blrec.flv.exceptions import FlvDataError
from blrec.flv.operators.typing import FLVStream
from blrec.utils import operators as utils_ops
from ..stream_param_holder import StreamParamHolder
__all__ = ('StreamParser',)

View File

@ -9,7 +9,7 @@ import urllib3
from reactivex import Observable, abc
from reactivex import operators as ops
from ...bili.exceptions import (
from blrec.bili.exceptions import (
LiveRoomEncrypted,
LiveRoomHidden,
LiveRoomLocked,
@ -19,9 +19,10 @@ from ...bili.exceptions import (
NoStreamFormatAvailable,
NoStreamQualityAvailable,
)
from ...bili.live import Live
from ...utils import operators as utils_ops
from ...utils.mixins import AsyncCooperationMixin
from blrec.bili.live import Live
from blrec.utils import operators as utils_ops
from blrec.utils.mixins import AsyncCooperationMixin
from ..stream_param_holder import StreamParamHolder, StreamParams
__all__ = ('StreamURLResolver',)

View File

@ -4,9 +4,9 @@ import re
from datetime import datetime
from typing import Tuple
from ..bili.live import Live
from ..path import escape_path
from ..utils.mixins import AsyncCooperationMixin
from blrec.bili.live import Live
from blrec.path import escape_path
from blrec.utils.mixins import AsyncCooperationMixin
__all__ = ('PathProvider',)

View File

@ -8,12 +8,13 @@ import aiofiles
from aiofiles.threadpool.text import AsyncTextIOWrapper
from tenacity import AsyncRetrying, retry_if_not_exception_type, stop_after_attempt
from ..bili.live import Live
from ..event.event_emitter import EventEmitter, EventListener
from ..exception import exception_callback, submit_exception
from ..logging.room_id import aio_task_with_room_id
from ..path import raw_danmaku_path
from ..utils.mixins import SwitchableMixin
from blrec.bili.live import Live
from blrec.event.event_emitter import EventEmitter, EventListener
from blrec.exception import exception_callback, submit_exception
from blrec.logging.room_id import aio_task_with_room_id
from blrec.path import raw_danmaku_path
from blrec.utils.mixins import SwitchableMixin
from .raw_danmaku_receiver import RawDanmakuReceiver
from .stream_recorder import StreamRecorder, StreamRecorderEventListener

View File

@ -2,13 +2,11 @@ import logging
from asyncio import Queue, QueueFull
from typing import Final
from blrec.bili.danmaku_client import DanmakuClient, DanmakuListener
from blrec.bili.typing import Danmaku
from blrec.utils.mixins import StoppableMixin
from ..bili.danmaku_client import DanmakuClient, DanmakuListener
from ..bili.typing import Danmaku
from ..utils.mixins import StoppableMixin
__all__ = 'RawDanmakuReceiver',
__all__ = ('RawDanmakuReceiver',)
logger = logging.getLogger(__name__)

View File

@ -7,15 +7,17 @@ from typing import Iterator, Optional
import humanize
from ..bili.danmaku_client import DanmakuClient
from ..bili.live import Live
from ..bili.live_monitor import LiveEventListener, LiveMonitor
from ..bili.models import RoomInfo
from ..bili.typing import QualityNumber, StreamFormat
from ..event.event_emitter import EventEmitter, EventListener
from ..flv.operators import MetaData, StreamProfile
from ..logging.room_id import aio_task_with_room_id
from ..utils.mixins import AsyncStoppableMixin
from blrec.bili.danmaku_client import DanmakuClient
from blrec.bili.live import Live
from blrec.bili.live_monitor import LiveEventListener, LiveMonitor
from blrec.bili.models import RoomInfo
from blrec.bili.typing import QualityNumber, StreamFormat
from blrec.event.event_emitter import EventEmitter, EventListener
from blrec.flv.operators import MetaData, StreamProfile
from blrec.logging.room_id import aio_task_with_room_id
from blrec.setting.typing import RecordingMode
from blrec.utils.mixins import AsyncStoppableMixin
from .cover_downloader import CoverDownloader, CoverSaveStrategy
from .danmaku_dumper import DanmakuDumper, DanmakuDumperEventListener
from .danmaku_receiver import DanmakuReceiver
@ -77,6 +79,7 @@ class Recorder(
path_template: str,
*,
stream_format: StreamFormat = 'flv',
recording_mode: RecordingMode = 'standard',
quality_number: QualityNumber = 10000,
fmp4_stream_timeout: int = 10,
buffer_size: Optional[int] = None,
@ -108,6 +111,7 @@ class Recorder(
out_dir=out_dir,
path_template=path_template,
stream_format=stream_format,
recording_mode=recording_mode,
quality_number=quality_number,
fmp4_stream_timeout=fmp4_stream_timeout,
buffer_size=buffer_size,
@ -156,6 +160,14 @@ class Recorder(
def stream_format(self, value: StreamFormat) -> None:
self._stream_recorder.stream_format = value
@property
def recording_mode(self) -> RecordingMode:
return self._stream_recorder.recording_mode
@recording_mode.setter
def recording_mode(self, value: RecordingMode) -> None:
self._stream_recorder.recording_mode = value
@property
def quality_number(self) -> QualityNumber:
return self._stream_recorder.quality_number
@ -365,6 +377,7 @@ class Recorder(
async def on_live_ended(self, live: Live) -> None:
logger.info('The live has ended')
await asyncio.sleep(3)
self._stream_available = False
self._stream_recorder.stream_available_time = None
await self._stop_recording()

View File

@ -8,7 +8,7 @@ from reactivex import Observable, abc, create
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from reactivex.scheduler.currentthreadscheduler import CurrentThreadScheduler
from ..bili.typing import ApiPlatform, QualityNumber, StreamFormat
from blrec.bili.typing import ApiPlatform, QualityNumber, StreamFormat
__all__ = ('StreamParamHolder',)

View File

@ -3,12 +3,15 @@ import logging
import time
from typing import Iterator, Optional
from ..bili.live import Live
from ..bili.typing import QualityNumber, StreamFormat
from ..event.event_emitter import EventEmitter
from ..flv.operators import MetaData, StreamProfile
from ..utils.mixins import AsyncStoppableMixin
from blrec.bili.live import Live
from blrec.bili.typing import QualityNumber, StreamFormat
from blrec.event.event_emitter import EventEmitter
from blrec.flv.operators import MetaData, StreamProfile
from blrec.setting.typing import RecordingMode
from blrec.utils.mixins import AsyncStoppableMixin
from .flv_stream_recorder_impl import FLVStreamRecorderImpl
from .hls_raw_stream_recorder_impl import HLSRawStreamRecorderImpl
from .hls_stream_recorder_impl import HLSStreamRecorderImpl
from .stream_recorder_impl import StreamRecorderEventListener
@ -30,6 +33,7 @@ class StreamRecorder(
path_template: str,
*,
stream_format: StreamFormat = 'flv',
recording_mode: RecordingMode = 'standard',
quality_number: QualityNumber = 10000,
fmp4_stream_timeout: int = 10,
buffer_size: Optional[int] = None,
@ -42,12 +46,16 @@ class StreamRecorder(
self._live = live
self.stream_format = stream_format
self.recording_mode = recording_mode
self.fmp4_stream_timeout = fmp4_stream_timeout
if stream_format == 'flv':
cls = FLVStreamRecorderImpl
elif stream_format == 'fmp4':
cls = HLSStreamRecorderImpl # type: ignore
if recording_mode == 'standard':
cls = HLSStreamRecorderImpl # type: ignore
else:
cls = HLSRawStreamRecorderImpl # type: ignore
else:
logger.warning(
f'The specified stream format ({stream_format}) is '
@ -254,8 +262,8 @@ class StreamRecorder(
async def on_video_file_completed(self, path: str) -> None:
await self._emit('video_file_completed', path)
async def on_stream_recording_interrupted(self, timestamp: int) -> None:
await self._emit('stream_recording_interrupted', timestamp)
async def on_stream_recording_interrupted(self, duration: float) -> None:
await self._emit('stream_recording_interrupted', duration)
async def on_stream_recording_recovered(self, timestamp: int) -> None:
await self._emit('stream_recording_recovered', timestamp)
@ -284,7 +292,10 @@ class StreamRecorder(
if stream_format == 'flv':
cls = FLVStreamRecorderImpl
elif stream_format == 'fmp4':
cls = HLSStreamRecorderImpl # type: ignore
if self.recording_mode == 'standard':
cls = HLSStreamRecorderImpl # type: ignore
else:
cls = HLSRawStreamRecorderImpl # type: ignore
else:
logger.warning(
f'The specified stream format ({stream_format}) is '

View File

@ -9,15 +9,16 @@ import urllib3
from reactivex import abc
from reactivex.typing import StartableFactory, StartableTarget
from ..bili.live import Live
from ..bili.typing import QualityNumber, StreamFormat
from ..event.event_emitter import EventEmitter, EventListener
from ..flv import operators as flv_ops
from ..flv.metadata_dumper import MetadataDumper
from ..flv.operators import StreamProfile
from ..flv.utils import format_timestamp
from ..logging.room_id import aio_task_with_room_id
from ..utils.mixins import AsyncCooperationMixin, AsyncStoppableMixin
from blrec.bili.live import Live
from blrec.bili.typing import QualityNumber, StreamFormat
from blrec.event.event_emitter import EventEmitter, EventListener
from blrec.flv import operators as flv_ops
from blrec.flv.operators import StreamProfile
from blrec.flv.utils import format_timestamp
from blrec.logging.room_id import aio_task_with_room_id
from blrec.setting.typing import RecordingMode
from blrec.utils.mixins import AsyncCooperationMixin, AsyncStoppableMixin
from . import operators as core_ops
from .metadata_provider import MetadataProvider
from .path_provider import PathProvider
@ -60,6 +61,7 @@ class StreamRecorderImpl(
path_template: str,
*,
stream_format: StreamFormat = 'flv',
recording_mode: RecordingMode = 'standard',
quality_number: QualityNumber = 10000,
buffer_size: Optional[int] = None,
read_timeout: Optional[int] = None,
@ -71,50 +73,30 @@ class StreamRecorderImpl(
self._live = live
self._session = requests.Session()
self._recording_mode = recording_mode
self._buffer_size = buffer_size
self._read_timeout = read_timeout
self._filesize_limit = filesize_limit
self._duration_limit = duration_limit
self._stream_param_holder = StreamParamHolder(
stream_format=stream_format, quality_number=quality_number
)
self._stream_url_resolver = core_ops.StreamURLResolver(
live, self._stream_param_holder
)
self._stream_fetcher = core_ops.StreamFetcher(
live, self._session, read_timeout=read_timeout
)
self._stream_parser = core_ops.StreamParser(
self._stream_param_holder,
ignore_eof=stream_format != 'flv',
ignore_value_error=stream_format != 'flv',
)
self._progress_bar = core_ops.ProgressBar(live)
self._analyser = flv_ops.Analyser()
self._metadata_provider = MetadataProvider(live, self)
self._injector = flv_ops.Injector(self._metadata_provider)
self._join_point_extractor = flv_ops.JoinPointExtractor()
self._limiter = flv_ops.Limiter(filesize_limit, duration_limit)
self._cutter = flv_ops.Cutter()
self._path_provider = PathProvider(live, out_dir, path_template)
self._dumper = flv_ops.Dumper(self._path_provider, buffer_size)
self._rec_statistics = core_ops.SizedStatistics()
self._recording_monitor = core_ops.RecordingMonitor(live, self._analyser)
self._prober: Union[flv_ops.Prober, core_ops.HLSProber]
self._dl_statistics: Union[core_ops.StreamStatistics, core_ops.SizedStatistics]
if stream_format == 'flv':
self._prober = flv_ops.Prober()
self._dl_statistics = core_ops.StreamStatistics()
else:
self._prober = core_ops.HLSProber()
self._dl_statistics = core_ops.SizedStatistics()
self._request_exception_handler = core_ops.RequestExceptionHandler()
self._connection_error_handler = core_ops.ConnectionErrorHandler(
live, disconnection_timeout=disconnection_timeout
)
self._exception_handler = core_ops.ExceptionHandler()
self._metadata_dumper = MetadataDumper(
self._dumper, self._analyser, self._join_point_extractor
)
self._metadata_dumper.enable()
self._subscription: abc.DisposableBase
self._completed: bool = False
@ -126,37 +108,6 @@ class StreamRecorderImpl(
self._stream_available_time: Optional[int] = None
self._hls_stream_available_time: Optional[int] = None
def on_profile_updated(profile: StreamProfile) -> None:
self._stream_profile = profile
self._prober.profiles.subscribe(on_profile_updated)
def on_file_opened(args: Tuple[str, int]) -> None:
logger.info(f"Video file created: '{args[0]}'")
self._files.append(args[0])
self._record_start_time = args[1]
self._emit_event('video_file_created', *args)
def on_file_closed(path: str) -> None:
logger.info(f"Video file completed: '{path}'")
self._emit_event('video_file_completed', path)
self._dumper.file_opened.subscribe(on_file_opened)
self._dumper.file_closed.subscribe(on_file_closed)
def on_recording_interrupted(duration: float) -> None:
duration_string = format_timestamp(int(duration * 1000))
logger.info(f'Recording interrupted, current duration: {duration_string}')
self._emit_event('stream_recording_interrupted', duration)
def on_recording_recovered(timestamp: int) -> None:
datetime_string = datetime.fromtimestamp(timestamp).isoformat()
logger.info(f'Recording recovered, current date time {(datetime_string)}')
self._emit_event('stream_recording_recovered', timestamp)
self._recording_monitor.interrupted.subscribe(on_recording_interrupted)
self._recording_monitor.recovered.subscribe(on_recording_recovered)
@property
def stream_url(self) -> str:
return self._stream_url_resolver.stream_url
@ -225,6 +176,10 @@ class StreamRecorderImpl(
def stream_format(self) -> StreamFormat:
return self._stream_param_holder.stream_format
@property
def recording_mode(self) -> RecordingMode:
return self._recording_mode
@property
def quality_number(self) -> QualityNumber:
return self._stream_param_holder.quality_number
@ -241,27 +196,27 @@ class StreamRecorderImpl(
@property
def filesize_limit(self) -> int:
return self._limiter.filesize_limit
return self._filesize_limit
@filesize_limit.setter
def filesize_limit(self, value: int) -> None:
self._limiter.filesize_limit = value
self._filesize_limit = value
@property
def duration_limit(self) -> int:
return self._limiter.duration_limit
return self._duration_limit
@duration_limit.setter
def duration_limit(self, value: int) -> None:
self._limiter.duration_limit = value
self._duration_limit = value
@property
def read_timeout(self) -> int:
return self._stream_fetcher.read_timeout
return self._read_timeout
@read_timeout.setter
def read_timeout(self, value: int) -> None:
self._stream_fetcher.read_timeout = value
self._read_timeout = value
@property
def disconnection_timeout(self) -> int:
@ -273,22 +228,19 @@ class StreamRecorderImpl(
@property
def buffer_size(self) -> int:
return self._dumper.buffer_size
return self._buffer_size
@buffer_size.setter
def buffer_size(self, value: int) -> None:
self._dumper.buffer_size = value
self._buffer_size = value
@property
def recording_path(self) -> Optional[str]:
return self._dumper.path
return ''
@property
def metadata(self) -> Optional[flv_ops.MetaData]:
try:
return self._analyser.make_metadata()
except Exception:
return None
return None
@property
def stream_profile(self) -> StreamProfile:
@ -304,10 +256,10 @@ class StreamRecorderImpl(
self._files.clear()
def can_cut_stream(self) -> bool:
return self._cutter.can_cut_stream()
return False
def cut_stream(self) -> bool:
return self._cutter.cut_stream()
return False
def update_progress_bar_info(self) -> None:
self._progress_bar.update_bar_info()
@ -320,6 +272,7 @@ class StreamRecorderImpl(
async def _do_start(self) -> None:
logger.debug('Starting stream recorder...')
self._on_start()
self._reset()
self._run()
logger.debug('Started stream recorder')
@ -332,8 +285,15 @@ class StreamRecorderImpl(
for thread in self._threads:
await self._loop.run_in_executor(None, thread.join, 30)
self._threads.clear()
self._on_stop()
logger.debug('Stopped stream recorder')
def _on_start(self) -> None:
pass
def _on_stop(self) -> None:
pass
@abstractmethod
def _run(self) -> None:
raise NotImplementedError()
@ -362,6 +322,30 @@ class StreamRecorderImpl(
self._rec_statistics.freeze()
self._emit_event('stream_recording_completed')
def _on_profile_updated(self, profile: StreamProfile) -> None:
logger.debug(f'Stream profile: {profile}')
self._stream_profile = profile
def _on_video_file_opened(self, args: Tuple[str, int]) -> None:
logger.info(f"Video file created: '{args[0]}'")
self._files.append(args[0])
self._record_start_time = args[1]
self._emit_event('video_file_created', *args)
def _on_video_file_closed(self, path: str) -> None:
logger.info(f"Video file completed: '{path}'")
self._emit_event('video_file_completed', path)
def _on_recording_interrupted(self, duration: float) -> None:
duration_string = format_timestamp(int(duration * 1000))
logger.info(f'Recording interrupted, current duration: {duration_string}')
self._emit_event('stream_recording_interrupted', duration)
def _on_recording_recovered(self, timestamp: int) -> None:
datetime_string = datetime.fromtimestamp(timestamp).isoformat()
logger.info(f'Recording recovered, current date time {(datetime_string)}')
self._emit_event('stream_recording_recovered', timestamp)
def _emit_event(self, name: str, *args: Any, **kwds: Any) -> None:
self._run_coroutine(self._emit(name, *args, **kwds))

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -10,6 +10,6 @@
<body>
<app-root></app-root>
<noscript>Please enable JavaScript to continue using this application.</noscript>
<script src="runtime.0ce129f346263990.js" type="module"></script><script src="polyfills.4b08448aee19bb22.js" type="module"></script><script src="main.888c50197ddf8040.js" type="module"></script>
<script src="runtime.c15d125b613d9f14.js" type="module"></script><script src="polyfills.4b08448aee19bb22.js" type="module"></script><script src="main.888c50197ddf8040.js" type="module"></script>
</body></html>

View File

@ -1,6 +1,6 @@
{
"configVersion": 1,
"timestamp": 1655391613431,
"timestamp": 1659072298887,
"index": "/index.html",
"assetGroups": [
{
@ -13,16 +13,16 @@
"urls": [
"/103.5b5d2a6e5a8a7479.js",
"/146.92e3b29c4c754544.js",
"/183.ae1a1102b7d5cbdb.js",
"/202.e15e5ae9f06639b8.js",
"/183.90c399afcab1b014.js",
"/202.ad802ed297fef2df.js",
"/45.c90c3cea2bf1a66e.js",
"/66.9faa0b5a6adf9602.js",
"/66.d61b8b935d3ed1ff.js",
"/common.858f777e9296e6f2.js",
"/index.html",
"/main.888c50197ddf8040.js",
"/manifest.webmanifest",
"/polyfills.4b08448aee19bb22.js",
"/runtime.0ce129f346263990.js",
"/runtime.c15d125b613d9f14.js",
"/styles.2e152d608221c2ee.css"
],
"patterns": []
@ -1636,10 +1636,10 @@
"hashTable": {
"/103.5b5d2a6e5a8a7479.js": "cc0240f217015b6d4ddcc14f31fcc42e1c1c282a",
"/146.92e3b29c4c754544.js": "3824de681dd1f982ea69a065cdf54d7a1e781f4d",
"/183.ae1a1102b7d5cbdb.js": "6cb22d60b0a20214212e6050fbbf33926a4c1346",
"/202.e15e5ae9f06639b8.js": "62335dc98644969539760565ff9c3c472d304287",
"/183.90c399afcab1b014.js": "467a8b4c21dace3ae358507932287ca3596051e6",
"/202.ad802ed297fef2df.js": "c66deea0e3fde32c1132430aab75486efb881960",
"/45.c90c3cea2bf1a66e.js": "e5bfb8cf3803593e6b8ea14c90b3d3cb6a066764",
"/66.9faa0b5a6adf9602.js": "c2f418ebb80f35402d9f24e5acaf8167c96f9eb3",
"/66.d61b8b935d3ed1ff.js": "6b81e8268d5a2d2596b0a7926985dd80fb06532a",
"/assets/animal/panda.js": "fec2868bb3053dd2da45f96bbcb86d5116ed72b1",
"/assets/animal/panda.svg": "bebd302cdc601e0ead3a6d2710acf8753f3d83b1",
"/assets/fill/.gitkeep": "da39a3ee5e6b4b0d3255bfef95601890afd80709",
@ -3234,11 +3234,11 @@
"/assets/twotone/warning.js": "fb2d7ea232f3a99bf8f080dbc94c65699232ac01",
"/assets/twotone/warning.svg": "8c7a2d3e765a2e7dd58ac674870c6655cecb0068",
"/common.858f777e9296e6f2.js": "b68ca68e1e214a2537d96935c23410126cc564dd",
"/index.html": "374ebd2a9b656c5ebcbc9f5a4402b345cd4c7c5c",
"/index.html": "feb36a563bdcb300ec006c8094686aaf87d27282",
"/main.888c50197ddf8040.js": "f506b85641a4598b002c21bc49c9a36e0c058326",
"/manifest.webmanifest": "62c1cb8c5ad2af551a956b97013ab55ce77dd586",
"/polyfills.4b08448aee19bb22.js": "8e73f2d42cc13ca353cea5c886d930bd6da08d0d",
"/runtime.0ce129f346263990.js": "98698b10b3f873a761f1e1c7fb5a9bcd2f3830ee",
"/runtime.c15d125b613d9f14.js": "40d99ef24a3f99be0f7fb09de4947b23fbd7c682",
"/styles.2e152d608221c2ee.css": "9830389a46daa5b4511e0dd343aad23ca9f9690f"
},
"navigationUrls": [

View File

@ -1 +0,0 @@
(()=>{"use strict";var e,v={},m={};function r(e){var i=m[e];if(void 0!==i)return i.exports;var t=m[e]={exports:{}};return v[e].call(t.exports,t,t.exports,r),t.exports}r.m=v,e=[],r.O=(i,t,o,f)=>{if(!t){var a=1/0;for(n=0;n<e.length;n++){for(var[t,o,f]=e[n],c=!0,l=0;l<t.length;l++)(!1&f||a>=f)&&Object.keys(r.O).every(p=>r.O[p](t[l]))?t.splice(l--,1):(c=!1,f<a&&(a=f));if(c){e.splice(n--,1);var d=o();void 0!==d&&(i=d)}}return i}f=f||0;for(var n=e.length;n>0&&e[n-1][2]>f;n--)e[n]=e[n-1];e[n]=[t,o,f]},r.n=e=>{var i=e&&e.__esModule?()=>e.default:()=>e;return r.d(i,{a:i}),i},r.d=(e,i)=>{for(var t in i)r.o(i,t)&&!r.o(e,t)&&Object.defineProperty(e,t,{enumerable:!0,get:i[t]})},r.f={},r.e=e=>Promise.all(Object.keys(r.f).reduce((i,t)=>(r.f[t](e,i),i),[])),r.u=e=>(592===e?"common":e)+"."+{45:"c90c3cea2bf1a66e",66:"9faa0b5a6adf9602",103:"5b5d2a6e5a8a7479",146:"92e3b29c4c754544",183:"ae1a1102b7d5cbdb",202:"e15e5ae9f06639b8",592:"858f777e9296e6f2"}[e]+".js",r.miniCssF=e=>{},r.o=(e,i)=>Object.prototype.hasOwnProperty.call(e,i),(()=>{var e={},i="blrec:";r.l=(t,o,f,n)=>{if(e[t])e[t].push(o);else{var a,c;if(void 0!==f)for(var l=document.getElementsByTagName("script"),d=0;d<l.length;d++){var u=l[d];if(u.getAttribute("src")==t||u.getAttribute("data-webpack")==i+f){a=u;break}}a||(c=!0,(a=document.createElement("script")).type="module",a.charset="utf-8",a.timeout=120,r.nc&&a.setAttribute("nonce",r.nc),a.setAttribute("data-webpack",i+f),a.src=r.tu(t)),e[t]=[o];var s=(g,p)=>{a.onerror=a.onload=null,clearTimeout(b);var _=e[t];if(delete e[t],a.parentNode&&a.parentNode.removeChild(a),_&&_.forEach(h=>h(p)),g)return g(p)},b=setTimeout(s.bind(null,void 0,{type:"timeout",target:a}),12e4);a.onerror=s.bind(null,a.onerror),a.onload=s.bind(null,a.onload),c&&document.head.appendChild(a)}}})(),r.r=e=>{"undefined"!=typeof Symbol&&Symbol.toStringTag&&Object.defineProperty(e,Symbol.toStringTag,{value:"Module"}),Object.defineProperty(e,"__esModule",{value:!0})},(()=>{var e;r.tu=i=>(void 0===e&&(e={createScriptURL:t=>t},"undefined"!=typeof trustedTypes&&trustedTypes.createPolicy&&(e=trustedTypes.createPolicy("angular#bundler",e))),e.createScriptURL(i))})(),r.p="",(()=>{var e={666:0};r.f.j=(o,f)=>{var n=r.o(e,o)?e[o]:void 0;if(0!==n)if(n)f.push(n[2]);else if(666!=o){var a=new Promise((u,s)=>n=e[o]=[u,s]);f.push(n[2]=a);var c=r.p+r.u(o),l=new Error;r.l(c,u=>{if(r.o(e,o)&&(0!==(n=e[o])&&(e[o]=void 0),n)){var s=u&&("load"===u.type?"missing":u.type),b=u&&u.target&&u.target.src;l.message="Loading chunk "+o+" failed.\n("+s+": "+b+")",l.name="ChunkLoadError",l.type=s,l.request=b,n[1](l)}},"chunk-"+o,o)}else e[o]=0},r.O.j=o=>0===e[o];var i=(o,f)=>{var l,d,[n,a,c]=f,u=0;if(n.some(b=>0!==e[b])){for(l in a)r.o(a,l)&&(r.m[l]=a[l]);if(c)var s=c(r)}for(o&&o(f);u<n.length;u++)r.o(e,d=n[u])&&e[d]&&e[d][0](),e[n[u]]=0;return r.O(s)},t=self.webpackChunkblrec=self.webpackChunkblrec||[];t.forEach(i.bind(null,0)),t.push=i.bind(null,t.push.bind(t))})()})();

View File

@ -0,0 +1 @@
(()=>{"use strict";var e,v={},m={};function r(e){var f=m[e];if(void 0!==f)return f.exports;var t=m[e]={exports:{}};return v[e].call(t.exports,t,t.exports,r),t.exports}r.m=v,e=[],r.O=(f,t,i,o)=>{if(!t){var a=1/0;for(n=0;n<e.length;n++){for(var[t,i,o]=e[n],c=!0,l=0;l<t.length;l++)(!1&o||a>=o)&&Object.keys(r.O).every(p=>r.O[p](t[l]))?t.splice(l--,1):(c=!1,o<a&&(a=o));if(c){e.splice(n--,1);var d=i();void 0!==d&&(f=d)}}return f}o=o||0;for(var n=e.length;n>0&&e[n-1][2]>o;n--)e[n]=e[n-1];e[n]=[t,i,o]},r.n=e=>{var f=e&&e.__esModule?()=>e.default:()=>e;return r.d(f,{a:f}),f},r.d=(e,f)=>{for(var t in f)r.o(f,t)&&!r.o(e,t)&&Object.defineProperty(e,t,{enumerable:!0,get:f[t]})},r.f={},r.e=e=>Promise.all(Object.keys(r.f).reduce((f,t)=>(r.f[t](e,f),f),[])),r.u=e=>(592===e?"common":e)+"."+{45:"c90c3cea2bf1a66e",66:"d61b8b935d3ed1ff",103:"5b5d2a6e5a8a7479",146:"92e3b29c4c754544",183:"90c399afcab1b014",202:"ad802ed297fef2df",592:"858f777e9296e6f2"}[e]+".js",r.miniCssF=e=>{},r.o=(e,f)=>Object.prototype.hasOwnProperty.call(e,f),(()=>{var e={},f="blrec:";r.l=(t,i,o,n)=>{if(e[t])e[t].push(i);else{var a,c;if(void 0!==o)for(var l=document.getElementsByTagName("script"),d=0;d<l.length;d++){var u=l[d];if(u.getAttribute("src")==t||u.getAttribute("data-webpack")==f+o){a=u;break}}a||(c=!0,(a=document.createElement("script")).type="module",a.charset="utf-8",a.timeout=120,r.nc&&a.setAttribute("nonce",r.nc),a.setAttribute("data-webpack",f+o),a.src=r.tu(t)),e[t]=[i];var s=(g,p)=>{a.onerror=a.onload=null,clearTimeout(b);var _=e[t];if(delete e[t],a.parentNode&&a.parentNode.removeChild(a),_&&_.forEach(h=>h(p)),g)return g(p)},b=setTimeout(s.bind(null,void 0,{type:"timeout",target:a}),12e4);a.onerror=s.bind(null,a.onerror),a.onload=s.bind(null,a.onload),c&&document.head.appendChild(a)}}})(),r.r=e=>{"undefined"!=typeof Symbol&&Symbol.toStringTag&&Object.defineProperty(e,Symbol.toStringTag,{value:"Module"}),Object.defineProperty(e,"__esModule",{value:!0})},(()=>{var e;r.tu=f=>(void 0===e&&(e={createScriptURL:t=>t},"undefined"!=typeof trustedTypes&&trustedTypes.createPolicy&&(e=trustedTypes.createPolicy("angular#bundler",e))),e.createScriptURL(f))})(),r.p="",(()=>{var e={666:0};r.f.j=(i,o)=>{var n=r.o(e,i)?e[i]:void 0;if(0!==n)if(n)o.push(n[2]);else if(666!=i){var a=new Promise((u,s)=>n=e[i]=[u,s]);o.push(n[2]=a);var c=r.p+r.u(i),l=new Error;r.l(c,u=>{if(r.o(e,i)&&(0!==(n=e[i])&&(e[i]=void 0),n)){var s=u&&("load"===u.type?"missing":u.type),b=u&&u.target&&u.target.src;l.message="Loading chunk "+i+" failed.\n("+s+": "+b+")",l.name="ChunkLoadError",l.type=s,l.request=b,n[1](l)}},"chunk-"+i,i)}else e[i]=0},r.O.j=i=>0===e[i];var f=(i,o)=>{var l,d,[n,a,c]=o,u=0;if(n.some(b=>0!==e[b])){for(l in a)r.o(a,l)&&(r.m[l]=a[l]);if(c)var s=c(r)}for(i&&i(o);u<n.length;u++)r.o(e,d=n[u])&&e[d]&&e[d][0](),e[n[u]]=0;return r.O(s)},t=self.webpackChunkblrec=self.webpackChunkblrec||[];t.forEach(f.bind(null,0)),t.push=f.bind(null,t.push.bind(t))})()})();

View File

@ -21,7 +21,9 @@ logger = logging.getLogger(__name__)
class SpaceReclaimer(SpaceEventListener, SwitchableMixin):
_SUFFIX_SET = frozenset(('.flv', '.mp4', '.xml', '.jsonl', '.jpg'))
_SUFFIX_SET = frozenset(
('.flv', '.mp4', '.ts', '.m4s', '.m3u8' '.xml', '.jsonl', '.jpg')
)
def __init__(
self,

View File

View File

@ -0,0 +1,2 @@
class SegmentDataCorrupted(ValueError):
pass

View File

@ -0,0 +1,52 @@
from __future__ import annotations
import json
import logging
from contextlib import suppress
from typing import Any, Callable, Dict, Tuple
from blrec.path.helpers import record_metadata_path
from blrec.utils.mixins import SwitchableMixin
from . import operators as hls_ops
__all__ = ('MetadataDumper',)
logger = logging.getLogger(__name__)
class MetadataDumper(SwitchableMixin):
def __init__(
self,
playlist_dumper: hls_ops.PlaylistDumper,
metadata_provider: Callable[[Dict[str, Any]], Dict[str, Any]],
) -> None:
super().__init__()
self._playlist_dumper = playlist_dumper
self._metadata_provider = metadata_provider
self._metadata: Dict[str, Any] = {}
def _do_enable(self) -> None:
self._file_opened_subscription = self._playlist_dumper.file_opened.subscribe(
self._on_playlist_file_opened
)
logger.debug('Enabled metadata dumper')
def _do_disable(self) -> None:
with suppress(Exception):
self._file_opened_subscription.dispose()
del self._file_opened_subscription
self._metadata.clear()
logger.debug('Disabled metadata dumper')
def _on_playlist_file_opened(self, args: Tuple[str, int]) -> None:
playlist_path, _ = args
metadata = self._metadata_provider({})
self._dump_metadata(playlist_path, metadata)
def _dump_metadata(self, playlist_path: str, metadata: Dict[str, Any]) -> None:
path = record_metadata_path(playlist_path)
logger.debug(f"Dumping metadata to file: '{path}'")
with open(path, 'wt', encoding='utf8') as file:
json.dump(metadata, file, ensure_ascii=False)

View File

@ -0,0 +1,20 @@
from .playlist_dumper import PlaylistDumper
from .playlist_fetcher import PlaylistFetcher
from .playlist_resolver import PlaylistResolver
from .prober import Prober, StreamProfile
from .segment_dumper import SegmentDumper
from .segment_fetcher import InitSectionData, SegmentData, SegmentFetcher
from .segment_remuxer import SegmentRemuxer
__all__ = (
'InitSectionData',
'PlaylistDumper',
'PlaylistFetcher',
'PlaylistResolver',
'Prober',
'SegmentData',
'SegmentDumper',
'SegmentFetcher',
'SegmentRemuxer',
'StreamProfile',
)

View File

@ -0,0 +1,183 @@
from __future__ import annotations
import io
import logging
import os
from copy import deepcopy
from decimal import Decimal
from typing import Callable, Optional, Tuple
import m3u8
from reactivex import Observable, Subject, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
__all__ = ('PlaylistDumper',)
logger = logging.getLogger(__name__)
class PlaylistDumper:
def __init__(self, path_provider: Callable[..., Tuple[str, int]]) -> None:
self._path_provider = path_provider
self._file_opened: Subject[Tuple[str, int]] = Subject()
self._file_closed: Subject[str] = Subject()
self._reset()
def _reset(self) -> None:
self._path: str = ''
self._file: Optional[io.TextIOWrapper] = None
self._duration: Decimal = Decimal()
@property
def path(self) -> str:
return self._path
@property
def duration(self) -> float:
return float(self._duration)
@property
def file_opened(self) -> Observable[Tuple[str, int]]:
return self._file_opened
@property
def file_closed(self) -> Observable[str]:
return self._file_closed
def __call__(self, source: Observable[m3u8.M3U8]) -> Observable[m3u8.Segment]:
return self._dump(source)
def _open_file(self) -> None:
path, timestamp = self._path_provider()
root, ext = os.path.splitext(path)
os.makedirs(root, exist_ok=True)
self._path = os.path.join(root, 'index.m3u8')
self._file = open(self._path, 'wt', encoding='utf8') # type: ignore
logger.debug(f'Opened file: {self._path}')
self._file_opened.on_next((self._path, timestamp))
def _close_file(self) -> None:
if self._file is not None and not self._file.closed:
self._file.write('#EXT-X-ENDLIST')
self._file.close()
logger.debug(f'Closed file: {self._path}')
self._file_closed.on_next(self._path)
def _name_of(self, uri: str) -> str:
name, ext = os.path.splitext(uri)
return name
def _sequence_number_of(self, uri: str) -> int:
return int(self._name_of(uri))
def _replace_uri(self, segment: m3u8.Segment) -> m3u8.Segment:
copied_seg = deepcopy(segment)
if init_section := getattr(copied_seg, 'init_section', None):
init_section.uri = f'segments/{init_section.uri}'
uri = segment.uri
name = self._name_of(uri)
copied_seg.uri = 'segments/%s/%s' % (name[:-3], uri)
return copied_seg
def _replace_all_uri(self, playlist: m3u8.M3U8) -> m3u8.M3U8:
copied_playlist = deepcopy(playlist)
copied_playlist.segments = m3u8.SegmentList(
self._replace_uri(s) for s in copied_playlist.segments
)
return copied_playlist
def _update_duration(self, segment: m3u8.Segment) -> None:
self._duration += Decimal(str(segment.duration))
def _dump(self, source: Observable[m3u8.M3U8]) -> Observable[m3u8.Segment]:
def subscribe(
observer: abc.ObserverBase[m3u8.Segment],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
last_segment: Optional[m3u8.Segment] = None
last_sequence_number: Optional[int] = None
first_playlist_dumped: bool = False
self._close_file()
self._reset()
def on_next(playlist: m3u8.M3U8) -> None:
nonlocal last_sequence_number, last_segment, first_playlist_dumped
if playlist.is_endlist:
logger.debug('Playlist ended')
try:
if not first_playlist_dumped:
self._close_file()
self._reset()
self._open_file()
assert self._file is not None
playlist.is_endlist = False
self._file.write(self._replace_all_uri(playlist).dumps())
self._file.flush()
for seg in playlist.segments:
observer.on_next(seg)
self._update_duration(seg)
last_segment = seg
last_sequence_number = self._sequence_number_of(seg.uri)
first_playlist_dumped = True
logger.debug('The first playlist has been dumped')
return
assert self._file is not None
for seg in playlist.segments:
num = self._sequence_number_of(seg.uri)
discontinuity = False
if last_sequence_number is not None:
if last_sequence_number >= num:
continue
if last_sequence_number + 1 != num:
logger.warning(
'Segments discontinuous: '
f'last sequence number: {last_sequence_number}, '
f'current sequence number: {num}'
)
discontinuity = True
new_seg = self._replace_uri(seg)
new_seg.discontinuity = discontinuity
new_last_seg = self._replace_uri(last_segment)
self._file.write(new_seg.dumps(new_last_seg) + '\n')
observer.on_next(seg)
self._update_duration(seg)
last_segment = seg
last_sequence_number = num
except Exception as e:
self._close_file()
self._reset()
observer.on_error(e)
def on_completed() -> None:
self._close_file()
self._reset()
observer.on_completed()
def on_error(e: Exception) -> None:
self._close_file()
self._reset()
observer.on_error(e)
def dispose() -> None:
nonlocal disposed
nonlocal last_segment, last_sequence_number
disposed = True
last_segment = None
last_sequence_number = None
self._close_file()
self._reset()
subscription.disposable = source.subscribe(
on_next, on_error, on_completed, scheduler=scheduler
)
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe)

View File

@ -12,8 +12,8 @@ from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_exponential
from ...bili.live import Live
from ...utils.mixins import SupportDebugMixin
from blrec.bili.live import Live
from blrec.utils.mixins import SupportDebugMixin
__all__ = ('PlaylistFetcher',)

View File

@ -1,7 +1,8 @@
from __future__ import annotations
import logging
from typing import Final, Optional, Set
import os
from typing import Final, Optional
import m3u8
import urllib3
@ -17,11 +18,16 @@ logging.getLogger(urllib3.__name__).setLevel(logging.WARNING)
class PlaylistResolver:
_MAX_LAST_SEG_URIS: Final[int] = 30
def __call__(self, source: Observable[m3u8.M3U8]) -> Observable[m3u8.Segment]:
return self._solve(source)
def _name_of(self, uri: str) -> str:
name, ext = os.path.splitext(uri)
return name
def _sequence_number_of(self, uri: str) -> int:
return int(self._name_of(uri))
def _solve(self, source: Observable[m3u8.M3U8]) -> Observable[m3u8.Segment]:
def subscribe(
observer: abc.ObserverBase[m3u8.Segment],
@ -30,35 +36,36 @@ class PlaylistResolver:
disposed = False
subscription = SerialDisposable()
last_seg_uris: OrderedSet[str] = OrderedSet()
last_sequence_number: Optional[int] = None
def on_next(playlist: m3u8.M3U8) -> None:
curr_seg_uris: Set[str] = set()
for seg in playlist.segments:
if disposed:
return
curr_seg_uris.add(seg.uri)
if seg.uri not in last_seg_uris:
observer.on_next(seg)
last_seg_uris.add(seg.uri)
if len(last_seg_uris) > self._MAX_LAST_SEG_URIS:
last_seg_uris.pop(0)
if last_seg_uris and not curr_seg_uris.intersection(last_seg_uris):
logger.debug(
'Segments broken!\n'
f'Last segments uris: {last_seg_uris}\n'
f'Current segments uris: {curr_seg_uris}'
)
nonlocal last_sequence_number
if playlist.is_endlist:
logger.debug('Playlist ended')
for seg in playlist.segments:
uri = seg.uri
name = self._name_of(uri)
num = int(name)
if last_sequence_number is not None:
if last_sequence_number >= num:
continue
if last_sequence_number + 1 != num:
logger.warning(
'Segments discontinuous: '
f'last sequence number: {last_sequence_number}, '
f'current sequence number: {num}'
)
seg.discontinuity = True
observer.on_next(seg)
last_sequence_number = num
def dispose() -> None:
nonlocal disposed
nonlocal last_sequence_number
disposed = True
last_seg_uris.clear()
last_sequence_number = None
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler

View File

@ -7,16 +7,17 @@ from typing import List, Optional, Union
from reactivex import Observable, Subject, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from ...utils.ffprobe import StreamProfile, ffprobe
from blrec.utils.ffprobe import StreamProfile, ffprobe
from .segment_fetcher import InitSectionData, SegmentData
__all__ = ('HLSProber', 'StreamProfile')
__all__ = ('Prober', 'StreamProfile')
logger = logging.getLogger(__name__)
class HLSProber:
class Prober:
def __init__(self) -> None:
self._profiles: Subject[StreamProfile] = Subject()

View File

@ -0,0 +1,79 @@
import logging
import os
from typing import Optional, Tuple, Union
from reactivex import Observable, Subject, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from blrec.hls.operators.segment_fetcher import InitSectionData, SegmentData
from .playlist_dumper import PlaylistDumper
__all__ = ('SegmentDumper',)
logger = logging.getLogger(__name__)
class SegmentDumper:
def __init__(self, playlist_dumper: PlaylistDumper) -> None:
self._playlist_dumper = playlist_dumper
self._out_dir: str = ''
def on_next(args: Tuple[str, int]) -> None:
path, timestamp = args
self._out_dir = os.path.dirname(path)
self._playlist_dumper.file_opened.subscribe(on_next)
self._file_opened: Subject[Tuple[str, int]] = Subject()
self._file_closed: Subject[str] = Subject()
@property
def file_opened(self) -> Observable[Tuple[str, int]]:
return self._file_opened
@property
def file_closed(self) -> Observable[str]:
return self._file_closed
def __call__(
self, source: Observable[Union[InitSectionData, SegmentData]]
) -> Observable[Union[InitSectionData, SegmentData]]:
return self._dump(source)
def _dump(
self, source: Observable[Union[InitSectionData, SegmentData]]
) -> Observable[Union[InitSectionData, SegmentData]]:
def subscribe(
observer: abc.ObserverBase[Union[InitSectionData, SegmentData]],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
subscription = SerialDisposable()
def on_next(item: Union[InitSectionData, SegmentData]) -> None:
if isinstance(item, InitSectionData):
uri = item.init_section.uri
path = os.path.join(self._out_dir, 'segments', uri)
else:
uri = item.segment.uri
name, ext = os.path.splitext(uri)
path = os.path.join(self._out_dir, 'segments', name[:-3], uri)
os.makedirs(os.path.dirname(path), exist_ok=True)
try:
with open(path, 'wb') as file:
file.write(item.payload)
except Exception as e:
logger.error(f'Failed to dump segmemt: {repr(e)}')
observer.on_error(e)
else:
observer.on_next(item)
def dispose() -> None:
pass
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler
)
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe)

View File

@ -12,7 +12,10 @@ from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_exponential
from ...bili.live import Live
from blrec.bili.live import Live
from blrec.utils.hash import cksum
from ..exceptions import SegmentDataCorrupted
__all__ = ('SegmentFetcher', 'InitSectionData', 'SegmentData')
@ -22,6 +25,7 @@ logger = logging.getLogger(__name__)
@attr.s(auto_attribs=True, slots=True, frozen=True)
class InitSectionData:
init_section: InitializationSection
payload: bytes
def __len__(self) -> int:
@ -30,6 +34,7 @@ class InitSectionData:
@attr.s(auto_attribs=True, slots=True, frozen=True)
class SegmentData:
segment: m3u8.Segment
payload: bytes
def __len__(self) -> int:
@ -56,30 +61,52 @@ class SegmentFetcher:
disposed = False
subscription = SerialDisposable()
init_section: Optional[InitializationSection] = None
last_segment: Optional[m3u8.Segment] = None
def on_next(seg: m3u8.Segment) -> None:
nonlocal init_section
nonlocal last_segment
url: str = ''
try:
if getattr(seg, 'init_section', None) and (
not init_section or seg.init_section.uri != init_section.uri
if hasattr(seg, 'init_section') and (
(
last_segment is None
or seg.init_section != last_segment.init_section
or seg.discontinuity
)
):
url = seg.init_section.absolute_uri
data = self._fetch_segment(url)
init_section = seg.init_section
observer.on_next(InitSectionData(payload=data))
observer.on_next(
InitSectionData(init_section=seg.init_section, payload=data)
)
last_segment = seg
url = seg.absolute_uri
data = self._fetch_segment(url)
observer.on_next(SegmentData(payload=data))
crc32 = seg.title.split('|')[-1]
for _ in range(3):
data = self._fetch_segment(url)
crc32_of_data = cksum(data)
if crc32_of_data == crc32:
break
logger.debug(
'Segment data corrupted: '
f'correct crc32: {crc32}, '
f'crc32 of segment data: {crc32_of_data}, '
f'segment url: {url}'
)
else:
raise SegmentDataCorrupted(crc32, crc32_of_data)
except Exception as e:
logger.warning(f'Failed to fetch segment {url}: {repr(e)}')
else:
observer.on_next(SegmentData(segment=seg, payload=data))
def dispose() -> None:
nonlocal disposed
nonlocal init_section
nonlocal last_segment
disposed = True
init_section = None
last_segment = None
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler

View File

@ -8,8 +8,9 @@ import urllib3
from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from ...bili.live import Live
from ...utils.io import wait_for
from blrec.bili.live import Live
from blrec.utils.io import wait_for
from ..stream_remuxer import StreamRemuxer
from .segment_fetcher import InitSectionData, SegmentData

View File

@ -1,16 +1,16 @@
import errno
import io
import logging
from contextlib import suppress
import os
import re
import shlex
from contextlib import suppress
from subprocess import PIPE, CalledProcessError, Popen
from threading import Condition, Thread
from typing import Optional, cast
from ..utils.io import wait_for
from ..utils.mixins import StoppableMixin, SupportDebugMixin
from blrec.utils.io import wait_for
from blrec.utils.mixins import StoppableMixin, SupportDebugMixin
logger = logging.getLogger(__name__)
@ -150,7 +150,7 @@ class StreamRemuxer(StoppableMixin, SupportDebugMixin):
self._check_error(line)
if not self._stopped and self._subprocess.returncode not in (0, 255):
# 255: Exiting normally, received signal 2.
# 255: Exiting standardly, received signal 2.
raise CalledProcessError(self._subprocess.returncode, cmd=cmd)
def _check_error(self, line: str) -> None:

View File

@ -1,20 +1,21 @@
from .helpers import (
file_exists,
cover_path,
create_file,
danmaku_path,
cover_path,
raw_danmaku_path,
extra_metadata_path,
escape_path,
extra_metadata_path,
file_exists,
raw_danmaku_path,
record_metadata_path,
)
__all__ = (
'file_exists',
'cover_path',
'create_file',
'danmaku_path',
'cover_path',
'raw_danmaku_path',
'extra_metadata_path',
'escape_path',
'extra_metadata_path',
'file_exists',
'raw_danmaku_path',
'record_metadata_path',
)

View File

@ -1,16 +1,17 @@
import re
import os
import re
from pathlib import PurePath
__all__ = (
'file_exists',
'cover_path',
'create_file',
'danmaku_path',
'cover_path',
'raw_danmaku_path',
'extra_metadata_path',
'escape_path',
'extra_metadata_path',
'ffmpeg_metadata_path',
'record_metadata_path',
'file_exists',
'raw_danmaku_path',
)
@ -40,5 +41,13 @@ def extra_metadata_path(video_path: str) -> str:
return video_path + '.meta.json'
def record_metadata_path(video_path: str) -> str:
return str(PurePath(video_path).with_suffix('.meta.json'))
def ffmpeg_metadata_path(video_path: str) -> str:
return video_path + '.meta'
def escape_path(path: str) -> str:
return re.sub(r'[\\/:*?"<>|]', '', path)

View File

@ -1,25 +1,38 @@
from __future__ import annotations
import asyncio
import json
import logging
from typing import Iterable, cast
from decimal import Decimal
from typing import Iterable, List, Tuple, cast
import aiofiles
import m3u8
from ..flv.helpers import make_comment_for_joinpoints
from ..flv.operators import JoinPoint
from .helpers import get_extra_metadata, get_metadata
from blrec.flv.helpers import make_comment_for_joinpoints
from blrec.flv.operators import JoinPoint
from blrec.flv.utils import format_timestamp
from blrec.path.helpers import ffmpeg_metadata_path
from .helpers import get_extra_metadata, get_metadata, get_record_metadata
logger = logging.getLogger(__name__)
async def make_metadata_file(flv_path: str) -> str:
path = flv_path + '.meta'
async with aiofiles.open(path, 'wb') as f:
content = await _make_metadata_content(flv_path)
await f.write(content.encode(encoding='utf8'))
async def make_metadata_file(video_path: str) -> str:
path = ffmpeg_metadata_path(video_path)
async with aiofiles.open(path, 'wb') as file:
if video_path.endswith('.flv'):
content = await _make_metadata_content_for_flv(video_path)
elif video_path.endswith('.m3u8'):
content = await _make_metadata_content_for_m3u8(video_path)
else:
raise NotImplementedError(video_path)
await file.write(content.encode(encoding='utf8'))
return path
async def _make_metadata_content(flv_path: str) -> str:
async def _make_metadata_content_for_flv(flv_path: str) -> str:
metadata = await get_metadata(flv_path)
try:
extra_metadata = await get_extra_metadata(flv_path)
@ -37,7 +50,7 @@ async def _make_metadata_content(flv_path: str) -> str:
cast(float, extra_metadata.get('duration') or metadata.get('duration'))
* 1000
)
chapters = _make_chapters(join_points, last_timestamp)
chapters = _make_chapters_for_flv(join_points, last_timestamp)
comment = '\\\n'.join(comment.splitlines())
@ -55,7 +68,9 @@ Comment={comment}
"""
def _make_chapters(join_points: Iterable[JoinPoint], last_timestamp: int) -> str:
def _make_chapters_for_flv(
join_points: Iterable[JoinPoint], last_timestamp: int
) -> str:
join_points = filter(lambda p: not p.seamless, join_points)
timestamps = list(map(lambda p: p.timestamp, join_points))
if not timestamps:
@ -78,3 +93,72 @@ END={end}
title=segment \\#{i}
"""
return result
async def _make_metadata_content_for_m3u8(playlist_path: str) -> str:
metadata = await get_record_metadata(playlist_path)
comment = cast(str, metadata.get('Comment', ''))
chapters = ''
timestamps, duration = await _get_discontinuities(playlist_path)
if timestamps:
comment += '\n\n' + _make_comment_for_discontinuities(timestamps)
chapters = _make_chapters_for_m3u8(timestamps, duration)
comment = '\\\n'.join(comment.splitlines())
# ref: https://ffmpeg.org/ffmpeg-formats.html#Metadata-1
return f"""\
;FFMETADATA1
Title={metadata['Title']}
Artist={metadata['Artist']}
Date={metadata['Date']}
# Description may be truncated!
Description={json.dumps(metadata['description'], ensure_ascii=False)}
Comment={comment}
{chapters}
"""
def _make_chapters_for_m3u8(timestamps: Iterable[int], duration: float) -> str:
timestamps = list(timestamps)
if not timestamps:
return ''
timestamps.insert(0, 0)
timestamps.append(int(duration * 1000))
result = ''
for i in range(1, len(timestamps)):
start = timestamps[i - 1]
end = timestamps[i]
if end < start:
logger.warning(f'Chapter end time {end} before start {start}')
end = start
result += f"""\
[CHAPTER]
TIMEBASE=1/1000
START={start}
END={end}
title=segment \\#{i}
"""
return result
def _make_comment_for_discontinuities(timestamps: Iterable[int]) -> str:
return 'HLS片段不连续位置\n' + '\n'.join(
('时间戳:{}'.format(format_timestamp(ts)) for ts in timestamps)
)
async def _get_discontinuities(playlist_path: str) -> Tuple[List[int], float]:
loop = asyncio.get_running_loop()
playlist = await loop.run_in_executor(None, m3u8.load, playlist_path)
duration = Decimal()
timestamps: List[int] = []
for seg in playlist.segments:
if seg.discontinuity:
timestamps.append(int(duration * 1000))
duration += Decimal(str(seg.duration))
return timestamps, float(duration)

View File

@ -1,31 +1,86 @@
import asyncio
import json
import logging
import os
import shutil
from pathlib import PurePath
from typing import Any, Dict, Iterable, Literal
from ..flv.helpers import get_metadata as _get_metadata
import aiofiles
from blrec.path.helpers import (
cover_path,
danmaku_path,
raw_danmaku_path,
record_metadata_path,
)
from ..flv.helpers import get_extra_metadata as _get_extra_metadata
from ..flv.helpers import get_metadata as _get_metadata
logger = logging.getLogger(__name__)
async def discard_files(
paths: Iterable[str],
log_level: Literal['INFO', 'DEBUG'] = 'INFO',
paths: Iterable[str], log_level: Literal['INFO', 'DEBUG'] = 'INFO'
) -> None:
for path in paths:
await discard_file(path, log_level)
async def discard_file(
path: str,
log_level: Literal['INFO', 'DEBUG'] = 'INFO',
) -> None:
async def discard_file(path: str, log_level: Literal['INFO', 'DEBUG'] = 'INFO') -> None:
from ..disk_space import delete_file
await delete_file(path, log_level)
async def discard_dir(path: str, log_level: Literal['INFO', 'DEBUG'] = 'INFO') -> None:
loop = asyncio.get_running_loop()
try:
await loop.run_in_executor(None, shutil.rmtree, path)
except Exception as e:
logger.error(f'Failed to delete {path!r}, due to: {repr(e)}')
else:
logger.log(logging.getLevelName(log_level), f'Deleted {path!r}')
async def copy_files_related(video_path: str) -> None:
loop = asyncio.get_running_loop()
dirname = os.path.dirname(video_path)
for src_path in [
danmaku_path(video_path),
raw_danmaku_path(video_path),
cover_path(video_path, ext='jpg'),
cover_path(video_path, ext='png'),
]:
if not os.path.isfile(src_path):
continue
root, ext = os.path.splitext(src_path)
dst_path = PurePath(dirname).with_suffix(ext)
try:
await loop.run_in_executor(None, shutil.copy, src_path, dst_path)
except Exception as e:
logger.error(f"Failed to copy '{src_path}' to '{dst_path}': {repr(e)}")
else:
logger.info(f"Copied '{src_path}' to '{dst_path}'")
async def get_metadata(flv_path: str) -> Dict[str, Any]:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _get_metadata, flv_path)
async def get_record_metadata(video_path: str) -> Dict[str, Any]:
if video_path.endswith('.m3u8'):
path = record_metadata_path(video_path)
else:
raise NotImplementedError(video_path)
async with aiofiles.open(path, 'rb') as file:
data = await file.read()
return json.loads(data)
async def get_extra_metadata(flv_path: str) -> Dict[str, Any]:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _get_extra_metadata, flv_path)

View File

@ -2,9 +2,10 @@ from __future__ import annotations
import asyncio
import logging
import os
from contextlib import suppress
from pathlib import PurePath
from typing import Any, Awaitable, Dict, Final, Iterator, List, Optional, Union
from typing import Any, Awaitable, Dict, Final, Iterator, List, Optional, Tuple, Union
from reactivex.scheduler import ThreadPoolScheduler
@ -19,7 +20,7 @@ from ..logging.room_id import aio_task_with_room_id
from ..path import extra_metadata_path
from ..utils.mixins import AsyncCooperationMixin, AsyncStoppableMixin, SupportDebugMixin
from .ffmpeg_metadata import make_metadata_file
from .helpers import discard_file, get_extra_metadata
from .helpers import copy_files_related, discard_dir, discard_file, get_extra_metadata
from .models import DeleteStrategy, PostprocessorStatus
from .remux import RemuxingProgress, RemuxingResult, remux_video
from .typing import Progress
@ -141,22 +142,44 @@ class Postprocessor(
async with self._worker_semaphore:
logger.debug(f'Postprocessing... {video_path}')
if not await self._is_vaild_flv_file(video_path):
logger.warning(f'The flv file may be invalid: {video_path}')
try:
if self.remux_to_mp4:
self._status = PostprocessorStatus.REMUXING
result_path = await self._remux_flv_to_mp4(video_path)
elif self.inject_extra_metadata:
self._status = PostprocessorStatus.INJECTING
result_path = await self._inject_extra_metadata(video_path)
if video_path.endswith('.flv'):
if not await self._is_vaild_flv_file(video_path):
logger.warning(f'The flv file may be invalid: {video_path}')
if self.remux_to_mp4:
self._status = PostprocessorStatus.REMUXING
(
result_path,
remuxing_result,
) = await self._remux_video_to_mp4(video_path)
if not self._debug:
await discard_file(
extra_metadata_path(video_path), 'DEBUG'
)
if self._should_delete_source_files(remuxing_result):
await discard_file(video_path)
elif self.inject_extra_metadata:
self._status = PostprocessorStatus.INJECTING
result_path = await self._inject_extra_metadata(video_path)
else:
result_path = video_path
elif video_path.endswith('.m3u8'):
if self.remux_to_mp4:
self._status = PostprocessorStatus.REMUXING
(
result_path,
remuxing_result,
) = await self._remux_video_to_mp4(video_path)
await copy_files_related(video_path)
if not self._debug:
if self._should_delete_source_files(remuxing_result):
await discard_dir(os.path.dirname(video_path))
else:
result_path = video_path
else:
result_path = video_path
if not self._debug:
await discard_file(extra_metadata_path(video_path), 'DEBUG')
self._completed_files.append(result_path)
await self._emit(
'video_postprocessing_completed', self, result_path
@ -191,33 +214,37 @@ class Postprocessor(
logger.info(f"Successfully injected metadata for '{path}'")
return path
async def _remux_flv_to_mp4(self, in_path: str) -> str:
out_path = str(PurePath(in_path).with_suffix('.mp4'))
logger.info(f"Remuxing '{in_path}' to '{out_path}' ...")
async def _remux_video_to_mp4(self, in_path: str) -> Tuple[str, RemuxingResult]:
if in_path.endswith('.flv'):
out_path = str(PurePath(in_path).with_suffix('.mp4'))
metadata_path = await make_metadata_file(in_path)
elif in_path.endswith('.m3u8'):
out_path = str(PurePath(in_path).parent.with_suffix('.mp4'))
metadata_path = await make_metadata_file(in_path)
else:
raise NotImplementedError(in_path)
metadata_path = await make_metadata_file(in_path)
logger.info(f"Remuxing '{in_path}' to '{out_path}' ...")
remux_result = await self._remux_video(in_path, out_path, metadata_path)
if remux_result.is_successful():
logger.info(f"Successfully remuxed '{in_path}' to '{out_path}'")
result_path = out_path
if remux_result.is_failed():
logger.error(f"Failed to remux '{in_path}' to '{out_path}'")
result_path = in_path
elif remux_result.is_warned():
logger.warning('Remuxing done, but ran into problems.')
result_path = out_path
elif remux_result.is_failed:
logger.error(f"Failed to remux '{in_path}' to '{out_path}'")
result_path = in_path
elif remux_result.is_successful():
logger.info(f"Successfully remuxed '{in_path}' to '{out_path}'")
result_path = out_path
else:
pass
logger.debug(f'ffmpeg output:\n{remux_result.output}')
if not self._debug:
if not self._debug and in_path.endswith('.flv'):
await discard_file(metadata_path, 'DEBUG')
if self._should_delete_source_files(remux_result):
await discard_file(in_path)
return result_path
return result_path, remux_result
def _analyse_metadata(self, path: str) -> Awaitable[None]:
future: asyncio.Future[None] = asyncio.Future()

View File

@ -19,8 +19,12 @@ class RemuxingProgress:
total: int
_ERROR_PATTERN = re.compile(r'\b(error|missing|invalid|corrupt)\b', re.IGNORECASE)
@attr.s(auto_attribs=True, slots=True, frozen=True)
class RemuxingResult:
return_code: int
output: str
@ -28,13 +32,20 @@ class RemuxingResult:
return self.return_code == 0
def is_successful(self) -> bool:
return self.is_done() and not self.may_timestamps_incorrect()
return (
self.is_done()
and not self.may_timestamps_incorrect()
and not self.has_errors()
)
def is_warned(self) -> bool:
return self.is_done() and self.may_timestamps_incorrect()
def is_failed(self) -> bool:
return not self.is_done()
return not self.is_done() or self.has_errors()
def has_errors(self) -> bool:
return _ERROR_PATTERN.search(self.output) is not None
def may_timestamps_incorrect(self) -> bool:
return 'Non-monotonous DTS in output stream' in self.output
@ -49,8 +60,19 @@ def remux_video(
remove_filler_data: bool = False,
) -> Observable[Union[RemuxingProgress, RemuxingResult]]:
SIZE_PATTERN: Final = re.compile(r'size=\s*(?P<number>\d+)(?P<unit>[a-zA-Z]?B)')
filesize = os.path.getsize(in_path)
filename = os.path.basename(in_path)
if in_path.endswith('.m3u8'):
total = 0
for root, dirs, files in os.walk(os.path.dirname(in_path)):
for filename in files:
if not (filename.endswith('.m4s') or filename.endswith('.ts')):
continue
total += os.path.getsize(os.path.join(root, filename))
postfix = os.path.join(
os.path.basename(os.path.dirname(in_path)), os.path.basename(in_path)
)
else:
total = os.path.getsize(in_path)
postfix = os.path.basename(in_path)
def parse_size(line: str) -> int:
match = SIZE_PATTERN.search(line)
@ -75,7 +97,11 @@ def remux_video(
def should_output_line(line: str) -> bool:
line = line.strip()
return not (line.startswith('frame=') or line.startswith('Press [q]'))
return not (
line.startswith('frame=')
or line.startswith('Press [q]')
or (line.startswith('[hls') and 'Opening' in line and 'for reading' in line)
)
def subscribe(
observer: abc.ObserverBase[Union[RemuxingProgress, RemuxingResult]],
@ -92,11 +118,11 @@ def remux_video(
with tqdm(
desc='Remuxing',
total=filesize,
total=total,
unit='B',
unit_scale=True,
unit_divisor=1024,
postfix=filename,
postfix=postfix,
disable=not show_progress,
) as pbar:
cmd = f'ffmpeg -i "{in_path}"'
@ -130,15 +156,15 @@ def remux_video(
if line.startswith('frame='):
size = parse_size(line)
pbar.update(size - pbar.n)
progress = RemuxingProgress(size, filesize)
progress = RemuxingProgress(size, total)
observer.on_next(progress)
if should_output_line(line):
out_lines.append(line)
if not disposed and process.returncode == 0:
pbar.update(filesize)
progress = RemuxingProgress(filesize, filesize)
pbar.update(total)
progress = RemuxingProgress(total, total)
observer.on_next(progress)
except Exception as e:
observer.on_error(e)

View File

@ -11,15 +11,17 @@ from pydantic import BaseSettings, Field, PrivateAttr, validator
from pydantic.networks import EmailStr, HttpUrl
from typing_extensions import Annotated
from ..bili.typing import QualityNumber, StreamFormat
from ..core.cover_downloader import CoverSaveStrategy
from ..logging.typing import LOG_LEVEL
from ..postprocess import DeleteStrategy
from ..utils.string import camel_case
from blrec.bili.typing import QualityNumber, StreamFormat
from blrec.core.cover_downloader import CoverSaveStrategy
from blrec.logging.typing import LOG_LEVEL
from blrec.postprocess import DeleteStrategy
from blrec.utils.string import camel_case
from .typing import (
EmailMessageType,
PushdeerMessageType,
PushplusMessageType,
RecordingMode,
ServerchanMessageType,
TelegramMessageType,
)
@ -142,6 +144,7 @@ class DanmakuSettings(DanmakuOptions):
class RecorderOptions(BaseModel):
stream_format: Optional[StreamFormat]
recording_mode: Optional[RecordingMode]
quality_number: Optional[QualityNumber]
fmp4_stream_timeout: Optional[int]
read_timeout: Optional[int] # seconds
@ -176,6 +179,7 @@ class RecorderOptions(BaseModel):
class RecorderSettings(RecorderOptions):
stream_format: StreamFormat = 'flv'
recording_mode: RecordingMode = 'standard'
quality_number: QualityNumber = 20000 # 4K, the highest quality.
fmp4_stream_timeout: int = 10
read_timeout: int = 3

View File

@ -1,5 +1,7 @@
from typing import AbstractSet, Literal, Union
RecordingMode = Literal['standard', 'raw']
TextMessageType = Literal['text']
HtmlMessageType = Literal['html']
MarkdownMessageType = Literal['markdown']

View File

@ -1,14 +1,16 @@
from __future__ import annotations
from enum import Enum
from typing import Optional
import attr
from ..bili.models import RoomInfo, UserInfo
from ..bili.typing import StreamFormat, QualityNumber
from ..core.cover_downloader import CoverSaveStrategy
from ..postprocess import DeleteStrategy, PostprocessorStatus
from ..postprocess.typing import Progress
from blrec.bili.models import RoomInfo, UserInfo
from blrec.bili.typing import QualityNumber, StreamFormat
from blrec.core.cover_downloader import CoverSaveStrategy
from blrec.postprocess import DeleteStrategy, PostprocessorStatus
from blrec.postprocess.typing import Progress
from blrec.setting.typing import RecordingMode
class RunningStatus(str, Enum):
@ -60,6 +62,7 @@ class TaskParam:
save_raw_danmaku: bool
# RecorderSettings
stream_format: StreamFormat
recording_mode: RecordingMode
quality_number: QualityNumber
fmp4_stream_timeout: int
read_timeout: int

View File

@ -3,23 +3,25 @@ import os
from pathlib import Path
from typing import Iterator, Optional
from ..bili.danmaku_client import DanmakuClient
from ..bili.live import Live
from ..bili.live_monitor import LiveMonitor
from ..bili.models import RoomInfo, UserInfo
from ..bili.typing import QualityNumber, StreamFormat
from ..core import Recorder
from ..core.cover_downloader import CoverSaveStrategy
from ..event.event_submitters import (
from blrec.bili.danmaku_client import DanmakuClient
from blrec.bili.live import Live
from blrec.bili.live_monitor import LiveMonitor
from blrec.bili.models import RoomInfo, UserInfo
from blrec.bili.typing import QualityNumber, StreamFormat
from blrec.core import Recorder
from blrec.core.cover_downloader import CoverSaveStrategy
from blrec.event.event_submitters import (
LiveEventSubmitter,
PostprocessorEventSubmitter,
RecorderEventSubmitter,
)
from ..flv.metadata_injection import InjectingProgress
from ..flv.operators import MetaData, StreamProfile
from ..logging.room_id import aio_task_with_room_id
from ..postprocess import DeleteStrategy, Postprocessor, PostprocessorStatus
from ..postprocess.remux import RemuxingProgress
from blrec.flv.metadata_injection import InjectingProgress
from blrec.flv.operators import MetaData, StreamProfile
from blrec.logging.room_id import aio_task_with_room_id
from blrec.postprocess import DeleteStrategy, Postprocessor, PostprocessorStatus
from blrec.postprocess.remux import RemuxingProgress
from blrec.setting.typing import RecordingMode
from .models import (
DanmakuFileDetail,
DanmukuFileStatus,
@ -276,6 +278,14 @@ class RecordTask:
def stream_format(self, value: StreamFormat) -> None:
self._recorder.stream_format = value
@property
def recording_mode(self) -> RecordingMode:
return self._recorder.recording_mode
@recording_mode.setter
def recording_mode(self, value: RecordingMode) -> None:
self._recorder.recording_mode = value
@property
def quality_number(self) -> QualityNumber:
return self._recorder.quality_number

View File

@ -263,6 +263,7 @@ class RecordTaskManager:
) -> None:
task = self._get_task(room_id)
task.stream_format = settings.stream_format
task.recording_mode = settings.recording_mode
task.quality_number = settings.quality_number
task.fmp4_stream_timeout = settings.fmp4_stream_timeout
task.read_timeout = settings.read_timeout
@ -306,6 +307,7 @@ class RecordTaskManager:
cover_save_strategy=task.cover_save_strategy,
save_raw_danmaku=task.save_raw_danmaku,
stream_format=task.stream_format,
recording_mode=task.recording_mode,
quality_number=task.quality_number,
fmp4_stream_timeout=task.fmp4_stream_timeout,
read_timeout=task.read_timeout,

View File

@ -5,7 +5,7 @@ from subprocess import PIPE, Popen
from typing import Any, Dict, Optional
from reactivex import Observable, abc
from reactivex.scheduler import NewThreadScheduler
from reactivex.scheduler import CurrentThreadScheduler
__all__ = ('ffprobe', 'StreamProfile')
@ -17,7 +17,7 @@ def ffprobe(data: bytes) -> Observable[StreamProfile]:
observer: abc.ObserverBase[StreamProfile],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
_scheduler = scheduler or NewThreadScheduler()
_scheduler = scheduler or CurrentThreadScheduler()
def action(scheduler: abc.SchedulerBase, state: Optional[Any] = None) -> None:
args = [

View File

@ -24,7 +24,7 @@
class="setting-label"
nzNoColon
nzTooltipTitle="调用 ffmpeg 进行转换,需要安装 ffmpeg 。"
>flv 转封装为 mp4</nz-form-label
>转封装为 mp4</nz-form-label
>
<nz-form-control
class="setting-control switch"

View File

@ -38,7 +38,10 @@
</nz-select>
</nz-form-control>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-item
class="setting-item"
*ngIf="streamFormatControl.value === 'fmp4'"
>
<nz-form-label
class="setting-label"
nzNoColon
@ -71,6 +74,38 @@
</nz-select>
</nz-form-control>
</nz-form-item>
<nz-form-item
class="setting-item"
*ngIf="streamFormatControl.value === 'fmp4'"
>
<nz-form-label
class="setting-label"
nzNoColon
[nzTooltipTitle]="recordingModeTip"
>录制模式</nz-form-label
>
<ng-template #recordingModeTip>
<p>
标准模式: 对下载的流数据进行解析处理,支持自动分割文件等功能。
<br />
原始模式: 直接下载流数据,没有进行解析处理,不支持自动分割文件等功能。
<br />
</p>
</ng-template>
<nz-form-control
class="setting-control select"
[nzWarningTip]="syncFailedWarningTip"
[nzValidateStatus]="
syncStatus.recordingMode ? recordingModeControl : 'warning'
"
>
<nz-select
formControlName="recordingMode"
[nzOptions]="recordingModeOptions"
>
</nz-select>
</nz-form-control>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-label
class="setting-label"
@ -139,7 +174,10 @@
</nz-radio-group>
</nz-form-control>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-item
class="setting-item"
*ngIf="streamFormatControl.value === 'flv'"
>
<nz-form-label
class="setting-label"
nzNoColon
@ -184,7 +222,14 @@
</nz-select>
</nz-form-control>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-item
class="setting-item"
*ngIf="
streamFormatControl.value === 'flv' ||
(streamFormatControl.value === 'fmp4' &&
recordingModeControl.value === 'standard')
"
>
<nz-form-label
class="setting-label"
nzNoColon

View File

@ -16,6 +16,7 @@ import type { Mutable } from '../../shared/utility-types';
import {
BUFFER_OPTIONS,
STREAM_FORMAT_OPTIONS,
RECORDING_MODE_OPTIONS,
QUALITY_OPTIONS,
TIMEOUT_OPTIONS,
DISCONNECTION_TIMEOUT_OPTIONS,
@ -44,6 +45,9 @@ export class RecorderSettingsComponent implements OnInit, OnChanges {
readonly streamFormatOptions = cloneDeep(STREAM_FORMAT_OPTIONS) as Mutable<
typeof STREAM_FORMAT_OPTIONS
>;
readonly recordingModeOptions = cloneDeep(RECORDING_MODE_OPTIONS) as Mutable<
typeof RECORDING_MODE_OPTIONS
>;
readonly fmp4StreamTimeoutOptions = cloneDeep(TIMEOUT_OPTIONS) as Mutable<
typeof TIMEOUT_OPTIONS
>;
@ -70,6 +74,7 @@ export class RecorderSettingsComponent implements OnInit, OnChanges {
) {
this.settingsForm = formBuilder.group({
streamFormat: [''],
recordingMode: [''],
qualityNumber: [''],
fmp4StreamTimeout: [''],
readTimeout: [''],
@ -84,6 +89,10 @@ export class RecorderSettingsComponent implements OnInit, OnChanges {
return this.settingsForm.get('streamFormat') as FormControl;
}
get recordingModeControl() {
return this.settingsForm.get('recordingMode') as FormControl;
}
get qualityNumberControl() {
return this.settingsForm.get('qualityNumber') as FormControl;
}

View File

@ -63,6 +63,11 @@ export const STREAM_FORMAT_OPTIONS = [
{ label: 'HLS (fmp4)', value: 'fmp4' },
] as const;
export const RECORDING_MODE_OPTIONS = [
{ label: '标准', value: 'standard' },
{ label: '原始', value: 'raw' },
] as const;
export const QUALITY_OPTIONS = [
{ label: '4K', value: 20000 },
{ label: '原画', value: 10000 },

View File

@ -19,6 +19,7 @@ export interface DanmakuSettings {
export type DanmakuOptions = Nullable<DanmakuSettings>;
export type StreamFormat = 'flv' | 'ts' | 'fmp4';
export type RecordingMode = 'standard' | 'raw';
export type QualityNumber =
| 20000 // 4K
@ -36,6 +37,7 @@ export enum CoverSaveStrategy {
export interface RecorderSettings {
streamFormat: StreamFormat;
recordingMode: RecordingMode;
qualityNumber: QualityNumber;
fmp4StreamTimeout: number;
readTimeout: number;

View File

@ -285,6 +285,16 @@ export class TaskManagerService {
);
}
canCutStream(roomId: number) {
return this.taskService.canCutStream(roomId).pipe(
tap((ableToCutStream) => {
if (!ableToCutStream) {
this.message.warning(`[${roomId}] 不支持文件切割~`);
}
})
);
}
cutStream(roomId: number) {
return this.taskService.cutStream(roomId).pipe(
tap(

View File

@ -2,6 +2,7 @@ import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
import { environment } from 'src/environments/environment';
import { ResponseMessage } from '../../../shared/api.models';
@ -163,6 +164,13 @@ export class TaskService {
return this.http.post<ResponseMessage>(url, { force, background });
}
canCutStream(roomId: number) {
const url = apiUrl + `/api/v1/tasks/${roomId}/cut`;
return this.http
.get<{ data: { result: boolean } }>(url)
.pipe(map((response) => response.data.result));
}
cutStream(roomId: number) {
const url = apiUrl + `/api/v1/tasks/${roomId}/cut`;
return this.http.post<null>(url, null);

View File

@ -3,6 +3,7 @@ import {
DeleteStrategy,
StreamFormat,
QualityNumber,
RecordingMode,
} from '../../settings/shared/setting.model';
export interface TaskData {
@ -116,6 +117,7 @@ export interface TaskParam {
readonly save_raw_danmaku: boolean;
readonly stream_format: StreamFormat;
readonly recording_mode: RecordingMode;
readonly quality_number: QualityNumber;
readonly read_timeout: number;
readonly disconnection_timeout: number;

View File

@ -226,7 +226,13 @@ export class TaskItemComponent implements OnChanges, OnDestroy {
cutStream(): void {
if (this.data.task_status.running_status === RunningStatus.RECORDING) {
this.taskManager.cutStream(this.roomId).subscribe();
this.taskManager
.canCutStream(this.roomId)
.subscribe((ableToCutStream) => {
if (ableToCutStream) {
this.taskManager.cutStream(this.roomId).subscribe();
}
});
}
}
}

View File

@ -52,7 +52,17 @@
>覆盖全局设置</label
>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-item
class="setting-item"
*ngIf="
(options.recorder.streamFormat || model.recorder.streamFormat) ===
'flv' ||
((options.recorder.streamFormat || model.recorder.streamFormat) ===
'fmp4' &&
(options.recorder.recordingMode ||
model.recorder.recordingMode) === 'standard')
"
>
<nz-form-label
class="setting-label"
nzNoColon
@ -79,7 +89,17 @@
>覆盖全局设置</label
>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-item
class="setting-item"
*ngIf="
(options.recorder.streamFormat || model.recorder.streamFormat) ===
'flv' ||
((options.recorder.streamFormat || model.recorder.streamFormat) ===
'fmp4' &&
(options.recorder.recordingMode ||
model.recorder.recordingMode) === 'standard')
"
>
<nz-form-label
class="setting-label"
nzNoColon
@ -155,7 +175,13 @@
>覆盖全局设置</label
>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-item
class="setting-item"
*ngIf="
(options.recorder.streamFormat || model.recorder.streamFormat) ===
'fmp4'
"
>
<nz-form-label
class="setting-label"
nzNoColon
@ -195,6 +221,48 @@
>覆盖全局设置</label
>
</nz-form-item>
<nz-form-item
class="setting-item"
*ngIf="
(options.recorder.streamFormat || model.recorder.streamFormat) ===
'fmp4'
"
>
<nz-form-label
class="setting-label"
nzNoColon
[nzTooltipTitle]="recordingModeTip"
>录制模式</nz-form-label
>
<ng-template #recordingModeTip>
<p>
标准模式: 对下载的流数据进行解析处理,支持自动分割文件等功能。
<br />
原始模式:
直接下载流数据,没有进行解析处理,不支持自动分割文件等功能。
<br />
</p>
</ng-template>
<nz-form-control class="setting-control select">
<nz-select
name="recordingMode"
[(ngModel)]="model.recorder.recordingMode"
[disabled]="options.recorder.recordingMode === null"
[nzOptions]="recordingModeOptions"
>
</nz-select>
</nz-form-control>
<label
nz-checkbox
[nzChecked]="options.recorder.recordingMode !== null"
(nzCheckedChange)="
options.recorder.recordingMode = $event
? globalSettings.recorder.recordingMode
: null
"
>覆盖全局设置</label
>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-label
class="setting-label"
@ -286,7 +354,13 @@
>覆盖全局设置</label
>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-item
class="setting-item"
*ngIf="
(options.recorder.streamFormat || model.recorder.streamFormat) ===
'flv'
"
>
<nz-form-label
class="setting-label"
nzNoColon
@ -346,7 +420,17 @@
>覆盖全局设置</label
>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-item
class="setting-item"
*ngIf="
(options.recorder.streamFormat || model.recorder.streamFormat) ===
'flv' ||
((options.recorder.streamFormat || model.recorder.streamFormat) ===
'fmp4' &&
(options.recorder.recordingMode ||
model.recorder.recordingMode) === 'standard')
"
>
<nz-form-label
class="setting-label"
nzNoColon
@ -544,7 +628,17 @@
<div ngModelGroup="postprocessing" class="form-group postprocessing">
<h2>文件处理</h2>
<nz-form-item class="setting-item">
<nz-form-item
class="setting-item"
*ngIf="
(options.recorder.streamFormat || model.recorder.streamFormat) ===
'flv' ||
((options.recorder.streamFormat || model.recorder.streamFormat) ===
'fmp4' &&
(options.recorder.recordingMode ||
model.recorder.recordingMode) === 'standard')
"
>
<nz-form-label
class="setting-label"
nzNoColon
@ -577,7 +671,7 @@
class="setting-label"
nzNoColon
nzTooltipTitle="调用 ffmpeg 进行转换,需要安装 ffmpeg 。"
>flv 转封装为 mp4</nz-form-label
>转封装为 mp4</nz-form-label
>
<nz-form-control class="setting-control switch">
<nz-switch

View File

@ -31,6 +31,7 @@ import {
DELETE_STRATEGIES,
SPLIT_FILE_TIP,
COVER_SAVE_STRATEGIES,
RECORDING_MODE_OPTIONS,
} from '../../settings/shared/constants/form';
type OptionsModel = NonNullable<TaskOptions>;
@ -68,6 +69,9 @@ export class TaskSettingsDialogComponent implements OnChanges {
readonly streamFormatOptions = cloneDeep(STREAM_FORMAT_OPTIONS) as Mutable<
typeof STREAM_FORMAT_OPTIONS
>;
readonly recordingModeOptions = cloneDeep(RECORDING_MODE_OPTIONS) as Mutable<
typeof RECORDING_MODE_OPTIONS
>;
readonly fmp4StreamTimeoutOptions = cloneDeep(TIMEOUT_OPTIONS) as Mutable<
typeof TIMEOUT_OPTIONS
>;