release: 1.7.0

close #42
This commit is contained in:
acgnhik 2022-05-06 13:44:44 +08:00
parent d5d9ece36a
commit 5ebeb38b69
45 changed files with 1037 additions and 505 deletions

View File

@ -1,5 +1,6 @@
[flake8]
ignore = D203, W504
max-line-length = 88
ignore = D203, W504, W503
exclude =
__*,
.*,

View File

@ -1,5 +1,23 @@
# 更新日志
## 1.7.0
- 添加封面保存策略
- 添加 Telegram bot 通知
- 添加 PushDeer 通知
- 废弃录制 HLS(ts) 流
- 在设定时间内没有 fmp4 流自动切换录制 flv 流
### P.S.
录制 fmp4 流基本没什么问题了
录制 fmp4 流基本不受网络波动影响,大概是不会录制到二压画质的。
人气比较高会被二压的直播间大都是有 fmp4 流的。
WEB 端直播播放器是 `Hls7Player` 的直播间支持录制 fmp4 流, `fMp4Player` 则不支持。
## 1.6.2
- 忽略 Windows 注册表 JavaScript MIME 设置 (issue #12, 27)

View File

@ -3,4 +3,14 @@ requires = [
"setuptools >= 57.0.0, < 58.0.0",
"wheel >= 0.37, < 0.38.0",
]
build-backend = "setuptools.build_meta"
build-backend = "setuptools.build_meta"
[tool.black]
line-length = 88
target-version = ['py38']
include = '\.py$'
skip-string-normalization = true
skip-magic-trailing-comma = true
[tool.isort]
profile = 'black'

View File

@ -36,10 +36,11 @@ include_package_data = True
python_requires = >= 3.8
install_requires =
typing-extensions >= 3.10.0.0
ordered-set >= 4.1.0, < 5.0.0
fastapi >= 0.70.0, < 0.71.0
email_validator >= 1.1.3, < 2.0.0
click < 8.1.0
typer >= 0.4.0, < 0.5.0
typer >= 0.4.1, < 0.5.0
aiohttp >= 3.8.1, < 4.0.0
requests >= 2.24.0, < 3.0.0
aiofiles >= 0.8.0, < 0.9.0
@ -61,7 +62,9 @@ install_requires =
[options.extras_require]
dev =
flake8 >= 4.0.1
mypy >= 0.910
mypy == 0.910 # https://github.com/samuelcolvin/pydantic/issues/3528
isort >= 5.10.1
black >= 22.3.0
setuptools >= 59.4.0
wheel >= 0.37

View File

@ -1,4 +1,4 @@
__prog__ = 'blrec'
__version__ = '1.6.2'
__version__ = '1.7.0'
__github__ = 'https://github.com/acgnhiki/blrec'

View File

@ -0,0 +1,97 @@
import asyncio
import logging
from enum import Enum
from typing import Set
import aiofiles
import aiohttp
from tenacity import retry, stop_after_attempt, wait_fixed
from ..bili.live import Live
from ..exception import exception_callback
from ..logging.room_id import aio_task_with_room_id
from ..path import cover_path
from ..utils.hash import sha1sum
from ..utils.mixins import SwitchableMixin
from .stream_recorder import StreamRecorder, StreamRecorderEventListener
__all__ = ('CoverDownloader',)
logger = logging.getLogger(__name__)
class CoverSaveStrategy(Enum):
DEFAULT = 'default'
DEDUP = 'dedup'
def __str__(self) -> str:
return self.value
# workaround for value serialization
def __repr__(self) -> str:
return str(self)
class CoverDownloader(StreamRecorderEventListener, SwitchableMixin):
def __init__(
self,
live: Live,
stream_recorder: StreamRecorder,
*,
save_cover: bool = False,
cover_save_strategy: CoverSaveStrategy = CoverSaveStrategy.DEFAULT,
) -> None:
super().__init__()
self._live = live
self._stream_recorder = stream_recorder
self._lock: asyncio.Lock = asyncio.Lock()
self._sha1_set: Set[str] = set()
self.save_cover = save_cover
self.cover_save_strategy = cover_save_strategy
def _do_enable(self) -> None:
self._sha1_set.clear()
self._stream_recorder.add_listener(self)
logger.debug('Enabled cover downloader')
def _do_disable(self) -> None:
self._stream_recorder.remove_listener(self)
logger.debug('Disabled cover downloader')
async def on_video_file_completed(self, video_path: str) -> None:
async with self._lock:
if not self.save_cover:
return
task = asyncio.create_task(self._save_cover(video_path))
task.add_done_callback(exception_callback)
@aio_task_with_room_id
async def _save_cover(self, video_path: str) -> None:
try:
await self._live.update_info()
cover_url = self._live.room_info.cover
data = await self._fetch_cover(cover_url)
sha1 = sha1sum(data)
if (
self.cover_save_strategy == CoverSaveStrategy.DEDUP
and sha1 in self._sha1_set
):
return
path = cover_path(video_path, ext=cover_url.rsplit('.', 1)[-1])
await self._save_file(path, data)
self._sha1_set.add(sha1)
except Exception as e:
logger.error(f'Failed to save cover image: {repr(e)}')
else:
logger.info(f'Saved cover image: {path}')
@retry(reraise=True, wait=wait_fixed(1), stop=stop_after_attempt(3))
async def _fetch_cover(self, url: str) -> bytes:
async with aiohttp.ClientSession(raise_for_status=True) as session:
async with session.get(url) as response:
return await response.read()
async def _save_file(self, path: str, data: bytes) -> None:
async with aiofiles.open(path, 'wb') as file:
await file.write(data)

View File

@ -12,9 +12,7 @@ from tenacity import (
from .. import __version__, __prog__, __github__
from .danmaku_receiver import DanmakuReceiver, DanmuMsg
from .base_stream_recorder import (
BaseStreamRecorder, StreamRecorderEventListener
)
from .stream_recorder import StreamRecorder, StreamRecorderEventListener
from .statistics import StatisticsCalculator
from ..bili.live import Live
from ..exception import exception_callback, submit_exception
@ -51,7 +49,7 @@ class DanmakuDumper(
def __init__(
self,
live: Live,
stream_recorder: BaseStreamRecorder,
stream_recorder: StreamRecorder,
danmaku_receiver: DanmakuReceiver,
*,
danmu_uname: bool = False,
@ -72,6 +70,7 @@ class DanmakuDumper(
self.record_guard_buy = record_guard_buy
self.record_super_chat = record_super_chat
self._lock: asyncio.Lock = asyncio.Lock()
self._path: Optional[str] = None
self._files: List[str] = []
self._calculator = StatisticsCalculator(interval=60)
@ -92,14 +91,6 @@ class DanmakuDumper(
def dumping_path(self) -> Optional[str]:
return self._path
def change_stream_recorder(
self, stream_recorder: BaseStreamRecorder
) -> None:
self._stream_recorder.remove_listener(self)
self._stream_recorder = stream_recorder
self._stream_recorder.add_listener(self)
logger.debug('Changed stream recorder')
def _do_enable(self) -> None:
self._stream_recorder.add_listener(self)
logger.debug('Enabled danmaku dumper')
@ -124,14 +115,16 @@ class DanmakuDumper(
async def on_video_file_created(
self, video_path: str, record_start_time: int
) -> None:
self._path = danmaku_path(video_path)
self._record_start_time = record_start_time
self._files.append(self._path)
self._start_dumping()
async with self._lock:
self._path = danmaku_path(video_path)
self._record_start_time = record_start_time
self._files.append(self._path)
self._start_dumping()
async def on_video_file_completed(self, video_path: str) -> None:
await self._stop_dumping()
self._path = None
async with self._lock:
await self._stop_dumping()
self._path = None
def _start_dumping(self) -> None:
self._create_dump_task()

View File

@ -1,3 +0,0 @@
class FailedToFetchSegments(Exception):
pass

View File

@ -1,51 +1,33 @@
import io
import errno
import logging
from typing import Optional
from urllib.parse import urlparse
from typing import Optional
import urllib3
import requests
import urllib3
from tenacity import TryAgain
from tqdm import tqdm
from tenacity import (
retry_if_result,
retry_if_not_exception_type,
Retrying,
TryAgain,
)
from .stream_analyzer import StreamProfile
from .base_stream_recorder import BaseStreamRecorder, StreamProxy
from .retry import wait_exponential_for_same_exceptions, before_sleep_log
from ..bili.live import Live
from ..bili.typing import StreamFormat, QualityNumber
from ..flv.stream_processor import StreamProcessor
from ..utils.mixins import AsyncCooperationMixin, AsyncStoppableMixin
from ..bili.typing import QualityNumber
from ..flv.exceptions import FlvDataError, FlvStreamCorruptedError
from ..bili.exceptions import (
LiveRoomHidden, LiveRoomLocked, LiveRoomEncrypted, NoStreamAvailable,
)
from ..flv.stream_processor import StreamProcessor
from .stream_analyzer import StreamProfile
from .stream_recorder_impl import StreamProxy, StreamRecorderImpl
__all__ = 'FLVStreamRecorder',
__all__ = 'FLVStreamRecorderImpl',
logger = logging.getLogger(__name__)
class FLVStreamRecorder(
BaseStreamRecorder,
AsyncCooperationMixin,
AsyncStoppableMixin,
):
class FLVStreamRecorderImpl(StreamRecorderImpl):
def __init__(
self,
live: Live,
out_dir: str,
path_template: str,
*,
stream_format: StreamFormat = 'flv',
quality_number: QualityNumber = 10000,
buffer_size: Optional[int] = None,
read_timeout: Optional[int] = None,
@ -57,7 +39,7 @@ class FLVStreamRecorder(
live=live,
out_dir=out_dir,
path_template=path_template,
stream_format=stream_format,
stream_format='flv',
quality_number=quality_number,
buffer_size=buffer_size,
read_timeout=read_timeout,
@ -73,6 +55,7 @@ class FLVStreamRecorder(
desc='Recording',
unit='B',
unit_scale=True,
unit_divisor=1024,
postfix=self._make_pbar_postfix(),
) as progress_bar:
self._progress_bar = progress_bar
@ -116,43 +99,6 @@ class FLVStreamRecorder(
self._emit_event('stream_recording_stopped')
logger.debug('Stream recorder thread stopped')
def _main_loop(self) -> None:
for attempt in Retrying(
reraise=True,
retry=(
retry_if_result(lambda r: not self._stopped) |
retry_if_not_exception_type((OSError, NotImplementedError))
),
wait=wait_exponential_for_same_exceptions(max=60),
before_sleep=before_sleep_log(logger, logging.DEBUG, 'main_loop'),
):
with attempt:
try:
self._streaming_loop()
except NoStreamAvailable as e:
logger.warning(f'No stream available: {repr(e)}')
if not self._stopped:
raise TryAgain
except OSError as e:
logger.critical(repr(e), exc_info=e)
if e.errno == errno.ENOSPC:
# OSError(28, 'No space left on device')
self._handle_exception(e)
self._stopped = True
raise TryAgain
except LiveRoomHidden:
logger.error('The live room has been hidden!')
self._stopped = True
except LiveRoomLocked:
logger.error('The live room has been locked!')
self._stopped = True
except LiveRoomEncrypted:
logger.error('The live room has been encrypted!')
self._stopped = True
except Exception as e:
logger.exception(e)
self._handle_exception(e)
def _streaming_loop(self) -> None:
url = self._get_live_stream_url()

View File

@ -1,68 +1,57 @@
import io
import time
import errno
import logging
from queue import Queue, Empty
from threading import Thread, Event, Lock, Condition
from datetime import datetime
import time
from contextlib import suppress
from datetime import datetime
from queue import Empty, Queue
from threading import Condition, Event, Lock, Thread
from typing import Final, List, Optional, Set
from urllib.parse import urlparse
from typing import List, Set, Optional
import urllib3
import requests
import m3u8
import requests
import urllib3
from m3u8.model import Segment
from tqdm import tqdm
from ordered_set import OrderedSet
from tenacity import (
retry,
wait_exponential,
stop_after_delay,
retry_if_result,
retry_if_exception_type,
retry_if_not_exception_type,
RetryError,
Retrying,
TryAgain,
RetryError,
retry,
retry_if_exception_type,
retry_if_not_exception_type,
retry_if_result,
stop_after_delay,
wait_exponential,
)
from tqdm import tqdm
from .stream_remuxer import StreamRemuxer
from .stream_analyzer import ffprobe, StreamProfile
from .base_stream_recorder import BaseStreamRecorder, StreamProxy
from .exceptions import FailedToFetchSegments
from .retry import wait_exponential_for_same_exceptions, before_sleep_log
from ..bili.live import Live
from ..bili.typing import StreamFormat, QualityNumber
from ..flv.stream_processor import StreamProcessor
from ..bili.typing import QualityNumber
from ..flv.exceptions import FlvDataError, FlvStreamCorruptedError
from ..utils.mixins import (
AsyncCooperationMixin, AsyncStoppableMixin, SupportDebugMixin
)
from ..bili.exceptions import (
LiveRoomHidden, LiveRoomLocked, LiveRoomEncrypted, NoStreamAvailable,
)
from ..flv.stream_processor import StreamProcessor
from ..utils.mixins import SupportDebugMixin
from .stream_analyzer import StreamProfile, ffprobe
from .stream_recorder_impl import StreamProxy, StreamRecorderImpl
from .stream_remuxer import StreamRemuxer
__all__ = 'HLSStreamRecorder',
__all__ = 'HLSStreamRecorderImpl',
logger = logging.getLogger(__name__)
class HLSStreamRecorder(
BaseStreamRecorder,
AsyncCooperationMixin,
AsyncStoppableMixin,
SupportDebugMixin,
):
class FailedToFetchSegments(Exception):
pass
class HLSStreamRecorderImpl(StreamRecorderImpl, SupportDebugMixin):
def __init__(
self,
live: Live,
out_dir: str,
path_template: str,
*,
stream_format: StreamFormat = 'flv',
quality_number: QualityNumber = 10000,
buffer_size: Optional[int] = None,
read_timeout: Optional[int] = None,
@ -74,7 +63,7 @@ class HLSStreamRecorder(
live=live,
out_dir=out_dir,
path_template=path_template,
stream_format=stream_format,
stream_format='fmp4',
quality_number=quality_number,
buffer_size=buffer_size,
read_timeout=read_timeout,
@ -87,7 +76,8 @@ class HLSStreamRecorder(
self._ready_to_fetch_segments = Condition()
self._failed_to_fetch_segments = Event()
self._stream_analysed_lock = Lock()
self._last_segment_uris: Set[str] = set()
self._last_seg_uris: OrderedSet[str] = OrderedSet()
self._MAX_LAST_SEG_URIS: Final[int] = 30
def _run(self) -> None:
logger.debug('Stream recorder thread started')
@ -103,7 +93,10 @@ class HLSStreamRecorder(
self._session = requests.Session()
self._session.headers.update(self._live.headers)
self._stream_remuxer = StreamRemuxer(self._live.room_id)
self._stream_remuxer = StreamRemuxer(
self._live.room_id,
remove_filler_data=True,
)
self._segment_queue: Queue[Segment] = Queue(maxsize=1000)
self._segment_data_queue: Queue[bytes] = Queue(maxsize=100)
self._stream_host_available = Event()
@ -139,7 +132,7 @@ class HLSStreamRecorder(
self._segment_data_feeder_thread.join(timeout=10)
self._stream_remuxer.stop()
self._stream_remuxer.raise_for_exception()
self._last_segment_uris.clear()
self._last_seg_uris.clear()
del self._segment_queue
del self._segment_data_queue
except TryAgain:
@ -153,44 +146,6 @@ class HLSStreamRecorder(
self._emit_event('stream_recording_stopped')
logger.debug('Stream recorder thread stopped')
def _main_loop(self) -> None:
for attempt in Retrying(
reraise=True,
retry=(
retry_if_result(lambda r: not self._stopped) |
retry_if_not_exception_type((OSError, NotImplementedError))
),
wait=wait_exponential_for_same_exceptions(max=60),
before_sleep=before_sleep_log(logger, logging.DEBUG, 'main_loop'),
):
with attempt:
try:
self._streaming_loop()
except NoStreamAvailable as e:
logger.warning(f'No stream available: {repr(e)}')
if not self._stopped:
raise TryAgain
except OSError as e:
logger.critical(repr(e), exc_info=e)
if e.errno == errno.ENOSPC:
# OSError(28, 'No space left on device')
self._handle_exception(e)
self._stopped = True
raise TryAgain
except LiveRoomHidden:
logger.error('The live room has been hidden!')
self._stopped = True
except LiveRoomLocked:
logger.error('The live room has been locked!')
self._stopped = True
except LiveRoomEncrypted:
logger.error('The live room has been encrypted!')
self._stopped = True
except Exception as e:
logger.exception(e)
self._handle_exception(e)
self._stopped = True
def _streaming_loop(self) -> None:
url = self._get_live_stream_url()
@ -247,32 +202,29 @@ class HLSStreamRecorder(
self._stream_analysed = False
continue
uris: Set[str] = set()
curr_seg_uris: Set[str] = set()
for seg in playlist.segments:
uris.add(seg.uri)
if seg.uri not in self._last_segment_uris:
curr_seg_uris.add(seg.uri)
if seg.uri not in self._last_seg_uris:
self._segment_queue.put(seg, timeout=60)
self._last_seg_uris.add(seg.uri)
if len(self._last_seg_uris) > self._MAX_LAST_SEG_URIS:
self._last_seg_uris.pop(0)
if (
self._last_segment_uris and
not uris.intersection(self._last_segment_uris)
self._last_seg_uris and
not curr_seg_uris.intersection(self._last_seg_uris)
):
logger.debug(
'segments broken!\n'
f'last segments: {self._last_segment_uris}\n'
f'current segments: {uris}'
f'last segments uris: {self._last_seg_uris}\n'
f'current segments uris: {curr_seg_uris}'
)
with self._stream_analysed_lock:
self._stream_analysed = False
self._last_segment_uris = uris
if playlist.is_endlist:
logger.debug('playlist ended')
self._run_coroutine(self._live.update_room_info())
if not self._live.is_living():
self._stopped = True
break
time.sleep(1)
@ -338,7 +290,7 @@ class HLSStreamRecorder(
break
except requests.exceptions.ConnectionError as e:
logger.warning(repr(e))
self._connection_recovered.wait()
self._wait_for_connection_error()
except RetryError as e:
logger.warning(repr(e))
break
@ -434,6 +386,7 @@ class HLSStreamRecorder(
desc='Recording',
unit='B',
unit_scale=True,
unit_divisor=1024,
postfix=self._make_pbar_postfix(),
) as progress_bar:
self._progress_bar = progress_bar

View File

@ -12,9 +12,7 @@ from tenacity import (
)
from .raw_danmaku_receiver import RawDanmakuReceiver
from .base_stream_recorder import (
BaseStreamRecorder, StreamRecorderEventListener
)
from .stream_recorder import StreamRecorder, StreamRecorderEventListener
from ..bili.live import Live
from ..exception import exception_callback, submit_exception
from ..event.event_emitter import EventListener, EventEmitter
@ -45,7 +43,7 @@ class RawDanmakuDumper(
def __init__(
self,
live: Live,
stream_recorder: BaseStreamRecorder,
stream_recorder: StreamRecorder,
danmaku_receiver: RawDanmakuReceiver,
) -> None:
super().__init__()
@ -53,13 +51,7 @@ class RawDanmakuDumper(
self._stream_recorder = stream_recorder
self._receiver = danmaku_receiver
def change_stream_recorder(
self, stream_recorder: BaseStreamRecorder
) -> None:
self._stream_recorder.remove_listener(self)
self._stream_recorder = stream_recorder
self._stream_recorder.add_listener(self)
logger.debug('Changed stream recorder')
self._lock: asyncio.Lock = asyncio.Lock()
def _do_enable(self) -> None:
self._stream_recorder.add_listener(self)
@ -72,11 +64,13 @@ class RawDanmakuDumper(
async def on_video_file_created(
self, video_path: str, record_start_time: int
) -> None:
self._path = raw_danmaku_path(video_path)
self._start_dumping()
async with self._lock:
self._path = raw_danmaku_path(video_path)
self._start_dumping()
async def on_video_file_completed(self, video_path: str) -> None:
await self._stop_dumping()
async with self._lock:
await self._stop_dumping()
def _start_dumping(self) -> None:
self._create_dump_task()

View File

@ -1,35 +1,28 @@
from __future__ import annotations
import asyncio
import logging
from datetime import datetime
from typing import Iterator, Optional, Type
from typing import Iterator, Optional
import aiohttp
import aiofiles
import humanize
from tenacity import retry, wait_fixed, stop_after_attempt
from .danmaku_receiver import DanmakuReceiver
from .danmaku_dumper import DanmakuDumper, DanmakuDumperEventListener
from .raw_danmaku_receiver import RawDanmakuReceiver
from .raw_danmaku_dumper import RawDanmakuDumper, RawDanmakuDumperEventListener
from .base_stream_recorder import (
BaseStreamRecorder, StreamRecorderEventListener
)
from .stream_analyzer import StreamProfile
from .flv_stream_recorder import FLVStreamRecorder
from .hls_stream_recorder import HLSStreamRecorder
from ..event.event_emitter import EventListener, EventEmitter
from ..flv.data_analyser import MetaData
from ..bili.live import Live
from ..bili.models import RoomInfo
from ..bili.danmaku_client import DanmakuClient
from ..bili.live_monitor import LiveMonitor, LiveEventListener
from ..bili.typing import StreamFormat, QualityNumber
from ..utils.mixins import AsyncStoppableMixin
from ..path import cover_path
from ..bili.live import Live
from ..bili.live_monitor import LiveEventListener, LiveMonitor
from ..bili.models import RoomInfo
from ..bili.typing import QualityNumber, StreamFormat
from ..event.event_emitter import EventEmitter, EventListener
from ..flv.data_analyser import MetaData
from ..logging.room_id import aio_task_with_room_id
from ..utils.mixins import AsyncStoppableMixin
from .cover_downloader import CoverDownloader, CoverSaveStrategy
from .danmaku_dumper import DanmakuDumper, DanmakuDumperEventListener
from .danmaku_receiver import DanmakuReceiver
from .raw_danmaku_dumper import RawDanmakuDumper, RawDanmakuDumperEventListener
from .raw_danmaku_receiver import RawDanmakuReceiver
from .stream_analyzer import StreamProfile
from .stream_recorder import StreamRecorder, StreamRecorderEventListener
__all__ = 'RecorderEventListener', 'Recorder'
@ -47,29 +40,19 @@ class RecorderEventListener(EventListener):
async def on_recording_cancelled(self, recorder: Recorder) -> None:
...
async def on_video_file_created(
self, recorder: Recorder, path: str
) -> None:
async def on_video_file_created(self, recorder: Recorder, path: str) -> None:
...
async def on_video_file_completed(
self, recorder: Recorder, path: str
) -> None:
async def on_video_file_completed(self, recorder: Recorder, path: str) -> None:
...
async def on_danmaku_file_created(
self, recorder: Recorder, path: str
) -> None:
async def on_danmaku_file_created(self, recorder: Recorder, path: str) -> None:
...
async def on_danmaku_file_completed(
self, recorder: Recorder, path: str
) -> None:
async def on_danmaku_file_completed(self, recorder: Recorder, path: str) -> None:
...
async def on_raw_danmaku_file_created(
self, recorder: Recorder, path: str
) -> None:
async def on_raw_danmaku_file_created(self, recorder: Recorder, path: str) -> None:
...
async def on_raw_danmaku_file_completed(
@ -96,6 +79,7 @@ class Recorder(
*,
stream_format: StreamFormat = 'flv',
quality_number: QualityNumber = 10000,
fmp4_stream_timeout: int = 10,
buffer_size: Optional[int] = None,
read_timeout: Optional[int] = None,
disconnection_timeout: Optional[int] = None,
@ -107,6 +91,7 @@ class Recorder(
record_guard_buy: bool = False,
record_super_chat: bool = False,
save_cover: bool = False,
cover_save_strategy: CoverSaveStrategy = CoverSaveStrategy.DEFAULT,
save_raw_danmaku: bool = False,
) -> None:
super().__init__()
@ -114,23 +99,18 @@ class Recorder(
self._live = live
self._danmaku_client = danmaku_client
self._live_monitor = live_monitor
self.save_cover = save_cover
self.save_raw_danmaku = save_raw_danmaku
self._recording: bool = False
self._stream_available: bool = False
cls: Type[BaseStreamRecorder]
if stream_format == 'flv':
cls = FLVStreamRecorder
else:
cls = HLSStreamRecorder
self._stream_recorder = cls(
self._stream_recorder = StreamRecorder(
self._live,
out_dir=out_dir,
path_template=path_template,
stream_format=stream_format,
quality_number=quality_number,
fmp4_stream_timeout=fmp4_stream_timeout,
buffer_size=buffer_size,
read_timeout=read_timeout,
disconnection_timeout=disconnection_timeout,
@ -151,9 +131,14 @@ class Recorder(
)
self._raw_danmaku_receiver = RawDanmakuReceiver(danmaku_client)
self._raw_danmaku_dumper = RawDanmakuDumper(
self._live, self._stream_recorder, self._raw_danmaku_receiver
)
self._cover_downloader = CoverDownloader(
self._live,
self._stream_recorder,
self._raw_danmaku_receiver,
save_cover=save_cover,
cover_save_strategy=cover_save_strategy,
)
@property
@ -181,11 +166,19 @@ class Recorder(
self._stream_recorder.quality_number = value
@property
def real_stream_format(self) -> StreamFormat:
def fmp4_stream_timeout(self) -> int:
return self._stream_recorder.fmp4_stream_timeout
@fmp4_stream_timeout.setter
def fmp4_stream_timeout(self, value: int) -> None:
self._stream_recorder.fmp4_stream_timeout = value
@property
def real_stream_format(self) -> Optional[StreamFormat]:
return self._stream_recorder.real_stream_format
@property
def real_quality_number(self) -> QualityNumber:
def real_quality_number(self) -> Optional[QualityNumber]:
return self._stream_recorder.real_quality_number
@property
@ -252,6 +245,22 @@ class Recorder(
def record_super_chat(self, value: bool) -> None:
self._danmaku_dumper.record_super_chat = value
@property
def save_cover(self) -> bool:
return self._cover_downloader.save_cover
@save_cover.setter
def save_cover(self, value: bool) -> None:
self._cover_downloader.save_cover = value
@property
def cover_save_strategy(self) -> CoverSaveStrategy:
return self._cover_downloader.cover_save_strategy
@cover_save_strategy.setter
def cover_save_strategy(self, value: CoverSaveStrategy) -> None:
self._cover_downloader.cover_save_strategy = value
@property
def stream_url(self) -> str:
return self._stream_recorder.stream_url
@ -375,15 +384,11 @@ class Recorder(
self._print_changed_room_info(room_info)
self._stream_recorder.update_progress_bar_info()
async def on_video_file_created(
self, path: str, record_start_time: int
) -> None:
async def on_video_file_created(self, path: str, record_start_time: int) -> None:
await self._emit('video_file_created', self, path)
async def on_video_file_completed(self, path: str) -> None:
await self._emit('video_file_completed', self, path)
if self.save_cover:
await self._save_cover_image(path)
async def on_danmaku_file_created(self, path: str) -> None:
await self._emit('danmaku_file_created', self, path)
@ -426,7 +431,6 @@ class Recorder(
async def _start_recording(self) -> None:
if self._recording:
return
self._change_stream_recorder()
self._recording = True
if self.save_raw_danmaku:
@ -434,6 +438,7 @@ class Recorder(
self._raw_danmaku_receiver.start()
self._danmaku_dumper.enable()
self._danmaku_receiver.start()
self._cover_downloader.enable()
await self._prepare()
if self._stream_available:
@ -455,6 +460,7 @@ class Recorder(
self._raw_danmaku_receiver.stop()
self._danmaku_dumper.disable()
self._danmaku_receiver.stop()
self._cover_downloader.disable()
if self._stopped:
logger.info('Recording Cancelled')
@ -492,60 +498,6 @@ class Recorder(
if not self._stream_recorder.stopped:
await self.stop()
def _change_stream_recorder(self) -> None:
if self._recording:
logger.debug('Can not change stream recorder while recording')
return
cls: Type[BaseStreamRecorder]
if self.stream_format == 'flv':
cls = FLVStreamRecorder
else:
cls = HLSStreamRecorder
if self._stream_recorder.__class__ == cls:
return
self._stream_recorder.remove_listener(self)
self._stream_recorder = cls(
self._live,
out_dir=self.out_dir,
path_template=self.path_template,
stream_format=self.stream_format,
quality_number=self.quality_number,
buffer_size=self.buffer_size,
read_timeout=self.read_timeout,
disconnection_timeout=self.disconnection_timeout,
filesize_limit=self.filesize_limit,
duration_limit=self.duration_limit,
)
self._stream_recorder.add_listener(self)
self._danmaku_dumper.change_stream_recorder(self._stream_recorder)
self._raw_danmaku_dumper.change_stream_recorder(self._stream_recorder)
logger.debug(f'Changed stream recorder to {cls.__name__}')
@aio_task_with_room_id
async def _save_cover_image(self, video_path: str) -> None:
try:
await self._live.update_info()
url = self._live.room_info.cover
ext = url.rsplit('.', 1)[-1]
path = cover_path(video_path, ext)
await self._save_file(url, path)
except Exception as e:
logger.error(f'Failed to save cover image: {repr(e)}')
else:
logger.info(f'Saved cover image: {path}')
@retry(reraise=True, wait=wait_fixed(1), stop=stop_after_attempt(3))
async def _save_file(self, url: str, path: str) -> None:
async with aiohttp.ClientSession(raise_for_status=True) as session:
async with session.get(url) as response:
async with aiofiles.open(path, 'wb') as file:
await file.write(await response.read())
def _print_waiting_message(self) -> None:
logger.info('Waiting... until the live starts')
@ -554,9 +506,7 @@ class Recorder(
user_info = self._live.user_info
if room_info.live_start_time > 0:
live_start_time = str(
datetime.fromtimestamp(room_info.live_start_time)
)
live_start_time = str(datetime.fromtimestamp(room_info.live_start_time))
else:
live_start_time = 'NULL'

View File

@ -0,0 +1,284 @@
import asyncio
import logging
import time
from typing import Iterator, Optional
from ..bili.live import Live
from ..bili.typing import QualityNumber, StreamFormat
from ..event.event_emitter import EventEmitter
from ..flv.data_analyser import MetaData
from ..utils.mixins import AsyncStoppableMixin
from .flv_stream_recorder_impl import FLVStreamRecorderImpl
from .hls_stream_recorder_impl import HLSStreamRecorderImpl
from .stream_analyzer import StreamProfile
from .stream_recorder_impl import StreamRecorderEventListener
__all__ = 'StreamRecorder', 'StreamRecorderEventListener'
logger = logging.getLogger(__name__)
class StreamRecorder(
StreamRecorderEventListener,
EventEmitter[StreamRecorderEventListener],
AsyncStoppableMixin,
):
def __init__(
self,
live: Live,
out_dir: str,
path_template: str,
*,
stream_format: StreamFormat = 'flv',
quality_number: QualityNumber = 10000,
fmp4_stream_timeout: int = 10,
buffer_size: Optional[int] = None,
read_timeout: Optional[int] = None,
disconnection_timeout: Optional[int] = None,
filesize_limit: int = 0,
duration_limit: int = 0,
) -> None:
super().__init__()
self.stream_format = stream_format
self.fmp4_stream_timeout = fmp4_stream_timeout
if stream_format == 'flv':
cls = FLVStreamRecorderImpl
elif stream_format == 'fmp4':
cls = HLSStreamRecorderImpl # type: ignore
else:
logger.warning(
f'The specified stream format ({stream_format}) is '
'unsupported, will using the stream format (flv) instead.'
)
self.stream_format = 'flv'
cls = FLVStreamRecorderImpl
self._impl = cls(
live=live,
out_dir=out_dir,
path_template=path_template,
quality_number=quality_number,
buffer_size=buffer_size,
read_timeout=read_timeout,
disconnection_timeout=disconnection_timeout,
filesize_limit=filesize_limit,
duration_limit=duration_limit,
)
self._impl.add_listener(self)
@property
def stream_url(self) -> str:
return self._impl.stream_url
@property
def stream_host(self) -> str:
return self._impl.stream_host
@property
def dl_total(self) -> int:
return self._impl.dl_total
@property
def dl_rate(self) -> float:
return self._impl.dl_rate
@property
def rec_elapsed(self) -> float:
return self._impl.rec_elapsed
@property
def rec_total(self) -> int:
return self._impl.rec_total
@property
def rec_rate(self) -> float:
return self._impl.rec_rate
@property
def out_dir(self) -> str:
return self._impl.out_dir
@out_dir.setter
def out_dir(self, value: str) -> None:
self._impl.out_dir = value
@property
def path_template(self) -> str:
return self._impl.path_template
@path_template.setter
def path_template(self, value: str) -> None:
self._impl.path_template = value
@property
def quality_number(self) -> QualityNumber:
return self._impl.quality_number
@quality_number.setter
def quality_number(self, value: QualityNumber) -> None:
self._impl.quality_number = value
@property
def real_stream_format(self) -> Optional[StreamFormat]:
if self.stopped:
return None
return self._impl.stream_format
@property
def real_quality_number(self) -> Optional[QualityNumber]:
return self._impl.real_quality_number
@property
def buffer_size(self) -> int:
return self._impl.buffer_size
@buffer_size.setter
def buffer_size(self, value: int) -> None:
self._impl.buffer_size = value
@property
def read_timeout(self) -> int:
return self._impl.read_timeout
@read_timeout.setter
def read_timeout(self, value: int) -> None:
self._impl.read_timeout = value
@property
def disconnection_timeout(self) -> int:
return self._impl.disconnection_timeout
@disconnection_timeout.setter
def disconnection_timeout(self, value: int) -> None:
self._impl.disconnection_timeout = value
@property
def filesize_limit(self) -> int:
return self._impl.filesize_limit
@filesize_limit.setter
def filesize_limit(self, value: int) -> None:
self._impl.filesize_limit = value
@property
def duration_limit(self) -> int:
return self._impl.duration_limit
@duration_limit.setter
def duration_limit(self, value: int) -> None:
self._impl.duration_limit = value
@property
def recording_path(self) -> Optional[str]:
return self._impl.recording_path
@property
def metadata(self) -> Optional[MetaData]:
return self._impl.metadata
@property
def stream_profile(self) -> StreamProfile:
return self._impl.stream_profile
def has_file(self) -> bool:
return self._impl.has_file()
def get_files(self) -> Iterator[str]:
yield from self._impl.get_files()
def clear_files(self) -> None:
self._impl.clear_files()
def can_cut_stream(self) -> bool:
return self._impl.can_cut_stream()
def cut_stream(self) -> bool:
return self._impl.cut_stream()
def update_progress_bar_info(self) -> None:
self._impl.update_progress_bar_info()
@property
def stopped(self) -> bool:
return self._impl.stopped
async def _do_start(self) -> None:
stream_format = self.stream_format
if stream_format == 'fmp4':
logger.info('Waiting for the fmp4 stream becomes available...')
available = await self._wait_fmp4_stream()
if not available:
logger.warning(
'The specified stream format (fmp4) is not available '
f'in {self.fmp4_stream_timeout} seconcds, '
'falling back to stream format (flv).'
)
stream_format = 'flv'
self._change_impl(stream_format)
await self._impl.start()
async def _do_stop(self) -> None:
await self._impl.stop()
async def on_video_file_created(self, path: str, record_start_time: int) -> None:
await self._emit('video_file_created', path, record_start_time)
async def on_video_file_completed(self, path: str) -> None:
await self._emit('video_file_completed', path)
async def on_stream_recording_stopped(self) -> None:
await self._emit('stream_recording_stopped')
async def _wait_fmp4_stream(self) -> bool:
end_time = time.monotonic() + self.fmp4_stream_timeout
available = False # debounce
while True:
try:
await self._impl._live.get_live_stream_urls(stream_format='fmp4')
except Exception:
available = False
if time.monotonic() > end_time:
return False
else:
if available:
return True
else:
available = True
await asyncio.sleep(1)
def _change_impl(self, stream_format: StreamFormat) -> None:
if stream_format == 'flv':
cls = FLVStreamRecorderImpl
elif stream_format == 'fmp4':
cls = HLSStreamRecorderImpl # type: ignore
else:
logger.warning(
f'The specified stream format ({stream_format}) is '
'unsupported, will using the stream format (flv) instead.'
)
cls = FLVStreamRecorderImpl
if self._impl.__class__ == cls:
return
self._impl.remove_listener(self)
self._impl = cls(
live=self._impl._live,
out_dir=self._impl.out_dir,
path_template=self._impl.path_template,
quality_number=self._impl.quality_number,
buffer_size=self._impl.buffer_size,
read_timeout=self._impl.read_timeout,
disconnection_timeout=self._impl.disconnection_timeout,
filesize_limit=self._impl.filesize_limit,
duration_limit=self._impl.duration_limit,
)
self._impl.add_listener(self)
logger.debug(f'Changed stream recorder impl to {cls.__name__}')

View File

@ -1,53 +1,61 @@
import asyncio
import errno
import io
import logging
import os
import re
import time
import asyncio
import logging
from abc import ABC, abstractmethod
from threading import Thread, Event
from datetime import datetime, timezone, timedelta
from collections import OrderedDict
from typing import Any, BinaryIO, Dict, Iterator, Optional, Tuple
from datetime import datetime, timedelta, timezone
from threading import Thread
from typing import Any, BinaryIO, Dict, Final, Iterator, Optional, Tuple
import aiohttp
import urllib3
from tqdm import tqdm
from rx.subject import Subject
from rx.core import Observable
from rx.subject import Subject
from tenacity import (
Retrying,
TryAgain,
retry,
wait_none,
wait_fixed,
retry_if_exception_type,
retry_if_not_exception_type,
retry_if_result,
stop_after_attempt,
stop_after_delay,
wait_chain,
wait_exponential,
stop_after_delay,
stop_after_attempt,
retry_if_exception_type,
TryAgain,
wait_fixed,
wait_none,
)
from tqdm import tqdm
from .. import __version__, __prog__, __github__
from .stream_remuxer import StreamRemuxer
from .stream_analyzer import StreamProfile
from .statistics import StatisticsCalculator
from ..event.event_emitter import EventListener, EventEmitter
from ..bili.live import Live
from ..bili.typing import ApiPlatform, StreamFormat, QualityNumber
from .. import __github__, __prog__, __version__
from ..bili.exceptions import (
LiveRoomEncrypted,
LiveRoomHidden,
LiveRoomLocked,
NoStreamAvailable,
NoStreamFormatAvailable,
NoStreamQualityAvailable,
)
from ..bili.helpers import get_quality_name
from ..bili.live import Live
from ..bili.typing import ApiPlatform, QualityNumber, StreamFormat
from ..event.event_emitter import EventEmitter, EventListener
from ..flv.data_analyser import MetaData
from ..flv.stream_processor import StreamProcessor, BaseOutputFileManager
from ..flv.stream_processor import BaseOutputFileManager, StreamProcessor
from ..logging.room_id import aio_task_with_room_id
from ..path import escape_path
from ..utils.io import wait_for
from ..utils.mixins import AsyncCooperationMixin, AsyncStoppableMixin
from ..path import escape_path
from ..logging.room_id import aio_task_with_room_id
from ..bili.exceptions import (
NoStreamFormatAvailable, NoStreamCodecAvailable, NoStreamQualityAvailable,
)
from .retry import before_sleep_log, wait_exponential_for_same_exceptions
from .statistics import StatisticsCalculator
from .stream_analyzer import StreamProfile
from .stream_remuxer import StreamRemuxer
__all__ = 'BaseStreamRecorder', 'StreamRecorderEventListener', 'StreamProxy'
__all__ = 'StreamRecorderImpl',
logger = logging.getLogger(__name__)
@ -67,7 +75,7 @@ class StreamRecorderEventListener(EventListener):
...
class BaseStreamRecorder(
class StreamRecorderImpl(
EventEmitter[StreamRecorderEventListener],
AsyncCooperationMixin,
AsyncStoppableMixin,
@ -99,9 +107,8 @@ class BaseStreamRecorder(
live, out_dir, path_template, buffer_size
)
self._stream_format = stream_format
self._stream_format: Final = stream_format
self._quality_number = quality_number
self._real_stream_format: Optional[StreamFormat] = None
self._real_quality_number: Optional[QualityNumber] = None
self._api_platform: ApiPlatform = 'android'
self._use_alternative_stream: bool = False
@ -116,8 +123,6 @@ class BaseStreamRecorder(
self._stream_host: str = ''
self._stream_profile: StreamProfile = {}
self._connection_recovered = Event()
def on_file_created(args: Tuple[str, int]) -> None:
logger.info(f"Video file created: '{args[0]}'")
self._emit_event('video_file_created', *args)
@ -177,11 +182,6 @@ class BaseStreamRecorder(
def stream_format(self) -> StreamFormat:
return self._stream_format
@stream_format.setter
def stream_format(self, value: StreamFormat) -> None:
self._stream_format = value
self._real_stream_format = None
@property
def quality_number(self) -> QualityNumber:
return self._quality_number
@ -189,15 +189,12 @@ class BaseStreamRecorder(
@quality_number.setter
def quality_number(self, value: QualityNumber) -> None:
self._quality_number = value
self._real_quality_number = None
@property
def real_stream_format(self) -> StreamFormat:
return self._real_stream_format or self.stream_format
@property
def real_quality_number(self) -> QualityNumber:
return self._real_quality_number or self.quality_number
def real_quality_number(self) -> Optional[QualityNumber]:
if self.stopped:
return None
return self._real_quality_number
@property
def filesize_limit(self) -> int:
@ -263,16 +260,20 @@ class BaseStreamRecorder(
if self._progress_bar is not None:
self._progress_bar.set_postfix_str(self._make_pbar_postfix())
async def _do_start(self) -> None:
logger.debug('Starting stream recorder...')
def _reset(self) -> None:
self._dl_calculator.reset()
self._rec_calculator.reset()
self._stream_url = ''
self._stream_host = ''
self._stream_profile = {}
self._api_platform = 'android'
self._real_quality_number = None
self._use_alternative_stream = False
self._connection_recovered.clear()
self._fall_back_stream_format = False
async def _do_start(self) -> None:
logger.debug('Starting stream recorder...')
self._reset()
self._thread = Thread(
target=self._run, name=f'StreamRecorder::{self._live.room_id}'
)
@ -290,6 +291,44 @@ class BaseStreamRecorder(
def _run(self) -> None:
raise NotImplementedError()
@abstractmethod
def _streaming_loop(self) -> None:
raise NotImplementedError()
def _main_loop(self) -> None:
for attempt in Retrying(
reraise=True,
retry=(
retry_if_result(lambda r: not self._stopped) |
retry_if_not_exception_type((NotImplementedError))
),
wait=wait_exponential_for_same_exceptions(max=60),
before_sleep=before_sleep_log(logger, logging.DEBUG, 'main_loop'),
):
with attempt:
try:
self._streaming_loop()
except (NoStreamAvailable, NoStreamFormatAvailable) as e:
logger.warning(f'Failed to get live stream url: {repr(e)}')
except OSError as e:
logger.critical(repr(e), exc_info=e)
if e.errno == errno.ENOSPC:
# OSError(28, 'No space left on device')
self._handle_exception(e)
self._stopped = True
except LiveRoomHidden:
logger.error('The live room has been hidden!')
self._stopped = True
except LiveRoomLocked:
logger.error('The live room has been locked!')
self._stopped = True
except LiveRoomEncrypted:
logger.error('The live room has been encrypted!')
self._stopped = True
except Exception as e:
logger.exception(e)
self._handle_exception(e)
def _rotate_api_platform(self) -> None:
if self._api_platform == 'android':
self._api_platform = 'web'
@ -305,8 +344,8 @@ class BaseStreamRecorder(
stop=stop_after_attempt(300),
)
def _get_live_stream_url(self) -> str:
fmt = self._stream_format
qn = self._real_quality_number or self.quality_number
fmt = self._real_stream_format or self.stream_format
logger.info(
f'Getting the live stream url... qn: {qn}, format: {fmt}, '
f'api platform: {self._api_platform}, '
@ -327,35 +366,11 @@ class BaseStreamRecorder(
)
self._real_quality_number = 10000
raise TryAgain
except NoStreamFormatAvailable:
if fmt == 'fmp4':
logger.info(
'The specified stream format (fmp4) is not available, '
'falling back to stream format (ts).'
)
self._real_stream_format = 'ts'
elif fmt == 'ts':
logger.info(
'The specified stream format (ts) is not available, '
'falling back to stream format (flv).'
)
self._real_stream_format = 'flv'
else:
raise NotImplementedError(fmt)
raise TryAgain
except NoStreamCodecAvailable as e:
logger.warning(repr(e))
raise TryAgain
except Exception as e:
logger.warning(f'Failed to get live stream urls: {repr(e)}')
self._rotate_api_platform()
raise TryAgain
else:
logger.info(
f'Adopted the stream format ({fmt}) and quality ({qn})'
)
self._real_quality_number = qn
self._real_stream_format = fmt
if not self._use_alternative_stream:
url = urls[0]
@ -366,8 +381,9 @@ class BaseStreamRecorder(
self._use_alternative_stream = False
self._rotate_api_platform()
logger.info(
'No alternative stream url available, will using the primary'
f' stream url from {self._api_platform} api instead.'
'No alternative stream url available, '
'will using the primary stream url '
f'from {self._api_platform} api instead.'
)
raise TryAgain
logger.info(f"Got live stream url: '{url}'")
@ -380,16 +396,7 @@ class BaseStreamRecorder(
logger.debug(f'Retry {name} after {seconds} seconds')
time.sleep(seconds)
def _wait_for_connection_error(self) -> None:
Thread(
target=self._conectivity_checker,
name=f'ConectivityChecker::{self._live.room_id}',
daemon=True,
).start()
self._connection_recovered.wait()
self._connection_recovered.clear()
def _conectivity_checker(self, check_interval: int = 3) -> None:
def _wait_for_connection_error(self, check_interval: int = 3) -> None:
timeout = self.disconnection_timeout
logger.info(f'Waiting {timeout} seconds for connection recovery... ')
timebase = time.monotonic()
@ -397,11 +404,10 @@ class BaseStreamRecorder(
if timeout is not None and time.monotonic() - timebase > timeout:
logger.error(f'Connection not recovered in {timeout} seconds')
self._stopped = True
self._connection_recovered.set()
break
time.sleep(check_interval)
else:
logger.info('Connection recovered')
self._connection_recovered.set()
def _make_pbar_postfix(self) -> str:
return '{room_id} - {user_name}: {room_title}'.format(
@ -434,7 +440,7 @@ B站直播录像
房间号{self._live.room_info.room_id}
开播时间{live_start_time}
流主机: {self._stream_host}
流格式{self._real_stream_format}
流格式{self._stream_format}
流画质{stream_quality}
录制程序{__prog__} v{__version__} {__github__}''',
'description': OrderedDict({
@ -446,7 +452,7 @@ B站直播录像
'ParentArea': self._live.room_info.parent_area_name,
'LiveStartTime': str(live_start_time),
'StreamHost': self._stream_host,
'StreamFormat': self._real_stream_format,
'StreamFormat': self._stream_format,
'StreamQuality': stream_quality,
'Recorder': f'{__prog__} v{__version__} {__github__}',
})

View File

@ -1,22 +1,20 @@
import re
import os
import io
import errno
import shlex
import io
import logging
from threading import Thread, Condition
from subprocess import Popen, PIPE, CalledProcessError
import os
import re
import shlex
from subprocess import PIPE, CalledProcessError, Popen
from threading import Condition, Thread
from typing import Optional, cast
from ..utils.mixins import StoppableMixin, SupportDebugMixin
from ..utils.io import wait_for
from ..utils.mixins import StoppableMixin, SupportDebugMixin
logger = logging.getLogger(__name__)
__all__ = 'StreamRemuxer',
__all__ = ('StreamRemuxer',)
class FFmpegError(Exception):
@ -28,10 +26,10 @@ class StreamRemuxer(StoppableMixin, SupportDebugMixin):
r'\b(error|failed|missing|invalid|corrupt)\b', re.IGNORECASE
)
def __init__(self, room_id: int, bufsize: int = 1024 * 1024) -> None:
def __init__(self, room_id: int, remove_filler_data: bool = False) -> None:
super().__init__()
self._room_id = room_id
self._bufsize = bufsize
self._remove_filler_data = remove_filler_data
self._exception: Optional[Exception] = None
self._ready = Condition()
self._env = None
@ -83,9 +81,7 @@ class StreamRemuxer(StoppableMixin, SupportDebugMixin):
def _do_start(self) -> None:
logger.debug('Starting stream remuxer...')
self._thread = Thread(
target=self._run,
name=f'StreamRemuxer::{self._room_id}',
daemon=True,
target=self._run, name=f'StreamRemuxer::{self._room_id}', daemon=True
)
self._thread.start()
@ -124,21 +120,21 @@ class StreamRemuxer(StoppableMixin, SupportDebugMixin):
logger.debug('Stopped stream remuxer')
def _run_subprocess(self) -> None:
cmd = 'ffmpeg -xerror -i pipe:0 -c copy -copyts -f flv pipe:1'
cmd = 'ffmpeg -xerror -i pipe:0 -c copy -copyts'
if self._remove_filler_data:
cmd += ' -bsf:v filter_units=remove_types=12'
cmd += ' -f flv pipe:1'
args = shlex.split(cmd)
with Popen(
args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
bufsize=self._bufsize, env=self._env,
args, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=self._env
) as self._subprocess:
with self._ready:
self._ready.notify_all()
assert self._subprocess.stderr is not None
with io.TextIOWrapper(
self._subprocess.stderr,
encoding='utf-8',
errors='backslashreplace'
self._subprocess.stderr, encoding='utf-8', errors='backslashreplace'
) as stderr:
while not self._stopped:
line = wait_for(stderr.readline, timeout=10)

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -10,6 +10,6 @@
<body>
<app-root></app-root>
<noscript>Please enable JavaScript to continue using this application.</noscript>
<script src="runtime.5296fd12ffdfadbe.js" type="module"></script><script src="polyfills.4b08448aee19bb22.js" type="module"></script><script src="main.b9234f0840c7101a.js" type="module"></script>
<script src="runtime.6dfcba40e2a24845.js" type="module"></script><script src="polyfills.4b08448aee19bb22.js" type="module"></script><script src="main.b9234f0840c7101a.js" type="module"></script>
</body></html>

View File

@ -1,6 +1,6 @@
{
"configVersion": 1,
"timestamp": 1651566774775,
"timestamp": 1651812628293,
"index": "/index.html",
"assetGroups": [
{
@ -14,15 +14,15 @@
"/103.5b5d2a6e5a8a7479.js",
"/146.92e3b29c4c754544.js",
"/45.c90c3cea2bf1a66e.js",
"/474.88f730916af2dc81.js",
"/66.17103bf51c59b5c8.js",
"/869.ac675e78fa0ea7cf.js",
"/474.c2cc33b068a782fc.js",
"/66.31f5b9ae46ae9005.js",
"/869.42b1fd9a88732b97.js",
"/common.858f777e9296e6f2.js",
"/index.html",
"/main.b9234f0840c7101a.js",
"/manifest.webmanifest",
"/polyfills.4b08448aee19bb22.js",
"/runtime.5296fd12ffdfadbe.js",
"/runtime.6dfcba40e2a24845.js",
"/styles.1f581691b230dc4d.css"
],
"patterns": []
@ -1637,9 +1637,9 @@
"/103.5b5d2a6e5a8a7479.js": "cc0240f217015b6d4ddcc14f31fcc42e1c1c282a",
"/146.92e3b29c4c754544.js": "3824de681dd1f982ea69a065cdf54d7a1e781f4d",
"/45.c90c3cea2bf1a66e.js": "e5bfb8cf3803593e6b8ea14c90b3d3cb6a066764",
"/474.88f730916af2dc81.js": "e7cb3e7bd68c162633d94c8c848dea2daeac8bc3",
"/66.17103bf51c59b5c8.js": "67c9bb3ac7e7c7c25ebe1db69fc87890a2fdc184",
"/869.ac675e78fa0ea7cf.js": "f45052016cb5201d5784b3f261e719d96bd1b153",
"/474.c2cc33b068a782fc.js": "cb59c0b560cdceeccf9c17df5e9be76bfa942775",
"/66.31f5b9ae46ae9005.js": "cc22d2582d8e4c2a83e089d5a1ec32619e439ccd",
"/869.42b1fd9a88732b97.js": "ca5c951f04d02218b3fe7dc5c022dad22bf36eca",
"/assets/animal/panda.js": "fec2868bb3053dd2da45f96bbcb86d5116ed72b1",
"/assets/animal/panda.svg": "bebd302cdc601e0ead3a6d2710acf8753f3d83b1",
"/assets/fill/.gitkeep": "da39a3ee5e6b4b0d3255bfef95601890afd80709",
@ -3234,11 +3234,11 @@
"/assets/twotone/warning.js": "fb2d7ea232f3a99bf8f080dbc94c65699232ac01",
"/assets/twotone/warning.svg": "8c7a2d3e765a2e7dd58ac674870c6655cecb0068",
"/common.858f777e9296e6f2.js": "b68ca68e1e214a2537d96935c23410126cc564dd",
"/index.html": "4957097d609200632fe355aef8f7603a3bb1addc",
"/index.html": "02b6c1c31185bec91e297c4f224b2d193f9c981c",
"/main.b9234f0840c7101a.js": "c8c7b588c070b957a2659f62d6a77de284aa2233",
"/manifest.webmanifest": "62c1cb8c5ad2af551a956b97013ab55ce77dd586",
"/polyfills.4b08448aee19bb22.js": "8e73f2d42cc13ca353cea5c886d930bd6da08d0d",
"/runtime.5296fd12ffdfadbe.js": "5b84a91028ab9e0daaaf89d70f2d12c48d5e358e",
"/runtime.6dfcba40e2a24845.js": "9dc2eec70103e3ee2249dde68eeb09910a7ae02d",
"/styles.1f581691b230dc4d.css": "6f5befbbad57c2b2e80aae855139744b8010d150"
},
"navigationUrls": [

View File

@ -1 +1 @@
(()=>{"use strict";var e,v={},m={};function r(e){var i=m[e];if(void 0!==i)return i.exports;var t=m[e]={exports:{}};return v[e].call(t.exports,t,t.exports,r),t.exports}r.m=v,e=[],r.O=(i,t,o,f)=>{if(!t){var a=1/0;for(n=0;n<e.length;n++){for(var[t,o,f]=e[n],c=!0,l=0;l<t.length;l++)(!1&f||a>=f)&&Object.keys(r.O).every(b=>r.O[b](t[l]))?t.splice(l--,1):(c=!1,f<a&&(a=f));if(c){e.splice(n--,1);var d=o();void 0!==d&&(i=d)}}return i}f=f||0;for(var n=e.length;n>0&&e[n-1][2]>f;n--)e[n]=e[n-1];e[n]=[t,o,f]},r.n=e=>{var i=e&&e.__esModule?()=>e.default:()=>e;return r.d(i,{a:i}),i},r.d=(e,i)=>{for(var t in i)r.o(i,t)&&!r.o(e,t)&&Object.defineProperty(e,t,{enumerable:!0,get:i[t]})},r.f={},r.e=e=>Promise.all(Object.keys(r.f).reduce((i,t)=>(r.f[t](e,i),i),[])),r.u=e=>(592===e?"common":e)+"."+{45:"c90c3cea2bf1a66e",66:"17103bf51c59b5c8",103:"5b5d2a6e5a8a7479",146:"92e3b29c4c754544",474:"88f730916af2dc81",592:"858f777e9296e6f2",869:"ac675e78fa0ea7cf"}[e]+".js",r.miniCssF=e=>{},r.o=(e,i)=>Object.prototype.hasOwnProperty.call(e,i),(()=>{var e={},i="blrec:";r.l=(t,o,f,n)=>{if(e[t])e[t].push(o);else{var a,c;if(void 0!==f)for(var l=document.getElementsByTagName("script"),d=0;d<l.length;d++){var u=l[d];if(u.getAttribute("src")==t||u.getAttribute("data-webpack")==i+f){a=u;break}}a||(c=!0,(a=document.createElement("script")).type="module",a.charset="utf-8",a.timeout=120,r.nc&&a.setAttribute("nonce",r.nc),a.setAttribute("data-webpack",i+f),a.src=r.tu(t)),e[t]=[o];var s=(g,b)=>{a.onerror=a.onload=null,clearTimeout(p);var _=e[t];if(delete e[t],a.parentNode&&a.parentNode.removeChild(a),_&&_.forEach(h=>h(b)),g)return g(b)},p=setTimeout(s.bind(null,void 0,{type:"timeout",target:a}),12e4);a.onerror=s.bind(null,a.onerror),a.onload=s.bind(null,a.onload),c&&document.head.appendChild(a)}}})(),r.r=e=>{"undefined"!=typeof Symbol&&Symbol.toStringTag&&Object.defineProperty(e,Symbol.toStringTag,{value:"Module"}),Object.defineProperty(e,"__esModule",{value:!0})},(()=>{var e;r.tu=i=>(void 0===e&&(e={createScriptURL:t=>t},"undefined"!=typeof trustedTypes&&trustedTypes.createPolicy&&(e=trustedTypes.createPolicy("angular#bundler",e))),e.createScriptURL(i))})(),r.p="",(()=>{var e={666:0};r.f.j=(o,f)=>{var n=r.o(e,o)?e[o]:void 0;if(0!==n)if(n)f.push(n[2]);else if(666!=o){var a=new Promise((u,s)=>n=e[o]=[u,s]);f.push(n[2]=a);var c=r.p+r.u(o),l=new Error;r.l(c,u=>{if(r.o(e,o)&&(0!==(n=e[o])&&(e[o]=void 0),n)){var s=u&&("load"===u.type?"missing":u.type),p=u&&u.target&&u.target.src;l.message="Loading chunk "+o+" failed.\n("+s+": "+p+")",l.name="ChunkLoadError",l.type=s,l.request=p,n[1](l)}},"chunk-"+o,o)}else e[o]=0},r.O.j=o=>0===e[o];var i=(o,f)=>{var l,d,[n,a,c]=f,u=0;if(n.some(p=>0!==e[p])){for(l in a)r.o(a,l)&&(r.m[l]=a[l]);if(c)var s=c(r)}for(o&&o(f);u<n.length;u++)r.o(e,d=n[u])&&e[d]&&e[d][0](),e[n[u]]=0;return r.O(s)},t=self.webpackChunkblrec=self.webpackChunkblrec||[];t.forEach(i.bind(null,0)),t.push=i.bind(null,t.push.bind(t))})()})();
(()=>{"use strict";var e,v={},m={};function r(e){var i=m[e];if(void 0!==i)return i.exports;var t=m[e]={exports:{}};return v[e].call(t.exports,t,t.exports,r),t.exports}r.m=v,e=[],r.O=(i,t,o,f)=>{if(!t){var a=1/0;for(n=0;n<e.length;n++){for(var[t,o,f]=e[n],c=!0,d=0;d<t.length;d++)(!1&f||a>=f)&&Object.keys(r.O).every(b=>r.O[b](t[d]))?t.splice(d--,1):(c=!1,f<a&&(a=f));if(c){e.splice(n--,1);var u=o();void 0!==u&&(i=u)}}return i}f=f||0;for(var n=e.length;n>0&&e[n-1][2]>f;n--)e[n]=e[n-1];e[n]=[t,o,f]},r.n=e=>{var i=e&&e.__esModule?()=>e.default:()=>e;return r.d(i,{a:i}),i},r.d=(e,i)=>{for(var t in i)r.o(i,t)&&!r.o(e,t)&&Object.defineProperty(e,t,{enumerable:!0,get:i[t]})},r.f={},r.e=e=>Promise.all(Object.keys(r.f).reduce((i,t)=>(r.f[t](e,i),i),[])),r.u=e=>(592===e?"common":e)+"."+{45:"c90c3cea2bf1a66e",66:"31f5b9ae46ae9005",103:"5b5d2a6e5a8a7479",146:"92e3b29c4c754544",474:"c2cc33b068a782fc",592:"858f777e9296e6f2",869:"42b1fd9a88732b97"}[e]+".js",r.miniCssF=e=>{},r.o=(e,i)=>Object.prototype.hasOwnProperty.call(e,i),(()=>{var e={},i="blrec:";r.l=(t,o,f,n)=>{if(e[t])e[t].push(o);else{var a,c;if(void 0!==f)for(var d=document.getElementsByTagName("script"),u=0;u<d.length;u++){var l=d[u];if(l.getAttribute("src")==t||l.getAttribute("data-webpack")==i+f){a=l;break}}a||(c=!0,(a=document.createElement("script")).type="module",a.charset="utf-8",a.timeout=120,r.nc&&a.setAttribute("nonce",r.nc),a.setAttribute("data-webpack",i+f),a.src=r.tu(t)),e[t]=[o];var s=(g,b)=>{a.onerror=a.onload=null,clearTimeout(p);var _=e[t];if(delete e[t],a.parentNode&&a.parentNode.removeChild(a),_&&_.forEach(h=>h(b)),g)return g(b)},p=setTimeout(s.bind(null,void 0,{type:"timeout",target:a}),12e4);a.onerror=s.bind(null,a.onerror),a.onload=s.bind(null,a.onload),c&&document.head.appendChild(a)}}})(),r.r=e=>{"undefined"!=typeof Symbol&&Symbol.toStringTag&&Object.defineProperty(e,Symbol.toStringTag,{value:"Module"}),Object.defineProperty(e,"__esModule",{value:!0})},(()=>{var e;r.tu=i=>(void 0===e&&(e={createScriptURL:t=>t},"undefined"!=typeof trustedTypes&&trustedTypes.createPolicy&&(e=trustedTypes.createPolicy("angular#bundler",e))),e.createScriptURL(i))})(),r.p="",(()=>{var e={666:0};r.f.j=(o,f)=>{var n=r.o(e,o)?e[o]:void 0;if(0!==n)if(n)f.push(n[2]);else if(666!=o){var a=new Promise((l,s)=>n=e[o]=[l,s]);f.push(n[2]=a);var c=r.p+r.u(o),d=new Error;r.l(c,l=>{if(r.o(e,o)&&(0!==(n=e[o])&&(e[o]=void 0),n)){var s=l&&("load"===l.type?"missing":l.type),p=l&&l.target&&l.target.src;d.message="Loading chunk "+o+" failed.\n("+s+": "+p+")",d.name="ChunkLoadError",d.type=s,d.request=p,n[1](d)}},"chunk-"+o,o)}else e[o]=0},r.O.j=o=>0===e[o];var i=(o,f)=>{var d,u,[n,a,c]=f,l=0;if(n.some(p=>0!==e[p])){for(d in a)r.o(a,d)&&(r.m[d]=a[d]);if(c)var s=c(r)}for(o&&o(f);l<n.length;l++)r.o(e,u=n[l])&&e[u]&&e[u][0](),e[n[l]]=0;return r.O(s)},t=self.webpackChunkblrec=self.webpackChunkblrec||[];t.forEach(i.bind(null,0)),t.push=i.bind(null,t.push.bind(t))})()})();

View File

@ -41,6 +41,8 @@ def read_tags_in_duration(
yield tag
if tag.timestamp > 0:
break
else:
raise EOFError('no tags')
start = tag.timestamp
end = start + duration

View File

@ -381,11 +381,26 @@ class StreamProcessor:
bytes_io = io.BytesIO()
writer = FlvWriter(bytes_io)
writer.write_header(flv_header)
writer.write_tag(self._parameters_checker.last_metadata_tag)
writer.write_tag(self._parameters_checker.last_video_header_tag)
writer.write_tag(
self._correct_ts(
self._parameters_checker.last_metadata_tag,
-self._parameters_checker.last_metadata_tag.timestamp,
)
)
writer.write_tag(
self._correct_ts(
self._parameters_checker.last_video_header_tag,
-self._parameters_checker.last_video_header_tag.timestamp,
)
)
if self._parameters_checker.last_audio_header_tag is not None:
writer.write_tag(self._parameters_checker.last_audio_header_tag)
writer.write_tag(first_data_tag)
writer.write_tag(
self._correct_ts(
self._parameters_checker.last_audio_header_tag,
-self._parameters_checker.last_audio_header_tag.timestamp,
)
)
writer.write_tag(self._correct_ts(first_data_tag, -first_data_tag.timestamp))
def on_next(profile: StreamProfile) -> None:
self._stream_profile_updates.on_next(profile)

View File

@ -5,7 +5,7 @@ import ssl
from abc import ABC, abstractmethod
from email.message import EmailMessage
from http.client import HTTPException
from typing import Final, Literal, TypedDict, cast
from typing import Final, Literal, TypedDict, Dict, Any, cast
from urllib.parse import urljoin
import aiohttp
@ -192,7 +192,7 @@ class Pushplus(MessagingProvider):
class TelegramResponse(TypedDict):
ok: bool
result: dict
result: Dict[str, Any]
class Telegram(MessagingProvider):

View File

@ -245,6 +245,7 @@ class Postprocessor(
out_path,
metadata_path,
report_progress=True,
remove_filler_data=True,
).subscribe(
on_next,
lambda e: future.set_exception(e),

View File

@ -61,12 +61,20 @@ class VideoRemuxer:
in_path: str,
out_path: str,
metadata_path: Optional[str] = None,
*,
remove_filler_data: bool = False,
) -> RemuxResult:
cmd = f'ffmpeg -i "{in_path}"'
if metadata_path is not None:
cmd = f'ffmpeg -i "{in_path}" -i "{metadata_path}" ' \
f'-map_metadata 1 -codec copy "{out_path}" -y'
else:
cmd = f'ffmpeg -i "{in_path}" -codec copy "{out_path}" -y'
cmd += f' -i "{metadata_path}" -map_metadata 1'
cmd += ' -codec copy'
if remove_filler_data:
# https://forum.doom9.org/showthread.php?t=152051
# ISO_IEC_14496-10_2020(E)
# Table 7-1 NAL unit type codes, syntax element categories, and NAL unit type classes # noqa
# 7.4.2.7 Filler data RBSP semantics
cmd += ' -bsf:v filter_units=remove_types=12'
cmd += f' "{out_path}" -y'
args = shlex.split(cmd)
out_lines: List[str] = []
@ -140,6 +148,7 @@ def remux_video(
metadata_path: Optional[str] = None,
*,
report_progress: bool = False,
remove_filler_data: bool = False,
) -> Observable:
def subscribe(
observer: Observer[Union[RemuxProgress, RemuxResult]],
@ -167,7 +176,12 @@ def remux_video(
)
try:
result = remuxer.remux(in_path, out_path, metadata_path)
result = remuxer.remux(
in_path,
out_path,
metadata_path,
remove_filler_data=remove_filler_data,
)
except Exception as e:
observer.on_error(e)
else:

View File

@ -20,6 +20,7 @@ from pydantic.networks import HttpUrl, EmailStr
from ..bili.typing import StreamFormat, QualityNumber
from ..postprocess import DeleteStrategy
from ..core.cover_downloader import CoverSaveStrategy
from ..logging.typing import LOG_LEVEL
from ..utils.string import camel_case
@ -144,12 +145,21 @@ class DanmakuSettings(DanmakuOptions):
class RecorderOptions(BaseModel):
stream_format: Optional[StreamFormat]
quality_number: Optional[QualityNumber]
fmp4_stream_timeout: Optional[int]
read_timeout: Optional[int] # seconds
disconnection_timeout: Optional[int] # seconds
buffer_size: Annotated[ # bytes
Optional[int], Field(ge=4096, le=1024 ** 2 * 512, multiple_of=2)
]
save_cover: Optional[bool]
cover_save_strategy: Optional[CoverSaveStrategy]
@validator('fmp4_stream_timeout')
def _validate_fmp4_stream_timeout(cls, v: Optional[int]) -> Optional[int]:
if v is not None:
allowed_values = frozenset((3, 5, 10, 30, 60, 180, 300, 600))
cls._validate_with_collection(v, allowed_values)
return v
@validator('read_timeout')
def _validate_read_timeout(cls, value: Optional[int]) -> Optional[int]:
@ -171,12 +181,14 @@ class RecorderOptions(BaseModel):
class RecorderSettings(RecorderOptions):
stream_format: StreamFormat = 'flv'
quality_number: QualityNumber = 20000 # 4K, the highest quality.
fmp4_stream_timeout: int = 10
read_timeout: int = 3
disconnection_timeout: int = 600
buffer_size: Annotated[
int, Field(ge=4096, le=1024 ** 2 * 512, multiple_of=2)
] = 8192
save_cover: bool = False
cover_save_strategy: CoverSaveStrategy = CoverSaveStrategy.DEFAULT
class PostprocessingOptions(BaseModel):
@ -465,8 +477,8 @@ class Settings(BaseModel):
version: str = '1.0'
tasks: Annotated[List[TaskSettings], Field(max_items=100)] = []
output: OutputSettings = OutputSettings()
logging: LoggingSettings = LoggingSettings()
output: OutputSettings = OutputSettings() # type: ignore
logging: LoggingSettings = LoggingSettings() # type: ignore
header: HeaderSettings = HeaderSettings()
danmaku: DanmakuSettings = DanmakuSettings()
recorder: RecorderSettings = RecorderSettings()

View File

@ -6,6 +6,7 @@ import attr
from ..bili.models import RoomInfo, UserInfo
from ..bili.typing import StreamFormat, QualityNumber
from ..core.cover_downloader import CoverSaveStrategy
from ..postprocess import DeleteStrategy, PostprocessorStatus
from ..postprocess.typing import Progress
@ -32,8 +33,8 @@ class TaskStatus:
rec_rate: float # Number of Bytes per second
danmu_total: int # Number of Danmu in total
danmu_rate: float # Number of Danmu per minutes
real_stream_format: StreamFormat
real_quality_number: QualityNumber
real_stream_format: Optional[StreamFormat]
real_quality_number: Optional[QualityNumber]
recording_path: Optional[str] = None
postprocessor_status: PostprocessorStatus = PostprocessorStatus.WAITING
postprocessing_path: Optional[str] = None
@ -60,10 +61,12 @@ class TaskParam:
# RecorderSettings
stream_format: StreamFormat
quality_number: QualityNumber
fmp4_stream_timeout: int
read_timeout: int
disconnection_timeout: Optional[int]
buffer_size: int
save_cover: bool
cover_save_strategy: CoverSaveStrategy
# PostprocessingOptions
remux_to_mp4: bool
inject_extra_metadata: bool

View File

@ -19,6 +19,7 @@ from ..bili.live_monitor import LiveMonitor
from ..bili.typing import StreamFormat, QualityNumber
from ..core import Recorder
from ..core.stream_analyzer import StreamProfile
from ..core.cover_downloader import CoverSaveStrategy
from ..postprocess import Postprocessor, PostprocessorStatus, DeleteStrategy
from ..postprocess.remuxer import RemuxProgress
from ..flv.metadata_injector import InjectProgress
@ -257,6 +258,14 @@ class RecordTask:
def save_cover(self, value: bool) -> None:
self._recorder.save_cover = value
@property
def cover_save_strategy(self) -> CoverSaveStrategy:
return self._recorder.cover_save_strategy
@cover_save_strategy.setter
def cover_save_strategy(self, value: CoverSaveStrategy) -> None:
self._recorder.cover_save_strategy = value
@property
def save_raw_danmaku(self) -> bool:
return self._recorder.save_raw_danmaku
@ -282,11 +291,19 @@ class RecordTask:
self._recorder.quality_number = value
@property
def real_stream_format(self) -> StreamFormat:
def fmp4_stream_timeout(self) -> int:
return self._recorder.fmp4_stream_timeout
@fmp4_stream_timeout.setter
def fmp4_stream_timeout(self, value: int) -> None:
self._recorder.fmp4_stream_timeout = value
@property
def real_stream_format(self) -> Optional[StreamFormat]:
return self._recorder.real_stream_format
@property
def real_quality_number(self) -> QualityNumber:
def real_quality_number(self) -> Optional[QualityNumber]:
return self._recorder.real_quality_number
@property

View File

@ -285,10 +285,12 @@ class RecordTaskManager:
task = self._get_task(room_id)
task.stream_format = settings.stream_format
task.quality_number = settings.quality_number
task.fmp4_stream_timeout = settings.fmp4_stream_timeout
task.read_timeout = settings.read_timeout
task.disconnection_timeout = settings.disconnection_timeout
task.buffer_size = settings.buffer_size
task.save_cover = settings.save_cover
task.cover_save_strategy = settings.cover_save_strategy
def apply_task_postprocessing_settings(
self, room_id: int, settings: PostprocessingSettings
@ -322,9 +324,11 @@ class RecordTaskManager:
record_guard_buy=task.record_guard_buy,
record_super_chat=task.record_super_chat,
save_cover=task.save_cover,
cover_save_strategy=task.cover_save_strategy,
save_raw_danmaku=task.save_raw_danmaku,
stream_format=task.stream_format,
quality_number=task.quality_number,
fmp4_stream_timeout=task.fmp4_stream_timeout,
read_timeout=task.read_timeout,
disconnection_timeout=task.disconnection_timeout,
buffer_size=task.buffer_size,

View File

@ -15,7 +15,7 @@
<input
id="server"
type="url"
placeholder="默认为官方服务器 https://api2.pushdeer.com"
placeholder="默认为官方服务器https://api2.pushdeer.com"
nz-input
formControlName="server"
/>

View File

@ -8,15 +8,20 @@
>
<ng-template #streamFormatTip>
<p>
选择要录制的直播流格式<br />
选择要录制的直播流格式
<br />
FLV 网络不稳定容易中断丢失数据 <br />
HLS (ts) 基本不受本地网络影响 <br />
HLS (fmp4) 只有少数直播间支持 <br />
FLV: 网络不稳定容易中断丢失数据或录制到二压画质
<br />
P.S.<br />
非 FLV 格式需要 ffmpeg<br />
HLS (fmp4) 不支持会自动切换到 HLS (ts)<br />
HLS (fmp4): 基本不受网络波动影响,但只有部分直播间支持。
<br />
P.S.
<br />
录制 HLS 流需要 ffmpeg
<br />
在设定时间内没有 fmp4 流会自动切换录制 flv 流
<br />
WEB 端直播播放器是 Hls7Player 的直播间支持录制 fmp4 流, fMp4Player
则不支持。
</p>
</ng-template>
<nz-form-control
@ -33,6 +38,39 @@
</nz-select>
</nz-form-control>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-label
class="setting-label"
nzNoColon
[nzTooltipTitle]="fmp4StreamTimeoutTip"
>fmp4 流等待时间</nz-form-label
>
<ng-template #fmp4StreamTimeoutTip>
<p>
如果超过所设置的等待时间 fmp4 流还没有就切换为录制 flv 流
<br />
fmp4 流在刚推流是没有的,要过一会才有。
<br />
fmp4 流出现的时间和直播延迟有关,一般都在 10 秒内,但也有延迟比较大超过
1 分钟的。
<br />
推荐全局设置为 10 秒,个别延迟比较大的直播间单独设置。
</p>
</ng-template>
<nz-form-control
class="setting-control select"
[nzWarningTip]="syncFailedWarningTip"
[nzValidateStatus]="
syncStatus.fmp4StreamTimeout ? fmp4StreamTimeoutControl : 'warning'
"
>
<nz-select
formControlName="fmp4StreamTimeout"
[nzOptions]="fmp4StreamTimeoutOptions"
>
</nz-select>
</nz-form-control>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-label
class="setting-label"
@ -66,6 +104,41 @@
<nz-switch formControlName="saveCover"></nz-switch>
</nz-form-control>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-label
class="setting-label"
nzNoColon
[nzTooltipTitle]="coverSaveStrategyTip"
>封面保存策略</nz-form-label
>
<ng-template #coverSaveStrategyTip>
<p>
默认: 每个分割的录播文件对应保存一个封面文件,不管封面是否相同。<br />
去重: 相同的封面只保存一次<br />
P.S.
<br />
判断是否相同是依据封面数据的 sha1只在单次录制内有效。
</p>
</ng-template>
<nz-form-control
class="setting-control radio"
[nzWarningTip]="syncFailedWarningTip"
[nzValidateStatus]="
syncStatus.coverSaveStrategy ? coverSaveStrategyControl : 'warning'
"
>
<nz-radio-group
formControlName="coverSaveStrategy"
[nzDisabled]="!saveCoverControl.value"
>
<ng-container *ngFor="let strategy of coverSaveStrategies">
<label nz-radio-button [nzValue]="strategy.value">{{
strategy.label
}}</label>
</ng-container>
</nz-radio-group>
</nz-form-control>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-label
class="setting-label"
@ -84,7 +157,7 @@
: 'warning'
"
>
<nz-select formControlName="readTimeout" [nzOptions]="timeoutOptions">
<nz-select formControlName="readTimeout" [nzOptions]="readTimeoutOptions">
</nz-select>
</nz-form-control>
</nz-form-item>

View File

@ -20,6 +20,7 @@ import {
TIMEOUT_OPTIONS,
DISCONNECTION_TIMEOUT_OPTIONS,
SYNC_FAILED_WARNING_TIP,
COVER_SAVE_STRATEGIES,
} from '../shared/constants/form';
import { RecorderSettings } from '../shared/setting.model';
import {
@ -43,10 +44,13 @@ export class RecorderSettingsComponent implements OnInit, OnChanges {
readonly streamFormatOptions = cloneDeep(STREAM_FORMAT_OPTIONS) as Mutable<
typeof STREAM_FORMAT_OPTIONS
>;
readonly fmp4StreamTimeoutOptions = cloneDeep(TIMEOUT_OPTIONS) as Mutable<
typeof TIMEOUT_OPTIONS
>;
readonly qualityOptions = cloneDeep(QUALITY_OPTIONS) as Mutable<
typeof QUALITY_OPTIONS
>;
readonly timeoutOptions = cloneDeep(TIMEOUT_OPTIONS) as Mutable<
readonly readTimeoutOptions = cloneDeep(TIMEOUT_OPTIONS) as Mutable<
typeof TIMEOUT_OPTIONS
>;
readonly disconnectionTimeoutOptions = cloneDeep(
@ -55,6 +59,9 @@ export class RecorderSettingsComponent implements OnInit, OnChanges {
readonly bufferOptions = cloneDeep(BUFFER_OPTIONS) as Mutable<
typeof BUFFER_OPTIONS
>;
readonly coverSaveStrategies = cloneDeep(COVER_SAVE_STRATEGIES) as Mutable<
typeof COVER_SAVE_STRATEGIES
>;
constructor(
formBuilder: FormBuilder,
@ -64,10 +71,12 @@ export class RecorderSettingsComponent implements OnInit, OnChanges {
this.settingsForm = formBuilder.group({
streamFormat: [''],
qualityNumber: [''],
fmp4StreamTimeout: [''],
readTimeout: [''],
disconnectionTimeout: [''],
bufferSize: [''],
saveCover: [''],
coverSaveStrategy: [''],
});
}
@ -79,6 +88,10 @@ export class RecorderSettingsComponent implements OnInit, OnChanges {
return this.settingsForm.get('qualityNumber') as FormControl;
}
get fmp4StreamTimeoutControl() {
return this.settingsForm.get('fmp4StreamTimeout') as FormControl;
}
get readTimeoutControl() {
return this.settingsForm.get('readTimeout') as FormControl;
}
@ -95,6 +108,10 @@ export class RecorderSettingsComponent implements OnInit, OnChanges {
return this.settingsForm.get('saveCover') as FormControl;
}
get coverSaveStrategyControl() {
return this.settingsForm.get('coverSaveStrategy') as FormControl;
}
ngOnChanges(): void {
this.syncStatus = mapValues(this.settings, () => true);
this.settingsForm.setValue(this.settings);

View File

@ -1,4 +1,4 @@
import { DeleteStrategy } from '../setting.model';
import { CoverSaveStrategy, DeleteStrategy } from '../setting.model';
import range from 'lodash-es/range';
@ -52,9 +52,14 @@ export const DELETE_STRATEGIES = [
{ label: '从不', value: DeleteStrategy.NEVER },
] as const;
export const COVER_SAVE_STRATEGIES = [
{ label: '默认', value: CoverSaveStrategy.DEFAULT },
{ label: '去重', value: CoverSaveStrategy.DEDUP },
] as const;
export const STREAM_FORMAT_OPTIONS = [
{ label: 'FLV', value: 'flv' },
{ label: 'HLS (ts)', value: 'ts' },
// { label: 'HLS (ts)', value: 'ts' },
{ label: 'HLS (fmp4)', value: 'fmp4' },
] as const;

View File

@ -29,13 +29,20 @@ export type QualityNumber =
| 150 // 高清
| 80; // 流畅
export enum CoverSaveStrategy {
DEFAULT = 'default',
DEDUP = 'dedup',
}
export interface RecorderSettings {
streamFormat: StreamFormat;
qualityNumber: QualityNumber;
fmp4StreamTimeout: number;
readTimeout: number;
disconnectionTimeout: number;
bufferSize: number;
saveCover: boolean;
coverSaveStrategy: CoverSaveStrategy;
}
export type RecorderOptions = Nullable<RecorderSettings>;

View File

@ -62,11 +62,19 @@
<span class="label">格式画质</span
><span class="value">
<span>
{{ data.task_status.real_stream_format }}
{{
data.task_status.real_stream_format
? data.task_status.real_stream_format
: "N/A"
}}
</span>
<span>
{{ data.task_status.real_quality_number | quality }}
({{ data.task_status.real_quality_number
{{
data.task_status.real_quality_number
? (data.task_status.real_quality_number | quality)
: "N/A"
}}
({{ data.task_status.real_quality_number ?? "N/A"
}}<ng-container *ngIf="isBlurayStreamQuality()">, bluray</ng-container
>)
</span>

View File

@ -32,10 +32,12 @@ export class TaskManagerService {
return this.taskService.updateTaskInfo(roomId).pipe(
tap(
() => {
this.message.success('成功刷新任务的数据');
this.message.success(`[${roomId}] 成功刷新任务的数据`);
},
(error: HttpErrorResponse) => {
this.message.error(`刷新任务的数据出错: ${error.message}`);
this.message.error(
`[${roomId}] 刷新任务的数据出错: ${error.message}`
);
}
)
);
@ -101,10 +103,10 @@ export class TaskManagerService {
return this.taskService.removeTask(roomId).pipe(
tap(
() => {
this.message.success('任务已删除');
this.message.success(`[${roomId}] 任务已删除`);
},
(error: HttpErrorResponse) => {
this.message.error(`删除任务出错: ${error.message}`);
this.message.error(`[${roomId}] 删除任务出错: ${error.message}`);
}
)
);
@ -129,18 +131,18 @@ export class TaskManagerService {
}
startTask(roomId: number): Observable<ResponseMessage> {
const messageId = this.message.loading('正在运行任务...', {
const messageId = this.message.loading(`[${roomId}] 正在运行任务...`, {
nzDuration: 0,
}).messageId;
return this.taskService.startTask(roomId).pipe(
tap(
() => {
this.message.remove(messageId);
this.message.success('成功运行任务');
this.message.success(`[${roomId}] 成功运行任务`);
},
(error: HttpErrorResponse) => {
this.message.remove(messageId);
this.message.error(`运行任务出错: ${error.message}`);
this.message.error(`[${roomId}] 运行任务出错: ${error.message}`);
}
)
);
@ -168,18 +170,18 @@ export class TaskManagerService {
roomId: number,
force: boolean = false
): Observable<ResponseMessage> {
const messageId = this.message.loading('正在停止任务...', {
const messageId = this.message.loading(`[${roomId}] 正在停止任务...`, {
nzDuration: 0,
}).messageId;
return this.taskService.stopTask(roomId, force).pipe(
tap(
() => {
this.message.remove(messageId);
this.message.success('成功停止任务');
this.message.success(`[${roomId}] 成功停止任务`);
},
(error: HttpErrorResponse) => {
this.message.remove(messageId);
this.message.error(`停止任务出错: ${error.message}`);
this.message.error(`[${roomId}] 停止任务出错: ${error.message}`);
}
)
);
@ -204,18 +206,18 @@ export class TaskManagerService {
}
enableRecorder(roomId: number): Observable<ResponseMessage> {
const messageId = this.message.loading('正在开启录制...', {
const messageId = this.message.loading(`[${roomId}] 正在开启录制...`, {
nzDuration: 0,
}).messageId;
return this.taskService.enableTaskRecorder(roomId).pipe(
tap(
() => {
this.message.remove(messageId);
this.message.success('成功开启录制');
this.message.success(`[${roomId}] 成功开启录制`);
},
(error: HttpErrorResponse) => {
this.message.remove(messageId);
this.message.error(`开启录制出错: ${error.message}`);
this.message.error(`[${roomId}] 开启录制出错: ${error.message}`);
}
)
);
@ -248,18 +250,18 @@ export class TaskManagerService {
roomId: number,
force: boolean = false
): Observable<ResponseMessage> {
const messageId = this.message.loading('正在关闭录制...', {
const messageId = this.message.loading(`[${roomId}] 正在关闭录制...`, {
nzDuration: 0,
}).messageId;
return this.taskService.disableTaskRecorder(roomId, force).pipe(
tap(
() => {
this.message.remove(messageId);
this.message.success('成功关闭录制');
this.message.success(`[${roomId}] 成功关闭录制`);
},
(error: HttpErrorResponse) => {
this.message.remove(messageId);
this.message.error(`关闭录制出错: ${error.message}`);
this.message.error(`[${roomId}] 关闭录制出错: ${error.message}`);
}
)
);
@ -287,13 +289,13 @@ export class TaskManagerService {
return this.taskService.cutStream(roomId).pipe(
tap(
() => {
this.message.success('文件切割已触发');
this.message.success(`[${roomId}] 文件切割已触发`);
},
(error: HttpErrorResponse) => {
if (error.status == 403) {
this.message.warning('时长太短不能切割,请稍后再试。');
this.message.warning(`[${roomId}] 时长太短不能切割,请稍后再试。`);
} else {
this.message.error(`切割文件出错: ${error.message}`);
this.message.error(`[${roomId}] 切割文件出错: ${error.message}`);
}
}
)

View File

@ -91,8 +91,8 @@ export interface TaskStatus {
readonly rec_rate: number;
readonly danmu_total: number;
readonly danmu_rate: number;
readonly real_stream_format: StreamFormat;
readonly real_quality_number: QualityNumber;
readonly real_stream_format: StreamFormat | null;
readonly real_quality_number: QualityNumber | null;
readonly recording_path: string | null;
readonly postprocessor_status: PostprocessorStatus;
readonly postprocessing_path: string | null;

View File

@ -47,7 +47,11 @@
nzTooltipTitle="录制画质"
nzTooltipPlacement="leftTop"
>
{{ status.real_quality_number | quality }}
{{
status.real_quality_number
? (status.real_quality_number | quality)
: ""
}}
</span>
</p>
</div>

View File

@ -29,11 +29,13 @@
[nzValueTemplate]="recordingQuality"
></nz-statistic>
<ng-template #recordingQuality>{{
(taskStatus.real_quality_number | quality)! +
" " +
"(" +
taskStatus.real_quality_number +
")"
taskStatus.real_quality_number
? (taskStatus.real_quality_number | quality) +
" " +
"(" +
taskStatus.real_quality_number +
")"
: ""
}}</ng-template>
<nz-statistic

View File

@ -119,15 +119,20 @@
>
<ng-template #streamFormatTip>
<p>
选择要录制的直播流格式<br />
选择要录制的直播流格式
<br />
FLV 网络不稳定容易中断丢失数据 <br />
HLS (ts) 基本不受本地网络影响 <br />
HLS (fmp4) 只有少数直播间支持 <br />
FLV: 网络不稳定容易中断丢失数据或录制到二压画质
<br />
P.S.<br />
非 FLV 格式需要 ffmpeg<br />
HLS (fmp4) 不支持会自动切换到 HLS (ts)<br />
HLS (fmp4): 基本不受网络波动影响,但只有部分直播间支持。
<br />
P.S.
<br />
录制 HLS 流需要 ffmpeg
<br />
在设定时间内没有 fmp4 流会自动切换录制 flv 流
<br />
WEB 端直播播放器是 Hls7Player 的直播间支持录制 fmp4 流, fMp4Player
则不支持。
</p>
</ng-template>
<nz-form-control class="setting-control select">
@ -150,6 +155,46 @@
>覆盖全局设置</label
>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-label
class="setting-label"
nzNoColon
[nzTooltipTitle]="fmp4StreamTimeoutTip"
>fmp4 流等待时间</nz-form-label
>
<ng-template #fmp4StreamTimeoutTip>
<p>
如果超过所设置的等待时间 fmp4 流还没有就切换为录制 flv 流
<br />
fmp4 流在刚推流是没有的,要过一会才有。
<br />
fmp4 流出现的时间和直播延迟有关,一般都在 10
秒内,但也有延迟比较大超过 1 分钟的。
<br />
推荐全局设置为 10 秒,个别延迟比较大的直播间单独设置。
</p>
</ng-template>
<nz-form-control class="setting-control select">
<nz-select
#fmp4StreamTimeout="ngModel"
name="fmp4StreamTimeout"
[(ngModel)]="model.recorder.fmp4StreamTimeout"
[disabled]="options.recorder.fmp4StreamTimeout === null"
[nzOptions]="fmp4StreamTimeoutOptions"
>
</nz-select>
</nz-form-control>
<label
nz-checkbox
[nzChecked]="options.recorder.fmp4StreamTimeout !== null"
(nzCheckedChange)="
options.recorder.fmp4StreamTimeout = $event
? globalSettings.recorder.fmp4StreamTimeout
: null
"
>覆盖全局设置</label
>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-label
class="setting-label"
@ -202,6 +247,45 @@
>覆盖全局设置</label
>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-label
class="setting-label"
nzNoColon
[nzTooltipTitle]="coverSaveStrategyTip"
>封面保存策略</nz-form-label
>
<ng-template #coverSaveStrategyTip>
<p>
默认:
每个分割的录播文件对应保存一个封面文件,不管封面是否相同。<br />
去重: 相同的封面只保存一次<br />
P.S.
<br />
判断是否相同是依据封面数据的 sha1只在单次录制内有效。
</p>
</ng-template>
<nz-form-control class="setting-control select">
<nz-select
name="coverSaveStrategy"
[(ngModel)]="model.recorder.coverSaveStrategy"
[disabled]="
options.recorder.coverSaveStrategy === null ||
!options.recorder.saveCover
"
[nzOptions]="coverSaveStrategies"
></nz-select>
</nz-form-control>
<label
nz-checkbox
[nzChecked]="options.recorder.coverSaveStrategy !== null"
(nzCheckedChange)="
options.recorder.coverSaveStrategy = $event
? globalSettings.recorder.coverSaveStrategy
: null
"
>覆盖全局设置</label
>
</nz-form-item>
<nz-form-item class="setting-item">
<nz-form-label
class="setting-label"
@ -219,7 +303,7 @@
name="readTimeout"
[(ngModel)]="model.recorder.readTimeout"
[disabled]="options.recorder.readTimeout === null"
[nzOptions]="timeoutOptions"
[nzOptions]="readTimeoutOptions"
>
</nz-select>
</nz-form-control>

View File

@ -30,6 +30,7 @@ import {
BUFFER_OPTIONS,
DELETE_STRATEGIES,
SPLIT_FILE_TIP,
COVER_SAVE_STRATEGIES,
} from '../../settings/shared/constants/form';
type OptionsModel = NonNullable<TaskOptions>;
@ -67,10 +68,13 @@ export class TaskSettingsDialogComponent implements OnChanges {
readonly streamFormatOptions = cloneDeep(STREAM_FORMAT_OPTIONS) as Mutable<
typeof STREAM_FORMAT_OPTIONS
>;
readonly fmp4StreamTimeoutOptions = cloneDeep(TIMEOUT_OPTIONS) as Mutable<
typeof TIMEOUT_OPTIONS
>;
readonly qualityOptions = cloneDeep(QUALITY_OPTIONS) as Mutable<
typeof QUALITY_OPTIONS
>;
readonly timeoutOptions = cloneDeep(TIMEOUT_OPTIONS) as Mutable<
readonly readTimeoutOptions = cloneDeep(TIMEOUT_OPTIONS) as Mutable<
typeof TIMEOUT_OPTIONS
>;
readonly disconnectionTimeoutOptions = cloneDeep(
@ -82,6 +86,9 @@ export class TaskSettingsDialogComponent implements OnChanges {
readonly deleteStrategies = cloneDeep(DELETE_STRATEGIES) as Mutable<
typeof DELETE_STRATEGIES
>;
readonly coverSaveStrategies = cloneDeep(COVER_SAVE_STRATEGIES) as Mutable<
typeof COVER_SAVE_STRATEGIES
>;
model!: OptionsModel;
options!: TaskOptions;

View File

@ -3,5 +3,11 @@
{
"path": "."
}
]
],
"settings": {
"python.linting.mypyPath": "mypy",
"python.linting.flake8Path": "flake8",
"python.formatting.blackPath": "black",
"python.sortImports.path": "isort"
}
}